Documentation > Architecture > Queue Management

Queue Management

Message queues, async processing, task scheduling, reliability patterns, and queue management strategies in SysManage.

Queue Management Overview

SysManage implements a sophisticated queue management system to handle asynchronous task processing, inter-service communication, and reliable message delivery across the distributed architecture.

Core Principles

  • Reliability: Guaranteed message delivery with acknowledgments
  • Scalability: Horizontal scaling of queue workers and processors
  • Priority Management: Priority-based task execution
  • Dead Letter Handling: Proper handling of failed messages

Queue Architecture

Multi-Queue Processing System

┌─────────────────────────────────────────────────────────────────┐
│                    Queue Management Layer                      │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Task        │  │ Priority     │  │ Dead Letter             │ │
│  │ Dispatcher  │  │ Manager      │  │ Queue Handler           │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Queue       │  │ Worker Pool  │  │ Result                  │ │
│  │ Router      │  │ Manager      │  │ Aggregator              │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                          │    ▲
                 Messages │    │ Results
                          ▼    │
┌─────────────────────────────────────────────────────────────────┐
│                      Redis Queue System                        │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ High        │  │ Normal       │  │ Low Priority            │ │
│  │ Priority    │  │ Priority     │  │ Queue                   │ │
│  │ Queue       │  │ Queue        │  │                         │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Agent       │  │ System       │  │ Maintenance             │ │
│  │ Tasks       │  │ Tasks        │  │ Tasks                   │ │
│  │ Queue       │  │ Queue        │  │ Queue                   │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Dead Letter │  │ Delayed      │  │ Result                  │ │
│  │ Queue       │  │ Queue        │  │ Storage                 │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                          │    ▲
                 Process  │    │ Acknowledge
                          ▼    │
┌─────────────────────────────────────────────────────────────────┐
│                      Worker Processes                          │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Agent       │  │ Package      │  │ System                  │ │
│  │ Worker      │  │ Worker       │  │ Worker                  │ │
│  │ Pool        │  │ Pool         │  │ Pool                    │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                        

Queue Types and Purposes

Agent Communication Queues

  • Agent command dispatch
  • Agent response collection
  • Real-time status updates
  • Metric data ingestion

Background Processing Queues

  • Package update processing
  • System health monitoring
  • Report generation
  • Data cleanup tasks

Integration Queues

  • External API calls
  • Webhook notifications
  • Email delivery
  • Audit log processing

Redis Queue Implementation

Core Queue Manager

Advanced Queue Management System

import redis
import json
import time
import uuid
from typing import Dict, List, Optional, Any, Callable
from dataclasses import dataclass, asdict
from enum import Enum
import asyncio
import logging

class TaskPriority(Enum):
    LOW = 1
    NORMAL = 2
    HIGH = 3
    CRITICAL = 4

class TaskStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"
    RETRYING = "retrying"

@dataclass
class Task:
    id: str
    type: str
    payload: Dict[str, Any]
    priority: TaskPriority
    max_retries: int = 3
    retry_count: int = 0
    created_at: float = None
    scheduled_at: float = None
    timeout: int = 300  # 5 minutes
    status: TaskStatus = TaskStatus.PENDING

    def __post_init__(self):
        if self.created_at is None:
            self.created_at = time.time()
        if self.scheduled_at is None:
            self.scheduled_at = self.created_at

    def to_dict(self) -> Dict:
        data = asdict(self)
        data['priority'] = self.priority.value
        data['status'] = self.status.value
        return data

    @classmethod
    def from_dict(cls, data: Dict) -> 'Task':
        data['priority'] = TaskPriority(data['priority'])
        data['status'] = TaskStatus(data['status'])
        return cls(**data)


class QueueManager:
    """Advanced Redis-based queue manager"""

    def __init__(self, redis_client: redis.Redis, namespace: str = "sysmanage"):
        self.redis = redis_client
        self.namespace = namespace
        self.queues = {
            TaskPriority.CRITICAL: f"{namespace}:queue:critical",
            TaskPriority.HIGH: f"{namespace}:queue:high",
            TaskPriority.NORMAL: f"{namespace}:queue:normal",
            TaskPriority.LOW: f"{namespace}:queue:low"
        }
        self.processing_queue = f"{namespace}:queue:processing"
        self.dead_letter_queue = f"{namespace}:queue:dead_letter"
        self.delayed_queue = f"{namespace}:queue:delayed"
        self.task_storage = f"{namespace}:tasks"

    async def enqueue(self, task: Task) -> str:
        """Enqueue task with priority"""

        # Generate task ID if not provided
        if not task.id:
            task.id = str(uuid.uuid4())

        # Store task metadata
        await self._store_task(task)

        # Handle delayed tasks
        if task.scheduled_at > time.time():
            await self._enqueue_delayed(task)
        else:
            await self._enqueue_immediate(task)

        return task.id

    async def _store_task(self, task: Task):
        """Store task metadata in Redis"""
        task_data = json.dumps(task.to_dict())
        await self.redis.hset(self.task_storage, task.id, task_data)

    async def _enqueue_immediate(self, task: Task):
        """Enqueue task for immediate processing"""
        queue_name = self.queues[task.priority]

        # Use priority score for sorting within queue
        score = time.time() - (task.priority.value * 1000000)
        await self.redis.zadd(queue_name, {task.id: score})

    async def _enqueue_delayed(self, task: Task):
        """Enqueue task for delayed processing"""
        await self.redis.zadd(self.delayed_queue, {task.id: task.scheduled_at})

    async def dequeue(self, worker_id: str, timeout: int = 10) -> Optional[Task]:
        """Dequeue highest priority task"""

        # Check for ready delayed tasks first
        await self._process_delayed_tasks()

        # Try to get task from priority queues
        for priority in [TaskPriority.CRITICAL, TaskPriority.HIGH,
                        TaskPriority.NORMAL, TaskPriority.LOW]:
            queue_name = self.queues[priority]

            # Use BZPOPMIN for blocking pop with timeout
            result = await self.redis.bzpopmin(queue_name, timeout=timeout)

            if result:
                queue, task_id, score = result
                task = await self._get_task(task_id)

                if task:
                    # Move task to processing queue
                    await self._mark_processing(task, worker_id)
                    return task

        return None

    async def _process_delayed_tasks(self):
        """Move ready delayed tasks to appropriate queues"""
        current_time = time.time()

        # Get tasks ready for processing
        ready_tasks = await self.redis.zrangebyscore(
            self.delayed_queue, 0, current_time, withscores=True
        )

        for task_id, scheduled_time in ready_tasks:
            task = await self._get_task(task_id)
            if task:
                # Remove from delayed queue
                await self.redis.zrem(self.delayed_queue, task_id)

                # Add to appropriate priority queue
                await self._enqueue_immediate(task)

    async def _get_task(self, task_id: str) -> Optional[Task]:
        """Retrieve task metadata"""
        task_data = await self.redis.hget(self.task_storage, task_id)
        if task_data:
            data = json.loads(task_data)
            return Task.from_dict(data)
        return None

    async def _mark_processing(self, task: Task, worker_id: str):
        """Mark task as being processed"""
        task.status = TaskStatus.PROCESSING

        # Store in processing queue with worker info
        processing_data = {
            'task_id': task.id,
            'worker_id': worker_id,
            'started_at': time.time(),
            'timeout_at': time.time() + task.timeout
        }

        await self.redis.hset(
            self.processing_queue,
            task.id,
            json.dumps(processing_data)
        )

        # Update task metadata
        await self._store_task(task)

    async def acknowledge(self, task_id: str, result: Dict[str, Any] = None):
        """Acknowledge successful task completion"""

        task = await self._get_task(task_id)
        if not task:
            return False

        task.status = TaskStatus.COMPLETED

        # Remove from processing queue
        await self.redis.hdel(self.processing_queue, task_id)

        # Store result if provided
        if result:
            result_key = f"{self.namespace}:results:{task_id}"
            await self.redis.setex(
                result_key,
                3600,  # Keep results for 1 hour
                json.dumps(result)
            )

        # Update task status
        await self._store_task(task)

        return True

    async def fail(self, task_id: str, error: str, retry: bool = True):
        """Mark task as failed and optionally retry"""

        task = await self._get_task(task_id)
        if not task:
            return False

        # Remove from processing queue
        await self.redis.hdel(self.processing_queue, task_id)

        # Check if we should retry
        if retry and task.retry_count < task.max_retries:
            task.retry_count += 1
            task.status = TaskStatus.RETRYING

            # Calculate exponential backoff delay
            delay = min(300, 2 ** task.retry_count * 10)  # Max 5 minutes
            task.scheduled_at = time.time() + delay

            # Re-enqueue for retry
            await self._enqueue_delayed(task)

            logging.info(f"Task {task_id} scheduled for retry {task.retry_count}/{task.max_retries} in {delay}s")

        else:
            # Move to dead letter queue
            task.status = TaskStatus.FAILED

            dead_letter_data = {
                'task': task.to_dict(),
                'error': error,
                'failed_at': time.time()
            }

            await self.redis.lpush(
                self.dead_letter_queue,
                json.dumps(dead_letter_data)
            )

            logging.error(f"Task {task_id} moved to dead letter queue: {error}")

        # Update task status
        await self._store_task(task)

        return True

    async def get_queue_stats(self) -> Dict[str, Any]:
        """Get comprehensive queue statistics"""

        stats = {}

        # Queue lengths
        for priority, queue_name in self.queues.items():
            stats[f"{priority.name.lower()}_queue"] = await self.redis.zcard(queue_name)

        # Processing queue
        stats['processing'] = await self.redis.hlen(self.processing_queue)

        # Delayed queue
        stats['delayed'] = await self.redis.zcard(self.delayed_queue)

        # Dead letter queue
        stats['dead_letter'] = await self.redis.llen(self.dead_letter_queue)

        # Task status counts
        all_tasks = await self.redis.hgetall(self.task_storage)
        status_counts = {}

        for task_data in all_tasks.values():
            task = Task.from_dict(json.loads(task_data))
            status = task.status.value
            status_counts[status] = status_counts.get(status, 0) + 1

        stats['task_status'] = status_counts

        return stats

    async def cleanup_stale_tasks(self):
        """Clean up stale processing tasks"""

        current_time = time.time()
        processing_tasks = await self.redis.hgetall(self.processing_queue)

        for task_id, processing_data in processing_tasks.items():
            data = json.loads(processing_data)

            # Check if task has timed out
            if current_time > data['timeout_at']:
                await self.fail(
                    task_id,
                    f"Task timed out after {data['timeout_at'] - data['started_at']}s",
                    retry=True
                )

                logging.warning(f"Cleaned up stale task {task_id}")


class WorkerPool:
    """Pool of worker processes for task execution"""

    def __init__(self, queue_manager: QueueManager, worker_count: int = 4):
        self.queue_manager = queue_manager
        self.worker_count = worker_count
        self.workers = []
        self.running = False
        self.task_handlers = {}

    def register_handler(self, task_type: str, handler: Callable):
        """Register handler for specific task type"""
        self.task_handlers[task_type] = handler

    async def start(self):
        """Start worker pool"""
        self.running = True

        # Start worker coroutines
        for i in range(self.worker_count):
            worker_id = f"worker-{i}"
            worker_coro = self._worker_loop(worker_id)
            task = asyncio.create_task(worker_coro)
            self.workers.append(task)

        logging.info(f"Started {self.worker_count} workers")

    async def stop(self):
        """Stop worker pool gracefully"""
        self.running = False

        # Wait for workers to finish current tasks
        await asyncio.gather(*self.workers, return_exceptions=True)

        logging.info("Worker pool stopped")

    async def _worker_loop(self, worker_id: str):
        """Main worker loop"""

        while self.running:
            try:
                # Get next task
                task = await self.queue_manager.dequeue(worker_id, timeout=5)

                if task:
                    await self._process_task(task)

            except Exception as e:
                logging.error(f"Worker {worker_id} error: {e}")
                await asyncio.sleep(1)

    async def _process_task(self, task: Task):
        """Process individual task"""

        try:
            # Get handler for task type
            handler = self.task_handlers.get(task.type)

            if not handler:
                raise Exception(f"No handler registered for task type: {task.type}")

            # Execute task
            result = await handler(task.payload)

            # Acknowledge completion
            await self.queue_manager.acknowledge(task.id, result)

            logging.info(f"Task {task.id} completed successfully")

        except Exception as e:
            # Mark task as failed
            await self.queue_manager.fail(task.id, str(e))

            logging.error(f"Task {task.id} failed: {e}")


# Usage example
async def main():
    redis_client = redis.Redis(host='localhost', port=6379, decode_responses=True)
    queue_manager = QueueManager(redis_client)
    worker_pool = WorkerPool(queue_manager, worker_count=8)

    # Register task handlers
    async def handle_agent_command(payload: Dict) -> Dict:
        # Process agent command
        agent_id = payload['agent_id']
        command = payload['command']

        # Execute command logic
        result = await execute_agent_command(agent_id, command)

        return {'success': True, 'result': result}

    async def handle_package_update(payload: Dict) -> Dict:
        # Process package update
        package_info = payload['package_info']

        # Update package logic
        result = await update_package(package_info)

        return {'updated_packages': result}

    worker_pool.register_handler('agent_command', handle_agent_command)
    worker_pool.register_handler('package_update', handle_package_update)

    # Start worker pool
    await worker_pool.start()

    # Example: Enqueue some tasks
    await queue_manager.enqueue(Task(
        id=None,
        type='agent_command',
        payload={'agent_id': 'agent-123', 'command': 'update_packages'},
        priority=TaskPriority.HIGH
    ))

    # Run for demonstration
    await asyncio.sleep(60)

    # Stop worker pool
    await worker_pool.stop()

if __name__ == "__main__":
    asyncio.run(main())

Message Serialization and Validation

Robust Message Handling

from pydantic import BaseModel, validator
from typing import Union, Dict, Any
import gzip
import pickle
import json

class MessageSerializer:
    """Efficient message serialization with compression"""

    @staticmethod
    def serialize(data: Any, compress: bool = True) -> bytes:
        """Serialize data with optional compression"""

        # Try JSON first (human readable)
        try:
            json_data = json.dumps(data, default=str)
            serialized = json_data.encode('utf-8')
            method = 'json'
        except (TypeError, ValueError):
            # Fallback to pickle for complex objects
            serialized = pickle.dumps(data)
            method = 'pickle'

        # Add compression if data is large
        if compress and len(serialized) > 1024:  # 1KB threshold
            serialized = gzip.compress(serialized)
            method += '_gzip'

        # Prepend method identifier
        return method.encode('utf-8') + b':' + serialized

    @staticmethod
    def deserialize(data: bytes) -> Any:
        """Deserialize data with automatic method detection"""

        # Extract method
        separator_idx = data.find(b':')
        if separator_idx == -1:
            raise ValueError("Invalid serialized data format")

        method = data[:separator_idx].decode('utf-8')
        payload = data[separator_idx + 1:]

        # Handle compression
        if method.endswith('_gzip'):
            payload = gzip.decompress(payload)
            method = method[:-5]  # Remove _gzip suffix

        # Deserialize based on method
        if method == 'json':
            return json.loads(payload.decode('utf-8'))
        elif method == 'pickle':
            return pickle.loads(payload)
        else:
            raise ValueError(f"Unknown serialization method: {method}")


class AgentCommandPayload(BaseModel):
    """Validated payload for agent commands"""

    agent_id: str
    command: str
    parameters: Dict[str, Any] = {}
    timeout: int = 300
    priority: str = "normal"

    @validator('agent_id')
    def validate_agent_id(cls, v):
        if not v or len(v) < 8:
            raise ValueError('Agent ID must be at least 8 characters')
        return v

    @validator('command')
    def validate_command(cls, v):
        allowed_commands = [
            'update_packages', 'install_package', 'remove_package',
            'collect_metrics', 'restart_service', 'run_script'
        ]
        if v not in allowed_commands:
            raise ValueError(f'Command must be one of: {allowed_commands}')
        return v

    @validator('priority')
    def validate_priority(cls, v):
        allowed_priorities = ['low', 'normal', 'high', 'critical']
        if v not in allowed_priorities:
            raise ValueError(f'Priority must be one of: {allowed_priorities}')
        return v


class PackageUpdatePayload(BaseModel):
    """Validated payload for package updates"""

    agent_id: str
    packages: List[Dict[str, str]]
    update_type: str = "security"
    reboot_required: bool = False

    @validator('packages')
    def validate_packages(cls, v):
        for package in v:
            required_fields = ['name', 'current_version', 'target_version']
            if not all(field in package for field in required_fields):
                raise ValueError(f'Package must have fields: {required_fields}')
        return v

    @validator('update_type')
    def validate_update_type(cls, v):
        allowed_types = ['security', 'feature', 'bugfix', 'all']
        if v not in allowed_types:
            raise ValueError(f'Update type must be one of: {allowed_types}')
        return v


class MessageValidator:
    """Message validation and sanitization"""

    payload_schemas = {
        'agent_command': AgentCommandPayload,
        'package_update': PackageUpdatePayload
    }

    @classmethod
    def validate_task(cls, task: Task) -> Task:
        """Validate and sanitize task payload"""

        if task.type in cls.payload_schemas:
            schema = cls.payload_schemas[task.type]

            try:
                # Validate payload against schema
                validated_payload = schema(**task.payload)
                task.payload = validated_payload.dict()
            except Exception as e:
                raise ValueError(f"Invalid payload for {task.type}: {e}")

        return task

Advanced Task Scheduling

Cron-Style Scheduling

Periodic Task Scheduler

import croniter
from datetime import datetime, timedelta
from typing import List, Dict, Callable, Optional

@dataclass
class ScheduledTask:
    name: str
    task_type: str
    payload: Dict[str, Any]
    cron_expression: str
    priority: TaskPriority = TaskPriority.NORMAL
    enabled: bool = True
    last_run: Optional[datetime] = None
    next_run: Optional[datetime] = None
    max_runtime: int = 3600  # 1 hour
    overlap_strategy: str = "skip"  # skip, queue, or replace

    def __post_init__(self):
        if self.next_run is None:
            self.calculate_next_run()

    def calculate_next_run(self):
        """Calculate next run time based on cron expression"""
        cron = croniter.croniter(self.cron_expression, datetime.utcnow())
        self.next_run = cron.get_next(datetime)

    def should_run(self) -> bool:
        """Check if task should run now"""
        if not self.enabled:
            return False

        return datetime.utcnow() >= self.next_run


class TaskScheduler:
    """Advanced task scheduler with cron support"""

    def __init__(self, queue_manager: QueueManager):
        self.queue_manager = queue_manager
        self.scheduled_tasks: Dict[str, ScheduledTask] = {}
        self.running_tasks: Dict[str, datetime] = {}
        self.scheduler_running = False

    def register_scheduled_task(self, scheduled_task: ScheduledTask):
        """Register a new scheduled task"""
        self.scheduled_tasks[scheduled_task.name] = scheduled_task

    def unregister_scheduled_task(self, task_name: str):
        """Unregister a scheduled task"""
        self.scheduled_tasks.pop(task_name, None)

    async def start_scheduler(self):
        """Start the task scheduler"""
        self.scheduler_running = True

        while self.scheduler_running:
            try:
                await self._process_scheduled_tasks()
                await asyncio.sleep(60)  # Check every minute
            except Exception as e:
                logging.error(f"Scheduler error: {e}")
                await asyncio.sleep(60)

    async def stop_scheduler(self):
        """Stop the task scheduler"""
        self.scheduler_running = False

    async def _process_scheduled_tasks(self):
        """Process scheduled tasks that are ready to run"""

        current_time = datetime.utcnow()

        for task_name, scheduled_task in self.scheduled_tasks.items():
            if scheduled_task.should_run():
                await self._execute_scheduled_task(scheduled_task)

    async def _execute_scheduled_task(self, scheduled_task: ScheduledTask):
        """Execute a scheduled task"""

        # Check overlap strategy
        if scheduled_task.name in self.running_tasks:
            if scheduled_task.overlap_strategy == "skip":
                logging.info(f"Skipping overlapping task: {scheduled_task.name}")
                return
            elif scheduled_task.overlap_strategy == "replace":
                # Could implement task cancellation here
                pass

        # Create task
        task = Task(
            id=f"scheduled_{scheduled_task.name}_{int(time.time())}",
            type=scheduled_task.task_type,
            payload=scheduled_task.payload.copy(),
            priority=scheduled_task.priority,
            timeout=scheduled_task.max_runtime
        )

        # Enqueue task
        await self.queue_manager.enqueue(task)

        # Update scheduled task
        scheduled_task.last_run = datetime.utcnow()
        scheduled_task.calculate_next_run()

        # Track running task
        self.running_tasks[scheduled_task.name] = datetime.utcnow()

        logging.info(f"Scheduled task '{scheduled_task.name}' enqueued. Next run: {scheduled_task.next_run}")


# Predefined scheduled tasks
def create_system_maintenance_tasks() -> List[ScheduledTask]:
    """Create common system maintenance tasks"""

    return [
        ScheduledTask(
            name="daily_cleanup",
            task_type="system_cleanup",
            payload={"cleanup_type": "logs", "retention_days": 30},
            cron_expression="0 2 * * *",  # Daily at 2 AM
            priority=TaskPriority.LOW
        ),
        ScheduledTask(
            name="hourly_metrics_aggregation",
            task_type="aggregate_metrics",
            payload={"time_window": "1h"},
            cron_expression="0 * * * *",  # Every hour
            priority=TaskPriority.NORMAL
        ),
        ScheduledTask(
            name="weekly_package_scan",
            task_type="scan_packages",
            payload={"scan_type": "security"},
            cron_expression="0 3 * * 0",  # Weekly on Sunday at 3 AM
            priority=TaskPriority.HIGH
        ),
        ScheduledTask(
            name="monthly_reports",
            task_type="generate_reports",
            payload={"report_types": ["usage", "security", "performance"]},
            cron_expression="0 4 1 * *",  # Monthly on 1st at 4 AM
            priority=TaskPriority.NORMAL
        )
    ]


class DynamicTaskScheduler:
    """Dynamic task scheduler that can be configured at runtime"""

    def __init__(self, queue_manager: QueueManager, db: Database):
        self.queue_manager = queue_manager
        self.db = db
        self.base_scheduler = TaskScheduler(queue_manager)

    async def load_scheduled_tasks_from_db(self):
        """Load scheduled tasks from database"""

        tasks = await self.db.fetch_all("""
            SELECT name, task_type, payload, cron_expression, priority,
                   enabled, max_runtime, overlap_strategy
            FROM scheduled_tasks
            WHERE enabled = true
        """)

        for task_row in tasks:
            scheduled_task = ScheduledTask(
                name=task_row['name'],
                task_type=task_row['task_type'],
                payload=json.loads(task_row['payload']),
                cron_expression=task_row['cron_expression'],
                priority=TaskPriority(task_row['priority']),
                enabled=task_row['enabled'],
                max_runtime=task_row['max_runtime'],
                overlap_strategy=task_row['overlap_strategy']
            )

            self.base_scheduler.register_scheduled_task(scheduled_task)

    async def add_scheduled_task(self, scheduled_task: ScheduledTask):
        """Add scheduled task and persist to database"""

        # Save to database
        await self.db.execute("""
            INSERT INTO scheduled_tasks
            (name, task_type, payload, cron_expression, priority,
             enabled, max_runtime, overlap_strategy)
            VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
            ON CONFLICT (name) DO UPDATE SET
                task_type = EXCLUDED.task_type,
                payload = EXCLUDED.payload,
                cron_expression = EXCLUDED.cron_expression,
                priority = EXCLUDED.priority,
                enabled = EXCLUDED.enabled,
                max_runtime = EXCLUDED.max_runtime,
                overlap_strategy = EXCLUDED.overlap_strategy
        """,
            scheduled_task.name,
            scheduled_task.task_type,
            json.dumps(scheduled_task.payload),
            scheduled_task.cron_expression,
            scheduled_task.priority.value,
            scheduled_task.enabled,
            scheduled_task.max_runtime,
            scheduled_task.overlap_strategy
        )

        # Register with scheduler
        self.base_scheduler.register_scheduled_task(scheduled_task)

    async def remove_scheduled_task(self, task_name: str):
        """Remove scheduled task"""

        # Remove from database
        await self.db.execute("""
            DELETE FROM scheduled_tasks WHERE name = $1
        """, task_name)

        # Unregister from scheduler
        self.base_scheduler.unregister_scheduled_task(task_name)

Dynamic Priority Adjustment

Intelligent Priority Management

class PriorityManager:
    """Dynamic priority adjustment based on system conditions"""

    def __init__(self, queue_manager: QueueManager):
        self.queue_manager = queue_manager
        self.priority_rules = []
        self.system_metrics = {}

    def add_priority_rule(self, rule: Callable[[Task, Dict], TaskPriority]):
        """Add priority adjustment rule"""
        self.priority_rules.append(rule)

    async def update_system_metrics(self):
        """Update system metrics for priority decisions"""
        stats = await self.queue_manager.get_queue_stats()

        self.system_metrics = {
            'queue_lengths': {
                'critical': stats.get('critical_queue', 0),
                'high': stats.get('high_queue', 0),
                'normal': stats.get('normal_queue', 0),
                'low': stats.get('low_queue', 0)
            },
            'processing_count': stats.get('processing', 0),
            'system_load': await self._get_system_load(),
            'error_rate': await self._get_error_rate()
        }

    async def adjust_task_priority(self, task: Task) -> TaskPriority:
        """Adjust task priority based on current system state"""

        await self.update_system_metrics()

        original_priority = task.priority
        adjusted_priority = original_priority

        # Apply priority rules
        for rule in self.priority_rules:
            try:
                new_priority = rule(task, self.system_metrics)
                if new_priority != original_priority:
                    adjusted_priority = new_priority
                    break
            except Exception as e:
                logging.warning(f"Priority rule failed: {e}")

        return adjusted_priority

    async def _get_system_load(self) -> float:
        """Get current system load"""
        # Could integrate with system monitoring
        return 0.5

    async def _get_error_rate(self) -> float:
        """Get current error rate"""
        # Could calculate from recent task failures
        return 0.01


# Priority adjustment rules
def high_load_priority_rule(task: Task, metrics: Dict) -> TaskPriority:
    """Boost agent communication tasks during high load"""

    if metrics['system_load'] > 0.8:
        if task.type in ['agent_command', 'agent_response']:
            # Boost agent tasks during high load
            if task.priority == TaskPriority.NORMAL:
                return TaskPriority.HIGH
            elif task.priority == TaskPriority.LOW:
                return TaskPriority.NORMAL

    return task.priority


def queue_congestion_rule(task: Task, metrics: Dict) -> TaskPriority:
    """Adjust priority based on queue congestion"""

    queue_lengths = metrics['queue_lengths']

    # If high priority queue is congested, demote some tasks
    if queue_lengths['high'] > 100:
        if task.type in ['report_generation', 'cleanup_task']:
            if task.priority == TaskPriority.HIGH:
                return TaskPriority.NORMAL

    # If system is idle, promote some tasks
    total_queued = sum(queue_lengths.values())
    if total_queued < 10:
        if task.priority == TaskPriority.LOW:
            return TaskPriority.NORMAL

    return task.priority


def error_rate_priority_rule(task: Task, metrics: Dict) -> TaskPriority:
    """Adjust priority based on error rates"""

    if metrics['error_rate'] > 0.05:  # 5% error rate
        # Prioritize diagnostic tasks
        if task.type in ['health_check', 'system_diagnostic']:
            return TaskPriority.CRITICAL

        # Deprioritize non-essential tasks
        if task.type in ['report_generation', 'analytics']:
            if task.priority == TaskPriority.NORMAL:
                return TaskPriority.LOW

    return task.priority

Queue Monitoring and Alerting

Comprehensive Queue Monitoring

Queue Health Monitoring System

from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
import time

@dataclass
class QueueHealth:
    queue_name: str
    length: int
    processing_rate: float  # tasks per second
    average_wait_time: float  # seconds
    error_rate: float  # percentage
    oldest_task_age: float  # seconds
    status: str  # healthy, warning, critical

@dataclass
class QueueAlert:
    queue_name: str
    alert_type: str
    severity: str
    message: str
    value: float
    threshold: float
    timestamp: float

class QueueMonitor:
    """Comprehensive queue monitoring and alerting"""

    def __init__(self, queue_manager: QueueManager):
        self.queue_manager = queue_manager
        self.metrics_history = {}
        self.alert_thresholds = {
            'queue_length': {'warning': 100, 'critical': 500},
            'wait_time': {'warning': 300, 'critical': 900},  # seconds
            'error_rate': {'warning': 0.05, 'critical': 0.15},  # percentage
            'processing_rate': {'warning': 0.1, 'critical': 0.01},  # tasks/sec
            'oldest_task_age': {'warning': 600, 'critical': 1800}  # seconds
        }

    async def collect_queue_metrics(self) -> Dict[str, QueueHealth]:
        """Collect comprehensive queue health metrics"""

        stats = await self.queue_manager.get_queue_stats()
        queue_health = {}

        for priority in TaskPriority:
            queue_name = priority.name.lower()
            health = await self._analyze_queue_health(queue_name, stats)
            queue_health[queue_name] = health

        # Special queues
        for queue_name in ['processing', 'delayed', 'dead_letter']:
            health = await self._analyze_special_queue_health(queue_name, stats)
            queue_health[queue_name] = health

        return queue_health

    async def _analyze_queue_health(self, queue_name: str, stats: Dict) -> QueueHealth:
        """Analyze health of a specific queue"""

        queue_length = stats.get(f'{queue_name}_queue', 0)

        # Calculate processing rate
        processing_rate = await self._calculate_processing_rate(queue_name)

        # Calculate average wait time
        wait_time = await self._calculate_average_wait_time(queue_name)

        # Calculate error rate
        error_rate = await self._calculate_error_rate(queue_name)

        # Get oldest task age
        oldest_task_age = await self._get_oldest_task_age(queue_name)

        # Determine overall status
        status = self._determine_queue_status(
            queue_length, processing_rate, wait_time, error_rate, oldest_task_age
        )

        return QueueHealth(
            queue_name=queue_name,
            length=queue_length,
            processing_rate=processing_rate,
            average_wait_time=wait_time,
            error_rate=error_rate,
            oldest_task_age=oldest_task_age,
            status=status
        )

    async def _calculate_processing_rate(self, queue_name: str) -> float:
        """Calculate processing rate (tasks per second)"""

        # Get metrics from last 5 minutes
        current_time = time.time()
        start_time = current_time - 300

        if queue_name not in self.metrics_history:
            self.metrics_history[queue_name] = []

        # Add current measurement
        current_length = await self._get_current_queue_length(queue_name)
        self.metrics_history[queue_name].append((current_time, current_length))

        # Clean old measurements
        self.metrics_history[queue_name] = [
            (t, length) for t, length in self.metrics_history[queue_name]
            if t > start_time
        ]

        # Calculate processing rate
        measurements = self.metrics_history[queue_name]
        if len(measurements) < 2:
            return 0.0

        # Linear regression to find processing rate
        time_diffs = []
        length_diffs = []

        for i in range(1, len(measurements)):
            prev_time, prev_length = measurements[i-1]
            curr_time, curr_length = measurements[i]

            time_diff = curr_time - prev_time
            length_diff = prev_length - curr_length  # Positive means processing

            if time_diff > 0:
                time_diffs.append(time_diff)
                length_diffs.append(length_diff)

        if not time_diffs:
            return 0.0

        # Average processing rate
        total_processed = sum(length_diffs)
        total_time = sum(time_diffs)

        return max(0, total_processed / total_time)

    async def _calculate_average_wait_time(self, queue_name: str) -> float:
        """Calculate average wait time for tasks in queue"""

        # This would require tracking task creation times
        # For now, estimate based on queue length and processing rate
        queue_length = await self._get_current_queue_length(queue_name)
        processing_rate = await self._calculate_processing_rate(queue_name)

        if processing_rate <= 0:
            return float('inf') if queue_length > 0 else 0

        return queue_length / processing_rate

    async def _calculate_error_rate(self, queue_name: str) -> float:
        """Calculate error rate for queue"""

        # Get task status distribution
        total_tasks = await self._get_total_tasks_processed(queue_name)
        failed_tasks = await self._get_failed_tasks_count(queue_name)

        if total_tasks == 0:
            return 0.0

        return failed_tasks / total_tasks

    async def _get_oldest_task_age(self, queue_name: str) -> float:
        """Get age of oldest task in queue"""

        # This would require tracking task creation times in queue
        # For now, return 0 as placeholder
        return 0.0

    def _determine_queue_status(self, length: int, rate: float, wait_time: float,
                               error_rate: float, oldest_age: float) -> str:
        """Determine overall queue status"""

        critical_conditions = [
            length >= self.alert_thresholds['queue_length']['critical'],
            wait_time >= self.alert_thresholds['wait_time']['critical'],
            error_rate >= self.alert_thresholds['error_rate']['critical'],
            rate <= self.alert_thresholds['processing_rate']['critical'],
            oldest_age >= self.alert_thresholds['oldest_task_age']['critical']
        ]

        warning_conditions = [
            length >= self.alert_thresholds['queue_length']['warning'],
            wait_time >= self.alert_thresholds['wait_time']['warning'],
            error_rate >= self.alert_thresholds['error_rate']['warning'],
            rate <= self.alert_thresholds['processing_rate']['warning'],
            oldest_age >= self.alert_thresholds['oldest_task_age']['warning']
        ]

        if any(critical_conditions):
            return 'critical'
        elif any(warning_conditions):
            return 'warning'
        else:
            return 'healthy'

    async def check_alerts(self) -> List[QueueAlert]:
        """Check for alert conditions and generate alerts"""

        queue_health = await self.collect_queue_metrics()
        alerts = []

        for queue_name, health in queue_health.items():
            # Check each metric against thresholds
            alerts.extend(self._check_metric_alerts(health))

        return alerts

    def _check_metric_alerts(self, health: QueueHealth) -> List[QueueAlert]:
        """Check specific metrics for alert conditions"""

        alerts = []
        current_time = time.time()

        # Queue length alerts
        if health.length >= self.alert_thresholds['queue_length']['critical']:
            alerts.append(QueueAlert(
                queue_name=health.queue_name,
                alert_type='queue_length',
                severity='critical',
                message=f'Queue length critically high: {health.length}',
                value=health.length,
                threshold=self.alert_thresholds['queue_length']['critical'],
                timestamp=current_time
            ))
        elif health.length >= self.alert_thresholds['queue_length']['warning']:
            alerts.append(QueueAlert(
                queue_name=health.queue_name,
                alert_type='queue_length',
                severity='warning',
                message=f'Queue length high: {health.length}',
                value=health.length,
                threshold=self.alert_thresholds['queue_length']['warning'],
                timestamp=current_time
            ))

        # Error rate alerts
        if health.error_rate >= self.alert_thresholds['error_rate']['critical']:
            alerts.append(QueueAlert(
                queue_name=health.queue_name,
                alert_type='error_rate',
                severity='critical',
                message=f'Error rate critically high: {health.error_rate:.2%}',
                value=health.error_rate,
                threshold=self.alert_thresholds['error_rate']['critical'],
                timestamp=current_time
            ))

        # Processing rate alerts
        if health.processing_rate <= self.alert_thresholds['processing_rate']['critical']:
            alerts.append(QueueAlert(
                queue_name=health.queue_name,
                alert_type='processing_rate',
                severity='critical',
                message=f'Processing rate critically low: {health.processing_rate:.2f} tasks/sec',
                value=health.processing_rate,
                threshold=self.alert_thresholds['processing_rate']['critical'],
                timestamp=current_time
            ))

        return alerts


class QueueDashboard:
    """Real-time queue dashboard metrics"""

    def __init__(self, queue_monitor: QueueMonitor):
        self.queue_monitor = queue_monitor

    async def get_dashboard_data(self) -> Dict[str, Any]:
        """Get comprehensive dashboard data"""

        queue_health = await self.queue_monitor.collect_queue_metrics()
        alerts = await self.queue_monitor.check_alerts()

        # Calculate summary metrics
        total_queued = sum(h.length for h in queue_health.values() if h.queue_name != 'processing')
        total_processing = queue_health.get('processing', QueueHealth('processing', 0, 0, 0, 0, 0, 'healthy')).length
        avg_processing_rate = sum(h.processing_rate for h in queue_health.values()) / len(queue_health)

        # Alert summary
        critical_alerts = [a for a in alerts if a.severity == 'critical']
        warning_alerts = [a for a in alerts if a.severity == 'warning']

        return {
            'summary': {
                'total_queued': total_queued,
                'total_processing': total_processing,
                'avg_processing_rate': avg_processing_rate,
                'critical_alerts': len(critical_alerts),
                'warning_alerts': len(warning_alerts)
            },
            'queue_health': {name: health for name, health in queue_health.items()},
            'alerts': alerts,
            'timestamp': time.time()
        }

Queue Management Best Practices

Design Guidelines

  • Idempotency: Design tasks to be safely retryable
  • Small Payloads: Keep task payloads minimal
  • Timeout Handling: Implement proper timeout and cleanup
  • Error Handling: Graceful error handling with proper logging

Operational Guidelines

  • Monitor Queue Health: Continuous monitoring of queue metrics
  • Capacity Planning: Plan for peak load scenarios
  • Dead Letter Analysis: Regular analysis of failed tasks
  • Worker Scaling: Dynamic scaling based on queue load

Performance Guidelines

  • Batch Processing: Group related tasks for efficiency
  • Priority Tuning: Regular adjustment of priority rules
  • Resource Management: Proper resource cleanup in workers
  • Load Distribution: Even distribution across worker pools

Next Steps

To learn more about related system management topics:

  1. Retry Logic: Advanced retry and resilience patterns
  2. Scaling Strategies: System scaling approaches
  3. Performance Optimization: System performance tuning
  4. Load Balancing: Traffic distribution strategies