Documentation > Architecture > Performance Optimization

Performance Optimization

Caching strategies, query optimization, resource management, and performance tuning techniques for SysManage systems.

Performance Optimization Overview

SysManage implements comprehensive performance optimization strategies across all system layers to ensure fast response times, efficient resource utilization, and scalable operations.

Core Principles

  • Measure First: Performance optimization based on metrics and profiling
  • Layered Optimization: Optimization at database, application, and infrastructure layers
  • Proactive Monitoring: Continuous performance monitoring and alerting
  • Graceful Degradation: Maintain functionality under high load

Caching Architecture

Multi-Layer Caching Strategy

┌─────────────────────────────────────────────────────────────────┐
│                        Client Layer                            │
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Browser     │  │ Mobile App   │  │ API Clients             │ │
│  │ Cache       │  │ Cache        │  │ Cache                   │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
                          │
┌─────────────────────────┼───────────────────────────────────────┐
│                    CDN / Edge Cache                             │
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ CloudFlare  │  │ AWS          │  │ Static Assets           │ │
│  │ Cache       │  │ CloudFront   │  │ Cache                   │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────┼───────────────────────────────────────┘
                          │
┌─────────────────────────┼───────────────────────────────────────┐
│                 Application Layer Cache                        │
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ HTTP        │  │ API Response │  │ Session                 │ │
│  │ Cache       │  │ Cache        │  │ Cache                   │ │
│  │ (Nginx)     │  │ (Redis)      │  │ (Redis)                 │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────┼───────────────────────────────────────┘
                          │
┌─────────────────────────┼───────────────────────────────────────┐
│                     Data Layer Cache                           │
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Query       │  │ Object       │  │ Computed                │ │
│  │ Cache       │  │ Cache        │  │ Results Cache           │ │
│  │ (Redis)     │  │ (Redis)      │  │ (Redis)                 │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────┼───────────────────────────────────────┘
                          │
┌─────────────────────────┼───────────────────────────────────────┐
│                  Database Layer Cache                          │
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Buffer      │  │ Query Plan   │  │ Connection              │ │
│  │ Pool        │  │ Cache        │  │ Pool                    │ │
│  │ (PgBouncer) │  │ (PostgreSQL) │  │ (PostgreSQL)            │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                        

Redis Caching Implementation

Intelligent Cache Manager

import redis
import json
import hashlib
from typing import Any, Optional, Dict, List, Callable
from datetime import datetime, timedelta
import asyncio

class CacheManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.default_ttl = 3600  # 1 hour
        self.key_prefix = "sysmanage:"

    def _generate_key(self, namespace: str, identifier: str,
                     params: Optional[Dict] = None) -> str:
        """Generate cache key with optional parameters"""
        key_parts = [self.key_prefix, namespace, identifier]

        if params:
            # Sort parameters for consistent key generation
            param_string = json.dumps(params, sort_keys=True)
            param_hash = hashlib.md5(param_string.encode()).hexdigest()[:8]
            key_parts.append(param_hash)

        return ":".join(key_parts)

    async def get(self, namespace: str, identifier: str,
                  params: Optional[Dict] = None) -> Optional[Any]:
        """Get cached value"""
        key = self._generate_key(namespace, identifier, params)

        try:
            value = await self.redis.get(key)
            if value:
                return json.loads(value)
        except Exception as e:
            logger.warning(f"Cache get failed for {key}: {e}")

        return None

    async def set(self, namespace: str, identifier: str, value: Any,
                  ttl: Optional[int] = None, params: Optional[Dict] = None):
        """Set cached value with TTL"""
        key = self._generate_key(namespace, identifier, params)
        ttl = ttl or self.default_ttl

        try:
            serialized_value = json.dumps(value, default=str)
            await self.redis.setex(key, ttl, serialized_value)
        except Exception as e:
            logger.warning(f"Cache set failed for {key}: {e}")

    async def delete(self, namespace: str, identifier: str,
                     params: Optional[Dict] = None):
        """Delete cached value"""
        key = self._generate_key(namespace, identifier, params)
        await self.redis.delete(key)

    async def invalidate_pattern(self, pattern: str):
        """Invalidate all keys matching pattern"""
        keys = await self.redis.keys(f"{self.key_prefix}{pattern}")
        if keys:
            await self.redis.delete(*keys)

    async def get_or_set(self, namespace: str, identifier: str,
                         fetch_func: Callable, ttl: Optional[int] = None,
                         params: Optional[Dict] = None) -> Any:
        """Get from cache or execute function and cache result"""

        # Try to get from cache first
        cached_value = await self.get(namespace, identifier, params)
        if cached_value is not None:
            return cached_value

        # Cache miss - execute function
        try:
            value = await fetch_func() if asyncio.iscoroutinefunction(fetch_func) else fetch_func()

            # Cache the result
            await self.set(namespace, identifier, value, ttl, params)
            return value
        except Exception as e:
            logger.error(f"Failed to fetch and cache {namespace}:{identifier}: {e}")
            raise


class SmartCacheDecorator:
    """Decorator for automatic caching of function results"""

    def __init__(self, cache_manager: CacheManager, namespace: str,
                 ttl: int = 3600, key_func: Optional[Callable] = None):
        self.cache_manager = cache_manager
        self.namespace = namespace
        self.ttl = ttl
        self.key_func = key_func or self._default_key_func

    def _default_key_func(self, func_name: str, *args, **kwargs) -> str:
        """Default key generation function"""
        key_parts = [func_name]

        # Add positional args
        for arg in args:
            key_parts.append(str(arg))

        # Add keyword args
        for k, v in sorted(kwargs.items()):
            key_parts.append(f"{k}={v}")

        return ":".join(key_parts)

    def __call__(self, func):
        async def wrapper(*args, **kwargs):
            # Generate cache key
            cache_key = self.key_func(func.__name__, *args, **kwargs)

            # Try cache first
            result = await self.cache_manager.get(self.namespace, cache_key)
            if result is not None:
                return result

            # Execute function
            result = await func(*args, **kwargs)

            # Cache result
            await self.cache_manager.set(self.namespace, cache_key, result, self.ttl)

            return result

        return wrapper


# Usage examples
cache_manager = CacheManager(redis_client)

@SmartCacheDecorator(cache_manager, "agents", ttl=1800)
async def get_agent_list(user_id: int, filters: Dict = None):
    """Get agent list with caching"""
    query = "SELECT * FROM agents WHERE user_id = $1"
    if filters:
        # Apply filters...
        pass

    return await db.fetch_all(query, user_id)

@SmartCacheDecorator(cache_manager, "metrics", ttl=300)
async def get_agent_metrics(agent_id: str, time_range: str):
    """Get agent metrics with 5-minute cache"""
    # Expensive computation...
    return await compute_metrics(agent_id, time_range)

HTTP Caching Strategy

FastAPI Response Caching

from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import time

class HTTPCacheMiddleware:
    def __init__(self, app, cache_manager: CacheManager):
        self.app = app
        self.cache_manager = cache_manager

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        request = Request(scope, receive)

        # Check if request is cacheable
        if not self._is_cacheable(request):
            await self.app(scope, receive, send)
            return

        # Generate cache key
        cache_key = self._generate_cache_key(request)

        # Try cache
        cached_response = await self.cache_manager.get("http_responses", cache_key)
        if cached_response:
            response = JSONResponse(
                content=cached_response["content"],
                status_code=cached_response["status_code"],
                headers=cached_response["headers"]
            )
            response.headers["X-Cache"] = "HIT"
            await response(scope, receive, send)
            return

        # Cache miss - execute request
        response_data = {}

        async def send_wrapper(message):
            if message["type"] == "http.response.start":
                response_data["status_code"] = message["status"]
                response_data["headers"] = dict(message.get("headers", []))
            elif message["type"] == "http.response.body":
                response_data["body"] = message.get("body", b"")

            await send(message)

        await self.app(scope, receive, send_wrapper)

        # Cache successful responses
        if response_data.get("status_code", 500) < 400:
            await self._cache_response(cache_key, response_data)

    def _is_cacheable(self, request: Request) -> bool:
        """Determine if request is cacheable"""
        # Only cache GET requests
        if request.method != "GET":
            return False

        # Don't cache authenticated requests (unless explicitly allowed)
        if request.headers.get("authorization"):
            return False

        # Check for cache control headers
        cache_control = request.headers.get("cache-control", "")
        if "no-cache" in cache_control:
            return False

        return True

    def _generate_cache_key(self, request: Request) -> str:
        """Generate cache key for request"""
        key_parts = [
            request.method,
            str(request.url.path),
            str(request.url.query)
        ]

        # Include relevant headers
        relevant_headers = ["accept", "accept-encoding", "accept-language"]
        for header in relevant_headers:
            if header in request.headers:
                key_parts.append(f"{header}:{request.headers[header]}")

        return hashlib.md5(":".join(key_parts).encode()).hexdigest()


# Cache configuration for different endpoints
CACHE_RULES = {
    "/api/agents": {"ttl": 300, "vary": ["user-id"]},
    "/api/packages": {"ttl": 1800, "vary": ["agent-id"]},
    "/api/metrics": {"ttl": 60, "vary": ["agent-id", "time-range"]},
    "/api/health": {"ttl": 30, "public": True},
}

def add_cache_headers(response: Response, path: str):
    """Add appropriate cache headers"""
    if path in CACHE_RULES:
        rule = CACHE_RULES[path]

        if rule.get("public"):
            response.headers["Cache-Control"] = f"public, max-age={rule['ttl']}"
        else:
            response.headers["Cache-Control"] = f"private, max-age={rule['ttl']}"

        # Add ETag for validation
        response.headers["ETag"] = f'"{int(time.time())}"'

Database Performance Optimization

Query Optimization

Optimized Database Queries

class OptimizedQueries:
    """Collection of optimized database queries"""

    @staticmethod
    async def get_agent_summary(db: Database, user_id: int,
                               limit: int = 50, offset: int = 0):
        """Optimized agent summary with pagination"""

        # Use index on (user_id, last_seen) for efficient filtering and sorting
        query = """
        SELECT
            a.id,
            a.hostname,
            a.ip_address,
            a.os_type,
            a.last_seen,
            a.status,
            COUNT(p.id) as package_count,
            MAX(p.updated_at) as last_package_update
        FROM agents a
        LEFT JOIN packages p ON a.id = p.agent_id
        WHERE a.user_id = $1
        GROUP BY a.id, a.hostname, a.ip_address, a.os_type, a.last_seen, a.status
        ORDER BY a.last_seen DESC
        LIMIT $2 OFFSET $3
        """

        return await db.fetch_all(query, user_id, limit, offset)

    @staticmethod
    async def get_package_updates_bulk(db: Database, agent_ids: List[str]):
        """Bulk fetch package updates for multiple agents"""

        # Use ANY() for efficient IN clause with large lists
        query = """
        SELECT
            p.agent_id,
            p.name,
            p.current_version,
            p.available_version,
            p.update_available,
            p.last_checked
        FROM packages p
        WHERE p.agent_id = ANY($1)
        AND p.update_available = true
        ORDER BY p.agent_id, p.priority DESC, p.name
        """

        return await db.fetch_all(query, agent_ids)

    @staticmethod
    async def get_metrics_aggregated(db: Database, agent_id: str,
                                   metric_type: str, start_time: datetime,
                                   end_time: datetime, interval: str = '1h'):
        """Get aggregated metrics with time bucketing"""

        # Use time bucketing for efficient aggregation
        query = """
        SELECT
            date_trunc($5, collected_at) as time_bucket,
            AVG((metric_value->>'value')::numeric) as avg_value,
            MIN((metric_value->>'value')::numeric) as min_value,
            MAX((metric_value->>'value')::numeric) as max_value,
            COUNT(*) as sample_count
        FROM agent_metrics
        WHERE agent_id = $1
        AND metric_type = $2
        AND collected_at BETWEEN $3 AND $4
        GROUP BY time_bucket
        ORDER BY time_bucket
        """

        return await db.fetch_all(query, agent_id, metric_type,
                                start_time, end_time, interval)


class QueryProfiler:
    """Database query profiler for performance monitoring"""

    def __init__(self, db: Database):
        self.db = db
        self.slow_query_threshold = 1000  # 1 second

    async def profile_query(self, query: str, params: tuple = ()):
        """Profile query execution"""

        start_time = time.perf_counter()

        # Enable query profiling
        await self.db.execute("SET track_io_timing = on")

        try:
            # Execute query
            result = await self.db.fetch_all(query, *params)

            # Get execution plan
            explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
            plan = await self.db.fetch_one(explain_query, *params)

            execution_time = (time.perf_counter() - start_time) * 1000

            profile_data = {
                "query": query,
                "execution_time_ms": execution_time,
                "row_count": len(result),
                "execution_plan": plan[0] if plan else None
            }

            # Log slow queries
            if execution_time > self.slow_query_threshold:
                await self._log_slow_query(profile_data)

            return result, profile_data

        finally:
            await self.db.execute("SET track_io_timing = off")

    async def _log_slow_query(self, profile_data: Dict):
        """Log slow query for analysis"""

        logger.warning(f"Slow query detected: {profile_data['execution_time_ms']:.2f}ms")
        logger.warning(f"Query: {profile_data['query'][:200]}...")

        # Store in slow query log table
        await self.db.execute("""
            INSERT INTO slow_query_log
            (query_hash, query_text, execution_time_ms, execution_plan, created_at)
            VALUES ($1, $2, $3, $4, NOW())
        """,
            hashlib.md5(profile_data['query'].encode()).hexdigest(),
            profile_data['query'],
            profile_data['execution_time_ms'],
            json.dumps(profile_data['execution_plan'])
        )

Index Optimization

Strategic Index Management

-- Performance-critical indexes for SysManage

-- Agents table indexes
CREATE INDEX CONCURRENTLY idx_agents_user_id_last_seen
ON agents(user_id, last_seen DESC);

CREATE INDEX CONCURRENTLY idx_agents_status_last_seen
ON agents(status, last_seen DESC)
WHERE status IN ('active', 'inactive');

-- Packages table indexes
CREATE INDEX CONCURRENTLY idx_packages_agent_id_update_available
ON packages(agent_id, update_available)
WHERE update_available = true;

CREATE INDEX CONCURRENTLY idx_packages_name_version
ON packages(name, current_version);

-- Agent metrics indexes (for time-series data)
CREATE INDEX CONCURRENTLY idx_agent_metrics_agent_time
ON agent_metrics(agent_id, collected_at DESC);

CREATE INDEX CONCURRENTLY idx_agent_metrics_type_time
ON agent_metrics(metric_type, collected_at DESC);

-- Partial index for recent metrics only
CREATE INDEX CONCURRENTLY idx_agent_metrics_recent
ON agent_metrics(agent_id, metric_type, collected_at)
WHERE collected_at > NOW() - INTERVAL '7 days';

-- BRIN index for time-series data (space-efficient)
CREATE INDEX CONCURRENTLY idx_agent_metrics_collected_at_brin
ON agent_metrics USING BRIN(collected_at);

-- GIN index for JSONB metric_value searching
CREATE INDEX CONCURRENTLY idx_agent_metrics_value_gin
ON agent_metrics USING GIN(metric_value);

-- Tasks table indexes
CREATE INDEX CONCURRENTLY idx_tasks_agent_id_status_created
ON tasks(agent_id, status, created_at DESC);

CREATE INDEX CONCURRENTLY idx_tasks_user_id_status
ON tasks(user_id, status)
WHERE status IN ('pending', 'running');

-- Composite index for complex queries
CREATE INDEX CONCURRENTLY idx_agents_composite
ON agents(user_id, status, os_type, last_seen DESC);

Connection Pooling

Optimized Database Connections

import asyncpg
import asyncio
from typing import Optional

class DatabasePool:
    """Optimized database connection pool"""

    def __init__(self, database_url: str, **pool_kwargs):
        self.database_url = database_url
        self.pool_kwargs = {
            'min_size': 5,
            'max_size': 20,
            'max_queries': 50000,
            'max_inactive_connection_lifetime': 300,
            'command_timeout': 60,
            **pool_kwargs
        }
        self.pool: Optional[asyncpg.Pool] = None

    async def create_pool(self):
        """Create connection pool with optimization"""

        self.pool = await asyncpg.create_pool(
            self.database_url,
            **self.pool_kwargs,
            init=self._init_connection
        )

    async def _init_connection(self, connection):
        """Initialize new connection with optimizations"""

        # Set connection-level optimizations
        await connection.execute("SET default_transaction_isolation = 'read committed'")
        await connection.execute("SET statement_timeout = '30s'")
        await connection.execute("SET lock_timeout = '10s'")
        await connection.execute("SET idle_in_transaction_session_timeout = '5min'")

        # Enable JIT for complex queries
        await connection.execute("SET jit = on")
        await connection.execute("SET jit_above_cost = 100000")

    async def execute(self, query: str, *args):
        """Execute query with connection from pool"""
        async with self.pool.acquire() as connection:
            return await connection.execute(query, *args)

    async def fetch_all(self, query: str, *args):
        """Fetch all results with connection from pool"""
        async with self.pool.acquire() as connection:
            return await connection.fetch(query, *args)

    async def fetch_one(self, query: str, *args):
        """Fetch one result with connection from pool"""
        async with self.pool.acquire() as connection:
            return await connection.fetchrow(query, *args)

    async def transaction(self):
        """Get transaction context"""
        return self.pool.acquire()

    async def close(self):
        """Close connection pool"""
        if self.pool:
            await self.pool.close()


class ReadWriteConnectionManager:
    """Separate read/write connection pools for better performance"""

    def __init__(self, write_url: str, read_urls: List[str]):
        self.write_pool = DatabasePool(write_url, min_size=3, max_size=10)
        self.read_pools = [
            DatabasePool(url, min_size=5, max_size=15)
            for url in read_urls
        ]
        self.read_pool_index = 0

    async def initialize(self):
        """Initialize all connection pools"""
        await self.write_pool.create_pool()

        for pool in self.read_pools:
            await pool.create_pool()

    def get_read_pool(self) -> DatabasePool:
        """Get read pool using round-robin"""
        pool = self.read_pools[self.read_pool_index]
        self.read_pool_index = (self.read_pool_index + 1) % len(self.read_pools)
        return pool

    def get_write_pool(self) -> DatabasePool:
        """Get write pool"""
        return self.write_pool

    async def execute_read(self, query: str, *args):
        """Execute read query"""
        return await self.get_read_pool().fetch_all(query, *args)

    async def execute_write(self, query: str, *args):
        """Execute write query"""
        return await self.write_pool.execute(query, *args)

Application Performance Optimization

Asynchronous Processing

Optimized Async Patterns

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any, Callable
import time

class AsyncBatchProcessor:
    """Efficient batch processing with concurrency control"""

    def __init__(self, max_concurrency: int = 10, batch_size: int = 100):
        self.max_concurrency = max_concurrency
        self.batch_size = batch_size
        self.semaphore = asyncio.Semaphore(max_concurrency)

    async def process_items(self, items: List[Any],
                          processor_func: Callable) -> List[Any]:
        """Process items in batches with controlled concurrency"""

        # Split items into batches
        batches = [
            items[i:i + self.batch_size]
            for i in range(0, len(items), self.batch_size)
        ]

        # Process batches concurrently
        tasks = [
            self._process_batch(batch, processor_func)
            for batch in batches
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Flatten results and handle exceptions
        flattened_results = []
        for result in results:
            if isinstance(result, Exception):
                logger.error(f"Batch processing failed: {result}")
                continue
            flattened_results.extend(result)

        return flattened_results

    async def _process_batch(self, batch: List[Any],
                           processor_func: Callable) -> List[Any]:
        """Process a single batch"""

        async with self.semaphore:
            tasks = [processor_func(item) for item in batch]
            return await asyncio.gather(*tasks, return_exceptions=True)


class ResourcePool:
    """Generic resource pool for expensive operations"""

    def __init__(self, create_resource: Callable, max_size: int = 10):
        self.create_resource = create_resource
        self.max_size = max_size
        self.pool = asyncio.Queue(maxsize=max_size)
        self.created_count = 0

    async def acquire(self):
        """Acquire resource from pool"""
        try:
            # Try to get existing resource
            resource = self.pool.get_nowait()
            return resource
        except asyncio.QueueEmpty:
            # Create new resource if under limit
            if self.created_count < self.max_size:
                resource = await self.create_resource()
                self.created_count += 1
                return resource
            else:
                # Wait for available resource
                return await self.pool.get()

    async def release(self, resource):
        """Release resource back to pool"""
        try:
            self.pool.put_nowait(resource)
        except asyncio.QueueFull:
            # Pool is full, discard resource
            pass


class PerformanceOptimizer:
    """Application-level performance optimizations"""

    def __init__(self):
        self.thread_pool = ThreadPoolExecutor(max_workers=4)
        self.cpu_bound_pool = ResourcePool(
            create_resource=lambda: ThreadPoolExecutor(max_workers=1),
            max_size=4
        )

    async def run_cpu_bound_task(self, func: Callable, *args, **kwargs):
        """Run CPU-bound task in thread pool"""

        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.thread_pool, func, *args, **kwargs
        )

    async def parallel_http_requests(self, urls: List[str],
                                   max_concurrent: int = 10) -> List[Dict]:
        """Make parallel HTTP requests with controlled concurrency"""

        semaphore = asyncio.Semaphore(max_concurrent)

        async def fetch_url(session, url):
            async with semaphore:
                try:
                    async with session.get(url, timeout=10) as response:
                        return {
                            'url': url,
                            'status': response.status,
                            'data': await response.json()
                        }
                except Exception as e:
                    return {
                        'url': url,
                        'error': str(e)
                    }

        async with aiohttp.ClientSession() as session:
            tasks = [fetch_url(session, url) for url in urls]
            return await asyncio.gather(*tasks)

    async def cache_warming(self, cache_entries: List[Dict]):
        """Warm up cache with commonly accessed data"""

        batch_processor = AsyncBatchProcessor(max_concurrency=20, batch_size=50)

        async def warm_cache_entry(entry):
            try:
                key = entry['key']
                value = await entry['fetch_func']()
                await cache_manager.set(entry['namespace'], key, value, entry.get('ttl'))
                return True
            except Exception as e:
                logger.warning(f"Cache warming failed for {entry['key']}: {e}")
                return False

        results = await batch_processor.process_items(cache_entries, warm_cache_entry)
        successful = sum(1 for r in results if r is True)

        logger.info(f"Cache warming completed: {successful}/{len(cache_entries)} entries")


# Usage examples
async def optimize_agent_data_loading(agent_ids: List[str]):
    """Optimized agent data loading"""

    batch_processor = AsyncBatchProcessor(max_concurrency=15, batch_size=20)

    async def load_agent_data(agent_id: str):
        # Load data concurrently
        agent_info, packages, metrics = await asyncio.gather(
            get_agent_info(agent_id),
            get_agent_packages(agent_id),
            get_recent_metrics(agent_id),
            return_exceptions=True
        )

        return {
            'agent_id': agent_id,
            'info': agent_info if not isinstance(agent_info, Exception) else None,
            'packages': packages if not isinstance(packages, Exception) else [],
            'metrics': metrics if not isinstance(metrics, Exception) else []
        }

    return await batch_processor.process_items(agent_ids, load_agent_data)

Memory Optimization

Memory-Efficient Data Processing

import gc
import psutil
from typing import Iterator, Generator, Any
import weakref

class MemoryOptimizer:
    """Memory optimization utilities"""

    def __init__(self, memory_threshold: float = 0.8):
        self.memory_threshold = memory_threshold
        self.object_cache = weakref.WeakValueDictionary()

    def get_memory_usage(self) -> float:
        """Get current memory usage percentage"""
        return psutil.virtual_memory().percent / 100.0

    def check_memory_pressure(self) -> bool:
        """Check if system is under memory pressure"""
        return self.get_memory_usage() > self.memory_threshold

    async def streaming_processor(self, data_source: Iterator[Any],
                                processor_func: Callable,
                                batch_size: int = 1000) -> AsyncGenerator[Any, None]:
        """Process large datasets in memory-efficient streaming manner"""

        batch = []

        for item in data_source:
            batch.append(item)

            if len(batch) >= batch_size:
                # Process batch
                results = await self._process_batch_efficiently(batch, processor_func)

                for result in results:
                    yield result

                # Clear batch and force garbage collection if under pressure
                batch.clear()

                if self.check_memory_pressure():
                    gc.collect()
                    await asyncio.sleep(0.1)  # Allow other tasks to run

        # Process remaining items
        if batch:
            results = await self._process_batch_efficiently(batch, processor_func)
            for result in results:
                yield result

    async def _process_batch_efficiently(self, batch: List[Any],
                                       processor_func: Callable) -> List[Any]:
        """Process batch with memory optimizations"""

        try:
            # Process in smaller chunks if memory pressure is high
            if self.check_memory_pressure():
                chunk_size = len(batch) // 4  # Process in quarters
                results = []

                for i in range(0, len(batch), chunk_size):
                    chunk = batch[i:i + chunk_size]
                    chunk_results = await processor_func(chunk)
                    results.extend(chunk_results)

                    # Force cleanup between chunks
                    del chunk
                    gc.collect()

                return results
            else:
                return await processor_func(batch)

        except MemoryError:
            # Fallback to item-by-item processing
            logger.warning("Memory error in batch processing, falling back to individual processing")
            return [await processor_func([item]) for item in batch]


class ObjectPool:
    """Object pool for reusing expensive objects"""

    def __init__(self, factory: Callable, max_size: int = 100):
        self.factory = factory
        self.max_size = max_size
        self.pool = []
        self.in_use = set()

    def acquire(self):
        """Acquire object from pool"""
        if self.pool:
            obj = self.pool.pop()
        else:
            obj = self.factory()

        self.in_use.add(id(obj))
        return obj

    def release(self, obj):
        """Release object back to pool"""
        obj_id = id(obj)

        if obj_id in self.in_use:
            self.in_use.remove(obj_id)

            if len(self.pool) < self.max_size:
                # Reset object state if needed
                if hasattr(obj, 'reset'):
                    obj.reset()

                self.pool.append(obj)


# Memory-efficient data structures
class CircularBuffer:
    """Memory-efficient circular buffer for metrics data"""

    def __init__(self, max_size: int):
        self.max_size = max_size
        self.buffer = [None] * max_size
        self.head = 0
        self.size = 0

    def append(self, item):
        """Add item to buffer"""
        self.buffer[self.head] = item
        self.head = (self.head + 1) % self.max_size

        if self.size < self.max_size:
            self.size += 1

    def get_recent(self, count: int) -> List[Any]:
        """Get most recent items"""
        if count >= self.size:
            return self.to_list()

        start = (self.head - count) % self.max_size
        result = []

        for i in range(count):
            idx = (start + i) % self.max_size
            result.append(self.buffer[idx])

        return result

    def to_list(self) -> List[Any]:
        """Convert to list in chronological order"""
        if self.size == 0:
            return []

        if self.size < self.max_size:
            return self.buffer[:self.size]

        return self.buffer[self.head:] + self.buffer[:self.head]


# Usage example
memory_optimizer = MemoryOptimizer()

async def process_large_dataset(data_file_path: str):
    """Process large dataset with memory optimization"""

    def data_generator():
        with open(data_file_path, 'r') as f:
            for line in f:
                yield json.loads(line)

    async def process_batch(items):
        # Your processing logic here
        return [process_item(item) for item in items]

    results = []
    async for result in memory_optimizer.streaming_processor(
        data_generator(), process_batch, batch_size=500
    ):
        results.append(result)

        # Periodically save to disk to free memory
        if len(results) >= 10000:
            await save_results_to_disk(results)
            results.clear()

Performance Monitoring

Application Performance Monitoring

Comprehensive Performance Monitoring

import time
import psutil
import asyncio
from dataclasses import dataclass
from typing import Dict, List, Optional
from prometheus_client import Counter, Histogram, Gauge, Summary

@dataclass
class PerformanceMetrics:
    response_time: float
    memory_usage: float
    cpu_usage: float
    db_query_time: float
    cache_hit_ratio: float
    active_connections: int

class PerformanceMonitor:
    """Comprehensive performance monitoring system"""

    def __init__(self):
        # Prometheus metrics
        self.request_duration = Histogram(
            'http_request_duration_seconds',
            'HTTP request duration',
            ['method', 'endpoint', 'status']
        )

        self.db_query_duration = Histogram(
            'db_query_duration_seconds',
            'Database query duration',
            ['query_type']
        )

        self.cache_operations = Counter(
            'cache_operations_total',
            'Cache operations',
            ['operation', 'result']
        )

        self.memory_usage = Gauge(
            'memory_usage_bytes',
            'Memory usage in bytes'
        )

        self.cpu_usage = Gauge(
            'cpu_usage_percent',
            'CPU usage percentage'
        )

        self.active_connections = Gauge(
            'active_connections',
            'Active database connections'
        )

    async def track_request(self, method: str, endpoint: str):
        """Context manager for request tracking"""
        start_time = time.perf_counter()

        try:
            yield
            status = 'success'
        except Exception:
            status = 'error'
            raise
        finally:
            duration = time.perf_counter() - start_time
            self.request_duration.labels(
                method=method,
                endpoint=endpoint,
                status=status
            ).observe(duration)

    async def track_db_query(self, query_type: str):
        """Context manager for database query tracking"""
        start_time = time.perf_counter()

        try:
            yield
        finally:
            duration = time.perf_counter() - start_time
            self.db_query_duration.labels(query_type=query_type).observe(duration)

    def track_cache_operation(self, operation: str, hit: bool):
        """Track cache operations"""
        result = 'hit' if hit else 'miss'
        self.cache_operations.labels(operation=operation, result=result).inc()

    async def collect_system_metrics(self):
        """Collect system-level metrics"""

        # Memory usage
        memory = psutil.virtual_memory()
        self.memory_usage.set(memory.used)

        # CPU usage
        cpu_percent = psutil.cpu_percent(interval=1)
        self.cpu_usage.set(cpu_percent)

        # Database connections
        # This would be specific to your connection pool
        # self.active_connections.set(db_pool.size)

    async def get_performance_summary(self) -> PerformanceMetrics:
        """Get current performance summary"""

        memory = psutil.virtual_memory()
        cpu_percent = psutil.cpu_percent(interval=0.1)

        # Calculate averages from metrics
        avg_response_time = self._get_metric_average('http_request_duration_seconds')
        avg_db_time = self._get_metric_average('db_query_duration_seconds')
        cache_hit_ratio = self._calculate_cache_hit_ratio()

        return PerformanceMetrics(
            response_time=avg_response_time,
            memory_usage=memory.percent,
            cpu_usage=cpu_percent,
            db_query_time=avg_db_time,
            cache_hit_ratio=cache_hit_ratio,
            active_connections=0  # Get from connection pool
        )

    def _get_metric_average(self, metric_name: str) -> float:
        """Get average value from histogram metric"""
        # This would integrate with your metrics collection system
        return 0.0

    def _calculate_cache_hit_ratio(self) -> float:
        """Calculate cache hit ratio"""
        # Calculate from cache operation counters
        return 0.0


class PerformanceOptimizationEngine:
    """Automated performance optimization"""

    def __init__(self, monitor: PerformanceMonitor):
        self.monitor = monitor
        self.optimization_rules = {
            'high_memory_usage': self._optimize_memory,
            'slow_db_queries': self._optimize_database,
            'low_cache_hit_ratio': self._optimize_cache,
            'high_response_time': self._optimize_response_time
        }

    async def analyze_and_optimize(self):
        """Analyze performance and apply optimizations"""

        metrics = await self.monitor.get_performance_summary()

        # Check for performance issues
        issues = self._identify_issues(metrics)

        # Apply optimizations
        for issue in issues:
            if issue in self.optimization_rules:
                await self.optimization_rules[issue](metrics)

    def _identify_issues(self, metrics: PerformanceMetrics) -> List[str]:
        """Identify performance issues"""
        issues = []

        if metrics.memory_usage > 80:
            issues.append('high_memory_usage')

        if metrics.db_query_time > 1.0:
            issues.append('slow_db_queries')

        if metrics.cache_hit_ratio < 0.8:
            issues.append('low_cache_hit_ratio')

        if metrics.response_time > 2.0:
            issues.append('high_response_time')

        return issues

    async def _optimize_memory(self, metrics: PerformanceMetrics):
        """Optimize memory usage"""
        logger.info("Applying memory optimizations")

        # Force garbage collection
        gc.collect()

        # Clear caches if memory pressure is high
        if metrics.memory_usage > 90:
            await cache_manager.clear_expired()

    async def _optimize_database(self, metrics: PerformanceMetrics):
        """Optimize database performance"""
        logger.info("Applying database optimizations")

        # Could implement query plan analysis, index suggestions, etc.
        pass

    async def _optimize_cache(self, metrics: PerformanceMetrics):
        """Optimize cache performance"""
        logger.info("Applying cache optimizations")

        # Adjust cache TTLs, warm popular data, etc.
        pass

    async def _optimize_response_time(self, metrics: PerformanceMetrics):
        """Optimize response times"""
        logger.info("Applying response time optimizations")

        # Could implement circuit breakers, request throttling, etc.
        pass


# FastAPI middleware integration
class PerformanceMiddleware:
    def __init__(self, app, monitor: PerformanceMonitor):
        self.app = app
        self.monitor = monitor

    async def __call__(self, scope, receive, send):
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        request = Request(scope, receive)
        method = request.method
        path = request.url.path

        async with self.monitor.track_request(method, path):
            await self.app(scope, receive, send)

Performance Alerting

Automated Performance Alerts

class PerformanceAlerting:
    """Performance alerting system"""

    def __init__(self, alert_config: Dict):
        self.thresholds = alert_config['thresholds']
        self.alert_channels = alert_config['channels']
        self.alert_history = {}

    async def check_performance_thresholds(self, metrics: PerformanceMetrics):
        """Check metrics against thresholds and alert if needed"""

        alerts = []

        # Response time threshold
        if metrics.response_time > self.thresholds['response_time']:
            alerts.append({
                'type': 'response_time',
                'severity': 'warning',
                'message': f'High response time: {metrics.response_time:.2f}s',
                'value': metrics.response_time,
                'threshold': self.thresholds['response_time']
            })

        # Memory usage threshold
        if metrics.memory_usage > self.thresholds['memory_usage']:
            alerts.append({
                'type': 'memory_usage',
                'severity': 'critical' if metrics.memory_usage > 90 else 'warning',
                'message': f'High memory usage: {metrics.memory_usage:.1f}%',
                'value': metrics.memory_usage,
                'threshold': self.thresholds['memory_usage']
            })

        # Database query time threshold
        if metrics.db_query_time > self.thresholds['db_query_time']:
            alerts.append({
                'type': 'db_performance',
                'severity': 'warning',
                'message': f'Slow database queries: {metrics.db_query_time:.2f}s',
                'value': metrics.db_query_time,
                'threshold': self.thresholds['db_query_time']
            })

        # Send alerts
        for alert in alerts:
            await self._send_alert(alert)

    async def _send_alert(self, alert: Dict):
        """Send performance alert"""

        alert_key = f"{alert['type']}_{alert['severity']}"

        # Check if we've already alerted recently (avoid spam)
        last_alert = self.alert_history.get(alert_key)
        if last_alert and (time.time() - last_alert) < 3600:  # 1 hour cooldown
            return

        # Send to configured channels
        for channel in self.alert_channels:
            await self._send_to_channel(channel, alert)

        # Record alert
        self.alert_history[alert_key] = time.time()

    async def _send_to_channel(self, channel: str, alert: Dict):
        """Send alert to specific channel"""

        if channel == 'slack':
            await self._send_slack_alert(alert)
        elif channel == 'email':
            await self._send_email_alert(alert)
        elif channel == 'webhook':
            await self._send_webhook_alert(alert)

# Alert configuration
ALERT_CONFIG = {
    'thresholds': {
        'response_time': 2.0,  # seconds
        'memory_usage': 80,    # percentage
        'cpu_usage': 80,       # percentage
        'db_query_time': 1.0,  # seconds
        'cache_hit_ratio': 0.7 # ratio
    },
    'channels': ['slack', 'email']
}

Performance Best Practices

Development Guidelines

  • Profiling First: Always profile before optimizing
  • Caching Strategy: Implement multi-layer caching strategically
  • Database Optimization: Optimize queries and use appropriate indexes
  • Async Processing: Use asynchronous patterns for I/O operations

Operational Guidelines

  • Monitor Continuously: Implement comprehensive monitoring
  • Load Testing: Regular load testing and capacity planning
  • Resource Management: Proper resource allocation and cleanup
  • Performance Budgets: Set and maintain performance budgets

Next Steps

To learn more about related system optimization topics:

  1. Queue Management: Background task processing optimization
  2. Scaling Strategies: System scaling approaches
  3. Load Balancing: Traffic distribution and balancing
  4. Retry Logic: Resilience and fault tolerance