Performance Optimization
Caching strategies, query optimization, resource management, and performance tuning techniques for SysManage systems.
Performance Optimization Overview
SysManage implements comprehensive performance optimization strategies across all system layers to ensure fast response times, efficient resource utilization, and scalable operations.
Core Principles
- Measure First: Performance optimization based on metrics and profiling
- Layered Optimization: Optimization at database, application, and infrastructure layers
- Proactive Monitoring: Continuous performance monitoring and alerting
- Graceful Degradation: Maintain functionality under high load
Caching Architecture
Multi-Layer Caching Strategy
┌─────────────────────────────────────────────────────────────────┐
│ Client Layer │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Browser │ │ Mobile App │ │ API Clients │ │
│ │ Cache │ │ Cache │ │ Cache │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
┌─────────────────────────┼───────────────────────────────────────┐
│ CDN / Edge Cache │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ CloudFlare │ │ AWS │ │ Static Assets │ │
│ │ Cache │ │ CloudFront │ │ Cache │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────┼───────────────────────────────────────┘
│
┌─────────────────────────┼───────────────────────────────────────┐
│ Application Layer Cache │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ HTTP │ │ API Response │ │ Session │ │
│ │ Cache │ │ Cache │ │ Cache │ │
│ │ (Nginx) │ │ (Redis) │ │ (Redis) │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────┼───────────────────────────────────────┘
│
┌─────────────────────────┼───────────────────────────────────────┐
│ Data Layer Cache │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Query │ │ Object │ │ Computed │ │
│ │ Cache │ │ Cache │ │ Results Cache │ │
│ │ (Redis) │ │ (Redis) │ │ (Redis) │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────┼───────────────────────────────────────┘
│
┌─────────────────────────┼───────────────────────────────────────┐
│ Database Layer Cache │
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Buffer │ │ Query Plan │ │ Connection │ │
│ │ Pool │ │ Cache │ │ Pool │ │
│ │ (PgBouncer) │ │ (PostgreSQL) │ │ (PostgreSQL) │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Redis Caching Implementation
Intelligent Cache Manager
import redis
import json
import hashlib
from typing import Any, Optional, Dict, List, Callable
from datetime import datetime, timedelta
import asyncio
class CacheManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.default_ttl = 3600 # 1 hour
self.key_prefix = "sysmanage:"
def _generate_key(self, namespace: str, identifier: str,
params: Optional[Dict] = None) -> str:
"""Generate cache key with optional parameters"""
key_parts = [self.key_prefix, namespace, identifier]
if params:
# Sort parameters for consistent key generation
param_string = json.dumps(params, sort_keys=True)
param_hash = hashlib.md5(param_string.encode()).hexdigest()[:8]
key_parts.append(param_hash)
return ":".join(key_parts)
async def get(self, namespace: str, identifier: str,
params: Optional[Dict] = None) -> Optional[Any]:
"""Get cached value"""
key = self._generate_key(namespace, identifier, params)
try:
value = await self.redis.get(key)
if value:
return json.loads(value)
except Exception as e:
logger.warning(f"Cache get failed for {key}: {e}")
return None
async def set(self, namespace: str, identifier: str, value: Any,
ttl: Optional[int] = None, params: Optional[Dict] = None):
"""Set cached value with TTL"""
key = self._generate_key(namespace, identifier, params)
ttl = ttl or self.default_ttl
try:
serialized_value = json.dumps(value, default=str)
await self.redis.setex(key, ttl, serialized_value)
except Exception as e:
logger.warning(f"Cache set failed for {key}: {e}")
async def delete(self, namespace: str, identifier: str,
params: Optional[Dict] = None):
"""Delete cached value"""
key = self._generate_key(namespace, identifier, params)
await self.redis.delete(key)
async def invalidate_pattern(self, pattern: str):
"""Invalidate all keys matching pattern"""
keys = await self.redis.keys(f"{self.key_prefix}{pattern}")
if keys:
await self.redis.delete(*keys)
async def get_or_set(self, namespace: str, identifier: str,
fetch_func: Callable, ttl: Optional[int] = None,
params: Optional[Dict] = None) -> Any:
"""Get from cache or execute function and cache result"""
# Try to get from cache first
cached_value = await self.get(namespace, identifier, params)
if cached_value is not None:
return cached_value
# Cache miss - execute function
try:
value = await fetch_func() if asyncio.iscoroutinefunction(fetch_func) else fetch_func()
# Cache the result
await self.set(namespace, identifier, value, ttl, params)
return value
except Exception as e:
logger.error(f"Failed to fetch and cache {namespace}:{identifier}: {e}")
raise
class SmartCacheDecorator:
"""Decorator for automatic caching of function results"""
def __init__(self, cache_manager: CacheManager, namespace: str,
ttl: int = 3600, key_func: Optional[Callable] = None):
self.cache_manager = cache_manager
self.namespace = namespace
self.ttl = ttl
self.key_func = key_func or self._default_key_func
def _default_key_func(self, func_name: str, *args, **kwargs) -> str:
"""Default key generation function"""
key_parts = [func_name]
# Add positional args
for arg in args:
key_parts.append(str(arg))
# Add keyword args
for k, v in sorted(kwargs.items()):
key_parts.append(f"{k}={v}")
return ":".join(key_parts)
def __call__(self, func):
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = self.key_func(func.__name__, *args, **kwargs)
# Try cache first
result = await self.cache_manager.get(self.namespace, cache_key)
if result is not None:
return result
# Execute function
result = await func(*args, **kwargs)
# Cache result
await self.cache_manager.set(self.namespace, cache_key, result, self.ttl)
return result
return wrapper
# Usage examples
cache_manager = CacheManager(redis_client)
@SmartCacheDecorator(cache_manager, "agents", ttl=1800)
async def get_agent_list(user_id: int, filters: Dict = None):
"""Get agent list with caching"""
query = "SELECT * FROM agents WHERE user_id = $1"
if filters:
# Apply filters...
pass
return await db.fetch_all(query, user_id)
@SmartCacheDecorator(cache_manager, "metrics", ttl=300)
async def get_agent_metrics(agent_id: str, time_range: str):
"""Get agent metrics with 5-minute cache"""
# Expensive computation...
return await compute_metrics(agent_id, time_range)
HTTP Caching Strategy
FastAPI Response Caching
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import time
class HTTPCacheMiddleware:
def __init__(self, app, cache_manager: CacheManager):
self.app = app
self.cache_manager = cache_manager
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
# Check if request is cacheable
if not self._is_cacheable(request):
await self.app(scope, receive, send)
return
# Generate cache key
cache_key = self._generate_cache_key(request)
# Try cache
cached_response = await self.cache_manager.get("http_responses", cache_key)
if cached_response:
response = JSONResponse(
content=cached_response["content"],
status_code=cached_response["status_code"],
headers=cached_response["headers"]
)
response.headers["X-Cache"] = "HIT"
await response(scope, receive, send)
return
# Cache miss - execute request
response_data = {}
async def send_wrapper(message):
if message["type"] == "http.response.start":
response_data["status_code"] = message["status"]
response_data["headers"] = dict(message.get("headers", []))
elif message["type"] == "http.response.body":
response_data["body"] = message.get("body", b"")
await send(message)
await self.app(scope, receive, send_wrapper)
# Cache successful responses
if response_data.get("status_code", 500) < 400:
await self._cache_response(cache_key, response_data)
def _is_cacheable(self, request: Request) -> bool:
"""Determine if request is cacheable"""
# Only cache GET requests
if request.method != "GET":
return False
# Don't cache authenticated requests (unless explicitly allowed)
if request.headers.get("authorization"):
return False
# Check for cache control headers
cache_control = request.headers.get("cache-control", "")
if "no-cache" in cache_control:
return False
return True
def _generate_cache_key(self, request: Request) -> str:
"""Generate cache key for request"""
key_parts = [
request.method,
str(request.url.path),
str(request.url.query)
]
# Include relevant headers
relevant_headers = ["accept", "accept-encoding", "accept-language"]
for header in relevant_headers:
if header in request.headers:
key_parts.append(f"{header}:{request.headers[header]}")
return hashlib.md5(":".join(key_parts).encode()).hexdigest()
# Cache configuration for different endpoints
CACHE_RULES = {
"/api/agents": {"ttl": 300, "vary": ["user-id"]},
"/api/packages": {"ttl": 1800, "vary": ["agent-id"]},
"/api/metrics": {"ttl": 60, "vary": ["agent-id", "time-range"]},
"/api/health": {"ttl": 30, "public": True},
}
def add_cache_headers(response: Response, path: str):
"""Add appropriate cache headers"""
if path in CACHE_RULES:
rule = CACHE_RULES[path]
if rule.get("public"):
response.headers["Cache-Control"] = f"public, max-age={rule['ttl']}"
else:
response.headers["Cache-Control"] = f"private, max-age={rule['ttl']}"
# Add ETag for validation
response.headers["ETag"] = f'"{int(time.time())}"'
Database Performance Optimization
Query Optimization
Optimized Database Queries
class OptimizedQueries:
"""Collection of optimized database queries"""
@staticmethod
async def get_agent_summary(db: Database, user_id: int,
limit: int = 50, offset: int = 0):
"""Optimized agent summary with pagination"""
# Use index on (user_id, last_seen) for efficient filtering and sorting
query = """
SELECT
a.id,
a.hostname,
a.ip_address,
a.os_type,
a.last_seen,
a.status,
COUNT(p.id) as package_count,
MAX(p.updated_at) as last_package_update
FROM agents a
LEFT JOIN packages p ON a.id = p.agent_id
WHERE a.user_id = $1
GROUP BY a.id, a.hostname, a.ip_address, a.os_type, a.last_seen, a.status
ORDER BY a.last_seen DESC
LIMIT $2 OFFSET $3
"""
return await db.fetch_all(query, user_id, limit, offset)
@staticmethod
async def get_package_updates_bulk(db: Database, agent_ids: List[str]):
"""Bulk fetch package updates for multiple agents"""
# Use ANY() for efficient IN clause with large lists
query = """
SELECT
p.agent_id,
p.name,
p.current_version,
p.available_version,
p.update_available,
p.last_checked
FROM packages p
WHERE p.agent_id = ANY($1)
AND p.update_available = true
ORDER BY p.agent_id, p.priority DESC, p.name
"""
return await db.fetch_all(query, agent_ids)
@staticmethod
async def get_metrics_aggregated(db: Database, agent_id: str,
metric_type: str, start_time: datetime,
end_time: datetime, interval: str = '1h'):
"""Get aggregated metrics with time bucketing"""
# Use time bucketing for efficient aggregation
query = """
SELECT
date_trunc($5, collected_at) as time_bucket,
AVG((metric_value->>'value')::numeric) as avg_value,
MIN((metric_value->>'value')::numeric) as min_value,
MAX((metric_value->>'value')::numeric) as max_value,
COUNT(*) as sample_count
FROM agent_metrics
WHERE agent_id = $1
AND metric_type = $2
AND collected_at BETWEEN $3 AND $4
GROUP BY time_bucket
ORDER BY time_bucket
"""
return await db.fetch_all(query, agent_id, metric_type,
start_time, end_time, interval)
class QueryProfiler:
"""Database query profiler for performance monitoring"""
def __init__(self, db: Database):
self.db = db
self.slow_query_threshold = 1000 # 1 second
async def profile_query(self, query: str, params: tuple = ()):
"""Profile query execution"""
start_time = time.perf_counter()
# Enable query profiling
await self.db.execute("SET track_io_timing = on")
try:
# Execute query
result = await self.db.fetch_all(query, *params)
# Get execution plan
explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}"
plan = await self.db.fetch_one(explain_query, *params)
execution_time = (time.perf_counter() - start_time) * 1000
profile_data = {
"query": query,
"execution_time_ms": execution_time,
"row_count": len(result),
"execution_plan": plan[0] if plan else None
}
# Log slow queries
if execution_time > self.slow_query_threshold:
await self._log_slow_query(profile_data)
return result, profile_data
finally:
await self.db.execute("SET track_io_timing = off")
async def _log_slow_query(self, profile_data: Dict):
"""Log slow query for analysis"""
logger.warning(f"Slow query detected: {profile_data['execution_time_ms']:.2f}ms")
logger.warning(f"Query: {profile_data['query'][:200]}...")
# Store in slow query log table
await self.db.execute("""
INSERT INTO slow_query_log
(query_hash, query_text, execution_time_ms, execution_plan, created_at)
VALUES ($1, $2, $3, $4, NOW())
""",
hashlib.md5(profile_data['query'].encode()).hexdigest(),
profile_data['query'],
profile_data['execution_time_ms'],
json.dumps(profile_data['execution_plan'])
)
Index Optimization
Strategic Index Management
-- Performance-critical indexes for SysManage
-- Agents table indexes
CREATE INDEX CONCURRENTLY idx_agents_user_id_last_seen
ON agents(user_id, last_seen DESC);
CREATE INDEX CONCURRENTLY idx_agents_status_last_seen
ON agents(status, last_seen DESC)
WHERE status IN ('active', 'inactive');
-- Packages table indexes
CREATE INDEX CONCURRENTLY idx_packages_agent_id_update_available
ON packages(agent_id, update_available)
WHERE update_available = true;
CREATE INDEX CONCURRENTLY idx_packages_name_version
ON packages(name, current_version);
-- Agent metrics indexes (for time-series data)
CREATE INDEX CONCURRENTLY idx_agent_metrics_agent_time
ON agent_metrics(agent_id, collected_at DESC);
CREATE INDEX CONCURRENTLY idx_agent_metrics_type_time
ON agent_metrics(metric_type, collected_at DESC);
-- Partial index for recent metrics only
CREATE INDEX CONCURRENTLY idx_agent_metrics_recent
ON agent_metrics(agent_id, metric_type, collected_at)
WHERE collected_at > NOW() - INTERVAL '7 days';
-- BRIN index for time-series data (space-efficient)
CREATE INDEX CONCURRENTLY idx_agent_metrics_collected_at_brin
ON agent_metrics USING BRIN(collected_at);
-- GIN index for JSONB metric_value searching
CREATE INDEX CONCURRENTLY idx_agent_metrics_value_gin
ON agent_metrics USING GIN(metric_value);
-- Tasks table indexes
CREATE INDEX CONCURRENTLY idx_tasks_agent_id_status_created
ON tasks(agent_id, status, created_at DESC);
CREATE INDEX CONCURRENTLY idx_tasks_user_id_status
ON tasks(user_id, status)
WHERE status IN ('pending', 'running');
-- Composite index for complex queries
CREATE INDEX CONCURRENTLY idx_agents_composite
ON agents(user_id, status, os_type, last_seen DESC);
Connection Pooling
Optimized Database Connections
import asyncpg
import asyncio
from typing import Optional
class DatabasePool:
"""Optimized database connection pool"""
def __init__(self, database_url: str, **pool_kwargs):
self.database_url = database_url
self.pool_kwargs = {
'min_size': 5,
'max_size': 20,
'max_queries': 50000,
'max_inactive_connection_lifetime': 300,
'command_timeout': 60,
**pool_kwargs
}
self.pool: Optional[asyncpg.Pool] = None
async def create_pool(self):
"""Create connection pool with optimization"""
self.pool = await asyncpg.create_pool(
self.database_url,
**self.pool_kwargs,
init=self._init_connection
)
async def _init_connection(self, connection):
"""Initialize new connection with optimizations"""
# Set connection-level optimizations
await connection.execute("SET default_transaction_isolation = 'read committed'")
await connection.execute("SET statement_timeout = '30s'")
await connection.execute("SET lock_timeout = '10s'")
await connection.execute("SET idle_in_transaction_session_timeout = '5min'")
# Enable JIT for complex queries
await connection.execute("SET jit = on")
await connection.execute("SET jit_above_cost = 100000")
async def execute(self, query: str, *args):
"""Execute query with connection from pool"""
async with self.pool.acquire() as connection:
return await connection.execute(query, *args)
async def fetch_all(self, query: str, *args):
"""Fetch all results with connection from pool"""
async with self.pool.acquire() as connection:
return await connection.fetch(query, *args)
async def fetch_one(self, query: str, *args):
"""Fetch one result with connection from pool"""
async with self.pool.acquire() as connection:
return await connection.fetchrow(query, *args)
async def transaction(self):
"""Get transaction context"""
return self.pool.acquire()
async def close(self):
"""Close connection pool"""
if self.pool:
await self.pool.close()
class ReadWriteConnectionManager:
"""Separate read/write connection pools for better performance"""
def __init__(self, write_url: str, read_urls: List[str]):
self.write_pool = DatabasePool(write_url, min_size=3, max_size=10)
self.read_pools = [
DatabasePool(url, min_size=5, max_size=15)
for url in read_urls
]
self.read_pool_index = 0
async def initialize(self):
"""Initialize all connection pools"""
await self.write_pool.create_pool()
for pool in self.read_pools:
await pool.create_pool()
def get_read_pool(self) -> DatabasePool:
"""Get read pool using round-robin"""
pool = self.read_pools[self.read_pool_index]
self.read_pool_index = (self.read_pool_index + 1) % len(self.read_pools)
return pool
def get_write_pool(self) -> DatabasePool:
"""Get write pool"""
return self.write_pool
async def execute_read(self, query: str, *args):
"""Execute read query"""
return await self.get_read_pool().fetch_all(query, *args)
async def execute_write(self, query: str, *args):
"""Execute write query"""
return await self.write_pool.execute(query, *args)
Application Performance Optimization
Asynchronous Processing
Optimized Async Patterns
import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Any, Callable
import time
class AsyncBatchProcessor:
"""Efficient batch processing with concurrency control"""
def __init__(self, max_concurrency: int = 10, batch_size: int = 100):
self.max_concurrency = max_concurrency
self.batch_size = batch_size
self.semaphore = asyncio.Semaphore(max_concurrency)
async def process_items(self, items: List[Any],
processor_func: Callable) -> List[Any]:
"""Process items in batches with controlled concurrency"""
# Split items into batches
batches = [
items[i:i + self.batch_size]
for i in range(0, len(items), self.batch_size)
]
# Process batches concurrently
tasks = [
self._process_batch(batch, processor_func)
for batch in batches
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Flatten results and handle exceptions
flattened_results = []
for result in results:
if isinstance(result, Exception):
logger.error(f"Batch processing failed: {result}")
continue
flattened_results.extend(result)
return flattened_results
async def _process_batch(self, batch: List[Any],
processor_func: Callable) -> List[Any]:
"""Process a single batch"""
async with self.semaphore:
tasks = [processor_func(item) for item in batch]
return await asyncio.gather(*tasks, return_exceptions=True)
class ResourcePool:
"""Generic resource pool for expensive operations"""
def __init__(self, create_resource: Callable, max_size: int = 10):
self.create_resource = create_resource
self.max_size = max_size
self.pool = asyncio.Queue(maxsize=max_size)
self.created_count = 0
async def acquire(self):
"""Acquire resource from pool"""
try:
# Try to get existing resource
resource = self.pool.get_nowait()
return resource
except asyncio.QueueEmpty:
# Create new resource if under limit
if self.created_count < self.max_size:
resource = await self.create_resource()
self.created_count += 1
return resource
else:
# Wait for available resource
return await self.pool.get()
async def release(self, resource):
"""Release resource back to pool"""
try:
self.pool.put_nowait(resource)
except asyncio.QueueFull:
# Pool is full, discard resource
pass
class PerformanceOptimizer:
"""Application-level performance optimizations"""
def __init__(self):
self.thread_pool = ThreadPoolExecutor(max_workers=4)
self.cpu_bound_pool = ResourcePool(
create_resource=lambda: ThreadPoolExecutor(max_workers=1),
max_size=4
)
async def run_cpu_bound_task(self, func: Callable, *args, **kwargs):
"""Run CPU-bound task in thread pool"""
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.thread_pool, func, *args, **kwargs
)
async def parallel_http_requests(self, urls: List[str],
max_concurrent: int = 10) -> List[Dict]:
"""Make parallel HTTP requests with controlled concurrency"""
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_url(session, url):
async with semaphore:
try:
async with session.get(url, timeout=10) as response:
return {
'url': url,
'status': response.status,
'data': await response.json()
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
return await asyncio.gather(*tasks)
async def cache_warming(self, cache_entries: List[Dict]):
"""Warm up cache with commonly accessed data"""
batch_processor = AsyncBatchProcessor(max_concurrency=20, batch_size=50)
async def warm_cache_entry(entry):
try:
key = entry['key']
value = await entry['fetch_func']()
await cache_manager.set(entry['namespace'], key, value, entry.get('ttl'))
return True
except Exception as e:
logger.warning(f"Cache warming failed for {entry['key']}: {e}")
return False
results = await batch_processor.process_items(cache_entries, warm_cache_entry)
successful = sum(1 for r in results if r is True)
logger.info(f"Cache warming completed: {successful}/{len(cache_entries)} entries")
# Usage examples
async def optimize_agent_data_loading(agent_ids: List[str]):
"""Optimized agent data loading"""
batch_processor = AsyncBatchProcessor(max_concurrency=15, batch_size=20)
async def load_agent_data(agent_id: str):
# Load data concurrently
agent_info, packages, metrics = await asyncio.gather(
get_agent_info(agent_id),
get_agent_packages(agent_id),
get_recent_metrics(agent_id),
return_exceptions=True
)
return {
'agent_id': agent_id,
'info': agent_info if not isinstance(agent_info, Exception) else None,
'packages': packages if not isinstance(packages, Exception) else [],
'metrics': metrics if not isinstance(metrics, Exception) else []
}
return await batch_processor.process_items(agent_ids, load_agent_data)
Memory Optimization
Memory-Efficient Data Processing
import gc
import psutil
from typing import Iterator, Generator, Any
import weakref
class MemoryOptimizer:
"""Memory optimization utilities"""
def __init__(self, memory_threshold: float = 0.8):
self.memory_threshold = memory_threshold
self.object_cache = weakref.WeakValueDictionary()
def get_memory_usage(self) -> float:
"""Get current memory usage percentage"""
return psutil.virtual_memory().percent / 100.0
def check_memory_pressure(self) -> bool:
"""Check if system is under memory pressure"""
return self.get_memory_usage() > self.memory_threshold
async def streaming_processor(self, data_source: Iterator[Any],
processor_func: Callable,
batch_size: int = 1000) -> AsyncGenerator[Any, None]:
"""Process large datasets in memory-efficient streaming manner"""
batch = []
for item in data_source:
batch.append(item)
if len(batch) >= batch_size:
# Process batch
results = await self._process_batch_efficiently(batch, processor_func)
for result in results:
yield result
# Clear batch and force garbage collection if under pressure
batch.clear()
if self.check_memory_pressure():
gc.collect()
await asyncio.sleep(0.1) # Allow other tasks to run
# Process remaining items
if batch:
results = await self._process_batch_efficiently(batch, processor_func)
for result in results:
yield result
async def _process_batch_efficiently(self, batch: List[Any],
processor_func: Callable) -> List[Any]:
"""Process batch with memory optimizations"""
try:
# Process in smaller chunks if memory pressure is high
if self.check_memory_pressure():
chunk_size = len(batch) // 4 # Process in quarters
results = []
for i in range(0, len(batch), chunk_size):
chunk = batch[i:i + chunk_size]
chunk_results = await processor_func(chunk)
results.extend(chunk_results)
# Force cleanup between chunks
del chunk
gc.collect()
return results
else:
return await processor_func(batch)
except MemoryError:
# Fallback to item-by-item processing
logger.warning("Memory error in batch processing, falling back to individual processing")
return [await processor_func([item]) for item in batch]
class ObjectPool:
"""Object pool for reusing expensive objects"""
def __init__(self, factory: Callable, max_size: int = 100):
self.factory = factory
self.max_size = max_size
self.pool = []
self.in_use = set()
def acquire(self):
"""Acquire object from pool"""
if self.pool:
obj = self.pool.pop()
else:
obj = self.factory()
self.in_use.add(id(obj))
return obj
def release(self, obj):
"""Release object back to pool"""
obj_id = id(obj)
if obj_id in self.in_use:
self.in_use.remove(obj_id)
if len(self.pool) < self.max_size:
# Reset object state if needed
if hasattr(obj, 'reset'):
obj.reset()
self.pool.append(obj)
# Memory-efficient data structures
class CircularBuffer:
"""Memory-efficient circular buffer for metrics data"""
def __init__(self, max_size: int):
self.max_size = max_size
self.buffer = [None] * max_size
self.head = 0
self.size = 0
def append(self, item):
"""Add item to buffer"""
self.buffer[self.head] = item
self.head = (self.head + 1) % self.max_size
if self.size < self.max_size:
self.size += 1
def get_recent(self, count: int) -> List[Any]:
"""Get most recent items"""
if count >= self.size:
return self.to_list()
start = (self.head - count) % self.max_size
result = []
for i in range(count):
idx = (start + i) % self.max_size
result.append(self.buffer[idx])
return result
def to_list(self) -> List[Any]:
"""Convert to list in chronological order"""
if self.size == 0:
return []
if self.size < self.max_size:
return self.buffer[:self.size]
return self.buffer[self.head:] + self.buffer[:self.head]
# Usage example
memory_optimizer = MemoryOptimizer()
async def process_large_dataset(data_file_path: str):
"""Process large dataset with memory optimization"""
def data_generator():
with open(data_file_path, 'r') as f:
for line in f:
yield json.loads(line)
async def process_batch(items):
# Your processing logic here
return [process_item(item) for item in items]
results = []
async for result in memory_optimizer.streaming_processor(
data_generator(), process_batch, batch_size=500
):
results.append(result)
# Periodically save to disk to free memory
if len(results) >= 10000:
await save_results_to_disk(results)
results.clear()
Performance Monitoring
Application Performance Monitoring
Comprehensive Performance Monitoring
import time
import psutil
import asyncio
from dataclasses import dataclass
from typing import Dict, List, Optional
from prometheus_client import Counter, Histogram, Gauge, Summary
@dataclass
class PerformanceMetrics:
response_time: float
memory_usage: float
cpu_usage: float
db_query_time: float
cache_hit_ratio: float
active_connections: int
class PerformanceMonitor:
"""Comprehensive performance monitoring system"""
def __init__(self):
# Prometheus metrics
self.request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint', 'status']
)
self.db_query_duration = Histogram(
'db_query_duration_seconds',
'Database query duration',
['query_type']
)
self.cache_operations = Counter(
'cache_operations_total',
'Cache operations',
['operation', 'result']
)
self.memory_usage = Gauge(
'memory_usage_bytes',
'Memory usage in bytes'
)
self.cpu_usage = Gauge(
'cpu_usage_percent',
'CPU usage percentage'
)
self.active_connections = Gauge(
'active_connections',
'Active database connections'
)
async def track_request(self, method: str, endpoint: str):
"""Context manager for request tracking"""
start_time = time.perf_counter()
try:
yield
status = 'success'
except Exception:
status = 'error'
raise
finally:
duration = time.perf_counter() - start_time
self.request_duration.labels(
method=method,
endpoint=endpoint,
status=status
).observe(duration)
async def track_db_query(self, query_type: str):
"""Context manager for database query tracking"""
start_time = time.perf_counter()
try:
yield
finally:
duration = time.perf_counter() - start_time
self.db_query_duration.labels(query_type=query_type).observe(duration)
def track_cache_operation(self, operation: str, hit: bool):
"""Track cache operations"""
result = 'hit' if hit else 'miss'
self.cache_operations.labels(operation=operation, result=result).inc()
async def collect_system_metrics(self):
"""Collect system-level metrics"""
# Memory usage
memory = psutil.virtual_memory()
self.memory_usage.set(memory.used)
# CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
# Database connections
# This would be specific to your connection pool
# self.active_connections.set(db_pool.size)
async def get_performance_summary(self) -> PerformanceMetrics:
"""Get current performance summary"""
memory = psutil.virtual_memory()
cpu_percent = psutil.cpu_percent(interval=0.1)
# Calculate averages from metrics
avg_response_time = self._get_metric_average('http_request_duration_seconds')
avg_db_time = self._get_metric_average('db_query_duration_seconds')
cache_hit_ratio = self._calculate_cache_hit_ratio()
return PerformanceMetrics(
response_time=avg_response_time,
memory_usage=memory.percent,
cpu_usage=cpu_percent,
db_query_time=avg_db_time,
cache_hit_ratio=cache_hit_ratio,
active_connections=0 # Get from connection pool
)
def _get_metric_average(self, metric_name: str) -> float:
"""Get average value from histogram metric"""
# This would integrate with your metrics collection system
return 0.0
def _calculate_cache_hit_ratio(self) -> float:
"""Calculate cache hit ratio"""
# Calculate from cache operation counters
return 0.0
class PerformanceOptimizationEngine:
"""Automated performance optimization"""
def __init__(self, monitor: PerformanceMonitor):
self.monitor = monitor
self.optimization_rules = {
'high_memory_usage': self._optimize_memory,
'slow_db_queries': self._optimize_database,
'low_cache_hit_ratio': self._optimize_cache,
'high_response_time': self._optimize_response_time
}
async def analyze_and_optimize(self):
"""Analyze performance and apply optimizations"""
metrics = await self.monitor.get_performance_summary()
# Check for performance issues
issues = self._identify_issues(metrics)
# Apply optimizations
for issue in issues:
if issue in self.optimization_rules:
await self.optimization_rules[issue](metrics)
def _identify_issues(self, metrics: PerformanceMetrics) -> List[str]:
"""Identify performance issues"""
issues = []
if metrics.memory_usage > 80:
issues.append('high_memory_usage')
if metrics.db_query_time > 1.0:
issues.append('slow_db_queries')
if metrics.cache_hit_ratio < 0.8:
issues.append('low_cache_hit_ratio')
if metrics.response_time > 2.0:
issues.append('high_response_time')
return issues
async def _optimize_memory(self, metrics: PerformanceMetrics):
"""Optimize memory usage"""
logger.info("Applying memory optimizations")
# Force garbage collection
gc.collect()
# Clear caches if memory pressure is high
if metrics.memory_usage > 90:
await cache_manager.clear_expired()
async def _optimize_database(self, metrics: PerformanceMetrics):
"""Optimize database performance"""
logger.info("Applying database optimizations")
# Could implement query plan analysis, index suggestions, etc.
pass
async def _optimize_cache(self, metrics: PerformanceMetrics):
"""Optimize cache performance"""
logger.info("Applying cache optimizations")
# Adjust cache TTLs, warm popular data, etc.
pass
async def _optimize_response_time(self, metrics: PerformanceMetrics):
"""Optimize response times"""
logger.info("Applying response time optimizations")
# Could implement circuit breakers, request throttling, etc.
pass
# FastAPI middleware integration
class PerformanceMiddleware:
def __init__(self, app, monitor: PerformanceMonitor):
self.app = app
self.monitor = monitor
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
request = Request(scope, receive)
method = request.method
path = request.url.path
async with self.monitor.track_request(method, path):
await self.app(scope, receive, send)
Performance Alerting
Automated Performance Alerts
class PerformanceAlerting:
"""Performance alerting system"""
def __init__(self, alert_config: Dict):
self.thresholds = alert_config['thresholds']
self.alert_channels = alert_config['channels']
self.alert_history = {}
async def check_performance_thresholds(self, metrics: PerformanceMetrics):
"""Check metrics against thresholds and alert if needed"""
alerts = []
# Response time threshold
if metrics.response_time > self.thresholds['response_time']:
alerts.append({
'type': 'response_time',
'severity': 'warning',
'message': f'High response time: {metrics.response_time:.2f}s',
'value': metrics.response_time,
'threshold': self.thresholds['response_time']
})
# Memory usage threshold
if metrics.memory_usage > self.thresholds['memory_usage']:
alerts.append({
'type': 'memory_usage',
'severity': 'critical' if metrics.memory_usage > 90 else 'warning',
'message': f'High memory usage: {metrics.memory_usage:.1f}%',
'value': metrics.memory_usage,
'threshold': self.thresholds['memory_usage']
})
# Database query time threshold
if metrics.db_query_time > self.thresholds['db_query_time']:
alerts.append({
'type': 'db_performance',
'severity': 'warning',
'message': f'Slow database queries: {metrics.db_query_time:.2f}s',
'value': metrics.db_query_time,
'threshold': self.thresholds['db_query_time']
})
# Send alerts
for alert in alerts:
await self._send_alert(alert)
async def _send_alert(self, alert: Dict):
"""Send performance alert"""
alert_key = f"{alert['type']}_{alert['severity']}"
# Check if we've already alerted recently (avoid spam)
last_alert = self.alert_history.get(alert_key)
if last_alert and (time.time() - last_alert) < 3600: # 1 hour cooldown
return
# Send to configured channels
for channel in self.alert_channels:
await self._send_to_channel(channel, alert)
# Record alert
self.alert_history[alert_key] = time.time()
async def _send_to_channel(self, channel: str, alert: Dict):
"""Send alert to specific channel"""
if channel == 'slack':
await self._send_slack_alert(alert)
elif channel == 'email':
await self._send_email_alert(alert)
elif channel == 'webhook':
await self._send_webhook_alert(alert)
# Alert configuration
ALERT_CONFIG = {
'thresholds': {
'response_time': 2.0, # seconds
'memory_usage': 80, # percentage
'cpu_usage': 80, # percentage
'db_query_time': 1.0, # seconds
'cache_hit_ratio': 0.7 # ratio
},
'channels': ['slack', 'email']
}
Performance Best Practices
Development Guidelines
- Profiling First: Always profile before optimizing
- Caching Strategy: Implement multi-layer caching strategically
- Database Optimization: Optimize queries and use appropriate indexes
- Async Processing: Use asynchronous patterns for I/O operations
Operational Guidelines
- Monitor Continuously: Implement comprehensive monitoring
- Load Testing: Regular load testing and capacity planning
- Resource Management: Proper resource allocation and cleanup
- Performance Budgets: Set and maintain performance budgets