Documentation > Architecture > Retry Logic

Retry Logic

Exponential backoff, circuit breakers, failure handling patterns, and resilience strategies in SysManage distributed systems.

Retry Logic Overview

SysManage implements sophisticated retry and resilience patterns to handle transient failures, network issues, and service unavailability while preventing cascade failures and ensuring system stability.

Core Principles

  • Graceful Degradation: System continues operating despite component failures
  • Intelligent Backoff: Progressive delays between retry attempts
  • Circuit Breaking: Automatic failure detection and prevention
  • Jitter Implementation: Randomization to prevent thundering herd

Retry Pattern Implementation

Resilience Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Resilience Layer                            │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Circuit     │  │ Retry        │  │ Timeout                 │ │
│  │ Breaker     │  │ Manager      │  │ Manager                 │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Bulkhead    │  │ Rate         │  │ Health                  │ │
│  │ Isolation   │  │ Limiter      │  │ Monitor                 │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Fallback    │  │ Cache        │  │ Dead Letter             │ │
│  │ Handler     │  │ Manager      │  │ Queue                   │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                          │    ▲
                  Request │    │ Response/Error
                          ▼    │
┌─────────────────────────────────────────────────────────────────┐
│                    Application Services                        │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Agent       │  │ Database     │  │ External APIs           │ │
│  │ Service     │  │ Service      │  │ Service                 │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                        

Exponential Backoff Implementation

Advanced Retry Manager

import asyncio
import random
import time
import logging
from typing import Callable, Any, Optional, Dict, List, Type, Union
from dataclasses import dataclass
from enum import Enum
import math

class FailureType(Enum):
    NETWORK_ERROR = "network_error"
    TIMEOUT = "timeout"
    SERVICE_UNAVAILABLE = "service_unavailable"
    AUTHENTICATION_ERROR = "authentication_error"
    RATE_LIMITED = "rate_limited"
    UNKNOWN = "unknown"

@dataclass
class RetryPolicy:
    max_attempts: int = 3
    base_delay: float = 1.0  # seconds
    max_delay: float = 60.0  # seconds
    exponential_base: float = 2.0
    jitter: bool = True
    jitter_range: float = 0.1  # 10% jitter
    backoff_strategy: str = "exponential"  # exponential, linear, fixed
    retryable_exceptions: List[Type[Exception]] = None
    retryable_status_codes: List[int] = None

    def __post_init__(self):
        if self.retryable_exceptions is None:
            self.retryable_exceptions = [
                ConnectionError, TimeoutError, OSError
            ]
        if self.retryable_status_codes is None:
            self.retryable_status_codes = [502, 503, 504, 429]


class RetryManager:
    """Advanced retry manager with multiple backoff strategies"""

    def __init__(self, policy: RetryPolicy):
        self.policy = policy
        self.attempt_history: Dict[str, List[float]] = {}

    async def retry_with_backoff(self,
                               func: Callable,
                               *args,
                               operation_id: Optional[str] = None,
                               **kwargs) -> Any:
        """Execute function with retry logic and backoff"""

        if operation_id is None:
            operation_id = f"{func.__name__}_{id(func)}"

        last_exception = None
        attempt_times = []

        for attempt in range(1, self.policy.max_attempts + 1):
            try:
                start_time = time.time()
                result = await self._execute_function(func, *args, **kwargs)

                # Record successful attempt
                execution_time = time.time() - start_time
                attempt_times.append(execution_time)
                self.attempt_history[operation_id] = attempt_times

                if attempt > 1:
                    logging.info(f"Operation {operation_id} succeeded on attempt {attempt}")

                return result

            except Exception as e:
                last_exception = e
                execution_time = time.time() - start_time
                attempt_times.append(execution_time)

                # Check if exception is retryable
                if not self._is_retryable_exception(e):
                    logging.error(f"Non-retryable exception in {operation_id}: {e}")
                    raise e

                # Log attempt failure
                logging.warning(f"Attempt {attempt}/{self.policy.max_attempts} failed for {operation_id}: {e}")

                # Don't sleep after last attempt
                if attempt < self.policy.max_attempts:
                    delay = self._calculate_delay(attempt)
                    logging.info(f"Retrying {operation_id} in {delay:.2f} seconds")
                    await asyncio.sleep(delay)

        # All attempts failed
        self.attempt_history[operation_id] = attempt_times
        logging.error(f"All {self.policy.max_attempts} attempts failed for {operation_id}")
        raise last_exception

    async def _execute_function(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function (sync or async)"""
        if asyncio.iscoroutinefunction(func):
            return await func(*args, **kwargs)
        else:
            return func(*args, **kwargs)

    def _is_retryable_exception(self, exception: Exception) -> bool:
        """Determine if exception is retryable"""

        # Check exception type
        for exc_type in self.policy.retryable_exceptions:
            if isinstance(exception, exc_type):
                return True

        # Check HTTP status codes if available
        if hasattr(exception, 'status_code'):
            if exception.status_code in self.policy.retryable_status_codes:
                return True

        # Check specific error patterns
        error_message = str(exception).lower()
        retryable_patterns = [
            'connection refused', 'connection timeout', 'connection reset',
            'temporary failure', 'service unavailable', 'rate limit'
        ]

        for pattern in retryable_patterns:
            if pattern in error_message:
                return True

        return False

    def _calculate_delay(self, attempt: int) -> float:
        """Calculate delay for retry attempt"""

        if self.policy.backoff_strategy == "exponential":
            delay = self.policy.base_delay * (self.policy.exponential_base ** (attempt - 1))
        elif self.policy.backoff_strategy == "linear":
            delay = self.policy.base_delay * attempt
        elif self.policy.backoff_strategy == "fixed":
            delay = self.policy.base_delay
        else:
            raise ValueError(f"Unknown backoff strategy: {self.policy.backoff_strategy}")

        # Apply maximum delay limit
        delay = min(delay, self.policy.max_delay)

        # Add jitter to prevent thundering herd
        if self.policy.jitter:
            jitter_amount = delay * self.policy.jitter_range
            jitter = random.uniform(-jitter_amount, jitter_amount)
            delay += jitter

        return max(0, delay)

    def get_retry_statistics(self, operation_id: str) -> Dict[str, Any]:
        """Get retry statistics for operation"""

        if operation_id not in self.attempt_history:
            return {}

        attempts = self.attempt_history[operation_id]

        return {
            'total_attempts': len(attempts),
            'total_time': sum(attempts),
            'average_attempt_time': sum(attempts) / len(attempts),
            'max_attempt_time': max(attempts),
            'min_attempt_time': min(attempts),
            'success_rate': 1.0  # If we have stats, operation eventually succeeded
        }


class RetryDecorator:
    """Decorator for automatic retry functionality"""

    def __init__(self, policy: RetryPolicy):
        self.retry_manager = RetryManager(policy)

    def __call__(self, func):
        async def wrapper(*args, **kwargs):
            operation_id = f"{func.__module__}.{func.__name__}"
            return await self.retry_manager.retry_with_backoff(
                func, *args, operation_id=operation_id, **kwargs
            )
        return wrapper


# Predefined retry policies
NETWORK_RETRY_POLICY = RetryPolicy(
    max_attempts=5,
    base_delay=1.0,
    max_delay=30.0,
    exponential_base=2.0,
    jitter=True,
    retryable_exceptions=[ConnectionError, TimeoutError, OSError]
)

DATABASE_RETRY_POLICY = RetryPolicy(
    max_attempts=3,
    base_delay=0.5,
    max_delay=10.0,
    exponential_base=2.0,
    jitter=True,
    retryable_exceptions=[ConnectionError, TimeoutError]
)

API_RETRY_POLICY = RetryPolicy(
    max_attempts=4,
    base_delay=2.0,
    max_delay=60.0,
    exponential_base=1.5,
    jitter=True,
    retryable_status_codes=[429, 502, 503, 504]
)

# Usage examples
@RetryDecorator(NETWORK_RETRY_POLICY)
async def send_agent_command(agent_id: str, command: Dict) -> Dict:
    """Send command to agent with retry logic"""
    async with aiohttp.ClientSession() as session:
        async with session.post(
            f"https://agent-{agent_id}.example.com/command",
            json=command,
            timeout=aiohttp.ClientTimeout(total=30)
        ) as response:
            if response.status >= 400:
                raise aiohttp.ClientResponseError(
                    request_info=response.request_info,
                    history=response.history,
                    status=response.status
                )
            return await response.json()

@RetryDecorator(DATABASE_RETRY_POLICY)
async def update_agent_status(db: Database, agent_id: str, status: str):
    """Update agent status with retry logic"""
    await db.execute(
        "UPDATE agents SET status = $1, last_updated = NOW() WHERE id = $2",
        status, agent_id
    )

Advanced Backoff Strategies

Adaptive Backoff Implementation

class AdaptiveBackoffStrategy:
    """Adaptive backoff that adjusts based on success/failure patterns"""

    def __init__(self, initial_policy: RetryPolicy):
        self.base_policy = initial_policy
        self.success_history: Dict[str, List[bool]] = {}
        self.adaptation_window = 100  # Track last 100 operations

    def calculate_adaptive_delay(self, operation_id: str, attempt: int) -> float:
        """Calculate delay with adaptation based on historical success rates"""

        base_delay = self._calculate_base_delay(attempt)

        # Get success rate for this operation
        success_rate = self._get_success_rate(operation_id)

        # Adjust delay based on success rate
        if success_rate > 0.9:
            # High success rate, reduce delays
            adaptive_factor = 0.7
        elif success_rate > 0.7:
            # Moderate success rate, normal delays
            adaptive_factor = 1.0
        elif success_rate > 0.5:
            # Low success rate, increase delays
            adaptive_factor = 1.5
        else:
            # Very low success rate, significantly increase delays
            adaptive_factor = 2.0

        adapted_delay = base_delay * adaptive_factor

        # Apply jitter
        if self.base_policy.jitter:
            jitter = adapted_delay * 0.1 * (random.random() - 0.5)
            adapted_delay += jitter

        return min(adapted_delay, self.base_policy.max_delay)

    def record_operation_result(self, operation_id: str, success: bool):
        """Record operation result for adaptive learning"""

        if operation_id not in self.success_history:
            self.success_history[operation_id] = []

        history = self.success_history[operation_id]
        history.append(success)

        # Keep only recent history
        if len(history) > self.adaptation_window:
            history.pop(0)

    def _get_success_rate(self, operation_id: str) -> float:
        """Get success rate for operation"""

        if operation_id not in self.success_history:
            return 0.8  # Default assumed success rate

        history = self.success_history[operation_id]
        if not history:
            return 0.8

        return sum(history) / len(history)

    def _calculate_base_delay(self, attempt: int) -> float:
        """Calculate base delay using configured strategy"""

        if self.base_policy.backoff_strategy == "exponential":
            return self.base_policy.base_delay * (self.base_policy.exponential_base ** (attempt - 1))
        elif self.base_policy.backoff_strategy == "linear":
            return self.base_policy.base_delay * attempt
        else:
            return self.base_policy.base_delay


class IntelligentRetryManager:
    """Intelligent retry manager with multiple strategies and learning"""

    def __init__(self):
        self.strategies = {
            'exponential': RetryManager(NETWORK_RETRY_POLICY),
            'linear': RetryManager(RetryPolicy(backoff_strategy="linear")),
            'adaptive': AdaptiveBackoffStrategy(NETWORK_RETRY_POLICY)
        }
        self.operation_strategies: Dict[str, str] = {}
        self.performance_metrics: Dict[str, Dict] = {}

    async def smart_retry(self, func: Callable, *args,
                         operation_type: str = "default",
                         **kwargs) -> Any:
        """Smart retry with automatic strategy selection"""

        # Select best strategy for operation type
        strategy_name = self._select_strategy(operation_type)
        strategy = self.strategies[strategy_name]

        operation_id = f"{operation_type}_{func.__name__}"

        start_time = time.time()
        try:
            if strategy_name == 'adaptive':
                result = await self._adaptive_retry(strategy, func, operation_id, *args, **kwargs)
                success = True
            else:
                result = await strategy.retry_with_backoff(func, *args, operation_id=operation_id, **kwargs)
                success = True
        except Exception as e:
            success = False
            raise e
        finally:
            # Record performance metrics
            execution_time = time.time() - start_time
            self._record_performance(operation_type, strategy_name, execution_time, success)

            # Update adaptive strategy
            if strategy_name == 'adaptive':
                strategy.record_operation_result(operation_id, success)

        return result

    async def _adaptive_retry(self, adaptive_strategy: AdaptiveBackoffStrategy,
                            func: Callable, operation_id: str,
                            *args, **kwargs) -> Any:
        """Execute retry with adaptive strategy"""

        last_exception = None

        for attempt in range(1, adaptive_strategy.base_policy.max_attempts + 1):
            try:
                result = await self._execute_function(func, *args, **kwargs)
                return result
            except Exception as e:
                last_exception = e

                if not self._is_retryable_exception(e, adaptive_strategy.base_policy):
                    raise e

                if attempt < adaptive_strategy.base_policy.max_attempts:
                    delay = adaptive_strategy.calculate_adaptive_delay(operation_id, attempt)
                    await asyncio.sleep(delay)

        raise last_exception

    def _select_strategy(self, operation_type: str) -> str:
        """Select best retry strategy for operation type"""

        # Use historical performance to select strategy
        if operation_type in self.performance_metrics:
            metrics = self.performance_metrics[operation_type]
            best_strategy = min(metrics.keys(),
                              key=lambda s: metrics[s].get('avg_time', float('inf')))
            return best_strategy

        # Default strategy mapping
        default_mapping = {
            'network': 'exponential',
            'database': 'linear',
            'api': 'adaptive',
            'default': 'exponential'
        }

        return default_mapping.get(operation_type, 'exponential')

    def _record_performance(self, operation_type: str, strategy: str,
                          execution_time: float, success: bool):
        """Record performance metrics for strategy selection"""

        if operation_type not in self.performance_metrics:
            self.performance_metrics[operation_type] = {}

        if strategy not in self.performance_metrics[operation_type]:
            self.performance_metrics[operation_type][strategy] = {
                'total_time': 0,
                'total_operations': 0,
                'successful_operations': 0,
                'avg_time': 0,
                'success_rate': 0
            }

        metrics = self.performance_metrics[operation_type][strategy]
        metrics['total_time'] += execution_time
        metrics['total_operations'] += 1

        if success:
            metrics['successful_operations'] += 1

        metrics['avg_time'] = metrics['total_time'] / metrics['total_operations']
        metrics['success_rate'] = metrics['successful_operations'] / metrics['total_operations']

Circuit Breaker Pattern

Circuit Breaker Implementation

Advanced Circuit Breaker

from enum import Enum
from typing import Callable, Any, Optional, Dict
import time
import asyncio

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"          # Failing, rejecting requests
    HALF_OPEN = "half_open" # Testing if service recovered

@dataclass
class CircuitBreakerConfig:
    failure_threshold: int = 5          # Failures before opening circuit
    timeout: float = 60.0               # Seconds to wait before trying again
    success_threshold: int = 3          # Successes needed to close circuit
    monitoring_window: float = 300.0    # Window for failure rate calculation
    expected_exception: Type[Exception] = Exception

class CircuitBreaker:
    """Advanced circuit breaker with monitoring and statistics"""

    def __init__(self, name: str, config: CircuitBreakerConfig):
        self.name = name
        self.config = config
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = 0
        self.last_attempt_time = 0
        self.call_history: List[Tuple[float, bool]] = []  # (timestamp, success)

    async def call(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function through circuit breaker"""

        current_time = time.time()

        # Check circuit state and decide whether to proceed
        if not self._should_attempt_call(current_time):
            raise CircuitBreakerOpenException(
                f"Circuit breaker '{self.name}' is OPEN. "
                f"Last failure: {current_time - self.last_failure_time:.1f}s ago"
            )

        try:
            # Execute the function
            result = await self._execute_function(func, *args, **kwargs)

            # Record success
            self._record_success(current_time)

            return result

        except self.config.expected_exception as e:
            # Record failure
            self._record_failure(current_time)
            raise e

    def _should_attempt_call(self, current_time: float) -> bool:
        """Determine if call should be attempted based on circuit state"""

        if self.state == CircuitState.CLOSED:
            return True

        elif self.state == CircuitState.OPEN:
            # Check if timeout period has elapsed
            if current_time - self.last_failure_time >= self.config.timeout:
                self.state = CircuitState.HALF_OPEN
                self.success_count = 0
                logging.info(f"Circuit breaker '{self.name}' moving to HALF_OPEN")
                return True
            return False

        elif self.state == CircuitState.HALF_OPEN:
            return True

        return False

    def _record_success(self, timestamp: float):
        """Record successful call"""

        self.call_history.append((timestamp, True))
        self._cleanup_old_history(timestamp)

        if self.state == CircuitState.HALF_OPEN:
            self.success_count += 1

            if self.success_count >= self.config.success_threshold:
                self.state = CircuitState.CLOSED
                self.failure_count = 0
                logging.info(f"Circuit breaker '{self.name}' CLOSED after {self.success_count} successes")

        elif self.state == CircuitState.CLOSED:
            # Reset failure count on success
            self.failure_count = 0

    def _record_failure(self, timestamp: float):
        """Record failed call"""

        self.call_history.append((timestamp, False))
        self._cleanup_old_history(timestamp)

        self.last_failure_time = timestamp
        self.failure_count += 1

        if self.state == CircuitState.HALF_OPEN:
            # Failure during half-open, go back to open
            self.state = CircuitState.OPEN
            logging.warning(f"Circuit breaker '{self.name}' back to OPEN after failure in HALF_OPEN")

        elif self.state == CircuitState.CLOSED:
            # Check if we should open the circuit
            failure_rate = self._calculate_failure_rate(timestamp)

            if (self.failure_count >= self.config.failure_threshold or
                failure_rate > 0.5):  # 50% failure rate threshold

                self.state = CircuitState.OPEN
                logging.error(f"Circuit breaker '{self.name}' OPENED after {self.failure_count} failures")

    def _calculate_failure_rate(self, current_time: float) -> float:
        """Calculate failure rate within monitoring window"""

        window_start = current_time - self.config.monitoring_window
        recent_calls = [
            (timestamp, success) for timestamp, success in self.call_history
            if timestamp >= window_start
        ]

        if not recent_calls:
            return 0.0

        failures = sum(1 for _, success in recent_calls if not success)
        return failures / len(recent_calls)

    def _cleanup_old_history(self, current_time: float):
        """Remove old entries from call history"""

        cutoff_time = current_time - self.config.monitoring_window
        self.call_history = [
            (timestamp, success) for timestamp, success in self.call_history
            if timestamp >= cutoff_time
        ]

    async def _execute_function(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function (sync or async)"""
        if asyncio.iscoroutinefunction(func):
            return await func(*args, **kwargs)
        else:
            return func(*args, **kwargs)

    def get_stats(self) -> Dict[str, Any]:
        """Get circuit breaker statistics"""

        current_time = time.time()
        failure_rate = self._calculate_failure_rate(current_time)

        return {
            'name': self.name,
            'state': self.state.value,
            'failure_count': self.failure_count,
            'success_count': self.success_count,
            'failure_rate': failure_rate,
            'last_failure_time': self.last_failure_time,
            'time_since_last_failure': current_time - self.last_failure_time,
            'total_calls': len(self.call_history)
        }

    def reset(self):
        """Reset circuit breaker to closed state"""
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.success_count = 0
        self.call_history.clear()
        logging.info(f"Circuit breaker '{self.name}' manually reset")


class CircuitBreakerOpenException(Exception):
    """Exception raised when circuit breaker is open"""
    pass


class CircuitBreakerRegistry:
    """Registry for managing multiple circuit breakers"""

    def __init__(self):
        self.breakers: Dict[str, CircuitBreaker] = {}
        self.default_config = CircuitBreakerConfig()

    def get_breaker(self, name: str, config: Optional[CircuitBreakerConfig] = None) -> CircuitBreaker:
        """Get or create circuit breaker"""

        if name not in self.breakers:
            breaker_config = config or self.default_config
            self.breakers[name] = CircuitBreaker(name, breaker_config)

        return self.breakers[name]

    def get_all_stats(self) -> Dict[str, Dict[str, Any]]:
        """Get statistics for all circuit breakers"""

        return {
            name: breaker.get_stats()
            for name, breaker in self.breakers.items()
        }

    def reset_all(self):
        """Reset all circuit breakers"""
        for breaker in self.breakers.values():
            breaker.reset()


# Global circuit breaker registry
circuit_breaker_registry = CircuitBreakerRegistry()


class CircuitBreakerDecorator:
    """Decorator for automatic circuit breaker protection"""

    def __init__(self, name: str, config: Optional[CircuitBreakerConfig] = None):
        self.name = name
        self.config = config

    def __call__(self, func):
        async def wrapper(*args, **kwargs):
            breaker = circuit_breaker_registry.get_breaker(self.name, self.config)
            return await breaker.call(func, *args, **kwargs)
        return wrapper


# Usage examples
@CircuitBreakerDecorator(
    name="agent_communication",
    config=CircuitBreakerConfig(
        failure_threshold=3,
        timeout=30.0,
        success_threshold=2
    )
)
async def send_agent_command(agent_id: str, command: Dict) -> Dict:
    """Send command to agent with circuit breaker protection"""
    # Implementation here
    pass

@CircuitBreakerDecorator(
    name="external_api",
    config=CircuitBreakerConfig(
        failure_threshold=5,
        timeout=60.0,
        expected_exception=aiohttp.ClientError
    )
)
async def call_external_api(endpoint: str, data: Dict) -> Dict:
    """Call external API with circuit breaker protection"""
    # Implementation here
    pass

Circuit Breaker Monitoring

Circuit Breaker Health Dashboard

class CircuitBreakerMonitor:
    """Monitor and alert on circuit breaker status"""

    def __init__(self, registry: CircuitBreakerRegistry):
        self.registry = registry
        self.alert_thresholds = {
            'failure_rate': 0.3,  # 30% failure rate
            'open_duration': 300,  # 5 minutes open
        }

    async def monitor_circuit_breakers(self) -> List[Dict[str, Any]]:
        """Monitor all circuit breakers and generate alerts"""

        alerts = []
        all_stats = self.registry.get_all_stats()

        for name, stats in all_stats.items():
            # Check for concerning conditions
            alerts.extend(self._check_breaker_health(name, stats))

        return alerts

    def _check_breaker_health(self, name: str, stats: Dict[str, Any]) -> List[Dict[str, Any]]:
        """Check individual circuit breaker health"""

        alerts = []
        current_time = time.time()

        # Check if circuit is open for too long
        if stats['state'] == 'open':
            open_duration = stats.get('time_since_last_failure', 0)

            if open_duration > self.alert_thresholds['open_duration']:
                alerts.append({
                    'type': 'circuit_breaker_open',
                    'severity': 'warning',
                    'circuit_breaker': name,
                    'message': f"Circuit breaker '{name}' has been open for {open_duration:.1f} seconds",
                    'duration': open_duration,
                    'threshold': self.alert_thresholds['open_duration']
                })

        # Check failure rate
        failure_rate = stats.get('failure_rate', 0)
        if failure_rate > self.alert_thresholds['failure_rate']:
            alerts.append({
                'type': 'high_failure_rate',
                'severity': 'warning',
                'circuit_breaker': name,
                'message': f"High failure rate for circuit breaker '{name}': {failure_rate:.2%}",
                'failure_rate': failure_rate,
                'threshold': self.alert_thresholds['failure_rate']
            })

        return alerts

    async def get_dashboard_data(self) -> Dict[str, Any]:
        """Get comprehensive dashboard data"""

        all_stats = self.registry.get_all_stats()
        alerts = await self.monitor_circuit_breakers()

        # Calculate summary metrics
        total_breakers = len(all_stats)
        open_breakers = sum(1 for stats in all_stats.values() if stats['state'] == 'open')
        half_open_breakers = sum(1 for stats in all_stats.values() if stats['state'] == 'half_open')
        avg_failure_rate = sum(stats.get('failure_rate', 0) for stats in all_stats.values()) / max(total_breakers, 1)

        return {
            'summary': {
                'total_breakers': total_breakers,
                'open_breakers': open_breakers,
                'half_open_breakers': half_open_breakers,
                'closed_breakers': total_breakers - open_breakers - half_open_breakers,
                'avg_failure_rate': avg_failure_rate
            },
            'breakers': all_stats,
            'alerts': alerts,
            'timestamp': time.time()
        }

Bulkhead Isolation Pattern

Resource Isolation Implementation

Resource Pool Isolation

import asyncio
from typing import Dict, Any, Optional, List, Callable
from dataclasses import dataclass
import time

@dataclass
class BulkheadConfig:
    max_concurrent_calls: int = 10
    max_queue_size: int = 100
    timeout: float = 30.0
    isolation_level: str = "strict"  # strict, shared, adaptive

class ResourceBulkhead:
    """Isolate resources using bulkhead pattern"""

    def __init__(self, name: str, config: BulkheadConfig):
        self.name = name
        self.config = config
        self.semaphore = asyncio.Semaphore(config.max_concurrent_calls)
        self.queue = asyncio.Queue(maxsize=config.max_queue_size)
        self.active_calls = 0
        self.queued_calls = 0
        self.rejected_calls = 0
        self.total_calls = 0

    async def execute(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function within bulkhead constraints"""

        self.total_calls += 1

        # Check if we can accept the call
        if not await self._can_accept_call():
            self.rejected_calls += 1
            raise BulkheadRejectedException(
                f"Bulkhead '{self.name}' rejected call - resource limit exceeded"
            )

        try:
            async with self.semaphore:
                self.active_calls += 1
                try:
                    # Execute with timeout
                    result = await asyncio.wait_for(
                        self._execute_function(func, *args, **kwargs),
                        timeout=self.config.timeout
                    )
                    return result
                finally:
                    self.active_calls -= 1

        except asyncio.TimeoutError:
            raise BulkheadTimeoutException(
                f"Bulkhead '{self.name}' operation timed out after {self.config.timeout}s"
            )

    async def _can_accept_call(self) -> bool:
        """Check if bulkhead can accept new call"""

        # Strict isolation - never exceed limits
        if self.config.isolation_level == "strict":
            return (self.active_calls < self.config.max_concurrent_calls and
                    self.queued_calls < self.config.max_queue_size)

        # Shared isolation - allow some overflow
        elif self.config.isolation_level == "shared":
            return self.active_calls < self.config.max_concurrent_calls * 1.2

        # Adaptive isolation - adjust based on performance
        elif self.config.isolation_level == "adaptive":
            success_rate = self._calculate_success_rate()
            if success_rate > 0.9:
                # High success rate, allow more calls
                return self.active_calls < self.config.max_concurrent_calls * 1.1
            else:
                # Lower success rate, be more restrictive
                return self.active_calls < self.config.max_concurrent_calls * 0.8

        return True

    async def _execute_function(self, func: Callable, *args, **kwargs) -> Any:
        """Execute function (sync or async)"""
        if asyncio.iscoroutinefunction(func):
            return await func(*args, **kwargs)
        else:
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(None, func, *args, **kwargs)

    def _calculate_success_rate(self) -> float:
        """Calculate recent success rate"""
        # Simplified - would track actual success/failure in real implementation
        if self.total_calls == 0:
            return 1.0
        return max(0, 1.0 - (self.rejected_calls / self.total_calls))

    def get_stats(self) -> Dict[str, Any]:
        """Get bulkhead statistics"""
        return {
            'name': self.name,
            'active_calls': self.active_calls,
            'queued_calls': self.queued_calls,
            'rejected_calls': self.rejected_calls,
            'total_calls': self.total_calls,
            'rejection_rate': self.rejected_calls / max(self.total_calls, 1),
            'utilization': self.active_calls / self.config.max_concurrent_calls,
            'config': {
                'max_concurrent': self.config.max_concurrent_calls,
                'max_queue_size': self.config.max_queue_size,
                'timeout': self.config.timeout,
                'isolation_level': self.config.isolation_level
            }
        }


class BulkheadRejectedException(Exception):
    """Exception raised when bulkhead rejects a call"""
    pass

class BulkheadTimeoutException(Exception):
    """Exception raised when bulkhead operation times out"""
    pass


class BulkheadManager:
    """Manage multiple bulkheads for different resource types"""

    def __init__(self):
        self.bulkheads: Dict[str, ResourceBulkhead] = {}
        self.default_configs = {
            'database': BulkheadConfig(max_concurrent_calls=20, timeout=10.0),
            'agent_communication': BulkheadConfig(max_concurrent_calls=50, timeout=30.0),
            'external_api': BulkheadConfig(max_concurrent_calls=10, timeout=60.0),
            'file_operations': BulkheadConfig(max_concurrent_calls=5, timeout=120.0)
        }

    def get_bulkhead(self, resource_type: str, config: Optional[BulkheadConfig] = None) -> ResourceBulkhead:
        """Get or create bulkhead for resource type"""

        if resource_type not in self.bulkheads:
            bulkhead_config = config or self.default_configs.get(
                resource_type, BulkheadConfig()
            )
            self.bulkheads[resource_type] = ResourceBulkhead(resource_type, bulkhead_config)

        return self.bulkheads[resource_type]

    async def execute_with_bulkhead(self, resource_type: str, func: Callable,
                                  *args, **kwargs) -> Any:
        """Execute function with appropriate bulkhead protection"""

        bulkhead = self.get_bulkhead(resource_type)
        return await bulkhead.execute(func, *args, **kwargs)

    def get_all_stats(self) -> Dict[str, Dict[str, Any]]:
        """Get statistics for all bulkheads"""
        return {
            name: bulkhead.get_stats()
            for name, bulkhead in self.bulkheads.items()
        }


# Global bulkhead manager
bulkhead_manager = BulkheadManager()


class BulkheadDecorator:
    """Decorator for automatic bulkhead protection"""

    def __init__(self, resource_type: str, config: Optional[BulkheadConfig] = None):
        self.resource_type = resource_type
        self.config = config

    def __call__(self, func):
        async def wrapper(*args, **kwargs):
            return await bulkhead_manager.execute_with_bulkhead(
                self.resource_type, func, *args, **kwargs
            )
        return wrapper


# Usage examples
@BulkheadDecorator("database")
async def update_agent_metrics(db: Database, agent_id: str, metrics: Dict):
    """Update agent metrics with database bulkhead protection"""
    # Database operation
    pass

@BulkheadDecorator("agent_communication")
async def send_command_to_agent(agent_id: str, command: Dict):
    """Send command to agent with communication bulkhead protection"""
    # Agent communication
    pass

Fallback and Graceful Degradation

Fallback Strategy Implementation

Comprehensive Fallback System

from typing import List, Callable, Any, Optional, Dict, Union
from dataclasses import dataclass
import asyncio
import logging

@dataclass
class FallbackConfig:
    enable_cache_fallback: bool = True
    enable_default_value_fallback: bool = True
    enable_degraded_service_fallback: bool = True
    cache_ttl: int = 300  # 5 minutes
    degraded_timeout: float = 5.0  # Faster timeout for degraded service

class FallbackStrategy:
    """Comprehensive fallback strategy with multiple levels"""

    def __init__(self, config: FallbackConfig):
        self.config = config
        self.cache: Dict[str, Any] = {}
        self.cache_timestamps: Dict[str, float] = {}

    async def execute_with_fallback(self,
                                  primary_func: Callable,
                                  fallback_funcs: List[Callable],
                                  cache_key: Optional[str] = None,
                                  default_value: Any = None,
                                  *args, **kwargs) -> Any:
        """Execute function with multiple fallback levels"""

        # Try primary function first
        try:
            result = await self._execute_with_timeout(primary_func, *args, **kwargs)

            # Cache successful result
            if cache_key and self.config.enable_cache_fallback:
                self._cache_result(cache_key, result)

            return result

        except Exception as primary_error:
            logging.warning(f"Primary function failed: {primary_error}")

            # Try fallback functions in order
            for i, fallback_func in enumerate(fallback_funcs):
                try:
                    logging.info(f"Trying fallback {i + 1}/{len(fallback_funcs)}")

                    # Use shorter timeout for fallbacks
                    result = await self._execute_with_timeout(
                        fallback_func,
                        timeout=self.config.degraded_timeout,
                        *args, **kwargs
                    )

                    logging.info(f"Fallback {i + 1} succeeded")
                    return result

                except Exception as fallback_error:
                    logging.warning(f"Fallback {i + 1} failed: {fallback_error}")
                    continue

            # Try cache fallback
            if cache_key and self.config.enable_cache_fallback:
                cached_result = self._get_cached_result(cache_key)
                if cached_result is not None:
                    logging.info("Using cached fallback value")
                    return cached_result

            # Try default value fallback
            if default_value is not None and self.config.enable_default_value_fallback:
                logging.info("Using default fallback value")
                return default_value

            # All fallbacks failed
            logging.error("All fallback strategies failed")
            raise FallbackExhaustedException("All fallback strategies exhausted") from primary_error

    async def _execute_with_timeout(self, func: Callable, timeout: Optional[float] = None,
                                   *args, **kwargs) -> Any:
        """Execute function with timeout"""

        if timeout is None:
            timeout = 30.0  # Default timeout

        if asyncio.iscoroutinefunction(func):
            return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout)
        else:
            loop = asyncio.get_event_loop()
            return await asyncio.wait_for(
                loop.run_in_executor(None, func, *args, **kwargs),
                timeout=timeout
            )

    def _cache_result(self, key: str, result: Any):
        """Cache result for fallback use"""
        self.cache[key] = result
        self.cache_timestamps[key] = time.time()

    def _get_cached_result(self, key: str) -> Optional[Any]:
        """Get cached result if still valid"""

        if key not in self.cache:
            return None

        # Check if cache is still valid
        cache_age = time.time() - self.cache_timestamps[key]
        if cache_age > self.config.cache_ttl:
            # Cache expired
            del self.cache[key]
            del self.cache_timestamps[key]
            return None

        return self.cache[key]

    def clear_cache(self, key: Optional[str] = None):
        """Clear cache entries"""
        if key:
            self.cache.pop(key, None)
            self.cache_timestamps.pop(key, None)
        else:
            self.cache.clear()
            self.cache_timestamps.clear()


class FallbackExhaustedException(Exception):
    """Exception raised when all fallback strategies fail"""
    pass


class ServiceFallbackManager:
    """Manage fallback strategies for different services"""

    def __init__(self):
        self.strategies: Dict[str, FallbackStrategy] = {}
        self.default_config = FallbackConfig()

    def get_strategy(self, service_name: str, config: Optional[FallbackConfig] = None) -> FallbackStrategy:
        """Get or create fallback strategy for service"""

        if service_name not in self.strategies:
            strategy_config = config or self.default_config
            self.strategies[service_name] = FallbackStrategy(strategy_config)

        return self.strategies[service_name]

    async def agent_communication_with_fallback(self, agent_id: str, command: Dict) -> Dict:
        """Agent communication with comprehensive fallback"""

        strategy = self.get_strategy("agent_communication")

        # Primary: Direct agent communication
        async def primary():
            return await send_agent_command_direct(agent_id, command)

        # Fallback 1: Try alternative agent endpoint
        async def fallback_1():
            return await send_agent_command_alt_endpoint(agent_id, command)

        # Fallback 2: Queue command for later execution
        async def fallback_2():
            await queue_agent_command(agent_id, command)
            return {"status": "queued", "message": "Command queued for later execution"}

        # Fallback 3: Return cached status if available
        def default_value():
            return {"status": "unavailable", "message": "Agent temporarily unavailable"}

        return await strategy.execute_with_fallback(
            primary,
            [fallback_1, fallback_2],
            cache_key=f"agent_command_{agent_id}",
            default_value=default_value()
        )

    async def database_query_with_fallback(self, query: str, params: tuple = ()) -> List[Dict]:
        """Database query with fallback to read replicas"""

        strategy = self.get_strategy("database")

        # Primary: Main database
        async def primary():
            return await execute_query_primary_db(query, params)

        # Fallback 1: Read replica 1
        async def fallback_1():
            return await execute_query_replica_db(query, params, replica=1)

        # Fallback 2: Read replica 2
        async def fallback_2():
            return await execute_query_replica_db(query, params, replica=2)

        return await strategy.execute_with_fallback(
            primary,
            [fallback_1, fallback_2],
            cache_key=f"query_{hash(query)}",
            default_value=[]
        )

    async def external_api_with_fallback(self, endpoint: str, data: Dict) -> Dict:
        """External API call with fallback strategies"""

        strategy = self.get_strategy("external_api")

        # Primary: Main API endpoint
        async def primary():
            return await call_external_api_primary(endpoint, data)

        # Fallback 1: Backup API endpoint
        async def fallback_1():
            return await call_external_api_backup(endpoint, data)

        # Fallback 2: Cached response if available
        async def fallback_2():
            cached = strategy._get_cached_result(f"api_{endpoint}")
            if cached:
                return cached
            raise Exception("No cached data available")

        return await strategy.execute_with_fallback(
            primary,
            [fallback_1, fallback_2],
            cache_key=f"api_{endpoint}",
            default_value={"status": "unavailable", "data": None}
        )


# Global fallback manager
fallback_manager = ServiceFallbackManager()


class FallbackDecorator:
    """Decorator for automatic fallback protection"""

    def __init__(self, service_name: str, fallback_funcs: List[Callable] = None,
                 cache_key_func: Optional[Callable] = None,
                 default_value: Any = None):
        self.service_name = service_name
        self.fallback_funcs = fallback_funcs or []
        self.cache_key_func = cache_key_func
        self.default_value = default_value

    def __call__(self, func):
        async def wrapper(*args, **kwargs):
            strategy = fallback_manager.get_strategy(self.service_name)

            # Generate cache key if function provided
            cache_key = None
            if self.cache_key_func:
                cache_key = self.cache_key_func(*args, **kwargs)

            return await strategy.execute_with_fallback(
                func,
                self.fallback_funcs,
                cache_key=cache_key,
                default_value=self.default_value,
                *args, **kwargs
            )
        return wrapper


# Usage examples
def generate_agent_cache_key(agent_id: str, command: Dict) -> str:
    return f"agent_{agent_id}_{hash(str(command))}"

@FallbackDecorator(
    service_name="agent_communication",
    cache_key_func=generate_agent_cache_key,
    default_value={"status": "error", "message": "Service unavailable"}
)
async def get_agent_status(agent_id: str) -> Dict:
    """Get agent status with fallback protection"""
    # Primary implementation
    async with aiohttp.ClientSession() as session:
        async with session.get(f"http://agent-{agent_id}/status") as response:
            return await response.json()

Resilience Monitoring

Comprehensive Resilience Dashboard

Resilience Health Monitor

class ResilienceMonitor:
    """Monitor all resilience patterns and generate health reports"""

    def __init__(self):
        self.circuit_breaker_monitor = CircuitBreakerMonitor(circuit_breaker_registry)
        self.retry_statistics: Dict[str, Dict] = {}
        self.fallback_statistics: Dict[str, Dict] = {}

    async def collect_resilience_metrics(self) -> Dict[str, Any]:
        """Collect comprehensive resilience metrics"""

        # Circuit breaker metrics
        cb_data = await self.circuit_breaker_monitor.get_dashboard_data()

        # Bulkhead metrics
        bulkhead_stats = bulkhead_manager.get_all_stats()

        # Retry statistics
        retry_stats = self._aggregate_retry_statistics()

        # Fallback statistics
        fallback_stats = self._aggregate_fallback_statistics()

        return {
            'circuit_breakers': cb_data,
            'bulkheads': bulkhead_stats,
            'retry_patterns': retry_stats,
            'fallback_patterns': fallback_stats,
            'overall_health': self._calculate_overall_health(
                cb_data, bulkhead_stats, retry_stats, fallback_stats
            ),
            'timestamp': time.time()
        }

    def _calculate_overall_health(self, cb_data: Dict, bulkhead_stats: Dict,
                                retry_stats: Dict, fallback_stats: Dict) -> Dict[str, Any]:
        """Calculate overall system resilience health"""

        # Circuit breaker health
        cb_health = 1.0
        if cb_data['summary']['open_breakers'] > 0:
            cb_health = 1.0 - (cb_data['summary']['open_breakers'] /
                             max(cb_data['summary']['total_breakers'], 1))

        # Bulkhead health
        bulkhead_health = 1.0
        total_rejection_rate = 0
        bulkhead_count = len(bulkhead_stats)

        if bulkhead_count > 0:
            total_rejection_rate = sum(
                stats.get('rejection_rate', 0) for stats in bulkhead_stats.values()
            ) / bulkhead_count
            bulkhead_health = 1.0 - total_rejection_rate

        # Retry pattern health
        retry_health = 1.0
        if retry_stats.get('total_operations', 0) > 0:
            retry_success_rate = retry_stats.get('success_rate', 1.0)
            retry_health = retry_success_rate

        # Overall health score
        overall_health = (cb_health + bulkhead_health + retry_health) / 3

        return {
            'score': overall_health,
            'circuit_breaker_health': cb_health,
            'bulkhead_health': bulkhead_health,
            'retry_health': retry_health,
            'status': 'healthy' if overall_health > 0.8 else 'degraded' if overall_health > 0.6 else 'critical'
        }

    async def generate_resilience_report(self) -> str:
        """Generate human-readable resilience report"""

        metrics = await self.collect_resilience_metrics()

        report = []
        report.append("=== SysManage Resilience Health Report ===")
        report.append(f"Generated at: {time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}")
        report.append("")

        # Overall health
        health = metrics['overall_health']
        report.append(f"Overall Health Score: {health['score']:.2%} ({health['status'].upper()})")
        report.append("")

        # Circuit breakers
        cb_data = metrics['circuit_breakers']
        report.append("Circuit Breakers:")
        report.append(f"  Total: {cb_data['summary']['total_breakers']}")
        report.append(f"  Open: {cb_data['summary']['open_breakers']}")
        report.append(f"  Half-Open: {cb_data['summary']['half_open_breakers']}")

        if cb_data['alerts']:
            report.append("  Alerts:")
            for alert in cb_data['alerts'][:5]:  # Show top 5 alerts
                report.append(f"    - {alert['message']}")

        report.append("")

        # Bulkheads
        bulkhead_data = metrics['bulkheads']
        report.append("Resource Bulkheads:")
        for name, stats in bulkhead_data.items():
            utilization = stats['utilization']
            rejection_rate = stats['rejection_rate']
            report.append(f"  {name}: {utilization:.1%} utilization, {rejection_rate:.2%} rejection rate")

        return "\n".join(report)

# Prometheus metrics for resilience monitoring
from prometheus_client import Counter, Histogram, Gauge

# Circuit breaker metrics
circuit_breaker_state = Gauge('circuit_breaker_state', 'Circuit breaker state', ['name', 'state'])
circuit_breaker_failures = Counter('circuit_breaker_failures_total', 'Circuit breaker failures', ['name'])

# Retry metrics
retry_attempts = Counter('retry_attempts_total', 'Total retry attempts', ['operation', 'attempt'])
retry_success = Counter('retry_success_total', 'Successful retries', ['operation'])
retry_duration = Histogram('retry_duration_seconds', 'Retry operation duration', ['operation'])

# Bulkhead metrics
bulkhead_active_calls = Gauge('bulkhead_active_calls', 'Active calls in bulkhead', ['resource'])
bulkhead_rejected_calls = Counter('bulkhead_rejected_calls_total', 'Rejected calls', ['resource'])
bulkhead_utilization = Gauge('bulkhead_utilization_ratio', 'Bulkhead utilization', ['resource'])

Retry Logic Best Practices

Design Guidelines

  • Idempotency: Ensure operations are safe to retry
  • Exponential Backoff: Use exponential backoff with jitter
  • Circuit Breakers: Implement circuit breakers for external dependencies
  • Timeouts: Set appropriate timeouts for all operations

Implementation Guidelines

  • Selective Retries: Only retry transient failures
  • Monitoring: Monitor retry patterns and failure rates
  • Fallback Strategies: Implement graceful degradation
  • Testing: Test failure scenarios and recovery

Operational Guidelines

  • Alert Configuration: Set up alerts for resilience pattern failures
  • Capacity Planning: Plan for retry load on dependent services
  • Documentation: Document retry and fallback behaviors
  • Performance Impact: Monitor performance impact of resilience patterns

Next Steps

To learn more about related system reliability topics:

  1. Scaling Strategies: System scaling and capacity management
  2. Performance Optimization: System performance tuning
  3. Queue Management: Background task processing
  4. Load Balancing: Traffic distribution strategies