Retry Logic
Exponential backoff, circuit breakers, failure handling patterns, and resilience strategies in SysManage distributed systems.
Retry Logic Overview
SysManage implements sophisticated retry and resilience patterns to handle transient failures, network issues, and service unavailability while preventing cascade failures and ensuring system stability.
Core Principles
- Graceful Degradation: System continues operating despite component failures
- Intelligent Backoff: Progressive delays between retry attempts
- Circuit Breaking: Automatic failure detection and prevention
- Jitter Implementation: Randomization to prevent thundering herd
Retry Pattern Implementation
Resilience Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Resilience Layer │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Circuit │ │ Retry │ │ Timeout │ │
│ │ Breaker │ │ Manager │ │ Manager │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Bulkhead │ │ Rate │ │ Health │ │
│ │ Isolation │ │ Limiter │ │ Monitor │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Fallback │ │ Cache │ │ Dead Letter │ │
│ │ Handler │ │ Manager │ │ Queue │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ ▲
Request │ │ Response/Error
▼ │
┌─────────────────────────────────────────────────────────────────┐
│ Application Services │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Agent │ │ Database │ │ External APIs │ │
│ │ Service │ │ Service │ │ Service │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Exponential Backoff Implementation
Advanced Retry Manager
import asyncio
import random
import time
import logging
from typing import Callable, Any, Optional, Dict, List, Type, Union
from dataclasses import dataclass
from enum import Enum
import math
class FailureType(Enum):
NETWORK_ERROR = "network_error"
TIMEOUT = "timeout"
SERVICE_UNAVAILABLE = "service_unavailable"
AUTHENTICATION_ERROR = "authentication_error"
RATE_LIMITED = "rate_limited"
UNKNOWN = "unknown"
@dataclass
class RetryPolicy:
max_attempts: int = 3
base_delay: float = 1.0 # seconds
max_delay: float = 60.0 # seconds
exponential_base: float = 2.0
jitter: bool = True
jitter_range: float = 0.1 # 10% jitter
backoff_strategy: str = "exponential" # exponential, linear, fixed
retryable_exceptions: List[Type[Exception]] = None
retryable_status_codes: List[int] = None
def __post_init__(self):
if self.retryable_exceptions is None:
self.retryable_exceptions = [
ConnectionError, TimeoutError, OSError
]
if self.retryable_status_codes is None:
self.retryable_status_codes = [502, 503, 504, 429]
class RetryManager:
"""Advanced retry manager with multiple backoff strategies"""
def __init__(self, policy: RetryPolicy):
self.policy = policy
self.attempt_history: Dict[str, List[float]] = {}
async def retry_with_backoff(self,
func: Callable,
*args,
operation_id: Optional[str] = None,
**kwargs) -> Any:
"""Execute function with retry logic and backoff"""
if operation_id is None:
operation_id = f"{func.__name__}_{id(func)}"
last_exception = None
attempt_times = []
for attempt in range(1, self.policy.max_attempts + 1):
try:
start_time = time.time()
result = await self._execute_function(func, *args, **kwargs)
# Record successful attempt
execution_time = time.time() - start_time
attempt_times.append(execution_time)
self.attempt_history[operation_id] = attempt_times
if attempt > 1:
logging.info(f"Operation {operation_id} succeeded on attempt {attempt}")
return result
except Exception as e:
last_exception = e
execution_time = time.time() - start_time
attempt_times.append(execution_time)
# Check if exception is retryable
if not self._is_retryable_exception(e):
logging.error(f"Non-retryable exception in {operation_id}: {e}")
raise e
# Log attempt failure
logging.warning(f"Attempt {attempt}/{self.policy.max_attempts} failed for {operation_id}: {e}")
# Don't sleep after last attempt
if attempt < self.policy.max_attempts:
delay = self._calculate_delay(attempt)
logging.info(f"Retrying {operation_id} in {delay:.2f} seconds")
await asyncio.sleep(delay)
# All attempts failed
self.attempt_history[operation_id] = attempt_times
logging.error(f"All {self.policy.max_attempts} attempts failed for {operation_id}")
raise last_exception
async def _execute_function(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function (sync or async)"""
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
def _is_retryable_exception(self, exception: Exception) -> bool:
"""Determine if exception is retryable"""
# Check exception type
for exc_type in self.policy.retryable_exceptions:
if isinstance(exception, exc_type):
return True
# Check HTTP status codes if available
if hasattr(exception, 'status_code'):
if exception.status_code in self.policy.retryable_status_codes:
return True
# Check specific error patterns
error_message = str(exception).lower()
retryable_patterns = [
'connection refused', 'connection timeout', 'connection reset',
'temporary failure', 'service unavailable', 'rate limit'
]
for pattern in retryable_patterns:
if pattern in error_message:
return True
return False
def _calculate_delay(self, attempt: int) -> float:
"""Calculate delay for retry attempt"""
if self.policy.backoff_strategy == "exponential":
delay = self.policy.base_delay * (self.policy.exponential_base ** (attempt - 1))
elif self.policy.backoff_strategy == "linear":
delay = self.policy.base_delay * attempt
elif self.policy.backoff_strategy == "fixed":
delay = self.policy.base_delay
else:
raise ValueError(f"Unknown backoff strategy: {self.policy.backoff_strategy}")
# Apply maximum delay limit
delay = min(delay, self.policy.max_delay)
# Add jitter to prevent thundering herd
if self.policy.jitter:
jitter_amount = delay * self.policy.jitter_range
jitter = random.uniform(-jitter_amount, jitter_amount)
delay += jitter
return max(0, delay)
def get_retry_statistics(self, operation_id: str) -> Dict[str, Any]:
"""Get retry statistics for operation"""
if operation_id not in self.attempt_history:
return {}
attempts = self.attempt_history[operation_id]
return {
'total_attempts': len(attempts),
'total_time': sum(attempts),
'average_attempt_time': sum(attempts) / len(attempts),
'max_attempt_time': max(attempts),
'min_attempt_time': min(attempts),
'success_rate': 1.0 # If we have stats, operation eventually succeeded
}
class RetryDecorator:
"""Decorator for automatic retry functionality"""
def __init__(self, policy: RetryPolicy):
self.retry_manager = RetryManager(policy)
def __call__(self, func):
async def wrapper(*args, **kwargs):
operation_id = f"{func.__module__}.{func.__name__}"
return await self.retry_manager.retry_with_backoff(
func, *args, operation_id=operation_id, **kwargs
)
return wrapper
# Predefined retry policies
NETWORK_RETRY_POLICY = RetryPolicy(
max_attempts=5,
base_delay=1.0,
max_delay=30.0,
exponential_base=2.0,
jitter=True,
retryable_exceptions=[ConnectionError, TimeoutError, OSError]
)
DATABASE_RETRY_POLICY = RetryPolicy(
max_attempts=3,
base_delay=0.5,
max_delay=10.0,
exponential_base=2.0,
jitter=True,
retryable_exceptions=[ConnectionError, TimeoutError]
)
API_RETRY_POLICY = RetryPolicy(
max_attempts=4,
base_delay=2.0,
max_delay=60.0,
exponential_base=1.5,
jitter=True,
retryable_status_codes=[429, 502, 503, 504]
)
# Usage examples
@RetryDecorator(NETWORK_RETRY_POLICY)
async def send_agent_command(agent_id: str, command: Dict) -> Dict:
"""Send command to agent with retry logic"""
async with aiohttp.ClientSession() as session:
async with session.post(
f"https://agent-{agent_id}.example.com/command",
json=command,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status >= 400:
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status
)
return await response.json()
@RetryDecorator(DATABASE_RETRY_POLICY)
async def update_agent_status(db: Database, agent_id: str, status: str):
"""Update agent status with retry logic"""
await db.execute(
"UPDATE agents SET status = $1, last_updated = NOW() WHERE id = $2",
status, agent_id
)
Advanced Backoff Strategies
Adaptive Backoff Implementation
class AdaptiveBackoffStrategy:
"""Adaptive backoff that adjusts based on success/failure patterns"""
def __init__(self, initial_policy: RetryPolicy):
self.base_policy = initial_policy
self.success_history: Dict[str, List[bool]] = {}
self.adaptation_window = 100 # Track last 100 operations
def calculate_adaptive_delay(self, operation_id: str, attempt: int) -> float:
"""Calculate delay with adaptation based on historical success rates"""
base_delay = self._calculate_base_delay(attempt)
# Get success rate for this operation
success_rate = self._get_success_rate(operation_id)
# Adjust delay based on success rate
if success_rate > 0.9:
# High success rate, reduce delays
adaptive_factor = 0.7
elif success_rate > 0.7:
# Moderate success rate, normal delays
adaptive_factor = 1.0
elif success_rate > 0.5:
# Low success rate, increase delays
adaptive_factor = 1.5
else:
# Very low success rate, significantly increase delays
adaptive_factor = 2.0
adapted_delay = base_delay * adaptive_factor
# Apply jitter
if self.base_policy.jitter:
jitter = adapted_delay * 0.1 * (random.random() - 0.5)
adapted_delay += jitter
return min(adapted_delay, self.base_policy.max_delay)
def record_operation_result(self, operation_id: str, success: bool):
"""Record operation result for adaptive learning"""
if operation_id not in self.success_history:
self.success_history[operation_id] = []
history = self.success_history[operation_id]
history.append(success)
# Keep only recent history
if len(history) > self.adaptation_window:
history.pop(0)
def _get_success_rate(self, operation_id: str) -> float:
"""Get success rate for operation"""
if operation_id not in self.success_history:
return 0.8 # Default assumed success rate
history = self.success_history[operation_id]
if not history:
return 0.8
return sum(history) / len(history)
def _calculate_base_delay(self, attempt: int) -> float:
"""Calculate base delay using configured strategy"""
if self.base_policy.backoff_strategy == "exponential":
return self.base_policy.base_delay * (self.base_policy.exponential_base ** (attempt - 1))
elif self.base_policy.backoff_strategy == "linear":
return self.base_policy.base_delay * attempt
else:
return self.base_policy.base_delay
class IntelligentRetryManager:
"""Intelligent retry manager with multiple strategies and learning"""
def __init__(self):
self.strategies = {
'exponential': RetryManager(NETWORK_RETRY_POLICY),
'linear': RetryManager(RetryPolicy(backoff_strategy="linear")),
'adaptive': AdaptiveBackoffStrategy(NETWORK_RETRY_POLICY)
}
self.operation_strategies: Dict[str, str] = {}
self.performance_metrics: Dict[str, Dict] = {}
async def smart_retry(self, func: Callable, *args,
operation_type: str = "default",
**kwargs) -> Any:
"""Smart retry with automatic strategy selection"""
# Select best strategy for operation type
strategy_name = self._select_strategy(operation_type)
strategy = self.strategies[strategy_name]
operation_id = f"{operation_type}_{func.__name__}"
start_time = time.time()
try:
if strategy_name == 'adaptive':
result = await self._adaptive_retry(strategy, func, operation_id, *args, **kwargs)
success = True
else:
result = await strategy.retry_with_backoff(func, *args, operation_id=operation_id, **kwargs)
success = True
except Exception as e:
success = False
raise e
finally:
# Record performance metrics
execution_time = time.time() - start_time
self._record_performance(operation_type, strategy_name, execution_time, success)
# Update adaptive strategy
if strategy_name == 'adaptive':
strategy.record_operation_result(operation_id, success)
return result
async def _adaptive_retry(self, adaptive_strategy: AdaptiveBackoffStrategy,
func: Callable, operation_id: str,
*args, **kwargs) -> Any:
"""Execute retry with adaptive strategy"""
last_exception = None
for attempt in range(1, adaptive_strategy.base_policy.max_attempts + 1):
try:
result = await self._execute_function(func, *args, **kwargs)
return result
except Exception as e:
last_exception = e
if not self._is_retryable_exception(e, adaptive_strategy.base_policy):
raise e
if attempt < adaptive_strategy.base_policy.max_attempts:
delay = adaptive_strategy.calculate_adaptive_delay(operation_id, attempt)
await asyncio.sleep(delay)
raise last_exception
def _select_strategy(self, operation_type: str) -> str:
"""Select best retry strategy for operation type"""
# Use historical performance to select strategy
if operation_type in self.performance_metrics:
metrics = self.performance_metrics[operation_type]
best_strategy = min(metrics.keys(),
key=lambda s: metrics[s].get('avg_time', float('inf')))
return best_strategy
# Default strategy mapping
default_mapping = {
'network': 'exponential',
'database': 'linear',
'api': 'adaptive',
'default': 'exponential'
}
return default_mapping.get(operation_type, 'exponential')
def _record_performance(self, operation_type: str, strategy: str,
execution_time: float, success: bool):
"""Record performance metrics for strategy selection"""
if operation_type not in self.performance_metrics:
self.performance_metrics[operation_type] = {}
if strategy not in self.performance_metrics[operation_type]:
self.performance_metrics[operation_type][strategy] = {
'total_time': 0,
'total_operations': 0,
'successful_operations': 0,
'avg_time': 0,
'success_rate': 0
}
metrics = self.performance_metrics[operation_type][strategy]
metrics['total_time'] += execution_time
metrics['total_operations'] += 1
if success:
metrics['successful_operations'] += 1
metrics['avg_time'] = metrics['total_time'] / metrics['total_operations']
metrics['success_rate'] = metrics['successful_operations'] / metrics['total_operations']
Circuit Breaker Pattern
Circuit Breaker Implementation
Advanced Circuit Breaker
from enum import Enum
from typing import Callable, Any, Optional, Dict
import time
import asyncio
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing, rejecting requests
HALF_OPEN = "half_open" # Testing if service recovered
@dataclass
class CircuitBreakerConfig:
failure_threshold: int = 5 # Failures before opening circuit
timeout: float = 60.0 # Seconds to wait before trying again
success_threshold: int = 3 # Successes needed to close circuit
monitoring_window: float = 300.0 # Window for failure rate calculation
expected_exception: Type[Exception] = Exception
class CircuitBreaker:
"""Advanced circuit breaker with monitoring and statistics"""
def __init__(self, name: str, config: CircuitBreakerConfig):
self.name = name
self.config = config
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
self.last_attempt_time = 0
self.call_history: List[Tuple[float, bool]] = [] # (timestamp, success)
async def call(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function through circuit breaker"""
current_time = time.time()
# Check circuit state and decide whether to proceed
if not self._should_attempt_call(current_time):
raise CircuitBreakerOpenException(
f"Circuit breaker '{self.name}' is OPEN. "
f"Last failure: {current_time - self.last_failure_time:.1f}s ago"
)
try:
# Execute the function
result = await self._execute_function(func, *args, **kwargs)
# Record success
self._record_success(current_time)
return result
except self.config.expected_exception as e:
# Record failure
self._record_failure(current_time)
raise e
def _should_attempt_call(self, current_time: float) -> bool:
"""Determine if call should be attempted based on circuit state"""
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
# Check if timeout period has elapsed
if current_time - self.last_failure_time >= self.config.timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
logging.info(f"Circuit breaker '{self.name}' moving to HALF_OPEN")
return True
return False
elif self.state == CircuitState.HALF_OPEN:
return True
return False
def _record_success(self, timestamp: float):
"""Record successful call"""
self.call_history.append((timestamp, True))
self._cleanup_old_history(timestamp)
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
logging.info(f"Circuit breaker '{self.name}' CLOSED after {self.success_count} successes")
elif self.state == CircuitState.CLOSED:
# Reset failure count on success
self.failure_count = 0
def _record_failure(self, timestamp: float):
"""Record failed call"""
self.call_history.append((timestamp, False))
self._cleanup_old_history(timestamp)
self.last_failure_time = timestamp
self.failure_count += 1
if self.state == CircuitState.HALF_OPEN:
# Failure during half-open, go back to open
self.state = CircuitState.OPEN
logging.warning(f"Circuit breaker '{self.name}' back to OPEN after failure in HALF_OPEN")
elif self.state == CircuitState.CLOSED:
# Check if we should open the circuit
failure_rate = self._calculate_failure_rate(timestamp)
if (self.failure_count >= self.config.failure_threshold or
failure_rate > 0.5): # 50% failure rate threshold
self.state = CircuitState.OPEN
logging.error(f"Circuit breaker '{self.name}' OPENED after {self.failure_count} failures")
def _calculate_failure_rate(self, current_time: float) -> float:
"""Calculate failure rate within monitoring window"""
window_start = current_time - self.config.monitoring_window
recent_calls = [
(timestamp, success) for timestamp, success in self.call_history
if timestamp >= window_start
]
if not recent_calls:
return 0.0
failures = sum(1 for _, success in recent_calls if not success)
return failures / len(recent_calls)
def _cleanup_old_history(self, current_time: float):
"""Remove old entries from call history"""
cutoff_time = current_time - self.config.monitoring_window
self.call_history = [
(timestamp, success) for timestamp, success in self.call_history
if timestamp >= cutoff_time
]
async def _execute_function(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function (sync or async)"""
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
return func(*args, **kwargs)
def get_stats(self) -> Dict[str, Any]:
"""Get circuit breaker statistics"""
current_time = time.time()
failure_rate = self._calculate_failure_rate(current_time)
return {
'name': self.name,
'state': self.state.value,
'failure_count': self.failure_count,
'success_count': self.success_count,
'failure_rate': failure_rate,
'last_failure_time': self.last_failure_time,
'time_since_last_failure': current_time - self.last_failure_time,
'total_calls': len(self.call_history)
}
def reset(self):
"""Reset circuit breaker to closed state"""
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.call_history.clear()
logging.info(f"Circuit breaker '{self.name}' manually reset")
class CircuitBreakerOpenException(Exception):
"""Exception raised when circuit breaker is open"""
pass
class CircuitBreakerRegistry:
"""Registry for managing multiple circuit breakers"""
def __init__(self):
self.breakers: Dict[str, CircuitBreaker] = {}
self.default_config = CircuitBreakerConfig()
def get_breaker(self, name: str, config: Optional[CircuitBreakerConfig] = None) -> CircuitBreaker:
"""Get or create circuit breaker"""
if name not in self.breakers:
breaker_config = config or self.default_config
self.breakers[name] = CircuitBreaker(name, breaker_config)
return self.breakers[name]
def get_all_stats(self) -> Dict[str, Dict[str, Any]]:
"""Get statistics for all circuit breakers"""
return {
name: breaker.get_stats()
for name, breaker in self.breakers.items()
}
def reset_all(self):
"""Reset all circuit breakers"""
for breaker in self.breakers.values():
breaker.reset()
# Global circuit breaker registry
circuit_breaker_registry = CircuitBreakerRegistry()
class CircuitBreakerDecorator:
"""Decorator for automatic circuit breaker protection"""
def __init__(self, name: str, config: Optional[CircuitBreakerConfig] = None):
self.name = name
self.config = config
def __call__(self, func):
async def wrapper(*args, **kwargs):
breaker = circuit_breaker_registry.get_breaker(self.name, self.config)
return await breaker.call(func, *args, **kwargs)
return wrapper
# Usage examples
@CircuitBreakerDecorator(
name="agent_communication",
config=CircuitBreakerConfig(
failure_threshold=3,
timeout=30.0,
success_threshold=2
)
)
async def send_agent_command(agent_id: str, command: Dict) -> Dict:
"""Send command to agent with circuit breaker protection"""
# Implementation here
pass
@CircuitBreakerDecorator(
name="external_api",
config=CircuitBreakerConfig(
failure_threshold=5,
timeout=60.0,
expected_exception=aiohttp.ClientError
)
)
async def call_external_api(endpoint: str, data: Dict) -> Dict:
"""Call external API with circuit breaker protection"""
# Implementation here
pass
Circuit Breaker Monitoring
Circuit Breaker Health Dashboard
class CircuitBreakerMonitor:
"""Monitor and alert on circuit breaker status"""
def __init__(self, registry: CircuitBreakerRegistry):
self.registry = registry
self.alert_thresholds = {
'failure_rate': 0.3, # 30% failure rate
'open_duration': 300, # 5 minutes open
}
async def monitor_circuit_breakers(self) -> List[Dict[str, Any]]:
"""Monitor all circuit breakers and generate alerts"""
alerts = []
all_stats = self.registry.get_all_stats()
for name, stats in all_stats.items():
# Check for concerning conditions
alerts.extend(self._check_breaker_health(name, stats))
return alerts
def _check_breaker_health(self, name: str, stats: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Check individual circuit breaker health"""
alerts = []
current_time = time.time()
# Check if circuit is open for too long
if stats['state'] == 'open':
open_duration = stats.get('time_since_last_failure', 0)
if open_duration > self.alert_thresholds['open_duration']:
alerts.append({
'type': 'circuit_breaker_open',
'severity': 'warning',
'circuit_breaker': name,
'message': f"Circuit breaker '{name}' has been open for {open_duration:.1f} seconds",
'duration': open_duration,
'threshold': self.alert_thresholds['open_duration']
})
# Check failure rate
failure_rate = stats.get('failure_rate', 0)
if failure_rate > self.alert_thresholds['failure_rate']:
alerts.append({
'type': 'high_failure_rate',
'severity': 'warning',
'circuit_breaker': name,
'message': f"High failure rate for circuit breaker '{name}': {failure_rate:.2%}",
'failure_rate': failure_rate,
'threshold': self.alert_thresholds['failure_rate']
})
return alerts
async def get_dashboard_data(self) -> Dict[str, Any]:
"""Get comprehensive dashboard data"""
all_stats = self.registry.get_all_stats()
alerts = await self.monitor_circuit_breakers()
# Calculate summary metrics
total_breakers = len(all_stats)
open_breakers = sum(1 for stats in all_stats.values() if stats['state'] == 'open')
half_open_breakers = sum(1 for stats in all_stats.values() if stats['state'] == 'half_open')
avg_failure_rate = sum(stats.get('failure_rate', 0) for stats in all_stats.values()) / max(total_breakers, 1)
return {
'summary': {
'total_breakers': total_breakers,
'open_breakers': open_breakers,
'half_open_breakers': half_open_breakers,
'closed_breakers': total_breakers - open_breakers - half_open_breakers,
'avg_failure_rate': avg_failure_rate
},
'breakers': all_stats,
'alerts': alerts,
'timestamp': time.time()
}
Bulkhead Isolation Pattern
Resource Isolation Implementation
Resource Pool Isolation
import asyncio
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass
import time
@dataclass
class BulkheadConfig:
max_concurrent_calls: int = 10
max_queue_size: int = 100
timeout: float = 30.0
isolation_level: str = "strict" # strict, shared, adaptive
class ResourceBulkhead:
"""Isolate resources using bulkhead pattern"""
def __init__(self, name: str, config: BulkheadConfig):
self.name = name
self.config = config
self.semaphore = asyncio.Semaphore(config.max_concurrent_calls)
self.queue = asyncio.Queue(maxsize=config.max_queue_size)
self.active_calls = 0
self.queued_calls = 0
self.rejected_calls = 0
self.total_calls = 0
async def execute(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function within bulkhead constraints"""
self.total_calls += 1
# Check if we can accept the call
if not await self._can_accept_call():
self.rejected_calls += 1
raise BulkheadRejectedException(
f"Bulkhead '{self.name}' rejected call - resource limit exceeded"
)
try:
async with self.semaphore:
self.active_calls += 1
try:
# Execute with timeout
result = await asyncio.wait_for(
self._execute_function(func, *args, **kwargs),
timeout=self.config.timeout
)
return result
finally:
self.active_calls -= 1
except asyncio.TimeoutError:
raise BulkheadTimeoutException(
f"Bulkhead '{self.name}' operation timed out after {self.config.timeout}s"
)
async def _can_accept_call(self) -> bool:
"""Check if bulkhead can accept new call"""
# Strict isolation - never exceed limits
if self.config.isolation_level == "strict":
return (self.active_calls < self.config.max_concurrent_calls and
self.queued_calls < self.config.max_queue_size)
# Shared isolation - allow some overflow
elif self.config.isolation_level == "shared":
return self.active_calls < self.config.max_concurrent_calls * 1.2
# Adaptive isolation - adjust based on performance
elif self.config.isolation_level == "adaptive":
success_rate = self._calculate_success_rate()
if success_rate > 0.9:
# High success rate, allow more calls
return self.active_calls < self.config.max_concurrent_calls * 1.1
else:
# Lower success rate, be more restrictive
return self.active_calls < self.config.max_concurrent_calls * 0.8
return True
async def _execute_function(self, func: Callable, *args, **kwargs) -> Any:
"""Execute function (sync or async)"""
if asyncio.iscoroutinefunction(func):
return await func(*args, **kwargs)
else:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
def _calculate_success_rate(self) -> float:
"""Calculate recent success rate"""
# Simplified - would track actual success/failure in real implementation
if self.total_calls == 0:
return 1.0
return max(0, 1.0 - (self.rejected_calls / self.total_calls))
def get_stats(self) -> Dict[str, Any]:
"""Get bulkhead statistics"""
return {
'name': self.name,
'active_calls': self.active_calls,
'queued_calls': self.queued_calls,
'rejected_calls': self.rejected_calls,
'total_calls': self.total_calls,
'rejection_rate': self.rejected_calls / max(self.total_calls, 1),
'utilization': self.active_calls / self.config.max_concurrent_calls,
'config': {
'max_concurrent': self.config.max_concurrent_calls,
'max_queue_size': self.config.max_queue_size,
'timeout': self.config.timeout,
'isolation_level': self.config.isolation_level
}
}
class BulkheadRejectedException(Exception):
"""Exception raised when bulkhead rejects a call"""
pass
class BulkheadTimeoutException(Exception):
"""Exception raised when bulkhead operation times out"""
pass
class BulkheadManager:
"""Manage multiple bulkheads for different resource types"""
def __init__(self):
self.bulkheads: Dict[str, ResourceBulkhead] = {}
self.default_configs = {
'database': BulkheadConfig(max_concurrent_calls=20, timeout=10.0),
'agent_communication': BulkheadConfig(max_concurrent_calls=50, timeout=30.0),
'external_api': BulkheadConfig(max_concurrent_calls=10, timeout=60.0),
'file_operations': BulkheadConfig(max_concurrent_calls=5, timeout=120.0)
}
def get_bulkhead(self, resource_type: str, config: Optional[BulkheadConfig] = None) -> ResourceBulkhead:
"""Get or create bulkhead for resource type"""
if resource_type not in self.bulkheads:
bulkhead_config = config or self.default_configs.get(
resource_type, BulkheadConfig()
)
self.bulkheads[resource_type] = ResourceBulkhead(resource_type, bulkhead_config)
return self.bulkheads[resource_type]
async def execute_with_bulkhead(self, resource_type: str, func: Callable,
*args, **kwargs) -> Any:
"""Execute function with appropriate bulkhead protection"""
bulkhead = self.get_bulkhead(resource_type)
return await bulkhead.execute(func, *args, **kwargs)
def get_all_stats(self) -> Dict[str, Dict[str, Any]]:
"""Get statistics for all bulkheads"""
return {
name: bulkhead.get_stats()
for name, bulkhead in self.bulkheads.items()
}
# Global bulkhead manager
bulkhead_manager = BulkheadManager()
class BulkheadDecorator:
"""Decorator for automatic bulkhead protection"""
def __init__(self, resource_type: str, config: Optional[BulkheadConfig] = None):
self.resource_type = resource_type
self.config = config
def __call__(self, func):
async def wrapper(*args, **kwargs):
return await bulkhead_manager.execute_with_bulkhead(
self.resource_type, func, *args, **kwargs
)
return wrapper
# Usage examples
@BulkheadDecorator("database")
async def update_agent_metrics(db: Database, agent_id: str, metrics: Dict):
"""Update agent metrics with database bulkhead protection"""
# Database operation
pass
@BulkheadDecorator("agent_communication")
async def send_command_to_agent(agent_id: str, command: Dict):
"""Send command to agent with communication bulkhead protection"""
# Agent communication
pass
Fallback and Graceful Degradation
Fallback Strategy Implementation
Comprehensive Fallback System
from typing import List, Callable, Any, Optional, Dict, Union
from dataclasses import dataclass
import asyncio
import logging
@dataclass
class FallbackConfig:
enable_cache_fallback: bool = True
enable_default_value_fallback: bool = True
enable_degraded_service_fallback: bool = True
cache_ttl: int = 300 # 5 minutes
degraded_timeout: float = 5.0 # Faster timeout for degraded service
class FallbackStrategy:
"""Comprehensive fallback strategy with multiple levels"""
def __init__(self, config: FallbackConfig):
self.config = config
self.cache: Dict[str, Any] = {}
self.cache_timestamps: Dict[str, float] = {}
async def execute_with_fallback(self,
primary_func: Callable,
fallback_funcs: List[Callable],
cache_key: Optional[str] = None,
default_value: Any = None,
*args, **kwargs) -> Any:
"""Execute function with multiple fallback levels"""
# Try primary function first
try:
result = await self._execute_with_timeout(primary_func, *args, **kwargs)
# Cache successful result
if cache_key and self.config.enable_cache_fallback:
self._cache_result(cache_key, result)
return result
except Exception as primary_error:
logging.warning(f"Primary function failed: {primary_error}")
# Try fallback functions in order
for i, fallback_func in enumerate(fallback_funcs):
try:
logging.info(f"Trying fallback {i + 1}/{len(fallback_funcs)}")
# Use shorter timeout for fallbacks
result = await self._execute_with_timeout(
fallback_func,
timeout=self.config.degraded_timeout,
*args, **kwargs
)
logging.info(f"Fallback {i + 1} succeeded")
return result
except Exception as fallback_error:
logging.warning(f"Fallback {i + 1} failed: {fallback_error}")
continue
# Try cache fallback
if cache_key and self.config.enable_cache_fallback:
cached_result = self._get_cached_result(cache_key)
if cached_result is not None:
logging.info("Using cached fallback value")
return cached_result
# Try default value fallback
if default_value is not None and self.config.enable_default_value_fallback:
logging.info("Using default fallback value")
return default_value
# All fallbacks failed
logging.error("All fallback strategies failed")
raise FallbackExhaustedException("All fallback strategies exhausted") from primary_error
async def _execute_with_timeout(self, func: Callable, timeout: Optional[float] = None,
*args, **kwargs) -> Any:
"""Execute function with timeout"""
if timeout is None:
timeout = 30.0 # Default timeout
if asyncio.iscoroutinefunction(func):
return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout)
else:
loop = asyncio.get_event_loop()
return await asyncio.wait_for(
loop.run_in_executor(None, func, *args, **kwargs),
timeout=timeout
)
def _cache_result(self, key: str, result: Any):
"""Cache result for fallback use"""
self.cache[key] = result
self.cache_timestamps[key] = time.time()
def _get_cached_result(self, key: str) -> Optional[Any]:
"""Get cached result if still valid"""
if key not in self.cache:
return None
# Check if cache is still valid
cache_age = time.time() - self.cache_timestamps[key]
if cache_age > self.config.cache_ttl:
# Cache expired
del self.cache[key]
del self.cache_timestamps[key]
return None
return self.cache[key]
def clear_cache(self, key: Optional[str] = None):
"""Clear cache entries"""
if key:
self.cache.pop(key, None)
self.cache_timestamps.pop(key, None)
else:
self.cache.clear()
self.cache_timestamps.clear()
class FallbackExhaustedException(Exception):
"""Exception raised when all fallback strategies fail"""
pass
class ServiceFallbackManager:
"""Manage fallback strategies for different services"""
def __init__(self):
self.strategies: Dict[str, FallbackStrategy] = {}
self.default_config = FallbackConfig()
def get_strategy(self, service_name: str, config: Optional[FallbackConfig] = None) -> FallbackStrategy:
"""Get or create fallback strategy for service"""
if service_name not in self.strategies:
strategy_config = config or self.default_config
self.strategies[service_name] = FallbackStrategy(strategy_config)
return self.strategies[service_name]
async def agent_communication_with_fallback(self, agent_id: str, command: Dict) -> Dict:
"""Agent communication with comprehensive fallback"""
strategy = self.get_strategy("agent_communication")
# Primary: Direct agent communication
async def primary():
return await send_agent_command_direct(agent_id, command)
# Fallback 1: Try alternative agent endpoint
async def fallback_1():
return await send_agent_command_alt_endpoint(agent_id, command)
# Fallback 2: Queue command for later execution
async def fallback_2():
await queue_agent_command(agent_id, command)
return {"status": "queued", "message": "Command queued for later execution"}
# Fallback 3: Return cached status if available
def default_value():
return {"status": "unavailable", "message": "Agent temporarily unavailable"}
return await strategy.execute_with_fallback(
primary,
[fallback_1, fallback_2],
cache_key=f"agent_command_{agent_id}",
default_value=default_value()
)
async def database_query_with_fallback(self, query: str, params: tuple = ()) -> List[Dict]:
"""Database query with fallback to read replicas"""
strategy = self.get_strategy("database")
# Primary: Main database
async def primary():
return await execute_query_primary_db(query, params)
# Fallback 1: Read replica 1
async def fallback_1():
return await execute_query_replica_db(query, params, replica=1)
# Fallback 2: Read replica 2
async def fallback_2():
return await execute_query_replica_db(query, params, replica=2)
return await strategy.execute_with_fallback(
primary,
[fallback_1, fallback_2],
cache_key=f"query_{hash(query)}",
default_value=[]
)
async def external_api_with_fallback(self, endpoint: str, data: Dict) -> Dict:
"""External API call with fallback strategies"""
strategy = self.get_strategy("external_api")
# Primary: Main API endpoint
async def primary():
return await call_external_api_primary(endpoint, data)
# Fallback 1: Backup API endpoint
async def fallback_1():
return await call_external_api_backup(endpoint, data)
# Fallback 2: Cached response if available
async def fallback_2():
cached = strategy._get_cached_result(f"api_{endpoint}")
if cached:
return cached
raise Exception("No cached data available")
return await strategy.execute_with_fallback(
primary,
[fallback_1, fallback_2],
cache_key=f"api_{endpoint}",
default_value={"status": "unavailable", "data": None}
)
# Global fallback manager
fallback_manager = ServiceFallbackManager()
class FallbackDecorator:
"""Decorator for automatic fallback protection"""
def __init__(self, service_name: str, fallback_funcs: List[Callable] = None,
cache_key_func: Optional[Callable] = None,
default_value: Any = None):
self.service_name = service_name
self.fallback_funcs = fallback_funcs or []
self.cache_key_func = cache_key_func
self.default_value = default_value
def __call__(self, func):
async def wrapper(*args, **kwargs):
strategy = fallback_manager.get_strategy(self.service_name)
# Generate cache key if function provided
cache_key = None
if self.cache_key_func:
cache_key = self.cache_key_func(*args, **kwargs)
return await strategy.execute_with_fallback(
func,
self.fallback_funcs,
cache_key=cache_key,
default_value=self.default_value,
*args, **kwargs
)
return wrapper
# Usage examples
def generate_agent_cache_key(agent_id: str, command: Dict) -> str:
return f"agent_{agent_id}_{hash(str(command))}"
@FallbackDecorator(
service_name="agent_communication",
cache_key_func=generate_agent_cache_key,
default_value={"status": "error", "message": "Service unavailable"}
)
async def get_agent_status(agent_id: str) -> Dict:
"""Get agent status with fallback protection"""
# Primary implementation
async with aiohttp.ClientSession() as session:
async with session.get(f"http://agent-{agent_id}/status") as response:
return await response.json()
Resilience Monitoring
Comprehensive Resilience Dashboard
Resilience Health Monitor
class ResilienceMonitor:
"""Monitor all resilience patterns and generate health reports"""
def __init__(self):
self.circuit_breaker_monitor = CircuitBreakerMonitor(circuit_breaker_registry)
self.retry_statistics: Dict[str, Dict] = {}
self.fallback_statistics: Dict[str, Dict] = {}
async def collect_resilience_metrics(self) -> Dict[str, Any]:
"""Collect comprehensive resilience metrics"""
# Circuit breaker metrics
cb_data = await self.circuit_breaker_monitor.get_dashboard_data()
# Bulkhead metrics
bulkhead_stats = bulkhead_manager.get_all_stats()
# Retry statistics
retry_stats = self._aggregate_retry_statistics()
# Fallback statistics
fallback_stats = self._aggregate_fallback_statistics()
return {
'circuit_breakers': cb_data,
'bulkheads': bulkhead_stats,
'retry_patterns': retry_stats,
'fallback_patterns': fallback_stats,
'overall_health': self._calculate_overall_health(
cb_data, bulkhead_stats, retry_stats, fallback_stats
),
'timestamp': time.time()
}
def _calculate_overall_health(self, cb_data: Dict, bulkhead_stats: Dict,
retry_stats: Dict, fallback_stats: Dict) -> Dict[str, Any]:
"""Calculate overall system resilience health"""
# Circuit breaker health
cb_health = 1.0
if cb_data['summary']['open_breakers'] > 0:
cb_health = 1.0 - (cb_data['summary']['open_breakers'] /
max(cb_data['summary']['total_breakers'], 1))
# Bulkhead health
bulkhead_health = 1.0
total_rejection_rate = 0
bulkhead_count = len(bulkhead_stats)
if bulkhead_count > 0:
total_rejection_rate = sum(
stats.get('rejection_rate', 0) for stats in bulkhead_stats.values()
) / bulkhead_count
bulkhead_health = 1.0 - total_rejection_rate
# Retry pattern health
retry_health = 1.0
if retry_stats.get('total_operations', 0) > 0:
retry_success_rate = retry_stats.get('success_rate', 1.0)
retry_health = retry_success_rate
# Overall health score
overall_health = (cb_health + bulkhead_health + retry_health) / 3
return {
'score': overall_health,
'circuit_breaker_health': cb_health,
'bulkhead_health': bulkhead_health,
'retry_health': retry_health,
'status': 'healthy' if overall_health > 0.8 else 'degraded' if overall_health > 0.6 else 'critical'
}
async def generate_resilience_report(self) -> str:
"""Generate human-readable resilience report"""
metrics = await self.collect_resilience_metrics()
report = []
report.append("=== SysManage Resilience Health Report ===")
report.append(f"Generated at: {time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}")
report.append("")
# Overall health
health = metrics['overall_health']
report.append(f"Overall Health Score: {health['score']:.2%} ({health['status'].upper()})")
report.append("")
# Circuit breakers
cb_data = metrics['circuit_breakers']
report.append("Circuit Breakers:")
report.append(f" Total: {cb_data['summary']['total_breakers']}")
report.append(f" Open: {cb_data['summary']['open_breakers']}")
report.append(f" Half-Open: {cb_data['summary']['half_open_breakers']}")
if cb_data['alerts']:
report.append(" Alerts:")
for alert in cb_data['alerts'][:5]: # Show top 5 alerts
report.append(f" - {alert['message']}")
report.append("")
# Bulkheads
bulkhead_data = metrics['bulkheads']
report.append("Resource Bulkheads:")
for name, stats in bulkhead_data.items():
utilization = stats['utilization']
rejection_rate = stats['rejection_rate']
report.append(f" {name}: {utilization:.1%} utilization, {rejection_rate:.2%} rejection rate")
return "\n".join(report)
# Prometheus metrics for resilience monitoring
from prometheus_client import Counter, Histogram, Gauge
# Circuit breaker metrics
circuit_breaker_state = Gauge('circuit_breaker_state', 'Circuit breaker state', ['name', 'state'])
circuit_breaker_failures = Counter('circuit_breaker_failures_total', 'Circuit breaker failures', ['name'])
# Retry metrics
retry_attempts = Counter('retry_attempts_total', 'Total retry attempts', ['operation', 'attempt'])
retry_success = Counter('retry_success_total', 'Successful retries', ['operation'])
retry_duration = Histogram('retry_duration_seconds', 'Retry operation duration', ['operation'])
# Bulkhead metrics
bulkhead_active_calls = Gauge('bulkhead_active_calls', 'Active calls in bulkhead', ['resource'])
bulkhead_rejected_calls = Counter('bulkhead_rejected_calls_total', 'Rejected calls', ['resource'])
bulkhead_utilization = Gauge('bulkhead_utilization_ratio', 'Bulkhead utilization', ['resource'])
Retry Logic Best Practices
Design Guidelines
- Idempotency: Ensure operations are safe to retry
- Exponential Backoff: Use exponential backoff with jitter
- Circuit Breakers: Implement circuit breakers for external dependencies
- Timeouts: Set appropriate timeouts for all operations
Implementation Guidelines
- Selective Retries: Only retry transient failures
- Monitoring: Monitor retry patterns and failure rates
- Fallback Strategies: Implement graceful degradation
- Testing: Test failure scenarios and recovery
Operational Guidelines
- Alert Configuration: Set up alerts for resilience pattern failures
- Capacity Planning: Plan for retry load on dependent services
- Documentation: Document retry and fallback behaviors
- Performance Impact: Monitor performance impact of resilience patterns