Documentation > Architecture > Performance Metrics

Performance Metrics

Performance monitoring, metrics collection, benchmarking, and observability patterns in SysManage.

Metrics Architecture Overview

SysManage implements a comprehensive metrics collection and monitoring system that provides deep visibility into system performance, user experience, and operational health across all components.

Metrics Collection Architecture

┌─────────────────────────────────────────────────────────────────┐
│                     Metrics Sources                            │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐ │
│  │ Application │  │ Database    │  │ Infrastructure          │ │
│  │ Metrics     │  │ Metrics     │  │ Metrics                 │ │
│  │             │  │             │  │                         │ │
│  │ • API Times │  │ • Query     │  │ • CPU/Memory            │ │
│  │ • Task Exec │  │   Times     │  │ • Network I/O           │ │
│  │ • User Acts │  │ • Conn Pool │  │ • Disk I/O              │ │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                   Metrics Pipeline                             │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐ │
│  │ Collection  │  │ Processing  │  │ Storage                 │ │
│  │             │  │             │  │                         │ │
│  │ • Scrapers  │  │ • Aggreg.   │  │ • Time Series DB        │ │
│  │ • Agents    │  │ • Filtering │  │ • PostgreSQL            │ │
│  │ • Push      │  │ • Enrichment│  │ • Redis Cache           │ │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                               │
                               ▼
┌─────────────────────────────────────────────────────────────────┐
│                 Visualization & Alerting                       │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────────────────┐ │
│  │ Dashboards  │  │ Alerts      │  │ APIs                    │ │
│  │             │  │             │  │                         │ │
│  │ • Real-time │  │ • Thresholds│  │ • Metrics Query         │ │
│  │ • Historical│  │ • ML-based  │  │ • Export Formats        │ │
│  │ • Custom    │  │ • Escalation│  │ • Integration           │ │
│  └─────────────┘  └─────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                        

Key Design Principles

  • Low Overhead: Metrics collection adds <5% performance overhead
  • High Cardinality: Support for dimensional metrics with labels
  • Real-time Processing: Sub-second latency for critical metrics
  • Scalable Storage: Efficient time-series data storage and retrieval
  • Flexible Alerting: Rule-based and ML-powered anomaly detection

Metric Categories

Application Performance Metrics

API Performance

Request Metrics
  • http_requests_total: Total HTTP requests by method, endpoint, status
  • http_request_duration_seconds: Request latency histogram
  • http_request_size_bytes: Request payload size distribution
  • http_response_size_bytes: Response payload size distribution
# Example Prometheus metrics
http_requests_total{method="GET", endpoint="/api/agents", status="200"} 15234
http_requests_total{method="POST", endpoint="/api/tasks", status="201"} 542
http_request_duration_seconds_bucket{endpoint="/api/agents", le="0.1"} 12890
http_request_duration_seconds_bucket{endpoint="/api/agents", le="0.5"} 15123
http_request_duration_seconds_bucket{endpoint="/api/agents", le="1.0"} 15200
                                
Business Logic Metrics
  • task_execution_duration_seconds: Task execution time by type
  • task_success_rate: Task success percentage by type
  • agent_connection_duration_seconds: Agent session duration
  • package_operations_total: Package install/update/remove counts
Error Metrics
  • application_errors_total: Application errors by type and severity
  • task_failures_total: Failed tasks by type and reason
  • agent_communication_errors_total: Agent communication failures
  • authentication_failures_total: Failed login attempts

Implementation Example

# Python FastAPI metrics implementation
from prometheus_client import Counter, Histogram, Gauge
import time
from functools import wraps

# Define metrics
REQUEST_COUNT = Counter(
    'http_requests_total',
    'Total HTTP requests',
    ['method', 'endpoint', 'status_code']
)

REQUEST_DURATION = Histogram(
    'http_request_duration_seconds',
    'HTTP request latency',
    ['endpoint'],
    buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
)

ACTIVE_CONNECTIONS = Gauge(
    'websocket_connections_active',
    'Active WebSocket connections',
    ['room']
)

# Middleware for automatic metrics collection
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start_time = time.time()

    response = await call_next(request)

    # Record metrics
    endpoint = request.url.path
    method = request.method
    status_code = str(response.status_code)
    duration = time.time() - start_time

    REQUEST_COUNT.labels(
        method=method,
        endpoint=endpoint,
        status_code=status_code
    ).inc()

    REQUEST_DURATION.labels(endpoint=endpoint).observe(duration)

    return response

# Business metrics decorator
def track_task_execution(task_type: str):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start_time = time.time()
            try:
                result = await func(*args, **kwargs)
                TASK_SUCCESS_RATE.labels(task_type=task_type).inc()
                return result
            except Exception as e:
                TASK_FAILURES.labels(
                    task_type=task_type,
                    error_type=type(e).__name__
                ).inc()
                raise
            finally:
                duration = time.time() - start_time
                TASK_DURATION.labels(task_type=task_type).observe(duration)
        return wrapper
    return decorator
                        

Infrastructure Metrics

System Resource Metrics

CPU & Memory
  • cpu_usage_percent: CPU utilization by core and process
  • memory_usage_bytes: Memory consumption by process
  • memory_available_bytes: Available system memory
  • gc_collections_total: Garbage collection metrics (Python)
Network & I/O
  • network_bytes_sent/received: Network traffic by interface
  • disk_io_bytes_read/written: Disk I/O by device
  • disk_usage_percent: Disk space utilization
  • file_descriptors_open: Open file descriptor count
Process Metrics
  • process_cpu_seconds_total: Process CPU time
  • process_resident_memory_bytes: Process memory usage
  • process_start_time_seconds: Process start timestamp
  • python_info: Python version and implementation info

Agent Metrics Collection

# Agent-side metrics collection
class AgentMetricsCollector:
    def __init__(self):
        self.metrics_buffer = []
        self.collection_interval = 30  # seconds
        self.max_buffer_size = 1000

    async def collect_system_metrics(self):
        """Collect system-level metrics"""
        metrics = {
            'timestamp': datetime.utcnow().isoformat(),
            'cpu': {
                'usage_percent': psutil.cpu_percent(interval=1),
                'cores': psutil.cpu_count(),
                'load_avg': os.getloadavg() if hasattr(os, 'getloadavg') else None
            },
            'memory': {
                'total': psutil.virtual_memory().total,
                'available': psutil.virtual_memory().available,
                'used': psutil.virtual_memory().used,
                'percent': psutil.virtual_memory().percent
            },
            'disk': {
                dev.device: {
                    'total': psutil.disk_usage(dev.mountpoint).total,
                    'used': psutil.disk_usage(dev.mountpoint).used,
                    'free': psutil.disk_usage(dev.mountpoint).free
                }
                for dev in psutil.disk_partitions()
            },
            'network': {
                interface: {
                    'bytes_sent': stats.bytes_sent,
                    'bytes_recv': stats.bytes_recv,
                    'packets_sent': stats.packets_sent,
                    'packets_recv': stats.packets_recv
                }
                for interface, stats in psutil.net_io_counters(pernic=True).items()
            }
        }
        return metrics

    async def collect_application_metrics(self):
        """Collect application-specific metrics"""
        return {
            'agent': {
                'version': self.agent_version,
                'uptime_seconds': time.time() - self.start_time,
                'tasks_completed': self.tasks_completed,
                'last_heartbeat': self.last_heartbeat.isoformat()
            },
            'services': await self.get_service_metrics(),
            'packages': await self.get_package_metrics()
        }

    async def send_metrics(self):
        """Send collected metrics to server"""
        if self.metrics_buffer:
            try:
                await self.api_client.post('/api/metrics', {
                    'agent_id': self.agent_id,
                    'metrics': self.metrics_buffer
                })
                self.metrics_buffer.clear()
            except Exception as e:
                logger.error(f"Failed to send metrics: {e}")
                # Implement exponential backoff retry
                await self.schedule_retry()
                        

Database Performance Metrics

Connection Pool Metrics

  • db_connections_active: Active database connections
  • db_connections_idle: Idle connections in pool
  • db_connection_wait_time_seconds: Time waiting for connection
  • db_pool_exhausted_total: Pool exhaustion events

Query Performance Metrics

  • db_query_duration_seconds: Query execution time by type
  • db_queries_total: Query count by operation (SELECT, INSERT, etc.)
  • db_slow_queries_total: Queries exceeding threshold
  • db_deadlocks_total: Database deadlock occurrences

Database Monitoring Implementation

# SQLAlchemy metrics integration
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time

# Query timing metrics
QUERY_DURATION = Histogram(
    'db_query_duration_seconds',
    'Database query execution time',
    ['operation', 'table']
)

QUERY_COUNT = Counter(
    'db_queries_total',
    'Total database queries',
    ['operation', 'table', 'status']
)

@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    context._query_start_time = time.time()

@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    duration = time.time() - context._query_start_time

    # Parse SQL to extract operation and table
    operation = statement.strip().split()[0].upper()
    table = extract_table_name(statement)

    QUERY_DURATION.labels(operation=operation, table=table).observe(duration)
    QUERY_COUNT.labels(operation=operation, table=table, status='success').inc()

# Connection pool monitoring
class MonitoredConnectionPool:
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.active_connections = Gauge(
            'db_connections_active',
            'Active database connections'
        )
        self.pool_size = Gauge(
            'db_pool_size',
            'Database connection pool size'
        )

    def _do_get(self):
        start_time = time.time()
        try:
            conn = super()._do_get()
            self.active_connections.inc()
            return conn
        finally:
            wait_time = time.time() - start_time
            if wait_time > 0.1:  # Log slow connection acquisition
                DB_CONNECTION_WAIT_TIME.observe(wait_time)

    def _do_return_conn(self, conn):
        super()._do_return_conn(conn)
        self.active_connections.dec()
                        

User Experience Metrics

Frontend Performance

  • page_load_time_seconds: Page load duration by route
  • api_call_duration_seconds: Client-side API call timing
  • websocket_latency_seconds: Real-time update latency
  • ui_interactions_total: User interaction counts by type

Client-side Metrics Collection

// JavaScript client-side metrics
class ClientMetrics {
    constructor() {
        this.metrics = [];
        this.batchSize = 50;
        this.flushInterval = 30000; // 30 seconds
        this.startFlushTimer();
    }

    // Performance timing
    recordPageLoad(route) {
        const navigation = performance.getEntriesByType('navigation')[0];
        this.addMetric({
            name: 'page_load_time_seconds',
            value: navigation.loadEventEnd / 1000,
            labels: { route: route },
            timestamp: Date.now()
        });
    }

    // API call timing
    recordApiCall(endpoint, method, duration, status) {
        this.addMetric({
            name: 'api_call_duration_seconds',
            value: duration / 1000,
            labels: {
                endpoint: endpoint,
                method: method,
                status: status.toString()
            },
            timestamp: Date.now()
        });
    }

    // WebSocket latency
    recordWebSocketLatency(messageType, latency) {
        this.addMetric({
            name: 'websocket_latency_seconds',
            value: latency / 1000,
            labels: { message_type: messageType },
            timestamp: Date.now()
        });
    }

    // User interactions
    recordInteraction(action, component) {
        this.addMetric({
            name: 'ui_interactions_total',
            value: 1,
            labels: {
                action: action,
                component: component
            },
            timestamp: Date.now()
        });
    }

    addMetric(metric) {
        this.metrics.push(metric);
        if (this.metrics.length >= this.batchSize) {
            this.flush();
        }
    }

    async flush() {
        if (this.metrics.length === 0) return;

        const batch = this.metrics.splice(0);
        try {
            await fetch('/api/metrics/client', {
                method: 'POST',
                headers: { 'Content-Type': 'application/json' },
                body: JSON.stringify({ metrics: batch })
            });
        } catch (error) {
            console.error('Failed to send client metrics:', error);
            // Could implement retry logic here
        }
    }

    startFlushTimer() {
        setInterval(() => this.flush(), this.flushInterval);
    }
}

// Integration with React components
const useMetrics = () => {
    const metrics = useRef(new ClientMetrics());

    const recordPageView = useCallback((route) => {
        metrics.current.recordPageLoad(route);
    }, []);

    const recordApiCall = useCallback((endpoint, method, duration, status) => {
        metrics.current.recordApiCall(endpoint, method, duration, status);
    }, []);

    return { recordPageView, recordApiCall };
};
                        

Metrics Storage & Retention

Time-Series Storage Strategy

Multi-Resolution Storage

High Resolution (Raw Data)
  • Retention: 24 hours
  • Resolution: 1-second intervals
  • Use Case: Real-time monitoring, debugging
  • Storage: Redis + PostgreSQL
Medium Resolution
  • Retention: 7 days
  • Resolution: 10-second intervals
  • Use Case: Recent trend analysis
  • Storage: PostgreSQL time partitions
Low Resolution (Aggregated)
  • Retention: 90 days
  • Resolution: 1-minute intervals
  • Use Case: Historical analysis, reporting
  • Storage: PostgreSQL compressed tables
Archive Resolution
  • Retention: 2 years
  • Resolution: 1-hour intervals
  • Use Case: Long-term trends, compliance
  • Storage: Compressed PostgreSQL / Object storage

Aggregation Pipeline

Raw Metrics           Aggregation Levels            Storage Tiers
     │                       │                          │
     ▼                       ▼                          ▼
┌─────────────┐      ┌─────────────────┐      ┌─────────────────┐
│   1-second  │──────▶│   10-second     │──────▶│    Redis        │
│   samples   │      │   aggregation   │      │    (24h TTL)    │
└─────────────┘      └─────────────────┘      └─────────────────┘
                              │                          │
                              ▼                          ▼
                     ┌─────────────────┐      ┌─────────────────┐
                     │   1-minute      │──────▶│   PostgreSQL    │
                     │   aggregation   │      │   (partitioned) │
                     └─────────────────┘      └─────────────────┘
                              │                          │
                              ▼                          ▼
                     ┌─────────────────┐      ┌─────────────────┐
                     │   1-hour        │──────▶│   Compressed    │
                     │   aggregation   │      │   Storage       │
                     └─────────────────┘      └─────────────────┘
                        

Implementation Example

# Metrics aggregation pipeline
class MetricsAggregator:
    def __init__(self, redis_client, db_session):
        self.redis = redis_client
        self.db = db_session
        self.aggregation_intervals = {
            'raw': 1,      # 1 second
            'short': 10,   # 10 seconds
            'medium': 60,  # 1 minute
            'long': 3600   # 1 hour
        }

    async def aggregate_metrics(self, interval_type: str):
        """Aggregate metrics for specified interval"""
        interval = self.aggregation_intervals[interval_type]
        current_time = int(time.time())
        window_start = (current_time // interval) * interval

        # Get raw metrics from Redis for aggregation window
        raw_metrics = await self.get_raw_metrics(window_start, interval)

        # Perform aggregations
        aggregated = {}
        for metric_name, samples in raw_metrics.items():
            aggregated[metric_name] = {
                'timestamp': window_start,
                'interval': interval,
                'count': len(samples),
                'sum': sum(s['value'] for s in samples),
                'min': min(s['value'] for s in samples),
                'max': max(s['value'] for s in samples),
                'avg': sum(s['value'] for s in samples) / len(samples),
                'p50': self.percentile(samples, 0.5),
                'p95': self.percentile(samples, 0.95),
                'p99': self.percentile(samples, 0.99)
            }

        # Store aggregated metrics
        await self.store_aggregated_metrics(aggregated, interval_type)

    async def store_aggregated_metrics(self, metrics, interval_type):
        """Store aggregated metrics in appropriate storage tier"""
        if interval_type == 'raw':
            # Store in Redis with TTL
            for metric_name, data in metrics.items():
                key = f"metrics:{metric_name}:{data['timestamp']}"
                await self.redis.setex(key, 86400, json.dumps(data))  # 24h TTL
        else:
            # Store in PostgreSQL
            async with self.db.begin():
                for metric_name, data in metrics.items():
                    metric_record = MetricAggregate(
                        name=metric_name,
                        timestamp=datetime.fromtimestamp(data['timestamp']),
                        interval_seconds=data['interval'],
                        count=data['count'],
                        sum_value=data['sum'],
                        min_value=data['min'],
                        max_value=data['max'],
                        avg_value=data['avg'],
                        p50_value=data['p50'],
                        p95_value=data['p95'],
                        p99_value=data['p99']
                    )
                    self.db.add(metric_record)

    def percentile(self, samples, p):
        """Calculate percentile value"""
        values = sorted([s['value'] for s in samples])
        index = int(len(values) * p)
        return values[min(index, len(values) - 1)]

# Automated aggregation scheduler
class MetricsScheduler:
    def __init__(self, aggregator):
        self.aggregator = aggregator

    async def start_scheduling(self):
        """Start background aggregation tasks"""
        # Schedule different aggregation intervals
        asyncio.create_task(self.schedule_aggregation('short', 10))   # Every 10s
        asyncio.create_task(self.schedule_aggregation('medium', 60))  # Every 1m
        asyncio.create_task(self.schedule_aggregation('long', 3600))  # Every 1h

    async def schedule_aggregation(self, interval_type, frequency):
        """Schedule periodic aggregation"""
        while True:
            try:
                await self.aggregator.aggregate_metrics(interval_type)
                await asyncio.sleep(frequency)
            except Exception as e:
                logger.error(f"Aggregation failed for {interval_type}: {e}")
                await asyncio.sleep(frequency)  # Continue despite errors
                        

Monitoring Dashboards

Dashboard Categories

Operational Dashboard

High-level system health and key performance indicators

Key Metrics Displayed:
  • System Health Score: Composite health indicator (0-100)
  • Active Agents: Connected agents vs. total registered
  • Active Tasks: Running tasks and queue depth
  • Alert Summary: Critical/high/medium alerts count
  • API Performance: Average response time and error rate
  • Resource Utilization: Server CPU/memory/disk usage
┌─────────────────────────────────────────────────────────────────┐
│                    SysManage Operations Dashboard               │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │   Health    │ │   Agents    │ │   Tasks     │ │   Alerts    │ │
│ │    Score    │ │   Online    │ │   Active    │ │  Critical   │ │
│ │     95%     │ │  247/250    │ │    12/5     │ │      3      │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────┐ ┌─────────────────────────────┐ │
│ │      API Response Times     │ │     Resource Utilization    │ │
│ │  ──────────────────────────  │ │  ─────────────────────────  │ │
│ │     /api/agents: 45ms       │ │    CPU: ████████░░ 80%      │ │
│ │     /api/tasks:  67ms       │ │    Memory: ██████░░░░ 60%   │ │
│ │     /api/metrics: 23ms      │ │    Disk: ███░░░░░░░░ 30%    │ │
│ └─────────────────────────────┘ └─────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │                  Task Execution Timeline                   │ │
│ │  ──────────────────────────────────────────────────────────  │ │
│ │    10:00   10:15   10:30   10:45   11:00   11:15   11:30   │ │
│ │      │      │      │      │      │      │      │           │ │
│ │  ████████░░░░░░░░░░██████████░░░░░░░░██████                │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                                

Performance Dashboard

Detailed performance metrics and trends

Performance Panels:
  • Request Latency: P50, P95, P99 latency by endpoint
  • Throughput: Requests per second trends
  • Error Rates: 4xx/5xx error percentages
  • Database Performance: Query times and connection pool stats
  • Agent Performance: Task execution times by agent
  • WebSocket Metrics: Connection count and message latency

Infrastructure Dashboard

System resource monitoring and capacity planning

Infrastructure Panels:
  • Server Resources: CPU, memory, disk, network by host
  • Database Health: Query performance, locks, replication lag
  • Network Health: Connectivity, latency between components
  • Storage Health: Disk usage, I/O performance, backup status
  • Security Metrics: Failed logins, certificate status

Dashboard Implementation

# React dashboard component with real-time updates
import React, { useState, useEffect } from 'react';
import { useWebSocket } from './hooks/useWebSocket';
import { MetricsAPI } from './services/api';

const OperationalDashboard = () => {
    const [metrics, setMetrics] = useState({});
    const [timeRange, setTimeRange] = useState('1h');

    // WebSocket for real-time updates
    const { lastMessage } = useWebSocket('/ws', {
        onMessage: (message) => {
            if (message.type === 'metrics_update') {
                setMetrics(prev => ({
                    ...prev,
                    [message.data.metric_name]: message.data
                }));
            }
        }
    });

    // Load initial metrics
    useEffect(() => {
        const loadMetrics = async () => {
            try {
                const data = await MetricsAPI.getDashboardMetrics(timeRange);
                setMetrics(data);
            } catch (error) {
                console.error('Failed to load metrics:', error);
            }
        };

        loadMetrics();
        const interval = setInterval(loadMetrics, 30000); // Refresh every 30s
        return () => clearInterval(interval);
    }, [timeRange]);

    return (
        
); }; // Metrics API service class MetricsAPIService { async getDashboardMetrics(timeRange) { const response = await fetch(`/api/metrics/dashboard?range=${timeRange}`); if (!response.ok) { throw new Error('Failed to fetch dashboard metrics'); } return response.json(); } async getMetricHistory(metricName, timeRange, resolution = 'auto') { const params = new URLSearchParams({ metric: metricName, range: timeRange, resolution: resolution }); const response = await fetch(`/api/metrics/history?${params}`); if (!response.ok) { throw new Error(`Failed to fetch ${metricName} history`); } return response.json(); } async getCustomQuery(query) { const response = await fetch('/api/metrics/query', { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify({ query }) }); if (!response.ok) { throw new Error('Failed to execute custom query'); } return response.json(); } }

Alerting & Anomaly Detection

Multi-tier Alerting System

Threshold-based Alerts

Simple rule-based alerts for known failure conditions

Common Alert Rules:
  • High CPU: CPU usage > 90% for 5 minutes
  • Memory Pressure: Available memory < 10%
  • Disk Space: Disk usage > 85%
  • API Errors: Error rate > 5% for 2 minutes
  • Agent Offline: Agent offline for > 5 minutes
  • Task Failures: Task failure rate > 10%
# Alert rule configuration
alert_rules:
  - name: "High CPU Usage"
    metric: "cpu_usage_percent"
    condition: ">"
    threshold: 90
    duration: "5m"
    severity: "warning"
    labels:
      team: "infrastructure"
    annotations:
      summary: "High CPU usage detected on {{ $labels.hostname }}"
      description: "CPU usage is {{ $value }}% on {{ $labels.hostname }}"

  - name: "API Error Rate High"
    metric: "http_requests_total"
    condition: "rate"
    threshold: 0.05  # 5% error rate
    duration: "2m"
    severity: "critical"
    labels:
      team: "backend"
    annotations:
      summary: "High API error rate detected"
      description: "API error rate is {{ $value | humanizePercentage }} over the last 2 minutes"
                                

Anomaly Detection

Machine learning-based detection of unusual patterns

Anomaly Detection Methods:
  • Statistical Anomalies: Z-score based detection
  • Seasonal Patterns: Time-series decomposition
  • Correlation Analysis: Multi-metric pattern detection
  • Predictive Models: Forecast-based anomaly detection
# Anomaly detection implementation
import numpy as np
from scipy import stats
from sklearn.ensemble import IsolationForest

class AnomalyDetector:
    def __init__(self):
        self.models = {}
        self.historical_data = {}
        self.detection_window = 3600  # 1 hour

    async def train_models(self, metric_name: str, historical_data: List[float]):
        """Train anomaly detection models for a metric"""

        # Statistical model (Z-score based)
        mean = np.mean(historical_data)
        std = np.std(historical_data)
        self.models[f"{metric_name}_zscore"] = {'mean': mean, 'std': std}

        # Isolation Forest for multivariate anomalies
        if len(historical_data) > 100:
            iso_forest = IsolationForest(contamination=0.1, random_state=42)
            data_reshaped = np.array(historical_data).reshape(-1, 1)
            iso_forest.fit(data_reshaped)
            self.models[f"{metric_name}_isolation"] = iso_forest

        # Seasonal decomposition for time-series patterns
        if len(historical_data) > 288:  # Need at least 1 day of 5-min samples
            seasonal_model = self.fit_seasonal_model(historical_data)
            self.models[f"{metric_name}_seasonal"] = seasonal_model

    def detect_anomaly(self, metric_name: str, current_value: float,
                      recent_values: List[float]) -> dict:
        """Detect if current value is anomalous"""

        anomalies = {}

        # Z-score detection
        zscore_model = self.models.get(f"{metric_name}_zscore")
        if zscore_model:
            z_score = abs((current_value - zscore_model['mean']) / zscore_model['std'])
            anomalies['zscore'] = {
                'is_anomaly': z_score > 3,  # 3 standard deviations
                'score': z_score,
                'threshold': 3
            }

        # Isolation Forest detection
        iso_model = self.models.get(f"{metric_name}_isolation")
        if iso_model:
            anomaly_score = iso_model.decision_function([[current_value]])[0]
            is_anomaly = iso_model.predict([[current_value]])[0] == -1
            anomalies['isolation'] = {
                'is_anomaly': is_anomaly,
                'score': anomaly_score,
                'threshold': 0
            }

        # Rate of change detection
        if len(recent_values) >= 2:
            rate_of_change = abs(current_value - recent_values[-1]) / recent_values[-1]
            anomalies['rate_change'] = {
                'is_anomaly': rate_of_change > 0.5,  # 50% change
                'score': rate_of_change,
                'threshold': 0.5
            }

        return anomalies

    async def evaluate_alert_rules(self):
        """Continuously evaluate alert rules"""
        while True:
            try:
                # Get current metrics
                current_metrics = await self.get_current_metrics()

                for metric_name, value in current_metrics.items():
                    # Get recent history for context
                    recent_values = await self.get_recent_values(
                        metric_name, self.detection_window
                    )

                    # Run anomaly detection
                    anomalies = self.detect_anomaly(metric_name, value, recent_values)

                    # Check if any anomaly detection triggered
                    for detection_type, result in anomalies.items():
                        if result['is_anomaly']:
                            await self.trigger_alert({
                                'metric': metric_name,
                                'value': value,
                                'detection_type': detection_type,
                                'score': result['score'],
                                'threshold': result['threshold'],
                                'severity': self.calculate_severity(result['score'])
                            })

                await asyncio.sleep(60)  # Check every minute

            except Exception as e:
                logger.error(f"Error in anomaly detection: {e}")
                await asyncio.sleep(60)
                                

Alert Management

Intelligent alert routing, escalation, and notification

Alert Lifecycle:
Triggered → Pending → Acknowledged → Resolved
    │           │           │           │
    ▼           ▼           ▼           ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Generate │ │ Evaluate │ │ Suppress │ │ Archive  │
│ Alert    │ │ Rules    │ │ Similar  │ │ Alert    │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
    │           │           │           │
    ▼           ▼           ▼           ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Apply    │ │ Route to │ │ Update   │ │ Update   │
│ Routing  │ │ Teams    │ │ Status   │ │ Metrics  │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
                                
# Alert management system
class AlertManager:
    def __init__(self):
        self.active_alerts = {}
        self.notification_channels = {}
        self.escalation_rules = {}

    async def process_alert(self, alert_data: dict):
        """Process incoming alert"""

        # Create alert object
        alert = Alert(
            id=generate_alert_id(),
            metric=alert_data['metric'],
            value=alert_data['value'],
            threshold=alert_data.get('threshold'),
            severity=alert_data['severity'],
            timestamp=datetime.utcnow(),
            status='triggered'
        )

        # Check for duplicate/similar alerts
        similar_alert = await self.find_similar_alert(alert)
        if similar_alert:
            await self.update_similar_alert(similar_alert, alert)
            return

        # Apply routing rules
        routing_config = await self.get_routing_config(alert)

        # Store alert
        self.active_alerts[alert.id] = alert
        await self.store_alert(alert)

        # Send notifications
        await self.send_notifications(alert, routing_config)

        # Schedule escalation if needed
        if routing_config.get('escalation'):
            await self.schedule_escalation(alert, routing_config['escalation'])

    async def send_notifications(self, alert: Alert, routing_config: dict):
        """Send alert notifications via configured channels"""

        channels = routing_config.get('channels', [])

        for channel_config in channels:
            channel_type = channel_config['type']
            channel = self.notification_channels.get(channel_type)

            if channel:
                try:
                    await channel.send_notification(alert, channel_config)
                except Exception as e:
                    logger.error(f"Failed to send notification via {channel_type}: {e}")

    async def acknowledge_alert(self, alert_id: str, user_id: str):
        """Acknowledge an alert"""

        alert = self.active_alerts.get(alert_id)
        if alert and alert.status == 'triggered':
            alert.status = 'acknowledged'
            alert.acknowledged_by = user_id
            alert.acknowledged_at = datetime.utcnow()

            await self.update_alert(alert)

            # Cancel escalation
            await self.cancel_escalation(alert_id)

    async def resolve_alert(self, alert_id: str, user_id: str):
        """Resolve an alert"""

        alert = self.active_alerts.get(alert_id)
        if alert:
            alert.status = 'resolved'
            alert.resolved_by = user_id
            alert.resolved_at = datetime.utcnow()

            await self.update_alert(alert)

            # Remove from active alerts
            del self.active_alerts[alert_id]

# Notification channels
class SlackNotificationChannel:
    def __init__(self, webhook_url: str):
        self.webhook_url = webhook_url

    async def send_notification(self, alert: Alert, config: dict):
        payload = {
            "text": f"🚨 {alert.severity.upper()} Alert",
            "attachments": [
                {
                    "color": self.get_color(alert.severity),
                    "fields": [
                        {"title": "Metric", "value": alert.metric, "short": True},
                        {"title": "Value", "value": str(alert.value), "short": True},
                        {"title": "Threshold", "value": str(alert.threshold), "short": True},
                        {"title": "Time", "value": alert.timestamp.isoformat(), "short": True}
                    ]
                }
            ]
        }

        async with aiohttp.ClientSession() as session:
            await session.post(self.webhook_url, json=payload)
                                

Performance Benchmarking

Automated Performance Testing

Load Testing Strategy

Scenario 1: Normal Load
  • Agents: 1,000 concurrent connections
  • API Calls: 100 requests/second
  • Tasks: 10 tasks/minute per agent
  • Duration: 1 hour
Scenario 2: Peak Load
  • Agents: 5,000 concurrent connections
  • API Calls: 500 requests/second
  • Tasks: 50 tasks/minute per agent
  • Duration: 30 minutes
Scenario 3: Stress Test
  • Agents: 10,000 concurrent connections
  • API Calls: 1,000 requests/second
  • Tasks: 100 tasks/minute per agent
  • Duration: 15 minutes

Performance Testing Implementation

# Performance testing framework
import asyncio
import aiohttp
import time
from dataclasses import dataclass
from typing import List

@dataclass
class LoadTestResult:
    scenario: str
    duration_seconds: float
    total_requests: int
    successful_requests: int
    failed_requests: int
    avg_response_time: float
    p95_response_time: float
    p99_response_time: float
    max_response_time: float
    requests_per_second: float
    errors: List[str]

class PerformanceTester:
    def __init__(self, base_url: str, auth_token: str):
        self.base_url = base_url
        self.auth_token = auth_token
        self.session = None

    async def run_load_test(self, scenario_config: dict) -> LoadTestResult:
        """Run a load test scenario"""

        start_time = time.time()
        response_times = []
        total_requests = 0
        successful_requests = 0
        failed_requests = 0
        errors = []

        # Create HTTP session
        connector = aiohttp.TCPConnector(limit=1000)
        self.session = aiohttp.ClientSession(
            connector=connector,
            headers={'Authorization': f'Bearer {self.auth_token}'},
            timeout=aiohttp.ClientTimeout(total=30)
        )

        try:
            # Run concurrent workers
            workers = []
            for i in range(scenario_config['concurrent_users']):
                worker = asyncio.create_task(
                    self.load_test_worker(
                        scenario_config,
                        response_times,
                        errors
                    )
                )
                workers.append(worker)

            # Wait for test duration
            await asyncio.sleep(scenario_config['duration_seconds'])

            # Cancel all workers
            for worker in workers:
                worker.cancel()

            # Wait for workers to complete
            await asyncio.gather(*workers, return_exceptions=True)

        finally:
            await self.session.close()

        # Calculate metrics
        end_time = time.time()
        duration = end_time - start_time

        if response_times:
            response_times.sort()
            avg_response_time = sum(response_times) / len(response_times)
            p95_response_time = response_times[int(len(response_times) * 0.95)]
            p99_response_time = response_times[int(len(response_times) * 0.99)]
            max_response_time = max(response_times)
        else:
            avg_response_time = p95_response_time = p99_response_time = max_response_time = 0

        total_requests = len(response_times) + len(errors)
        successful_requests = len(response_times)
        failed_requests = len(errors)

        return LoadTestResult(
            scenario=scenario_config['name'],
            duration_seconds=duration,
            total_requests=total_requests,
            successful_requests=successful_requests,
            failed_requests=failed_requests,
            avg_response_time=avg_response_time,
            p95_response_time=p95_response_time,
            p99_response_time=p99_response_time,
            max_response_time=max_response_time,
            requests_per_second=total_requests / duration if duration > 0 else 0,
            errors=errors[:100]  # Limit error samples
        )

    async def load_test_worker(self, config: dict, response_times: List[float],
                              errors: List[str]):
        """Individual load test worker"""

        endpoints = config['endpoints']
        request_interval = 1.0 / config['requests_per_second_per_user']

        try:
            while True:
                for endpoint_config in endpoints:
                    start = time.time()
                    try:
                        async with self.session.request(
                            endpoint_config['method'],
                            f"{self.base_url}{endpoint_config['path']}",
                            json=endpoint_config.get('data')
                        ) as response:
                            await response.text()  # Consume response

                            if response.status < 400:
                                response_times.append(time.time() - start)
                            else:
                                errors.append(f"HTTP {response.status}")

                    except Exception as e:
                        errors.append(str(e))

                    # Rate limiting
                    await asyncio.sleep(request_interval)

        except asyncio.CancelledError:
            pass

# Benchmark configuration
BENCHMARK_SCENARIOS = {
    'normal_load': {
        'name': 'Normal Load',
        'concurrent_users': 100,
        'requests_per_second_per_user': 1,
        'duration_seconds': 300,  # 5 minutes
        'endpoints': [
            {'method': 'GET', 'path': '/api/agents'},
            {'method': 'GET', 'path': '/api/tasks'},
            {'method': 'POST', 'path': '/api/tasks', 'data': {'type': 'ping'}}
        ]
    },
    'peak_load': {
        'name': 'Peak Load',
        'concurrent_users': 500,
        'requests_per_second_per_user': 2,
        'duration_seconds': 600,  # 10 minutes
        'endpoints': [
            {'method': 'GET', 'path': '/api/agents'},
            {'method': 'GET', 'path': '/api/metrics'},
            {'method': 'POST', 'path': '/api/tasks', 'data': {'type': 'update_packages'}}
        ]
    }
}

# Automated benchmark runner
class BenchmarkRunner:
    def __init__(self):
        self.results_history = []

    async def run_all_benchmarks(self):
        """Run all benchmark scenarios"""
        tester = PerformanceTester(
            base_url="https://sysmanage.example.com",
            auth_token=await self.get_test_token()
        )

        results = {}
        for scenario_name, config in BENCHMARK_SCENARIOS.items():
            print(f"Running {scenario_name} benchmark...")
            result = await tester.run_load_test(config)
            results[scenario_name] = result

            # Store results
            await self.store_benchmark_result(result)

            # Generate report
            self.print_result_summary(result)

        return results

    def print_result_summary(self, result: LoadTestResult):
        """Print benchmark result summary"""
        print(f"""
Benchmark: {result.scenario}
Duration: {result.duration_seconds:.1f}s
Total Requests: {result.total_requests}
Success Rate: {result.successful_requests/result.total_requests*100:.1f}%
Requests/sec: {result.requests_per_second:.1f}
Avg Response Time: {result.avg_response_time*1000:.1f}ms
P95 Response Time: {result.p95_response_time*1000:.1f}ms
P99 Response Time: {result.p99_response_time*1000:.1f}ms
Max Response Time: {result.max_response_time*1000:.1f}ms
Failed Requests: {result.failed_requests}
        """)
                        

Next Steps

To explore related performance and monitoring topics:

  1. WebSocket Protocol: Real-time metrics delivery and performance
  2. Database Schema: Metrics storage and query optimization
  3. REST API Design: API performance monitoring and optimization
  4. Design Principles: How observability principles guide architecture
  5. Scaling Strategies: Scale monitoring infrastructure with system growth