Scaling Strategies
Horizontal scaling, database scaling, capacity planning, auto-scaling, and performance scaling strategies for SysManage infrastructure.
Scaling Strategies Overview
SysManage is designed from the ground up to scale horizontally and vertically to meet growing infrastructure management demands. This document outlines comprehensive scaling strategies across all system components.
Core Scaling Principles
- Stateless Design: Application servers maintain no local state
- Shared-Nothing Architecture: Independent scaling of components
- Horizontal-First: Prefer horizontal scaling over vertical scaling
- Predictive Scaling: Proactive scaling based on metrics and trends
Scaling Architecture Overview
Multi-Tier Scaling Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Load Balancer Tier │
│ (Horizontal Scaling) │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Global LB │ │ Regional LB │ │ Local LB │ │
│ │ (DNS/CDN) │ │ (Geographic) │ │ (Instance) │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
┌─────────────────────────┼───────────────────────────────────────┐
│ Application Tier │
│ (Horizontal + Vertical Scaling) │
├─────────────────────────┼───────────────────────────────────────┤
│ ┌─────────────────────┴───────────────────────────────────┐ │
│ │ Auto Scaling Groups │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ API │ │ API │ │ API │ │ API │ ... │ │
│ │ │ Server │ │ Server │ │ Server │ │ Server │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Worker Process Pools │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Queue │ │ Agent │ │ Report │ │ Cleanup │ ... │ │
│ │ │ Worker │ │ Worker │ │ Worker │ │ Worker │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────┬───────────────────────────────────────┘
│
┌─────────────────────────┼───────────────────────────────────────┐
│ Data Tier │
│ (Read Replicas + Sharding) │
├─────────────────────────┼───────────────────────────────────────┤
│ ┌─────────────────────┴───────────────────────────────────┐ │
│ │ Database Cluster │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Primary │ │ Replica │ │ Replica │ │ Analytics│ │ │
│ │ │ (Write) │ │ (Read) │ │ (Read) │ │ (OLAP) │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Cache Layer │ │
│ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │ │
│ │ │ Redis │ │ Redis │ │ Redis │ │ Redis │ │ │
│ │ │ Cluster │ │ Cluster │ │ Cluster │ │ Cluster │ │ │
│ │ └─────────┘ └─────────┘ └─────────┘ └─────────┘ │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Scaling Dimensions
Horizontal Scaling (Scale Out)
- Add more instances to handle increased load
- Distribute load across multiple servers
- Improve fault tolerance and availability
- Linear cost scaling with capacity
Vertical Scaling (Scale Up)
- Increase resources (CPU, RAM) per instance
- Simpler to implement and manage
- Better for CPU/memory intensive operations
- Limited by hardware constraints
Geographic Scaling
- Deploy across multiple regions/zones
- Reduce latency for global users
- Improve disaster recovery capabilities
- Comply with data locality requirements
Functional Scaling
- Scale different components independently
- Optimize resources for specific workloads
- Separate read and write operations
- Specialized instances for specific tasks
Application Layer Scaling
Stateless Application Design
Stateless FastAPI Application
from fastapi import FastAPI, Request, Depends
from fastapi.middleware.cors import CORSMiddleware
import redis
import asyncpg
from typing import Dict, Any, Optional
import uuid
class StatelessApp:
"""Stateless application design for horizontal scaling"""
def __init__(self):
self.app = FastAPI(title="SysManage API")
self.setup_middleware()
self.setup_dependencies()
def setup_middleware(self):
"""Setup middleware for stateless operation"""
# CORS for multi-origin support
self.app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure based on deployment
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Request ID middleware for tracing
@self.app.middleware("http")
async def add_request_id(request: Request, call_next):
request_id = str(uuid.uuid4())
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
def setup_dependencies(self):
"""Setup shared dependencies for stateless operation"""
# Database connection pool (shared across instances)
async def get_database():
# Connection pool is managed externally (PgBouncer)
return await get_db_connection()
# Redis cache (shared across instances)
async def get_cache():
return await get_redis_connection()
# Session management (stateless with JWT)
async def get_current_user(request: Request):
token = request.headers.get("Authorization")
if not token:
raise HTTPException(status_code=401, detail="No token provided")
# Validate JWT token (stateless)
return await validate_jwt_token(token)
self.app.dependency_overrides.update({
"get_database": get_database,
"get_cache": get_cache,
"get_current_user": get_current_user
})
class ScalableSessionManager:
"""Session management for scalable applications"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.session_ttl = 3600 # 1 hour
async def create_session(self, user_id: str, metadata: Dict[str, Any]) -> str:
"""Create new session (stored in Redis for sharing across instances)"""
session_id = str(uuid.uuid4())
session_data = {
'user_id': user_id,
'created_at': time.time(),
'metadata': metadata
}
await self.redis.setex(
f"session:{session_id}",
self.session_ttl,
json.dumps(session_data)
)
return session_id
async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""Get session data"""
session_data = await self.redis.get(f"session:{session_id}")
if session_data:
return json.loads(session_data)
return None
async def update_session(self, session_id: str, metadata: Dict[str, Any]):
"""Update session metadata"""
session = await self.get_session(session_id)
if session:
session['metadata'].update(metadata)
await self.redis.setex(
f"session:{session_id}",
self.session_ttl,
json.dumps(session)
)
async def delete_session(self, session_id: str):
"""Delete session"""
await self.redis.delete(f"session:{session_id}")
class LoadBalancerHealthCheck:
"""Health check endpoint for load balancer"""
def __init__(self, app: FastAPI):
self.app = app
self.setup_health_endpoints()
def setup_health_endpoints(self):
"""Setup health check endpoints"""
@self.app.get("/health")
async def health_check():
"""Basic health check"""
return {"status": "healthy", "timestamp": time.time()}
@self.app.get("/health/detailed")
async def detailed_health_check():
"""Detailed health check for load balancer"""
health_status = {
"status": "healthy",
"timestamp": time.time(),
"checks": {}
}
# Database connectivity check
try:
await check_database_connectivity()
health_status["checks"]["database"] = "healthy"
except Exception as e:
health_status["checks"]["database"] = f"unhealthy: {e}"
health_status["status"] = "unhealthy"
# Redis connectivity check
try:
await check_redis_connectivity()
health_status["checks"]["redis"] = "healthy"
except Exception as e:
health_status["checks"]["redis"] = f"unhealthy: {e}"
health_status["status"] = "unhealthy"
# Memory usage check
memory_usage = psutil.virtual_memory().percent
if memory_usage > 90:
health_status["checks"]["memory"] = f"high usage: {memory_usage}%"
health_status["status"] = "degraded"
else:
health_status["checks"]["memory"] = f"healthy: {memory_usage}%"
# Return appropriate HTTP status
if health_status["status"] == "unhealthy":
raise HTTPException(status_code=503, detail=health_status)
elif health_status["status"] == "degraded":
raise HTTPException(status_code=200, detail=health_status)
return health_status
@self.app.get("/health/ready")
async def readiness_check():
"""Readiness check for Kubernetes"""
# Check if application is ready to serve requests
ready_checks = [
await check_database_pool_ready(),
await check_redis_pool_ready(),
await check_background_services_ready()
]
if all(ready_checks):
return {"status": "ready"}
else:
raise HTTPException(status_code=503, detail={"status": "not ready"})
@self.app.get("/health/live")
async def liveness_check():
"""Liveness check for Kubernetes"""
# Basic liveness check
return {"status": "alive", "timestamp": time.time()}
Auto-Scaling Configuration
Kubernetes Horizontal Pod Autoscaler
# HPA for API servers
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: sysmanage-api-hpa
namespace: sysmanage
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: sysmanage-api
minReplicas: 3
maxReplicas: 50
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "100"
- type: Pods
pods:
metric:
name: response_time_p95
target:
type: AverageValue
averageValue: "500m" # 500ms
behavior:
scaleDown:
stabilizationWindowSeconds: 300 # 5 minutes
policies:
- type: Percent
value: 50
periodSeconds: 60
- type: Pods
value: 2
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60 # 1 minute
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Pods
value: 5
periodSeconds: 60
---
# HPA for worker processes
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: sysmanage-workers-hpa
namespace: sysmanage
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: sysmanage-workers
minReplicas: 2
maxReplicas: 20
metrics:
- type: Pods
pods:
metric:
name: queue_length
target:
type: AverageValue
averageValue: "10"
- type: Pods
pods:
metric:
name: processing_time_avg
target:
type: AverageValue
averageValue: "30" # 30 seconds
behavior:
scaleDown:
stabilizationWindowSeconds: 600 # 10 minutes (longer for workers)
policies:
- type: Percent
value: 25
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 30
policies:
- type: Percent
value: 200
periodSeconds: 30
---
# Vertical Pod Autoscaler for database-intensive workloads
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: sysmanage-analytics-vpa
namespace: sysmanage
spec:
targetRef:
apiVersion: apps/v1
kind: Deployment
name: sysmanage-analytics
updatePolicy:
updateMode: "Auto"
resourcePolicy:
containerPolicies:
- containerName: analytics
minAllowed:
cpu: 500m
memory: 1Gi
maxAllowed:
cpu: 4
memory: 16Gi
controlledResources: ["cpu", "memory"]
Application-Level Auto-Scaling
Custom Auto-Scaling Logic
import asyncio
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
from enum import Enum
class ScalingAction(Enum):
SCALE_UP = "scale_up"
SCALE_DOWN = "scale_down"
NO_ACTION = "no_action"
@dataclass
class ScalingMetrics:
cpu_utilization: float
memory_utilization: float
request_rate: float
response_time_p95: float
queue_length: int
error_rate: float
timestamp: float
@dataclass
class ScalingDecision:
action: ScalingAction
target_replicas: int
current_replicas: int
reason: str
confidence: float
metrics: ScalingMetrics
class IntelligentAutoScaler:
"""Intelligent auto-scaler with predictive capabilities"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.metrics_history: List[ScalingMetrics] = []
self.scaling_history: List[ScalingDecision] = []
self.min_replicas = config.get('min_replicas', 2)
self.max_replicas = config.get('max_replicas', 50)
self.scale_up_threshold = config.get('scale_up_threshold', 0.7)
self.scale_down_threshold = config.get('scale_down_threshold', 0.3)
self.prediction_window = config.get('prediction_window', 300) # 5 minutes
async def evaluate_scaling_decision(self, current_metrics: ScalingMetrics,
current_replicas: int) -> ScalingDecision:
"""Evaluate whether to scale up, down, or maintain current replicas"""
# Add current metrics to history
self.metrics_history.append(current_metrics)
self._cleanup_old_metrics()
# Calculate scaling factors
scaling_factors = self._calculate_scaling_factors(current_metrics)
# Get predicted load
predicted_load = self._predict_future_load()
# Make scaling decision
decision = self._make_scaling_decision(
current_metrics, scaling_factors, predicted_load, current_replicas
)
# Record decision
self.scaling_history.append(decision)
return decision
def _calculate_scaling_factors(self, metrics: ScalingMetrics) -> Dict[str, float]:
"""Calculate scaling factors for different metrics"""
factors = {}
# CPU-based scaling factor
if metrics.cpu_utilization > self.scale_up_threshold:
factors['cpu'] = min(2.0, metrics.cpu_utilization / self.scale_up_threshold)
elif metrics.cpu_utilization < self.scale_down_threshold:
factors['cpu'] = max(0.5, metrics.cpu_utilization / self.scale_down_threshold)
else:
factors['cpu'] = 1.0
# Memory-based scaling factor
if metrics.memory_utilization > self.scale_up_threshold:
factors['memory'] = min(2.0, metrics.memory_utilization / self.scale_up_threshold)
elif metrics.memory_utilization < self.scale_down_threshold:
factors['memory'] = max(0.5, metrics.memory_utilization / self.scale_down_threshold)
else:
factors['memory'] = 1.0
# Response time-based scaling factor
target_response_time = 500 # 500ms target
if metrics.response_time_p95 > target_response_time:
factors['response_time'] = min(3.0, metrics.response_time_p95 / target_response_time)
else:
factors['response_time'] = 1.0
# Queue length-based scaling factor
target_queue_length = 10
if metrics.queue_length > target_queue_length:
factors['queue'] = min(2.0, metrics.queue_length / target_queue_length)
else:
factors['queue'] = max(0.8, metrics.queue_length / target_queue_length)
# Error rate consideration
if metrics.error_rate > 0.05: # 5% error rate
factors['error_rate'] = min(2.0, 1.0 + metrics.error_rate)
else:
factors['error_rate'] = 1.0
return factors
def _predict_future_load(self) -> float:
"""Predict future load based on historical patterns"""
if len(self.metrics_history) < 10:
return 1.0 # Not enough data for prediction
# Simple trend analysis
recent_metrics = self.metrics_history[-10:]
# Calculate trend for request rate
request_rates = [m.request_rate for m in recent_metrics]
if len(request_rates) >= 2:
# Linear regression for trend
n = len(request_rates)
sum_x = sum(range(n))
sum_y = sum(request_rates)
sum_xy = sum(i * rate for i, rate in enumerate(request_rates))
sum_x2 = sum(i * i for i in range(n))
# Calculate slope (trend)
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x)
# Predict load increase/decrease
predicted_change = slope * 5 # 5 time periods ahead
current_avg = sum(request_rates) / len(request_rates)
if current_avg > 0:
predicted_load_factor = (current_avg + predicted_change) / current_avg
return max(0.5, min(2.0, predicted_load_factor))
return 1.0
def _make_scaling_decision(self, metrics: ScalingMetrics,
scaling_factors: Dict[str, float],
predicted_load: float,
current_replicas: int) -> ScalingDecision:
"""Make final scaling decision"""
# Calculate weighted scaling factor
weights = {
'cpu': 0.3,
'memory': 0.25,
'response_time': 0.2,
'queue': 0.15,
'error_rate': 0.1
}
weighted_factor = sum(
scaling_factors.get(metric, 1.0) * weight
for metric, weight in weights.items()
)
# Apply prediction factor
final_factor = weighted_factor * predicted_load
# Calculate target replicas
target_replicas = int(current_replicas * final_factor)
# Apply min/max constraints
target_replicas = max(self.min_replicas, min(self.max_replicas, target_replicas))
# Determine action and confidence
if target_replicas > current_replicas:
action = ScalingAction.SCALE_UP
confidence = min(1.0, (final_factor - 1.0) * 2)
reason = f"Scale up due to high load (factor: {final_factor:.2f})"
elif target_replicas < current_replicas:
action = ScalingAction.SCALE_DOWN
confidence = min(1.0, (1.0 - final_factor) * 2)
reason = f"Scale down due to low load (factor: {final_factor:.2f})"
else:
action = ScalingAction.NO_ACTION
confidence = 1.0 - abs(final_factor - 1.0)
reason = "Load is within acceptable range"
# Apply dampening for frequent scaling
if self._has_recent_scaling():
confidence *= 0.5
if confidence < 0.7:
action = ScalingAction.NO_ACTION
reason = "Dampened due to recent scaling activity"
return ScalingDecision(
action=action,
target_replicas=target_replicas,
current_replicas=current_replicas,
reason=reason,
confidence=confidence,
metrics=metrics
)
def _has_recent_scaling(self) -> bool:
"""Check if there was recent scaling activity"""
recent_decisions = [
d for d in self.scaling_history
if d.metrics.timestamp > time.time() - 300 # Last 5 minutes
]
return any(
d.action != ScalingAction.NO_ACTION
for d in recent_decisions
)
def _cleanup_old_metrics(self):
"""Remove old metrics to prevent memory bloat"""
cutoff_time = time.time() - 3600 # Keep 1 hour of history
self.metrics_history = [
m for m in self.metrics_history
if m.timestamp > cutoff_time
]
def get_scaling_statistics(self) -> Dict[str, Any]:
"""Get scaling statistics and insights"""
if not self.scaling_history:
return {"message": "No scaling history available"}
recent_decisions = [
d for d in self.scaling_history
if d.metrics.timestamp > time.time() - 86400 # Last 24 hours
]
scale_up_count = sum(1 for d in recent_decisions if d.action == ScalingAction.SCALE_UP)
scale_down_count = sum(1 for d in recent_decisions if d.action == ScalingAction.SCALE_DOWN)
no_action_count = sum(1 for d in recent_decisions if d.action == ScalingAction.NO_ACTION)
avg_confidence = sum(d.confidence for d in recent_decisions) / len(recent_decisions)
return {
"total_decisions_24h": len(recent_decisions),
"scale_up_count": scale_up_count,
"scale_down_count": scale_down_count,
"no_action_count": no_action_count,
"average_confidence": avg_confidence,
"scaling_frequency": len(recent_decisions) / 24, # Per hour
"recent_trend": self._analyze_scaling_trend(recent_decisions)
}
def _analyze_scaling_trend(self, decisions: List[ScalingDecision]) -> str:
"""Analyze recent scaling trend"""
if len(decisions) < 5:
return "insufficient_data"
recent_actions = [d.action for d in decisions[-5:]]
if recent_actions.count(ScalingAction.SCALE_UP) >= 3:
return "scaling_up"
elif recent_actions.count(ScalingAction.SCALE_DOWN) >= 3:
return "scaling_down"
elif recent_actions.count(ScalingAction.NO_ACTION) >= 4:
return "stable"
else:
return "oscillating"
Database Scaling Strategies
Read Replica Scaling
Dynamic Read Replica Management
import asyncio
import asyncpg
from typing import List, Dict, Any, Optional
import random
import time
class DatabaseConnectionManager:
"""Manage database connections with read replica scaling"""
def __init__(self, config: Dict[str, Any]):
self.write_db_url = config['write_db_url']
self.read_db_urls = config['read_db_urls']
self.connection_pools = {}
self.read_replica_weights = {}
self.health_status = {}
self.connection_config = config.get('connection_config', {})
async def initialize(self):
"""Initialize all connection pools"""
# Primary write database
self.connection_pools['write'] = await asyncpg.create_pool(
self.write_db_url,
min_size=5,
max_size=20,
**self.connection_config
)
# Read replicas
for i, read_url in enumerate(self.read_db_urls):
pool_name = f"read_{i}"
self.connection_pools[pool_name] = await asyncpg.create_pool(
read_url,
min_size=10,
max_size=50,
**self.connection_config
)
self.read_replica_weights[pool_name] = 1.0
self.health_status[pool_name] = True
async def execute_write(self, query: str, *args) -> Any:
"""Execute write query on primary database"""
async with self.connection_pools['write'].acquire() as conn:
return await conn.execute(query, *args)
async def fetch_read(self, query: str, *args) -> List[Dict]:
"""Execute read query on best available read replica"""
replica_pool = await self._select_read_replica()
try:
async with replica_pool.acquire() as conn:
rows = await conn.fetch(query, *args)
return [dict(row) for row in rows]
except Exception as e:
# Fallback to write database if read replica fails
logging.warning(f"Read replica failed, falling back to write DB: {e}")
async with self.connection_pools['write'].acquire() as conn:
rows = await conn.fetch(query, *args)
return [dict(row) for row in rows]
async def _select_read_replica(self) -> asyncpg.Pool:
"""Select best read replica based on weights and health"""
healthy_replicas = [
(name, pool) for name, pool in self.connection_pools.items()
if name.startswith('read_') and self.health_status.get(name, False)
]
if not healthy_replicas:
# No healthy read replicas, use write database
return self.connection_pools['write']
# Weighted random selection
total_weight = sum(
self.read_replica_weights[name] for name, _ in healthy_replicas
)
if total_weight == 0:
return random.choice(healthy_replicas)[1]
random_weight = random.uniform(0, total_weight)
current_weight = 0
for name, pool in healthy_replicas:
current_weight += self.read_replica_weights[name]
if random_weight <= current_weight:
return pool
return healthy_replicas[0][1] # Fallback
async def monitor_replica_health(self):
"""Monitor read replica health and adjust weights"""
while True:
for name, pool in self.connection_pools.items():
if name.startswith('read_'):
health = await self._check_replica_health(pool)
self.health_status[name] = health
# Adjust weights based on performance
if health:
response_time = await self._measure_response_time(pool)
# Lower response time = higher weight
self.read_replica_weights[name] = max(0.1, 2.0 - response_time)
else:
self.read_replica_weights[name] = 0.0
await asyncio.sleep(30) # Check every 30 seconds
async def _check_replica_health(self, pool: asyncpg.Pool) -> bool:
"""Check if read replica is healthy"""
try:
async with pool.acquire() as conn:
await conn.execute("SELECT 1")
return True
except Exception:
return False
async def _measure_response_time(self, pool: asyncpg.Pool) -> float:
"""Measure average response time for replica"""
try:
start_time = time.time()
async with pool.acquire() as conn:
await conn.execute("SELECT 1")
return time.time() - start_time
except Exception:
return float('inf')
async def get_connection_stats(self) -> Dict[str, Any]:
"""Get connection pool statistics"""
stats = {}
for name, pool in self.connection_pools.items():
stats[name] = {
'size': pool.get_size(),
'idle_size': pool.get_idle_size(),
'weight': self.read_replica_weights.get(name, 1.0),
'healthy': self.health_status.get(name, True)
}
return stats
class DatabaseShardingManager:
"""Manage database sharding for horizontal scaling"""
def __init__(self, shard_configs: List[Dict[str, Any]]):
self.shards = {}
self.shard_keys = {}
self.shard_configs = shard_configs
async def initialize(self):
"""Initialize all shards"""
for i, config in enumerate(self.shard_configs):
shard_name = f"shard_{i}"
self.shards[shard_name] = await asyncpg.create_pool(
config['database_url'],
min_size=config.get('min_connections', 5),
max_size=config.get('max_connections', 20)
)
# Define which keys this shard handles
self.shard_keys[shard_name] = config.get('key_range', (i, i + 1))
def _get_shard_for_key(self, key: str) -> str:
"""Determine which shard to use for a given key"""
# Hash-based sharding
hash_value = hash(key) % len(self.shards)
for shard_name, (start, end) in self.shard_keys.items():
if start <= hash_value < end:
return shard_name
# Fallback to first shard
return list(self.shards.keys())[0]
async def execute_sharded_query(self, shard_key: str, query: str, *args) -> Any:
"""Execute query on appropriate shard"""
shard_name = self._get_shard_for_key(shard_key)
shard_pool = self.shards[shard_name]
async with shard_pool.acquire() as conn:
return await conn.execute(query, *args)
async def fetch_sharded_query(self, shard_key: str, query: str, *args) -> List[Dict]:
"""Fetch from appropriate shard"""
shard_name = self._get_shard_for_key(shard_key)
shard_pool = self.shards[shard_name]
async with shard_pool.acquire() as conn:
rows = await conn.fetch(query, *args)
return [dict(row) for row in rows]
async def execute_cross_shard_query(self, query: str, *args) -> List[Dict]:
"""Execute query across all shards and merge results"""
tasks = []
for shard_name, shard_pool in self.shards.items():
task = self._execute_on_shard(shard_pool, query, *args)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Merge results from all shards
merged_results = []
for result in results:
if isinstance(result, Exception):
logging.error(f"Shard query failed: {result}")
continue
merged_results.extend(result)
return merged_results
async def _execute_on_shard(self, shard_pool: asyncpg.Pool,
query: str, *args) -> List[Dict]:
"""Execute query on specific shard"""
try:
async with shard_pool.acquire() as conn:
rows = await conn.fetch(query, *args)
return [dict(row) for row in rows]
except Exception as e:
logging.error(f"Shard query failed: {e}")
return []
async def get_shard_statistics(self) -> Dict[str, Any]:
"""Get statistics for all shards"""
stats = {}
for shard_name, shard_pool in self.shards.items():
stats[shard_name] = {
'size': shard_pool.get_size(),
'idle_size': shard_pool.get_idle_size(),
'key_range': self.shard_keys[shard_name]
}
return stats
# Usage example
async def setup_scalable_database():
"""Setup scalable database configuration"""
# Read replica configuration
db_config = {
'write_db_url': 'postgresql://user:pass@primary-db:5432/sysmanage',
'read_db_urls': [
'postgresql://user:pass@replica1-db:5432/sysmanage',
'postgresql://user:pass@replica2-db:5432/sysmanage',
'postgresql://user:pass@replica3-db:5432/sysmanage'
],
'connection_config': {
'command_timeout': 60,
'max_inactive_connection_lifetime': 300
}
}
db_manager = DatabaseConnectionManager(db_config)
await db_manager.initialize()
# Start health monitoring
asyncio.create_task(db_manager.monitor_replica_health())
# Sharding configuration for large datasets
shard_configs = [
{
'database_url': 'postgresql://user:pass@shard1:5432/sysmanage_shard1',
'key_range': (0, 2),
'min_connections': 10,
'max_connections': 30
},
{
'database_url': 'postgresql://user:pass@shard2:5432/sysmanage_shard2',
'key_range': (2, 4),
'min_connections': 10,
'max_connections': 30
}
]
shard_manager = DatabaseShardingManager(shard_configs)
await shard_manager.initialize()
return db_manager, shard_manager
Database Connection Pooling
PgBouncer Configuration for Scaling
# PgBouncer configuration for database scaling
[databases]
# Primary database for writes
sysmanage_write = host=postgresql-primary port=5432 dbname=sysmanage user=sysmanage_write
# Read replicas for scaling reads
sysmanage_read1 = host=postgresql-replica1 port=5432 dbname=sysmanage user=sysmanage_read
sysmanage_read2 = host=postgresql-replica2 port=5432 dbname=sysmanage user=sysmanage_read
sysmanage_read3 = host=postgresql-replica3 port=5432 dbname=sysmanage user=sysmanage_read
[pgbouncer]
# Connection pooling settings for scaling
listen_addr = 0.0.0.0
listen_port = 5432
auth_type = md5
auth_file = /etc/pgbouncer/userlist.txt
# Pool configuration
pool_mode = transaction
max_client_conn = 1000
default_pool_size = 50
min_pool_size = 10
reserve_pool_size = 10
reserve_pool_timeout = 5
# Server connection limits
max_db_connections = 100
max_user_connections = 100
# Connection lifecycle
server_reset_query = DISCARD ALL
server_check_delay = 30
server_connect_timeout = 15
server_login_retry = 15
# Performance tuning
tcp_keepalive = 1
tcp_keepcnt = 3
tcp_keepidle = 600
tcp_keepintvl = 30
# Logging and monitoring
log_connections = 1
log_disconnections = 1
log_pooler_errors = 1
stats_period = 60
# Administrative settings
admin_users = pgbouncer_admin
stats_users = pgbouncer_stats
Cache Layer Scaling
Redis Cluster Scaling
Redis Cluster Management
import redis
import rediscluster
from typing import Dict, List, Any, Optional
import hashlib
import asyncio
class RedisClusterManager:
"""Manage Redis cluster for horizontal cache scaling"""
def __init__(self, cluster_nodes: List[Dict[str, Any]]):
self.cluster_nodes = cluster_nodes
self.cluster = None
self.node_clients = {}
self.hash_slots = 16384 # Redis cluster hash slots
async def initialize(self):
"""Initialize Redis cluster connection"""
startup_nodes = [
{"host": node["host"], "port": node["port"]}
for node in self.cluster_nodes
]
self.cluster = rediscluster.RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
skip_full_coverage_check=True,
health_check_interval=30,
retry_on_timeout=True,
socket_keepalive=True,
socket_keepalive_options={
1: 1, # TCP_KEEPIDLE
2: 3, # TCP_KEEPINTVL
3: 5 # TCP_KEEPCNT
}
)
# Initialize individual node clients for monitoring
for node in self.cluster_nodes:
node_key = f"{node['host']}:{node['port']}"
self.node_clients[node_key] = redis.Redis(
host=node['host'],
port=node['port'],
decode_responses=True
)
async def set_with_scaling(self, key: str, value: Any, ttl: Optional[int] = None):
"""Set value with cluster scaling considerations"""
try:
serialized_value = self._serialize_value(value)
if ttl:
await self.cluster.setex(key, ttl, serialized_value)
else:
await self.cluster.set(key, serialized_value)
except rediscluster.exceptions.ClusterDownError:
# Handle cluster scaling events
await self._handle_cluster_scaling()
raise
async def get_with_scaling(self, key: str) -> Optional[Any]:
"""Get value with cluster scaling considerations"""
try:
value = await self.cluster.get(key)
if value:
return self._deserialize_value(value)
return None
except rediscluster.exceptions.ClusterDownError:
await self._handle_cluster_scaling()
raise
async def mget_with_scaling(self, keys: List[str]) -> List[Optional[Any]]:
"""Multi-get with cluster scaling optimization"""
# Group keys by cluster slots for efficient retrieval
slot_groups = self._group_keys_by_slot(keys)
results = {}
for slot, slot_keys in slot_groups.items():
try:
slot_results = await self.cluster.mget(slot_keys)
for key, value in zip(slot_keys, slot_results):
results[key] = self._deserialize_value(value) if value else None
except Exception as e:
logging.warning(f"Failed to get keys from slot {slot}: {e}")
# Set None for failed keys
for key in slot_keys:
results[key] = None
# Return results in original key order
return [results.get(key) for key in keys]
def _group_keys_by_slot(self, keys: List[str]) -> Dict[int, List[str]]:
"""Group keys by Redis cluster hash slots"""
slot_groups = {}
for key in keys:
slot = self._calculate_slot(key)
if slot not in slot_groups:
slot_groups[slot] = []
slot_groups[slot].append(key)
return slot_groups
def _calculate_slot(self, key: str) -> int:
"""Calculate Redis cluster slot for key"""
# Redis cluster key hashing
if '{' in key and '}' in key:
# Hash tag support
start = key.find('{')
end = key.find('}', start)
if end > start + 1:
key = key[start + 1:end]
return rediscluster.crc16.crc16(key.encode('utf-8')) % self.hash_slots
async def _handle_cluster_scaling(self):
"""Handle cluster scaling events"""
try:
# Refresh cluster topology
await self.cluster.cluster_nodes()
logging.info("Redis cluster topology refreshed")
except Exception as e:
logging.error(f"Failed to refresh cluster topology: {e}")
async def scale_cluster(self, new_nodes: List[Dict[str, Any]]):
"""Scale Redis cluster by adding new nodes"""
for node in new_nodes:
try:
# Add node to cluster
await self._add_node_to_cluster(node)
# Update node clients
node_key = f"{node['host']}:{node['port']}"
self.node_clients[node_key] = redis.Redis(
host=node['host'],
port=node['port'],
decode_responses=True
)
logging.info(f"Added node {node_key} to Redis cluster")
except Exception as e:
logging.error(f"Failed to add node {node}: {e}")
# Rebalance cluster
await self._rebalance_cluster()
async def _add_node_to_cluster(self, node: Dict[str, Any]):
"""Add new node to existing cluster"""
# This would integrate with Redis cluster management commands
# Implementation depends on your cluster management setup
pass
async def _rebalance_cluster(self):
"""Rebalance cluster slots after scaling"""
# This would implement cluster slot rebalancing
# Implementation depends on your cluster management setup
pass
async def get_cluster_stats(self) -> Dict[str, Any]:
"""Get comprehensive cluster statistics"""
stats = {
'nodes': {},
'total_memory': 0,
'total_keys': 0,
'cluster_health': 'unknown'
}
healthy_nodes = 0
total_nodes = len(self.node_clients)
for node_key, client in self.node_clients.items():
try:
info = await client.info()
node_stats = {
'memory_used': info.get('used_memory', 0),
'keys': info.get('db0', {}).get('keys', 0),
'connected_clients': info.get('connected_clients', 0),
'ops_per_sec': info.get('instantaneous_ops_per_sec', 0),
'status': 'healthy'
}
stats['nodes'][node_key] = node_stats
stats['total_memory'] += node_stats['memory_used']
stats['total_keys'] += node_stats['keys']
healthy_nodes += 1
except Exception as e:
stats['nodes'][node_key] = {
'status': 'unhealthy',
'error': str(e)
}
# Determine cluster health
health_ratio = healthy_nodes / total_nodes
if health_ratio >= 0.9:
stats['cluster_health'] = 'healthy'
elif health_ratio >= 0.7:
stats['cluster_health'] = 'degraded'
else:
stats['cluster_health'] = 'critical'
stats['healthy_nodes'] = healthy_nodes
stats['total_nodes'] = total_nodes
stats['health_ratio'] = health_ratio
return stats
def _serialize_value(self, value: Any) -> str:
"""Serialize value for Redis storage"""
import json
return json.dumps(value, default=str)
def _deserialize_value(self, value: str) -> Any:
"""Deserialize value from Redis storage"""
import json
try:
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return value
class CacheScalingStrategy:
"""Strategy for scaling cache layer based on load"""
def __init__(self, redis_manager: RedisClusterManager):
self.redis_manager = redis_manager
self.scaling_thresholds = {
'memory_utilization': 0.8, # 80%
'ops_per_second': 10000, # 10K ops/sec per node
'response_time': 10 # 10ms average response time
}
async def evaluate_scaling_needs(self) -> Dict[str, Any]:
"""Evaluate if cache layer needs scaling"""
stats = await self.redis_manager.get_cluster_stats()
scaling_recommendation = {
'action': 'none',
'reason': '',
'recommended_nodes': 0,
'current_performance': stats
}
# Check memory utilization
if self._check_memory_pressure(stats):
scaling_recommendation['action'] = 'scale_out'
scaling_recommendation['reason'] = 'High memory utilization'
scaling_recommendation['recommended_nodes'] = self._calculate_memory_scaling(stats)
# Check operations per second
elif self._check_ops_pressure(stats):
scaling_recommendation['action'] = 'scale_out'
scaling_recommendation['reason'] = 'High operations load'
scaling_recommendation['recommended_nodes'] = self._calculate_ops_scaling(stats)
# Check for over-provisioning
elif self._check_underutilization(stats):
scaling_recommendation['action'] = 'scale_in'
scaling_recommendation['reason'] = 'Underutilized resources'
scaling_recommendation['recommended_nodes'] = self._calculate_scale_in(stats)
return scaling_recommendation
def _check_memory_pressure(self, stats: Dict[str, Any]) -> bool:
"""Check if memory utilization is too high"""
for node_key, node_stats in stats['nodes'].items():
if node_stats.get('status') == 'healthy':
# Assume 1GB max memory per node (configurable)
max_memory = 1024 * 1024 * 1024
memory_ratio = node_stats.get('memory_used', 0) / max_memory
if memory_ratio > self.scaling_thresholds['memory_utilization']:
return True
return False
def _check_ops_pressure(self, stats: Dict[str, Any]) -> bool:
"""Check if operations load is too high"""
for node_key, node_stats in stats['nodes'].items():
if node_stats.get('status') == 'healthy':
ops_per_sec = node_stats.get('ops_per_sec', 0)
if ops_per_sec > self.scaling_thresholds['ops_per_second']:
return True
return False
def _check_underutilization(self, stats: Dict[str, Any]) -> bool:
"""Check if resources are underutilized"""
healthy_nodes = [
node_stats for node_stats in stats['nodes'].values()
if node_stats.get('status') == 'healthy'
]
if len(healthy_nodes) <= 3: # Minimum cluster size
return False
# Check if all nodes are significantly underutilized
underutilized_count = 0
for node_stats in healthy_nodes:
max_memory = 1024 * 1024 * 1024
memory_ratio = node_stats.get('memory_used', 0) / max_memory
ops_per_sec = node_stats.get('ops_per_sec', 0)
if (memory_ratio < 0.3 and # < 30% memory
ops_per_sec < 1000): # < 1K ops/sec
underutilized_count += 1
return underutilized_count >= len(healthy_nodes) * 0.7 # 70% underutilized
def _calculate_memory_scaling(self, stats: Dict[str, Any]) -> int:
"""Calculate number of nodes needed for memory scaling"""
total_memory = stats.get('total_memory', 0)
healthy_nodes = stats.get('healthy_nodes', 1)
# Target 60% memory utilization after scaling
target_utilization = 0.6
max_memory_per_node = 1024 * 1024 * 1024 # 1GB
required_nodes = int(total_memory / (max_memory_per_node * target_utilization))
additional_nodes = max(1, required_nodes - healthy_nodes)
return additional_nodes
def _calculate_ops_scaling(self, stats: Dict[str, Any]) -> int:
"""Calculate number of nodes needed for operations scaling"""
# Find max ops per second across nodes
max_ops = max(
node_stats.get('ops_per_sec', 0)
for node_stats in stats['nodes'].values()
if node_stats.get('status') == 'healthy'
)
if max_ops > self.scaling_thresholds['ops_per_second']:
# Add nodes to distribute load
scale_factor = max_ops / self.scaling_thresholds['ops_per_second']
return int(scale_factor)
return 1
def _calculate_scale_in(self, stats: Dict[str, Any]) -> int:
"""Calculate how many nodes can be removed"""
healthy_nodes = stats.get('healthy_nodes', 0)
min_nodes = 3 # Minimum for Redis cluster
if healthy_nodes <= min_nodes:
return 0
# Conservative scale-in: remove at most 1 node at a time
return min(1, healthy_nodes - min_nodes)
Capacity Planning and Monitoring
Predictive Capacity Planning
Capacity Planning System
import numpy as np
from typing import Dict, List, Any, Tuple
import time
from dataclasses import dataclass
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import PolynomialFeatures
import logging
@dataclass
class CapacityMetrics:
timestamp: float
cpu_utilization: float
memory_utilization: float
disk_utilization: float
network_throughput: float
request_rate: float
response_time: float
active_connections: int
queue_length: int
@dataclass
class CapacityForecast:
metric_name: str
current_value: float
predicted_values: List[Tuple[float, float]] # (timestamp, value)
capacity_limit: float
time_to_limit: Optional[float] # seconds until limit reached
confidence: float
trend: str # increasing, decreasing, stable
class CapacityPlanner:
"""Predictive capacity planning system"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.metrics_history: List[CapacityMetrics] = []
self.capacity_limits = config.get('capacity_limits', {
'cpu_utilization': 0.8, # 80%
'memory_utilization': 0.85, # 85%
'disk_utilization': 0.9, # 90%
'response_time': 2.0, # 2 seconds
'queue_length': 1000 # 1000 items
})
self.prediction_horizon = config.get('prediction_horizon', 86400 * 7) # 7 days
def add_metrics(self, metrics: CapacityMetrics):
"""Add new metrics for capacity planning"""
self.metrics_history.append(metrics)
self._cleanup_old_metrics()
def _cleanup_old_metrics(self):
"""Remove old metrics to prevent memory bloat"""
cutoff_time = time.time() - (86400 * 30) # Keep 30 days
self.metrics_history = [
m for m in self.metrics_history
if m.timestamp > cutoff_time
]
def generate_capacity_forecast(self) -> List[CapacityForecast]:
"""Generate capacity forecasts for all metrics"""
if len(self.metrics_history) < 10:
logging.warning("Insufficient data for capacity forecasting")
return []
forecasts = []
# Forecast each metric
metrics_to_forecast = [
'cpu_utilization', 'memory_utilization', 'disk_utilization',
'response_time', 'queue_length'
]
for metric_name in metrics_to_forecast:
forecast = self._forecast_metric(metric_name)
if forecast:
forecasts.append(forecast)
return forecasts
def _forecast_metric(self, metric_name: str) -> Optional[CapacityForecast]:
"""Forecast specific metric"""
# Extract time series data
timestamps = [m.timestamp for m in self.metrics_history]
values = [getattr(m, metric_name) for m in self.metrics_history]
if not values or all(v == values[0] for v in values):
return None # No variation in data
# Prepare data for forecasting
X = np.array(timestamps).reshape(-1, 1)
y = np.array(values)
# Normalize timestamps
base_time = timestamps[0]
X_norm = (X - base_time) / 3600 # Convert to hours
try:
# Try polynomial regression for better trend capture
poly_features = PolynomialFeatures(degree=2)
X_poly = poly_features.fit_transform(X_norm)
model = LinearRegression()
model.fit(X_poly, y)
# Generate predictions
future_timestamps = []
current_time = time.time()
for hours_ahead in range(1, int(self.prediction_horizon / 3600) + 1):
future_time = current_time + (hours_ahead * 3600)
future_timestamps.append(future_time)
# Predict future values
future_X = np.array(future_timestamps).reshape(-1, 1)
future_X_norm = (future_X - base_time) / 3600
future_X_poly = poly_features.transform(future_X_norm)
predicted_values = model.predict(future_X_poly)
# Calculate confidence
train_score = model.score(X_poly, y)
confidence = max(0.1, min(0.95, train_score))
# Determine trend
trend = self._analyze_trend(values[-10:]) # Last 10 values
# Calculate time to capacity limit
capacity_limit = self.capacity_limits.get(metric_name)
time_to_limit = None
if capacity_limit:
time_to_limit = self._calculate_time_to_limit(
future_timestamps, predicted_values, capacity_limit
)
# Create forecast
predictions = list(zip(future_timestamps, predicted_values))
return CapacityForecast(
metric_name=metric_name,
current_value=values[-1],
predicted_values=predictions[:168], # Next week (hourly)
capacity_limit=capacity_limit,
time_to_limit=time_to_limit,
confidence=confidence,
trend=trend
)
except Exception as e:
logging.error(f"Failed to forecast {metric_name}: {e}")
return None
def _analyze_trend(self, values: List[float]) -> str:
"""Analyze trend in recent values"""
if len(values) < 3:
return "stable"
# Linear regression on recent values
X = np.arange(len(values)).reshape(-1, 1)
y = np.array(values)
model = LinearRegression()
model.fit(X, y)
slope = model.coef_[0]
if slope > 0.01: # Increasing trend
return "increasing"
elif slope < -0.01: # Decreasing trend
return "decreasing"
else:
return "stable"
def _calculate_time_to_limit(self, timestamps: List[float],
predictions: List[float],
limit: float) -> Optional[float]:
"""Calculate time until capacity limit is reached"""
current_time = time.time()
for timestamp, prediction in zip(timestamps, predictions):
if prediction >= limit:
return timestamp - current_time
return None # Limit not reached in prediction horizon
def generate_scaling_recommendations(self) -> List[Dict[str, Any]]:
"""Generate scaling recommendations based on capacity forecasts"""
forecasts = self.generate_capacity_forecast()
recommendations = []
for forecast in forecasts:
if forecast.time_to_limit and forecast.time_to_limit < 86400 * 3: # 3 days
urgency = "high" if forecast.time_to_limit < 86400 else "medium"
recommendation = {
'metric': forecast.metric_name,
'urgency': urgency,
'time_to_limit': forecast.time_to_limit,
'current_value': forecast.current_value,
'capacity_limit': forecast.capacity_limit,
'recommended_action': self._get_scaling_action(forecast),
'confidence': forecast.confidence
}
recommendations.append(recommendation)
return recommendations
def _get_scaling_action(self, forecast: CapacityForecast) -> str:
"""Get recommended scaling action for forecast"""
metric_actions = {
'cpu_utilization': 'Scale out application servers',
'memory_utilization': 'Scale up instance memory or scale out',
'disk_utilization': 'Add storage capacity or implement data archiving',
'response_time': 'Scale out application servers or optimize queries',
'queue_length': 'Scale out worker processes'
}
return metric_actions.get(forecast.metric_name, 'Review capacity requirements')
def get_capacity_report(self) -> Dict[str, Any]:
"""Generate comprehensive capacity report"""
forecasts = self.generate_capacity_forecast()
recommendations = self.generate_scaling_recommendations()
# Calculate overall capacity health
near_limit_count = sum(
1 for f in forecasts
if f.time_to_limit and f.time_to_limit < 86400 * 7 # Within a week
)
capacity_health = "healthy"
if near_limit_count > 0:
if any(r['urgency'] == 'high' for r in recommendations):
capacity_health = "critical"
else:
capacity_health = "warning"
return {
'capacity_health': capacity_health,
'forecasts': [
{
'metric': f.metric_name,
'current_value': f.current_value,
'trend': f.trend,
'time_to_limit': f.time_to_limit,
'confidence': f.confidence
}
for f in forecasts
],
'recommendations': recommendations,
'summary': {
'total_metrics_monitored': len(forecasts),
'metrics_near_limit': near_limit_count,
'high_urgency_items': sum(1 for r in recommendations if r['urgency'] == 'high'),
'prediction_horizon_days': self.prediction_horizon / 86400
},
'generated_at': time.time()
}
Scaling Best Practices
Design Principles
- Stateless Architecture: Design applications to be stateless for easy horizontal scaling
- Database Separation: Separate read and write workloads for independent scaling
- Caching Strategy: Implement multi-layer caching to reduce database load
- Microservices: Break down monoliths for independent component scaling
Implementation Guidelines
- Gradual Scaling: Scale gradually to avoid overwhelming dependencies
- Health Checks: Implement comprehensive health checks for load balancers
- Circuit Breakers: Use circuit breakers to prevent cascade failures
- Resource Limits: Set appropriate resource limits and requests
Operational Guidelines
- Monitor Everything: Monitor all layers of the application stack
- Automate Scaling: Automate scaling decisions based on metrics
- Test Scaling: Regularly test scaling scenarios and limits
- Capacity Planning: Plan capacity needs based on growth projections
Cost Optimization
- Right-Sizing: Regularly review and adjust resource allocation
- Spot Instances: Use spot instances for non-critical workloads
- Reserved Capacity: Use reserved instances for predictable workloads
- Auto-Shutdown: Automatically shut down unused resources