Queue Management
Message queues, async processing, task scheduling, reliability patterns, and queue management strategies in SysManage.
Queue Management Overview
SysManage implements a sophisticated queue management system to handle asynchronous task processing, inter-service communication, and reliable message delivery across the distributed architecture.
Core Principles
- Reliability: Guaranteed message delivery with acknowledgments
- Scalability: Horizontal scaling of queue workers and processors
- Priority Management: Priority-based task execution
- Dead Letter Handling: Proper handling of failed messages
Queue Architecture
Multi-Queue Processing System
┌─────────────────────────────────────────────────────────────────┐
│ Queue Management Layer │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Task │ │ Priority │ │ Dead Letter │ │
│ │ Dispatcher │ │ Manager │ │ Queue Handler │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Queue │ │ Worker Pool │ │ Result │ │
│ │ Router │ │ Manager │ │ Aggregator │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ ▲
Messages │ │ Results
▼ │
┌─────────────────────────────────────────────────────────────────┐
│ Redis Queue System │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ High │ │ Normal │ │ Low Priority │ │
│ │ Priority │ │ Priority │ │ Queue │ │
│ │ Queue │ │ Queue │ │ │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Agent │ │ System │ │ Maintenance │ │
│ │ Tasks │ │ Tasks │ │ Tasks │ │
│ │ Queue │ │ Queue │ │ Queue │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Dead Letter │ │ Delayed │ │ Result │ │
│ │ Queue │ │ Queue │ │ Storage │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ ▲
Process │ │ Acknowledge
▼ │
┌─────────────────────────────────────────────────────────────────┐
│ Worker Processes │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Agent │ │ Package │ │ System │ │
│ │ Worker │ │ Worker │ │ Worker │ │
│ │ Pool │ │ Pool │ │ Pool │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Queue Types and Purposes
Agent Communication Queues
- Agent command dispatch
- Agent response collection
- Real-time status updates
- Metric data ingestion
Background Processing Queues
- Package update processing
- System health monitoring
- Report generation
- Data cleanup tasks
Integration Queues
- External API calls
- Webhook notifications
- Email delivery
- Audit log processing
Redis Queue Implementation
Core Queue Manager
Advanced Queue Management System
import redis
import json
import time
import uuid
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import asyncio
import logging
class TaskPriority(Enum):
LOW = 1
NORMAL = 2
HIGH = 3
CRITICAL = 4
class TaskStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
RETRYING = "retrying"
@dataclass
class Task:
id: str
type: str
payload: Dict[str, Any]
priority: TaskPriority
max_retries: int = 3
retry_count: int = 0
created_at: float = None
scheduled_at: float = None
timeout: int = 300 # 5 minutes
status: TaskStatus = TaskStatus.PENDING
def __post_init__(self):
if self.created_at is None:
self.created_at = time.time()
if self.scheduled_at is None:
self.scheduled_at = self.created_at
def to_dict(self) -> Dict:
data = asdict(self)
data['priority'] = self.priority.value
data['status'] = self.status.value
return data
@classmethod
def from_dict(cls, data: Dict) -> 'Task':
data['priority'] = TaskPriority(data['priority'])
data['status'] = TaskStatus(data['status'])
return cls(**data)
class QueueManager:
"""Advanced Redis-based queue manager"""
def __init__(self, redis_client: redis.Redis, namespace: str = "sysmanage"):
self.redis = redis_client
self.namespace = namespace
self.queues = {
TaskPriority.CRITICAL: f"{namespace}:queue:critical",
TaskPriority.HIGH: f"{namespace}:queue:high",
TaskPriority.NORMAL: f"{namespace}:queue:normal",
TaskPriority.LOW: f"{namespace}:queue:low"
}
self.processing_queue = f"{namespace}:queue:processing"
self.dead_letter_queue = f"{namespace}:queue:dead_letter"
self.delayed_queue = f"{namespace}:queue:delayed"
self.task_storage = f"{namespace}:tasks"
async def enqueue(self, task: Task) -> str:
"""Enqueue task with priority"""
# Generate task ID if not provided
if not task.id:
task.id = str(uuid.uuid4())
# Store task metadata
await self._store_task(task)
# Handle delayed tasks
if task.scheduled_at > time.time():
await self._enqueue_delayed(task)
else:
await self._enqueue_immediate(task)
return task.id
async def _store_task(self, task: Task):
"""Store task metadata in Redis"""
task_data = json.dumps(task.to_dict())
await self.redis.hset(self.task_storage, task.id, task_data)
async def _enqueue_immediate(self, task: Task):
"""Enqueue task for immediate processing"""
queue_name = self.queues[task.priority]
# Use priority score for sorting within queue
score = time.time() - (task.priority.value * 1000000)
await self.redis.zadd(queue_name, {task.id: score})
async def _enqueue_delayed(self, task: Task):
"""Enqueue task for delayed processing"""
await self.redis.zadd(self.delayed_queue, {task.id: task.scheduled_at})
async def dequeue(self, worker_id: str, timeout: int = 10) -> Optional[Task]:
"""Dequeue highest priority task"""
# Check for ready delayed tasks first
await self._process_delayed_tasks()
# Try to get task from priority queues
for priority in [TaskPriority.CRITICAL, TaskPriority.HIGH,
TaskPriority.NORMAL, TaskPriority.LOW]:
queue_name = self.queues[priority]
# Use BZPOPMIN for blocking pop with timeout
result = await self.redis.bzpopmin(queue_name, timeout=timeout)
if result:
queue, task_id, score = result
task = await self._get_task(task_id)
if task:
# Move task to processing queue
await self._mark_processing(task, worker_id)
return task
return None
async def _process_delayed_tasks(self):
"""Move ready delayed tasks to appropriate queues"""
current_time = time.time()
# Get tasks ready for processing
ready_tasks = await self.redis.zrangebyscore(
self.delayed_queue, 0, current_time, withscores=True
)
for task_id, scheduled_time in ready_tasks:
task = await self._get_task(task_id)
if task:
# Remove from delayed queue
await self.redis.zrem(self.delayed_queue, task_id)
# Add to appropriate priority queue
await self._enqueue_immediate(task)
async def _get_task(self, task_id: str) -> Optional[Task]:
"""Retrieve task metadata"""
task_data = await self.redis.hget(self.task_storage, task_id)
if task_data:
data = json.loads(task_data)
return Task.from_dict(data)
return None
async def _mark_processing(self, task: Task, worker_id: str):
"""Mark task as being processed"""
task.status = TaskStatus.PROCESSING
# Store in processing queue with worker info
processing_data = {
'task_id': task.id,
'worker_id': worker_id,
'started_at': time.time(),
'timeout_at': time.time() + task.timeout
}
await self.redis.hset(
self.processing_queue,
task.id,
json.dumps(processing_data)
)
# Update task metadata
await self._store_task(task)
async def acknowledge(self, task_id: str, result: Dict[str, Any] = None):
"""Acknowledge successful task completion"""
task = await self._get_task(task_id)
if not task:
return False
task.status = TaskStatus.COMPLETED
# Remove from processing queue
await self.redis.hdel(self.processing_queue, task_id)
# Store result if provided
if result:
result_key = f"{self.namespace}:results:{task_id}"
await self.redis.setex(
result_key,
3600, # Keep results for 1 hour
json.dumps(result)
)
# Update task status
await self._store_task(task)
return True
async def fail(self, task_id: str, error: str, retry: bool = True):
"""Mark task as failed and optionally retry"""
task = await self._get_task(task_id)
if not task:
return False
# Remove from processing queue
await self.redis.hdel(self.processing_queue, task_id)
# Check if we should retry
if retry and task.retry_count < task.max_retries:
task.retry_count += 1
task.status = TaskStatus.RETRYING
# Calculate exponential backoff delay
delay = min(300, 2 ** task.retry_count * 10) # Max 5 minutes
task.scheduled_at = time.time() + delay
# Re-enqueue for retry
await self._enqueue_delayed(task)
logging.info(f"Task {task_id} scheduled for retry {task.retry_count}/{task.max_retries} in {delay}s")
else:
# Move to dead letter queue
task.status = TaskStatus.FAILED
dead_letter_data = {
'task': task.to_dict(),
'error': error,
'failed_at': time.time()
}
await self.redis.lpush(
self.dead_letter_queue,
json.dumps(dead_letter_data)
)
logging.error(f"Task {task_id} moved to dead letter queue: {error}")
# Update task status
await self._store_task(task)
return True
async def get_queue_stats(self) -> Dict[str, Any]:
"""Get comprehensive queue statistics"""
stats = {}
# Queue lengths
for priority, queue_name in self.queues.items():
stats[f"{priority.name.lower()}_queue"] = await self.redis.zcard(queue_name)
# Processing queue
stats['processing'] = await self.redis.hlen(self.processing_queue)
# Delayed queue
stats['delayed'] = await self.redis.zcard(self.delayed_queue)
# Dead letter queue
stats['dead_letter'] = await self.redis.llen(self.dead_letter_queue)
# Task status counts
all_tasks = await self.redis.hgetall(self.task_storage)
status_counts = {}
for task_data in all_tasks.values():
task = Task.from_dict(json.loads(task_data))
status = task.status.value
status_counts[status] = status_counts.get(status, 0) + 1
stats['task_status'] = status_counts
return stats
async def cleanup_stale_tasks(self):
"""Clean up stale processing tasks"""
current_time = time.time()
processing_tasks = await self.redis.hgetall(self.processing_queue)
for task_id, processing_data in processing_tasks.items():
data = json.loads(processing_data)
# Check if task has timed out
if current_time > data['timeout_at']:
await self.fail(
task_id,
f"Task timed out after {data['timeout_at'] - data['started_at']}s",
retry=True
)
logging.warning(f"Cleaned up stale task {task_id}")
class WorkerPool:
"""Pool of worker processes for task execution"""
def __init__(self, queue_manager: QueueManager, worker_count: int = 4):
self.queue_manager = queue_manager
self.worker_count = worker_count
self.workers = []
self.running = False
self.task_handlers = {}
def register_handler(self, task_type: str, handler: Callable):
"""Register handler for specific task type"""
self.task_handlers[task_type] = handler
async def start(self):
"""Start worker pool"""
self.running = True
# Start worker coroutines
for i in range(self.worker_count):
worker_id = f"worker-{i}"
worker_coro = self._worker_loop(worker_id)
task = asyncio.create_task(worker_coro)
self.workers.append(task)
logging.info(f"Started {self.worker_count} workers")
async def stop(self):
"""Stop worker pool gracefully"""
self.running = False
# Wait for workers to finish current tasks
await asyncio.gather(*self.workers, return_exceptions=True)
logging.info("Worker pool stopped")
async def _worker_loop(self, worker_id: str):
"""Main worker loop"""
while self.running:
try:
# Get next task
task = await self.queue_manager.dequeue(worker_id, timeout=5)
if task:
await self._process_task(task)
except Exception as e:
logging.error(f"Worker {worker_id} error: {e}")
await asyncio.sleep(1)
async def _process_task(self, task: Task):
"""Process individual task"""
try:
# Get handler for task type
handler = self.task_handlers.get(task.type)
if not handler:
raise Exception(f"No handler registered for task type: {task.type}")
# Execute task
result = await handler(task.payload)
# Acknowledge completion
await self.queue_manager.acknowledge(task.id, result)
logging.info(f"Task {task.id} completed successfully")
except Exception as e:
# Mark task as failed
await self.queue_manager.fail(task.id, str(e))
logging.error(f"Task {task.id} failed: {e}")
# Usage example
async def main():
redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
queue_manager = QueueManager(redis_client)
worker_pool = WorkerPool(queue_manager, worker_count=8)
# Register task handlers
async def handle_agent_command(payload: Dict) -> Dict:
# Process agent command
agent_id = payload['agent_id']
command = payload['command']
# Execute command logic
result = await execute_agent_command(agent_id, command)
return {'success': True, 'result': result}
async def handle_package_update(payload: Dict) -> Dict:
# Process package update
package_info = payload['package_info']
# Update package logic
result = await update_package(package_info)
return {'updated_packages': result}
worker_pool.register_handler('agent_command', handle_agent_command)
worker_pool.register_handler('package_update', handle_package_update)
# Start worker pool
await worker_pool.start()
# Example: Enqueue some tasks
await queue_manager.enqueue(Task(
id=None,
type='agent_command',
payload={'agent_id': 'agent-123', 'command': 'update_packages'},
priority=TaskPriority.HIGH
))
# Run for demonstration
await asyncio.sleep(60)
# Stop worker pool
await worker_pool.stop()
if __name__ == "__main__":
asyncio.run(main())
Message Serialization and Validation
Robust Message Handling
from pydantic import BaseModel, validator
from typing import Union, Dict, Any
import gzip
import pickle
import json
class MessageSerializer:
"""Efficient message serialization with compression"""
@staticmethod
def serialize(data: Any, compress: bool = True) -> bytes:
"""Serialize data with optional compression"""
# Try JSON first (human readable)
try:
json_data = json.dumps(data, default=str)
serialized = json_data.encode('utf-8')
method = 'json'
except (TypeError, ValueError):
# Fallback to pickle for complex objects
serialized = pickle.dumps(data)
method = 'pickle'
# Add compression if data is large
if compress and len(serialized) > 1024: # 1KB threshold
serialized = gzip.compress(serialized)
method += '_gzip'
# Prepend method identifier
return method.encode('utf-8') + b':' + serialized
@staticmethod
def deserialize(data: bytes) -> Any:
"""Deserialize data with automatic method detection"""
# Extract method
separator_idx = data.find(b':')
if separator_idx == -1:
raise ValueError("Invalid serialized data format")
method = data[:separator_idx].decode('utf-8')
payload = data[separator_idx + 1:]
# Handle compression
if method.endswith('_gzip'):
payload = gzip.decompress(payload)
method = method[:-5] # Remove _gzip suffix
# Deserialize based on method
if method == 'json':
return json.loads(payload.decode('utf-8'))
elif method == 'pickle':
return pickle.loads(payload)
else:
raise ValueError(f"Unknown serialization method: {method}")
class AgentCommandPayload(BaseModel):
"""Validated payload for agent commands"""
agent_id: str
command: str
parameters: Dict[str, Any] = {}
timeout: int = 300
priority: str = "normal"
@validator('agent_id')
def validate_agent_id(cls, v):
if not v or len(v) < 8:
raise ValueError('Agent ID must be at least 8 characters')
return v
@validator('command')
def validate_command(cls, v):
allowed_commands = [
'update_packages', 'install_package', 'remove_package',
'collect_metrics', 'restart_service', 'run_script'
]
if v not in allowed_commands:
raise ValueError(f'Command must be one of: {allowed_commands}')
return v
@validator('priority')
def validate_priority(cls, v):
allowed_priorities = ['low', 'normal', 'high', 'critical']
if v not in allowed_priorities:
raise ValueError(f'Priority must be one of: {allowed_priorities}')
return v
class PackageUpdatePayload(BaseModel):
"""Validated payload for package updates"""
agent_id: str
packages: List[Dict[str, str]]
update_type: str = "security"
reboot_required: bool = False
@validator('packages')
def validate_packages(cls, v):
for package in v:
required_fields = ['name', 'current_version', 'target_version']
if not all(field in package for field in required_fields):
raise ValueError(f'Package must have fields: {required_fields}')
return v
@validator('update_type')
def validate_update_type(cls, v):
allowed_types = ['security', 'feature', 'bugfix', 'all']
if v not in allowed_types:
raise ValueError(f'Update type must be one of: {allowed_types}')
return v
class MessageValidator:
"""Message validation and sanitization"""
payload_schemas = {
'agent_command': AgentCommandPayload,
'package_update': PackageUpdatePayload
}
@classmethod
def validate_task(cls, task: Task) -> Task:
"""Validate and sanitize task payload"""
if task.type in cls.payload_schemas:
schema = cls.payload_schemas[task.type]
try:
# Validate payload against schema
validated_payload = schema(**task.payload)
task.payload = validated_payload.dict()
except Exception as e:
raise ValueError(f"Invalid payload for {task.type}: {e}")
return task
Advanced Task Scheduling
Cron-Style Scheduling
Periodic Task Scheduler
import croniter
from datetime import datetime, timedelta
from typing import List, Dict, Callable, Optional
@dataclass
class ScheduledTask:
name: str
task_type: str
payload: Dict[str, Any]
cron_expression: str
priority: TaskPriority = TaskPriority.NORMAL
enabled: bool = True
last_run: Optional[datetime] = None
next_run: Optional[datetime] = None
max_runtime: int = 3600 # 1 hour
overlap_strategy: str = "skip" # skip, queue, or replace
def __post_init__(self):
if self.next_run is None:
self.calculate_next_run()
def calculate_next_run(self):
"""Calculate next run time based on cron expression"""
cron = croniter.croniter(self.cron_expression, datetime.utcnow())
self.next_run = cron.get_next(datetime)
def should_run(self) -> bool:
"""Check if task should run now"""
if not self.enabled:
return False
return datetime.utcnow() >= self.next_run
class TaskScheduler:
"""Advanced task scheduler with cron support"""
def __init__(self, queue_manager: QueueManager):
self.queue_manager = queue_manager
self.scheduled_tasks: Dict[str, ScheduledTask] = {}
self.running_tasks: Dict[str, datetime] = {}
self.scheduler_running = False
def register_scheduled_task(self, scheduled_task: ScheduledTask):
"""Register a new scheduled task"""
self.scheduled_tasks[scheduled_task.name] = scheduled_task
def unregister_scheduled_task(self, task_name: str):
"""Unregister a scheduled task"""
self.scheduled_tasks.pop(task_name, None)
async def start_scheduler(self):
"""Start the task scheduler"""
self.scheduler_running = True
while self.scheduler_running:
try:
await self._process_scheduled_tasks()
await asyncio.sleep(60) # Check every minute
except Exception as e:
logging.error(f"Scheduler error: {e}")
await asyncio.sleep(60)
async def stop_scheduler(self):
"""Stop the task scheduler"""
self.scheduler_running = False
async def _process_scheduled_tasks(self):
"""Process scheduled tasks that are ready to run"""
current_time = datetime.utcnow()
for task_name, scheduled_task in self.scheduled_tasks.items():
if scheduled_task.should_run():
await self._execute_scheduled_task(scheduled_task)
async def _execute_scheduled_task(self, scheduled_task: ScheduledTask):
"""Execute a scheduled task"""
# Check overlap strategy
if scheduled_task.name in self.running_tasks:
if scheduled_task.overlap_strategy == "skip":
logging.info(f"Skipping overlapping task: {scheduled_task.name}")
return
elif scheduled_task.overlap_strategy == "replace":
# Could implement task cancellation here
pass
# Create task
task = Task(
id=f"scheduled_{scheduled_task.name}_{int(time.time())}",
type=scheduled_task.task_type,
payload=scheduled_task.payload.copy(),
priority=scheduled_task.priority,
timeout=scheduled_task.max_runtime
)
# Enqueue task
await self.queue_manager.enqueue(task)
# Update scheduled task
scheduled_task.last_run = datetime.utcnow()
scheduled_task.calculate_next_run()
# Track running task
self.running_tasks[scheduled_task.name] = datetime.utcnow()
logging.info(f"Scheduled task '{scheduled_task.name}' enqueued. Next run: {scheduled_task.next_run}")
# Predefined scheduled tasks
def create_system_maintenance_tasks() -> List[ScheduledTask]:
"""Create common system maintenance tasks"""
return [
ScheduledTask(
name="daily_cleanup",
task_type="system_cleanup",
payload={"cleanup_type": "logs", "retention_days": 30},
cron_expression="0 2 * * *", # Daily at 2 AM
priority=TaskPriority.LOW
),
ScheduledTask(
name="hourly_metrics_aggregation",
task_type="aggregate_metrics",
payload={"time_window": "1h"},
cron_expression="0 * * * *", # Every hour
priority=TaskPriority.NORMAL
),
ScheduledTask(
name="weekly_package_scan",
task_type="scan_packages",
payload={"scan_type": "security"},
cron_expression="0 3 * * 0", # Weekly on Sunday at 3 AM
priority=TaskPriority.HIGH
),
ScheduledTask(
name="monthly_reports",
task_type="generate_reports",
payload={"report_types": ["usage", "security", "performance"]},
cron_expression="0 4 1 * *", # Monthly on 1st at 4 AM
priority=TaskPriority.NORMAL
)
]
class DynamicTaskScheduler:
"""Dynamic task scheduler that can be configured at runtime"""
def __init__(self, queue_manager: QueueManager, db: Database):
self.queue_manager = queue_manager
self.db = db
self.base_scheduler = TaskScheduler(queue_manager)
async def load_scheduled_tasks_from_db(self):
"""Load scheduled tasks from database"""
tasks = await self.db.fetch_all("""
SELECT name, task_type, payload, cron_expression, priority,
enabled, max_runtime, overlap_strategy
FROM scheduled_tasks
WHERE enabled = true
""")
for task_row in tasks:
scheduled_task = ScheduledTask(
name=task_row['name'],
task_type=task_row['task_type'],
payload=json.loads(task_row['payload']),
cron_expression=task_row['cron_expression'],
priority=TaskPriority(task_row['priority']),
enabled=task_row['enabled'],
max_runtime=task_row['max_runtime'],
overlap_strategy=task_row['overlap_strategy']
)
self.base_scheduler.register_scheduled_task(scheduled_task)
async def add_scheduled_task(self, scheduled_task: ScheduledTask):
"""Add scheduled task and persist to database"""
# Save to database
await self.db.execute("""
INSERT INTO scheduled_tasks
(name, task_type, payload, cron_expression, priority,
enabled, max_runtime, overlap_strategy)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (name) DO UPDATE SET
task_type = EXCLUDED.task_type,
payload = EXCLUDED.payload,
cron_expression = EXCLUDED.cron_expression,
priority = EXCLUDED.priority,
enabled = EXCLUDED.enabled,
max_runtime = EXCLUDED.max_runtime,
overlap_strategy = EXCLUDED.overlap_strategy
""",
scheduled_task.name,
scheduled_task.task_type,
json.dumps(scheduled_task.payload),
scheduled_task.cron_expression,
scheduled_task.priority.value,
scheduled_task.enabled,
scheduled_task.max_runtime,
scheduled_task.overlap_strategy
)
# Register with scheduler
self.base_scheduler.register_scheduled_task(scheduled_task)
async def remove_scheduled_task(self, task_name: str):
"""Remove scheduled task"""
# Remove from database
await self.db.execute("""
DELETE FROM scheduled_tasks WHERE name = $1
""", task_name)
# Unregister from scheduler
self.base_scheduler.unregister_scheduled_task(task_name)
Dynamic Priority Adjustment
Intelligent Priority Management
class PriorityManager:
"""Dynamic priority adjustment based on system conditions"""
def __init__(self, queue_manager: QueueManager):
self.queue_manager = queue_manager
self.priority_rules = []
self.system_metrics = {}
def add_priority_rule(self, rule: Callable[[Task, Dict], TaskPriority]):
"""Add priority adjustment rule"""
self.priority_rules.append(rule)
async def update_system_metrics(self):
"""Update system metrics for priority decisions"""
stats = await self.queue_manager.get_queue_stats()
self.system_metrics = {
'queue_lengths': {
'critical': stats.get('critical_queue', 0),
'high': stats.get('high_queue', 0),
'normal': stats.get('normal_queue', 0),
'low': stats.get('low_queue', 0)
},
'processing_count': stats.get('processing', 0),
'system_load': await self._get_system_load(),
'error_rate': await self._get_error_rate()
}
async def adjust_task_priority(self, task: Task) -> TaskPriority:
"""Adjust task priority based on current system state"""
await self.update_system_metrics()
original_priority = task.priority
adjusted_priority = original_priority
# Apply priority rules
for rule in self.priority_rules:
try:
new_priority = rule(task, self.system_metrics)
if new_priority != original_priority:
adjusted_priority = new_priority
break
except Exception as e:
logging.warning(f"Priority rule failed: {e}")
return adjusted_priority
async def _get_system_load(self) -> float:
"""Get current system load"""
# Could integrate with system monitoring
return 0.5
async def _get_error_rate(self) -> float:
"""Get current error rate"""
# Could calculate from recent task failures
return 0.01
# Priority adjustment rules
def high_load_priority_rule(task: Task, metrics: Dict) -> TaskPriority:
"""Boost agent communication tasks during high load"""
if metrics['system_load'] > 0.8:
if task.type in ['agent_command', 'agent_response']:
# Boost agent tasks during high load
if task.priority == TaskPriority.NORMAL:
return TaskPriority.HIGH
elif task.priority == TaskPriority.LOW:
return TaskPriority.NORMAL
return task.priority
def queue_congestion_rule(task: Task, metrics: Dict) -> TaskPriority:
"""Adjust priority based on queue congestion"""
queue_lengths = metrics['queue_lengths']
# If high priority queue is congested, demote some tasks
if queue_lengths['high'] > 100:
if task.type in ['report_generation', 'cleanup_task']:
if task.priority == TaskPriority.HIGH:
return TaskPriority.NORMAL
# If system is idle, promote some tasks
total_queued = sum(queue_lengths.values())
if total_queued < 10:
if task.priority == TaskPriority.LOW:
return TaskPriority.NORMAL
return task.priority
def error_rate_priority_rule(task: Task, metrics: Dict) -> TaskPriority:
"""Adjust priority based on error rates"""
if metrics['error_rate'] > 0.05: # 5% error rate
# Prioritize diagnostic tasks
if task.type in ['health_check', 'system_diagnostic']:
return TaskPriority.CRITICAL
# Deprioritize non-essential tasks
if task.type in ['report_generation', 'analytics']:
if task.priority == TaskPriority.NORMAL:
return TaskPriority.LOW
return task.priority
Queue Monitoring and Alerting
Comprehensive Queue Monitoring
Queue Health Monitoring System
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import time
@dataclass
class QueueHealth:
queue_name: str
length: int
processing_rate: float # tasks per second
average_wait_time: float # seconds
error_rate: float # percentage
oldest_task_age: float # seconds
status: str # healthy, warning, critical
@dataclass
class QueueAlert:
queue_name: str
alert_type: str
severity: str
message: str
value: float
threshold: float
timestamp: float
class QueueMonitor:
"""Comprehensive queue monitoring and alerting"""
def __init__(self, queue_manager: QueueManager):
self.queue_manager = queue_manager
self.metrics_history = {}
self.alert_thresholds = {
'queue_length': {'warning': 100, 'critical': 500},
'wait_time': {'warning': 300, 'critical': 900}, # seconds
'error_rate': {'warning': 0.05, 'critical': 0.15}, # percentage
'processing_rate': {'warning': 0.1, 'critical': 0.01}, # tasks/sec
'oldest_task_age': {'warning': 600, 'critical': 1800} # seconds
}
async def collect_queue_metrics(self) -> Dict[str, QueueHealth]:
"""Collect comprehensive queue health metrics"""
stats = await self.queue_manager.get_queue_stats()
queue_health = {}
for priority in TaskPriority:
queue_name = priority.name.lower()
health = await self._analyze_queue_health(queue_name, stats)
queue_health[queue_name] = health
# Special queues
for queue_name in ['processing', 'delayed', 'dead_letter']:
health = await self._analyze_special_queue_health(queue_name, stats)
queue_health[queue_name] = health
return queue_health
async def _analyze_queue_health(self, queue_name: str, stats: Dict) -> QueueHealth:
"""Analyze health of a specific queue"""
queue_length = stats.get(f'{queue_name}_queue', 0)
# Calculate processing rate
processing_rate = await self._calculate_processing_rate(queue_name)
# Calculate average wait time
wait_time = await self._calculate_average_wait_time(queue_name)
# Calculate error rate
error_rate = await self._calculate_error_rate(queue_name)
# Get oldest task age
oldest_task_age = await self._get_oldest_task_age(queue_name)
# Determine overall status
status = self._determine_queue_status(
queue_length, processing_rate, wait_time, error_rate, oldest_task_age
)
return QueueHealth(
queue_name=queue_name,
length=queue_length,
processing_rate=processing_rate,
average_wait_time=wait_time,
error_rate=error_rate,
oldest_task_age=oldest_task_age,
status=status
)
async def _calculate_processing_rate(self, queue_name: str) -> float:
"""Calculate processing rate (tasks per second)"""
# Get metrics from last 5 minutes
current_time = time.time()
start_time = current_time - 300
if queue_name not in self.metrics_history:
self.metrics_history[queue_name] = []
# Add current measurement
current_length = await self._get_current_queue_length(queue_name)
self.metrics_history[queue_name].append((current_time, current_length))
# Clean old measurements
self.metrics_history[queue_name] = [
(t, length) for t, length in self.metrics_history[queue_name]
if t > start_time
]
# Calculate processing rate
measurements = self.metrics_history[queue_name]
if len(measurements) < 2:
return 0.0
# Linear regression to find processing rate
time_diffs = []
length_diffs = []
for i in range(1, len(measurements)):
prev_time, prev_length = measurements[i-1]
curr_time, curr_length = measurements[i]
time_diff = curr_time - prev_time
length_diff = prev_length - curr_length # Positive means processing
if time_diff > 0:
time_diffs.append(time_diff)
length_diffs.append(length_diff)
if not time_diffs:
return 0.0
# Average processing rate
total_processed = sum(length_diffs)
total_time = sum(time_diffs)
return max(0, total_processed / total_time)
async def _calculate_average_wait_time(self, queue_name: str) -> float:
"""Calculate average wait time for tasks in queue"""
# This would require tracking task creation times
# For now, estimate based on queue length and processing rate
queue_length = await self._get_current_queue_length(queue_name)
processing_rate = await self._calculate_processing_rate(queue_name)
if processing_rate <= 0:
return float('inf') if queue_length > 0 else 0
return queue_length / processing_rate
async def _calculate_error_rate(self, queue_name: str) -> float:
"""Calculate error rate for queue"""
# Get task status distribution
total_tasks = await self._get_total_tasks_processed(queue_name)
failed_tasks = await self._get_failed_tasks_count(queue_name)
if total_tasks == 0:
return 0.0
return failed_tasks / total_tasks
async def _get_oldest_task_age(self, queue_name: str) -> float:
"""Get age of oldest task in queue"""
# This would require tracking task creation times in queue
# For now, return 0 as placeholder
return 0.0
def _determine_queue_status(self, length: int, rate: float, wait_time: float,
error_rate: float, oldest_age: float) -> str:
"""Determine overall queue status"""
critical_conditions = [
length >= self.alert_thresholds['queue_length']['critical'],
wait_time >= self.alert_thresholds['wait_time']['critical'],
error_rate >= self.alert_thresholds['error_rate']['critical'],
rate <= self.alert_thresholds['processing_rate']['critical'],
oldest_age >= self.alert_thresholds['oldest_task_age']['critical']
]
warning_conditions = [
length >= self.alert_thresholds['queue_length']['warning'],
wait_time >= self.alert_thresholds['wait_time']['warning'],
error_rate >= self.alert_thresholds['error_rate']['warning'],
rate <= self.alert_thresholds['processing_rate']['warning'],
oldest_age >= self.alert_thresholds['oldest_task_age']['warning']
]
if any(critical_conditions):
return 'critical'
elif any(warning_conditions):
return 'warning'
else:
return 'healthy'
async def check_alerts(self) -> List[QueueAlert]:
"""Check for alert conditions and generate alerts"""
queue_health = await self.collect_queue_metrics()
alerts = []
for queue_name, health in queue_health.items():
# Check each metric against thresholds
alerts.extend(self._check_metric_alerts(health))
return alerts
def _check_metric_alerts(self, health: QueueHealth) -> List[QueueAlert]:
"""Check specific metrics for alert conditions"""
alerts = []
current_time = time.time()
# Queue length alerts
if health.length >= self.alert_thresholds['queue_length']['critical']:
alerts.append(QueueAlert(
queue_name=health.queue_name,
alert_type='queue_length',
severity='critical',
message=f'Queue length critically high: {health.length}',
value=health.length,
threshold=self.alert_thresholds['queue_length']['critical'],
timestamp=current_time
))
elif health.length >= self.alert_thresholds['queue_length']['warning']:
alerts.append(QueueAlert(
queue_name=health.queue_name,
alert_type='queue_length',
severity='warning',
message=f'Queue length high: {health.length}',
value=health.length,
threshold=self.alert_thresholds['queue_length']['warning'],
timestamp=current_time
))
# Error rate alerts
if health.error_rate >= self.alert_thresholds['error_rate']['critical']:
alerts.append(QueueAlert(
queue_name=health.queue_name,
alert_type='error_rate',
severity='critical',
message=f'Error rate critically high: {health.error_rate:.2%}',
value=health.error_rate,
threshold=self.alert_thresholds['error_rate']['critical'],
timestamp=current_time
))
# Processing rate alerts
if health.processing_rate <= self.alert_thresholds['processing_rate']['critical']:
alerts.append(QueueAlert(
queue_name=health.queue_name,
alert_type='processing_rate',
severity='critical',
message=f'Processing rate critically low: {health.processing_rate:.2f} tasks/sec',
value=health.processing_rate,
threshold=self.alert_thresholds['processing_rate']['critical'],
timestamp=current_time
))
return alerts
class QueueDashboard:
"""Real-time queue dashboard metrics"""
def __init__(self, queue_monitor: QueueMonitor):
self.queue_monitor = queue_monitor
async def get_dashboard_data(self) -> Dict[str, Any]:
"""Get comprehensive dashboard data"""
queue_health = await self.queue_monitor.collect_queue_metrics()
alerts = await self.queue_monitor.check_alerts()
# Calculate summary metrics
total_queued = sum(h.length for h in queue_health.values() if h.queue_name != 'processing')
total_processing = queue_health.get('processing', QueueHealth('processing', 0, 0, 0, 0, 0, 'healthy')).length
avg_processing_rate = sum(h.processing_rate for h in queue_health.values()) / len(queue_health)
# Alert summary
critical_alerts = [a for a in alerts if a.severity == 'critical']
warning_alerts = [a for a in alerts if a.severity == 'warning']
return {
'summary': {
'total_queued': total_queued,
'total_processing': total_processing,
'avg_processing_rate': avg_processing_rate,
'critical_alerts': len(critical_alerts),
'warning_alerts': len(warning_alerts)
},
'queue_health': {name: health for name, health in queue_health.items()},
'alerts': alerts,
'timestamp': time.time()
}
Queue Management Best Practices
Design Guidelines
- Idempotency: Design tasks to be safely retryable
- Small Payloads: Keep task payloads minimal
- Timeout Handling: Implement proper timeout and cleanup
- Error Handling: Graceful error handling with proper logging
Operational Guidelines
- Monitor Queue Health: Continuous monitoring of queue metrics
- Capacity Planning: Plan for peak load scenarios
- Dead Letter Analysis: Regular analysis of failed tasks
- Worker Scaling: Dynamic scaling based on queue load
Performance Guidelines
- Batch Processing: Group related tasks for efficiency
- Priority Tuning: Regular adjustment of priority rules
- Resource Management: Proper resource cleanup in workers
- Load Distribution: Even distribution across worker pools