Performance Metrics
Performance monitoring, metrics collection, benchmarking, and observability patterns in SysManage.
Metrics Architecture Overview
SysManage implements a comprehensive metrics collection and monitoring system that provides deep visibility into system performance, user experience, and operational health across all components.
Metrics Collection Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Metrics Sources │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Application │ │ Database │ │ Infrastructure │ │
│ │ Metrics │ │ Metrics │ │ Metrics │ │
│ │ │ │ │ │ │ │
│ │ • API Times │ │ • Query │ │ • CPU/Memory │ │
│ │ • Task Exec │ │ Times │ │ • Network I/O │ │
│ │ • User Acts │ │ • Conn Pool │ │ • Disk I/O │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Metrics Pipeline │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Collection │ │ Processing │ │ Storage │ │
│ │ │ │ │ │ │ │
│ │ • Scrapers │ │ • Aggreg. │ │ • Time Series DB │ │
│ │ • Agents │ │ • Filtering │ │ • PostgreSQL │ │
│ │ • Push │ │ • Enrichment│ │ • Redis Cache │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Visualization & Alerting │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────────┐ │
│ │ Dashboards │ │ Alerts │ │ APIs │ │
│ │ │ │ │ │ │ │
│ │ • Real-time │ │ • Thresholds│ │ • Metrics Query │ │
│ │ • Historical│ │ • ML-based │ │ • Export Formats │ │
│ │ • Custom │ │ • Escalation│ │ • Integration │ │
│ └─────────────┘ └─────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Key Design Principles
- Low Overhead: Metrics collection adds <5% performance overhead
- High Cardinality: Support for dimensional metrics with labels
- Real-time Processing: Sub-second latency for critical metrics
- Scalable Storage: Efficient time-series data storage and retrieval
- Flexible Alerting: Rule-based and ML-powered anomaly detection
Metric Categories
Application Performance Metrics
API Performance
Request Metrics
- http_requests_total: Total HTTP requests by method, endpoint, status
- http_request_duration_seconds: Request latency histogram
- http_request_size_bytes: Request payload size distribution
- http_response_size_bytes: Response payload size distribution
# Example Prometheus metrics
http_requests_total{method="GET", endpoint="/api/agents", status="200"} 15234
http_requests_total{method="POST", endpoint="/api/tasks", status="201"} 542
http_request_duration_seconds_bucket{endpoint="/api/agents", le="0.1"} 12890
http_request_duration_seconds_bucket{endpoint="/api/agents", le="0.5"} 15123
http_request_duration_seconds_bucket{endpoint="/api/agents", le="1.0"} 15200
Business Logic Metrics
- task_execution_duration_seconds: Task execution time by type
- task_success_rate: Task success percentage by type
- agent_connection_duration_seconds: Agent session duration
- package_operations_total: Package install/update/remove counts
Error Metrics
- application_errors_total: Application errors by type and severity
- task_failures_total: Failed tasks by type and reason
- agent_communication_errors_total: Agent communication failures
- authentication_failures_total: Failed login attempts
Implementation Example
# Python FastAPI metrics implementation
from prometheus_client import Counter, Histogram, Gauge
import time
from functools import wraps
# Define metrics
REQUEST_COUNT = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code']
)
REQUEST_DURATION = Histogram(
'http_request_duration_seconds',
'HTTP request latency',
['endpoint'],
buckets=[0.01, 0.05, 0.1, 0.5, 1.0, 2.5, 5.0, 10.0]
)
ACTIVE_CONNECTIONS = Gauge(
'websocket_connections_active',
'Active WebSocket connections',
['room']
)
# Middleware for automatic metrics collection
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start_time = time.time()
response = await call_next(request)
# Record metrics
endpoint = request.url.path
method = request.method
status_code = str(response.status_code)
duration = time.time() - start_time
REQUEST_COUNT.labels(
method=method,
endpoint=endpoint,
status_code=status_code
).inc()
REQUEST_DURATION.labels(endpoint=endpoint).observe(duration)
return response
# Business metrics decorator
def track_task_execution(task_type: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
TASK_SUCCESS_RATE.labels(task_type=task_type).inc()
return result
except Exception as e:
TASK_FAILURES.labels(
task_type=task_type,
error_type=type(e).__name__
).inc()
raise
finally:
duration = time.time() - start_time
TASK_DURATION.labels(task_type=task_type).observe(duration)
return wrapper
return decorator
Infrastructure Metrics
System Resource Metrics
CPU & Memory
- cpu_usage_percent: CPU utilization by core and process
- memory_usage_bytes: Memory consumption by process
- memory_available_bytes: Available system memory
- gc_collections_total: Garbage collection metrics (Python)
Network & I/O
- network_bytes_sent/received: Network traffic by interface
- disk_io_bytes_read/written: Disk I/O by device
- disk_usage_percent: Disk space utilization
- file_descriptors_open: Open file descriptor count
Process Metrics
- process_cpu_seconds_total: Process CPU time
- process_resident_memory_bytes: Process memory usage
- process_start_time_seconds: Process start timestamp
- python_info: Python version and implementation info
Agent Metrics Collection
# Agent-side metrics collection
class AgentMetricsCollector:
def __init__(self):
self.metrics_buffer = []
self.collection_interval = 30 # seconds
self.max_buffer_size = 1000
async def collect_system_metrics(self):
"""Collect system-level metrics"""
metrics = {
'timestamp': datetime.utcnow().isoformat(),
'cpu': {
'usage_percent': psutil.cpu_percent(interval=1),
'cores': psutil.cpu_count(),
'load_avg': os.getloadavg() if hasattr(os, 'getloadavg') else None
},
'memory': {
'total': psutil.virtual_memory().total,
'available': psutil.virtual_memory().available,
'used': psutil.virtual_memory().used,
'percent': psutil.virtual_memory().percent
},
'disk': {
dev.device: {
'total': psutil.disk_usage(dev.mountpoint).total,
'used': psutil.disk_usage(dev.mountpoint).used,
'free': psutil.disk_usage(dev.mountpoint).free
}
for dev in psutil.disk_partitions()
},
'network': {
interface: {
'bytes_sent': stats.bytes_sent,
'bytes_recv': stats.bytes_recv,
'packets_sent': stats.packets_sent,
'packets_recv': stats.packets_recv
}
for interface, stats in psutil.net_io_counters(pernic=True).items()
}
}
return metrics
async def collect_application_metrics(self):
"""Collect application-specific metrics"""
return {
'agent': {
'version': self.agent_version,
'uptime_seconds': time.time() - self.start_time,
'tasks_completed': self.tasks_completed,
'last_heartbeat': self.last_heartbeat.isoformat()
},
'services': await self.get_service_metrics(),
'packages': await self.get_package_metrics()
}
async def send_metrics(self):
"""Send collected metrics to server"""
if self.metrics_buffer:
try:
await self.api_client.post('/api/metrics', {
'agent_id': self.agent_id,
'metrics': self.metrics_buffer
})
self.metrics_buffer.clear()
except Exception as e:
logger.error(f"Failed to send metrics: {e}")
# Implement exponential backoff retry
await self.schedule_retry()
Database Performance Metrics
Connection Pool Metrics
- db_connections_active: Active database connections
- db_connections_idle: Idle connections in pool
- db_connection_wait_time_seconds: Time waiting for connection
- db_pool_exhausted_total: Pool exhaustion events
Query Performance Metrics
- db_query_duration_seconds: Query execution time by type
- db_queries_total: Query count by operation (SELECT, INSERT, etc.)
- db_slow_queries_total: Queries exceeding threshold
- db_deadlocks_total: Database deadlock occurrences
Database Monitoring Implementation
# SQLAlchemy metrics integration
from sqlalchemy import event
from sqlalchemy.engine import Engine
import time
# Query timing metrics
QUERY_DURATION = Histogram(
'db_query_duration_seconds',
'Database query execution time',
['operation', 'table']
)
QUERY_COUNT = Counter(
'db_queries_total',
'Total database queries',
['operation', 'table', 'status']
)
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
duration = time.time() - context._query_start_time
# Parse SQL to extract operation and table
operation = statement.strip().split()[0].upper()
table = extract_table_name(statement)
QUERY_DURATION.labels(operation=operation, table=table).observe(duration)
QUERY_COUNT.labels(operation=operation, table=table, status='success').inc()
# Connection pool monitoring
class MonitoredConnectionPool:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.active_connections = Gauge(
'db_connections_active',
'Active database connections'
)
self.pool_size = Gauge(
'db_pool_size',
'Database connection pool size'
)
def _do_get(self):
start_time = time.time()
try:
conn = super()._do_get()
self.active_connections.inc()
return conn
finally:
wait_time = time.time() - start_time
if wait_time > 0.1: # Log slow connection acquisition
DB_CONNECTION_WAIT_TIME.observe(wait_time)
def _do_return_conn(self, conn):
super()._do_return_conn(conn)
self.active_connections.dec()
User Experience Metrics
Frontend Performance
- page_load_time_seconds: Page load duration by route
- api_call_duration_seconds: Client-side API call timing
- websocket_latency_seconds: Real-time update latency
- ui_interactions_total: User interaction counts by type
Client-side Metrics Collection
// JavaScript client-side metrics
class ClientMetrics {
constructor() {
this.metrics = [];
this.batchSize = 50;
this.flushInterval = 30000; // 30 seconds
this.startFlushTimer();
}
// Performance timing
recordPageLoad(route) {
const navigation = performance.getEntriesByType('navigation')[0];
this.addMetric({
name: 'page_load_time_seconds',
value: navigation.loadEventEnd / 1000,
labels: { route: route },
timestamp: Date.now()
});
}
// API call timing
recordApiCall(endpoint, method, duration, status) {
this.addMetric({
name: 'api_call_duration_seconds',
value: duration / 1000,
labels: {
endpoint: endpoint,
method: method,
status: status.toString()
},
timestamp: Date.now()
});
}
// WebSocket latency
recordWebSocketLatency(messageType, latency) {
this.addMetric({
name: 'websocket_latency_seconds',
value: latency / 1000,
labels: { message_type: messageType },
timestamp: Date.now()
});
}
// User interactions
recordInteraction(action, component) {
this.addMetric({
name: 'ui_interactions_total',
value: 1,
labels: {
action: action,
component: component
},
timestamp: Date.now()
});
}
addMetric(metric) {
this.metrics.push(metric);
if (this.metrics.length >= this.batchSize) {
this.flush();
}
}
async flush() {
if (this.metrics.length === 0) return;
const batch = this.metrics.splice(0);
try {
await fetch('/api/metrics/client', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ metrics: batch })
});
} catch (error) {
console.error('Failed to send client metrics:', error);
// Could implement retry logic here
}
}
startFlushTimer() {
setInterval(() => this.flush(), this.flushInterval);
}
}
// Integration with React components
const useMetrics = () => {
const metrics = useRef(new ClientMetrics());
const recordPageView = useCallback((route) => {
metrics.current.recordPageLoad(route);
}, []);
const recordApiCall = useCallback((endpoint, method, duration, status) => {
metrics.current.recordApiCall(endpoint, method, duration, status);
}, []);
return { recordPageView, recordApiCall };
};
Metrics Storage & Retention
Time-Series Storage Strategy
Multi-Resolution Storage
High Resolution (Raw Data)
- Retention: 24 hours
- Resolution: 1-second intervals
- Use Case: Real-time monitoring, debugging
- Storage: Redis + PostgreSQL
Medium Resolution
- Retention: 7 days
- Resolution: 10-second intervals
- Use Case: Recent trend analysis
- Storage: PostgreSQL time partitions
Low Resolution (Aggregated)
- Retention: 90 days
- Resolution: 1-minute intervals
- Use Case: Historical analysis, reporting
- Storage: PostgreSQL compressed tables
Archive Resolution
- Retention: 2 years
- Resolution: 1-hour intervals
- Use Case: Long-term trends, compliance
- Storage: Compressed PostgreSQL / Object storage
Aggregation Pipeline
Raw Metrics Aggregation Levels Storage Tiers
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 1-second │──────▶│ 10-second │──────▶│ Redis │
│ samples │ │ aggregation │ │ (24h TTL) │
└─────────────┘ └─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ 1-minute │──────▶│ PostgreSQL │
│ aggregation │ │ (partitioned) │
└─────────────────┘ └─────────────────┘
│ │
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ 1-hour │──────▶│ Compressed │
│ aggregation │ │ Storage │
└─────────────────┘ └─────────────────┘
Implementation Example
# Metrics aggregation pipeline
class MetricsAggregator:
def __init__(self, redis_client, db_session):
self.redis = redis_client
self.db = db_session
self.aggregation_intervals = {
'raw': 1, # 1 second
'short': 10, # 10 seconds
'medium': 60, # 1 minute
'long': 3600 # 1 hour
}
async def aggregate_metrics(self, interval_type: str):
"""Aggregate metrics for specified interval"""
interval = self.aggregation_intervals[interval_type]
current_time = int(time.time())
window_start = (current_time // interval) * interval
# Get raw metrics from Redis for aggregation window
raw_metrics = await self.get_raw_metrics(window_start, interval)
# Perform aggregations
aggregated = {}
for metric_name, samples in raw_metrics.items():
aggregated[metric_name] = {
'timestamp': window_start,
'interval': interval,
'count': len(samples),
'sum': sum(s['value'] for s in samples),
'min': min(s['value'] for s in samples),
'max': max(s['value'] for s in samples),
'avg': sum(s['value'] for s in samples) / len(samples),
'p50': self.percentile(samples, 0.5),
'p95': self.percentile(samples, 0.95),
'p99': self.percentile(samples, 0.99)
}
# Store aggregated metrics
await self.store_aggregated_metrics(aggregated, interval_type)
async def store_aggregated_metrics(self, metrics, interval_type):
"""Store aggregated metrics in appropriate storage tier"""
if interval_type == 'raw':
# Store in Redis with TTL
for metric_name, data in metrics.items():
key = f"metrics:{metric_name}:{data['timestamp']}"
await self.redis.setex(key, 86400, json.dumps(data)) # 24h TTL
else:
# Store in PostgreSQL
async with self.db.begin():
for metric_name, data in metrics.items():
metric_record = MetricAggregate(
name=metric_name,
timestamp=datetime.fromtimestamp(data['timestamp']),
interval_seconds=data['interval'],
count=data['count'],
sum_value=data['sum'],
min_value=data['min'],
max_value=data['max'],
avg_value=data['avg'],
p50_value=data['p50'],
p95_value=data['p95'],
p99_value=data['p99']
)
self.db.add(metric_record)
def percentile(self, samples, p):
"""Calculate percentile value"""
values = sorted([s['value'] for s in samples])
index = int(len(values) * p)
return values[min(index, len(values) - 1)]
# Automated aggregation scheduler
class MetricsScheduler:
def __init__(self, aggregator):
self.aggregator = aggregator
async def start_scheduling(self):
"""Start background aggregation tasks"""
# Schedule different aggregation intervals
asyncio.create_task(self.schedule_aggregation('short', 10)) # Every 10s
asyncio.create_task(self.schedule_aggregation('medium', 60)) # Every 1m
asyncio.create_task(self.schedule_aggregation('long', 3600)) # Every 1h
async def schedule_aggregation(self, interval_type, frequency):
"""Schedule periodic aggregation"""
while True:
try:
await self.aggregator.aggregate_metrics(interval_type)
await asyncio.sleep(frequency)
except Exception as e:
logger.error(f"Aggregation failed for {interval_type}: {e}")
await asyncio.sleep(frequency) # Continue despite errors
Monitoring Dashboards
Dashboard Categories
Operational Dashboard
High-level system health and key performance indicators
Key Metrics Displayed:
- System Health Score: Composite health indicator (0-100)
- Active Agents: Connected agents vs. total registered
- Active Tasks: Running tasks and queue depth
- Alert Summary: Critical/high/medium alerts count
- API Performance: Average response time and error rate
- Resource Utilization: Server CPU/memory/disk usage
┌─────────────────────────────────────────────────────────────────┐
│ SysManage Operations Dashboard │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Health │ │ Agents │ │ Tasks │ │ Alerts │ │
│ │ Score │ │ Online │ │ Active │ │ Critical │ │
│ │ 95% │ │ 247/250 │ │ 12/5 │ │ 3 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────┐ ┌─────────────────────────────┐ │
│ │ API Response Times │ │ Resource Utilization │ │
│ │ ────────────────────────── │ │ ───────────────────────── │ │
│ │ /api/agents: 45ms │ │ CPU: ████████░░ 80% │ │
│ │ /api/tasks: 67ms │ │ Memory: ██████░░░░ 60% │ │
│ │ /api/metrics: 23ms │ │ Disk: ███░░░░░░░░ 30% │ │
│ └─────────────────────────────┘ └─────────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ Task Execution Timeline │ │
│ │ ────────────────────────────────────────────────────────── │ │
│ │ 10:00 10:15 10:30 10:45 11:00 11:15 11:30 │ │
│ │ │ │ │ │ │ │ │ │ │
│ │ ████████░░░░░░░░░░██████████░░░░░░░░██████ │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Performance Dashboard
Detailed performance metrics and trends
Performance Panels:
- Request Latency: P50, P95, P99 latency by endpoint
- Throughput: Requests per second trends
- Error Rates: 4xx/5xx error percentages
- Database Performance: Query times and connection pool stats
- Agent Performance: Task execution times by agent
- WebSocket Metrics: Connection count and message latency
Infrastructure Dashboard
System resource monitoring and capacity planning
Infrastructure Panels:
- Server Resources: CPU, memory, disk, network by host
- Database Health: Query performance, locks, replication lag
- Network Health: Connectivity, latency between components
- Storage Health: Disk usage, I/O performance, backup status
- Security Metrics: Failed logins, certificate status
Dashboard Implementation
# React dashboard component with real-time updates
import React, { useState, useEffect } from 'react';
import { useWebSocket } from './hooks/useWebSocket';
import { MetricsAPI } from './services/api';
const OperationalDashboard = () => {
const [metrics, setMetrics] = useState({});
const [timeRange, setTimeRange] = useState('1h');
// WebSocket for real-time updates
const { lastMessage } = useWebSocket('/ws', {
onMessage: (message) => {
if (message.type === 'metrics_update') {
setMetrics(prev => ({
...prev,
[message.data.metric_name]: message.data
}));
}
}
});
// Load initial metrics
useEffect(() => {
const loadMetrics = async () => {
try {
const data = await MetricsAPI.getDashboardMetrics(timeRange);
setMetrics(data);
} catch (error) {
console.error('Failed to load metrics:', error);
}
};
loadMetrics();
const interval = setInterval(loadMetrics, 30000); // Refresh every 30s
return () => clearInterval(interval);
}, [timeRange]);
return (
);
};
// Metrics API service
class MetricsAPIService {
async getDashboardMetrics(timeRange) {
const response = await fetch(`/api/metrics/dashboard?range=${timeRange}`);
if (!response.ok) {
throw new Error('Failed to fetch dashboard metrics');
}
return response.json();
}
async getMetricHistory(metricName, timeRange, resolution = 'auto') {
const params = new URLSearchParams({
metric: metricName,
range: timeRange,
resolution: resolution
});
const response = await fetch(`/api/metrics/history?${params}`);
if (!response.ok) {
throw new Error(`Failed to fetch ${metricName} history`);
}
return response.json();
}
async getCustomQuery(query) {
const response = await fetch('/api/metrics/query', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ query })
});
if (!response.ok) {
throw new Error('Failed to execute custom query');
}
return response.json();
}
}
Alerting & Anomaly Detection
Multi-tier Alerting System
Threshold-based Alerts
Simple rule-based alerts for known failure conditions
Common Alert Rules:
- High CPU: CPU usage > 90% for 5 minutes
- Memory Pressure: Available memory < 10%
- Disk Space: Disk usage > 85%
- API Errors: Error rate > 5% for 2 minutes
- Agent Offline: Agent offline for > 5 minutes
- Task Failures: Task failure rate > 10%
# Alert rule configuration
alert_rules:
- name: "High CPU Usage"
metric: "cpu_usage_percent"
condition: ">"
threshold: 90
duration: "5m"
severity: "warning"
labels:
team: "infrastructure"
annotations:
summary: "High CPU usage detected on {{ $labels.hostname }}"
description: "CPU usage is {{ $value }}% on {{ $labels.hostname }}"
- name: "API Error Rate High"
metric: "http_requests_total"
condition: "rate"
threshold: 0.05 # 5% error rate
duration: "2m"
severity: "critical"
labels:
team: "backend"
annotations:
summary: "High API error rate detected"
description: "API error rate is {{ $value | humanizePercentage }} over the last 2 minutes"
Anomaly Detection
Machine learning-based detection of unusual patterns
Anomaly Detection Methods:
- Statistical Anomalies: Z-score based detection
- Seasonal Patterns: Time-series decomposition
- Correlation Analysis: Multi-metric pattern detection
- Predictive Models: Forecast-based anomaly detection
# Anomaly detection implementation
import numpy as np
from scipy import stats
from sklearn.ensemble import IsolationForest
class AnomalyDetector:
def __init__(self):
self.models = {}
self.historical_data = {}
self.detection_window = 3600 # 1 hour
async def train_models(self, metric_name: str, historical_data: List[float]):
"""Train anomaly detection models for a metric"""
# Statistical model (Z-score based)
mean = np.mean(historical_data)
std = np.std(historical_data)
self.models[f"{metric_name}_zscore"] = {'mean': mean, 'std': std}
# Isolation Forest for multivariate anomalies
if len(historical_data) > 100:
iso_forest = IsolationForest(contamination=0.1, random_state=42)
data_reshaped = np.array(historical_data).reshape(-1, 1)
iso_forest.fit(data_reshaped)
self.models[f"{metric_name}_isolation"] = iso_forest
# Seasonal decomposition for time-series patterns
if len(historical_data) > 288: # Need at least 1 day of 5-min samples
seasonal_model = self.fit_seasonal_model(historical_data)
self.models[f"{metric_name}_seasonal"] = seasonal_model
def detect_anomaly(self, metric_name: str, current_value: float,
recent_values: List[float]) -> dict:
"""Detect if current value is anomalous"""
anomalies = {}
# Z-score detection
zscore_model = self.models.get(f"{metric_name}_zscore")
if zscore_model:
z_score = abs((current_value - zscore_model['mean']) / zscore_model['std'])
anomalies['zscore'] = {
'is_anomaly': z_score > 3, # 3 standard deviations
'score': z_score,
'threshold': 3
}
# Isolation Forest detection
iso_model = self.models.get(f"{metric_name}_isolation")
if iso_model:
anomaly_score = iso_model.decision_function([[current_value]])[0]
is_anomaly = iso_model.predict([[current_value]])[0] == -1
anomalies['isolation'] = {
'is_anomaly': is_anomaly,
'score': anomaly_score,
'threshold': 0
}
# Rate of change detection
if len(recent_values) >= 2:
rate_of_change = abs(current_value - recent_values[-1]) / recent_values[-1]
anomalies['rate_change'] = {
'is_anomaly': rate_of_change > 0.5, # 50% change
'score': rate_of_change,
'threshold': 0.5
}
return anomalies
async def evaluate_alert_rules(self):
"""Continuously evaluate alert rules"""
while True:
try:
# Get current metrics
current_metrics = await self.get_current_metrics()
for metric_name, value in current_metrics.items():
# Get recent history for context
recent_values = await self.get_recent_values(
metric_name, self.detection_window
)
# Run anomaly detection
anomalies = self.detect_anomaly(metric_name, value, recent_values)
# Check if any anomaly detection triggered
for detection_type, result in anomalies.items():
if result['is_anomaly']:
await self.trigger_alert({
'metric': metric_name,
'value': value,
'detection_type': detection_type,
'score': result['score'],
'threshold': result['threshold'],
'severity': self.calculate_severity(result['score'])
})
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Error in anomaly detection: {e}")
await asyncio.sleep(60)
Alert Management
Intelligent alert routing, escalation, and notification
Alert Lifecycle:
Triggered → Pending → Acknowledged → Resolved
│ │ │ │
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Generate │ │ Evaluate │ │ Suppress │ │ Archive │
│ Alert │ │ Rules │ │ Similar │ │ Alert │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Apply │ │ Route to │ │ Update │ │ Update │
│ Routing │ │ Teams │ │ Status │ │ Metrics │
└──────────┘ └──────────┘ └──────────┘ └──────────┘
# Alert management system
class AlertManager:
def __init__(self):
self.active_alerts = {}
self.notification_channels = {}
self.escalation_rules = {}
async def process_alert(self, alert_data: dict):
"""Process incoming alert"""
# Create alert object
alert = Alert(
id=generate_alert_id(),
metric=alert_data['metric'],
value=alert_data['value'],
threshold=alert_data.get('threshold'),
severity=alert_data['severity'],
timestamp=datetime.utcnow(),
status='triggered'
)
# Check for duplicate/similar alerts
similar_alert = await self.find_similar_alert(alert)
if similar_alert:
await self.update_similar_alert(similar_alert, alert)
return
# Apply routing rules
routing_config = await self.get_routing_config(alert)
# Store alert
self.active_alerts[alert.id] = alert
await self.store_alert(alert)
# Send notifications
await self.send_notifications(alert, routing_config)
# Schedule escalation if needed
if routing_config.get('escalation'):
await self.schedule_escalation(alert, routing_config['escalation'])
async def send_notifications(self, alert: Alert, routing_config: dict):
"""Send alert notifications via configured channels"""
channels = routing_config.get('channels', [])
for channel_config in channels:
channel_type = channel_config['type']
channel = self.notification_channels.get(channel_type)
if channel:
try:
await channel.send_notification(alert, channel_config)
except Exception as e:
logger.error(f"Failed to send notification via {channel_type}: {e}")
async def acknowledge_alert(self, alert_id: str, user_id: str):
"""Acknowledge an alert"""
alert = self.active_alerts.get(alert_id)
if alert and alert.status == 'triggered':
alert.status = 'acknowledged'
alert.acknowledged_by = user_id
alert.acknowledged_at = datetime.utcnow()
await self.update_alert(alert)
# Cancel escalation
await self.cancel_escalation(alert_id)
async def resolve_alert(self, alert_id: str, user_id: str):
"""Resolve an alert"""
alert = self.active_alerts.get(alert_id)
if alert:
alert.status = 'resolved'
alert.resolved_by = user_id
alert.resolved_at = datetime.utcnow()
await self.update_alert(alert)
# Remove from active alerts
del self.active_alerts[alert_id]
# Notification channels
class SlackNotificationChannel:
def __init__(self, webhook_url: str):
self.webhook_url = webhook_url
async def send_notification(self, alert: Alert, config: dict):
payload = {
"text": f"🚨 {alert.severity.upper()} Alert",
"attachments": [
{
"color": self.get_color(alert.severity),
"fields": [
{"title": "Metric", "value": alert.metric, "short": True},
{"title": "Value", "value": str(alert.value), "short": True},
{"title": "Threshold", "value": str(alert.threshold), "short": True},
{"title": "Time", "value": alert.timestamp.isoformat(), "short": True}
]
}
]
}
async with aiohttp.ClientSession() as session:
await session.post(self.webhook_url, json=payload)
Performance Benchmarking
Automated Performance Testing
Load Testing Strategy
Scenario 1: Normal Load
- Agents: 1,000 concurrent connections
- API Calls: 100 requests/second
- Tasks: 10 tasks/minute per agent
- Duration: 1 hour
Scenario 2: Peak Load
- Agents: 5,000 concurrent connections
- API Calls: 500 requests/second
- Tasks: 50 tasks/minute per agent
- Duration: 30 minutes
Scenario 3: Stress Test
- Agents: 10,000 concurrent connections
- API Calls: 1,000 requests/second
- Tasks: 100 tasks/minute per agent
- Duration: 15 minutes
Performance Testing Implementation
# Performance testing framework
import asyncio
import aiohttp
import time
from dataclasses import dataclass
from typing import List
@dataclass
class LoadTestResult:
scenario: str
duration_seconds: float
total_requests: int
successful_requests: int
failed_requests: int
avg_response_time: float
p95_response_time: float
p99_response_time: float
max_response_time: float
requests_per_second: float
errors: List[str]
class PerformanceTester:
def __init__(self, base_url: str, auth_token: str):
self.base_url = base_url
self.auth_token = auth_token
self.session = None
async def run_load_test(self, scenario_config: dict) -> LoadTestResult:
"""Run a load test scenario"""
start_time = time.time()
response_times = []
total_requests = 0
successful_requests = 0
failed_requests = 0
errors = []
# Create HTTP session
connector = aiohttp.TCPConnector(limit=1000)
self.session = aiohttp.ClientSession(
connector=connector,
headers={'Authorization': f'Bearer {self.auth_token}'},
timeout=aiohttp.ClientTimeout(total=30)
)
try:
# Run concurrent workers
workers = []
for i in range(scenario_config['concurrent_users']):
worker = asyncio.create_task(
self.load_test_worker(
scenario_config,
response_times,
errors
)
)
workers.append(worker)
# Wait for test duration
await asyncio.sleep(scenario_config['duration_seconds'])
# Cancel all workers
for worker in workers:
worker.cancel()
# Wait for workers to complete
await asyncio.gather(*workers, return_exceptions=True)
finally:
await self.session.close()
# Calculate metrics
end_time = time.time()
duration = end_time - start_time
if response_times:
response_times.sort()
avg_response_time = sum(response_times) / len(response_times)
p95_response_time = response_times[int(len(response_times) * 0.95)]
p99_response_time = response_times[int(len(response_times) * 0.99)]
max_response_time = max(response_times)
else:
avg_response_time = p95_response_time = p99_response_time = max_response_time = 0
total_requests = len(response_times) + len(errors)
successful_requests = len(response_times)
failed_requests = len(errors)
return LoadTestResult(
scenario=scenario_config['name'],
duration_seconds=duration,
total_requests=total_requests,
successful_requests=successful_requests,
failed_requests=failed_requests,
avg_response_time=avg_response_time,
p95_response_time=p95_response_time,
p99_response_time=p99_response_time,
max_response_time=max_response_time,
requests_per_second=total_requests / duration if duration > 0 else 0,
errors=errors[:100] # Limit error samples
)
async def load_test_worker(self, config: dict, response_times: List[float],
errors: List[str]):
"""Individual load test worker"""
endpoints = config['endpoints']
request_interval = 1.0 / config['requests_per_second_per_user']
try:
while True:
for endpoint_config in endpoints:
start = time.time()
try:
async with self.session.request(
endpoint_config['method'],
f"{self.base_url}{endpoint_config['path']}",
json=endpoint_config.get('data')
) as response:
await response.text() # Consume response
if response.status < 400:
response_times.append(time.time() - start)
else:
errors.append(f"HTTP {response.status}")
except Exception as e:
errors.append(str(e))
# Rate limiting
await asyncio.sleep(request_interval)
except asyncio.CancelledError:
pass
# Benchmark configuration
BENCHMARK_SCENARIOS = {
'normal_load': {
'name': 'Normal Load',
'concurrent_users': 100,
'requests_per_second_per_user': 1,
'duration_seconds': 300, # 5 minutes
'endpoints': [
{'method': 'GET', 'path': '/api/agents'},
{'method': 'GET', 'path': '/api/tasks'},
{'method': 'POST', 'path': '/api/tasks', 'data': {'type': 'ping'}}
]
},
'peak_load': {
'name': 'Peak Load',
'concurrent_users': 500,
'requests_per_second_per_user': 2,
'duration_seconds': 600, # 10 minutes
'endpoints': [
{'method': 'GET', 'path': '/api/agents'},
{'method': 'GET', 'path': '/api/metrics'},
{'method': 'POST', 'path': '/api/tasks', 'data': {'type': 'update_packages'}}
]
}
}
# Automated benchmark runner
class BenchmarkRunner:
def __init__(self):
self.results_history = []
async def run_all_benchmarks(self):
"""Run all benchmark scenarios"""
tester = PerformanceTester(
base_url="https://sysmanage.example.com",
auth_token=await self.get_test_token()
)
results = {}
for scenario_name, config in BENCHMARK_SCENARIOS.items():
print(f"Running {scenario_name} benchmark...")
result = await tester.run_load_test(config)
results[scenario_name] = result
# Store results
await self.store_benchmark_result(result)
# Generate report
self.print_result_summary(result)
return results
def print_result_summary(self, result: LoadTestResult):
"""Print benchmark result summary"""
print(f"""
Benchmark: {result.scenario}
Duration: {result.duration_seconds:.1f}s
Total Requests: {result.total_requests}
Success Rate: {result.successful_requests/result.total_requests*100:.1f}%
Requests/sec: {result.requests_per_second:.1f}
Avg Response Time: {result.avg_response_time*1000:.1f}ms
P95 Response Time: {result.p95_response_time*1000:.1f}ms
P99 Response Time: {result.p99_response_time*1000:.1f}ms
Max Response Time: {result.max_response_time*1000:.1f}ms
Failed Requests: {result.failed_requests}
""")