Documentation > Architecture > WebSocket Protocol

WebSocket Protocol

Real-time communication protocol, message types, connection management, and WebSocket implementation in SysManage.

Protocol Overview

SysManage uses WebSocket connections to provide real-time communication between the web interface and server, enabling instant updates for system status, task progress, agent connectivity, and alerts without polling.

WebSocket Communication Flow

┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│   Browser   │◀───────▶│  WebSocket  │◀───────▶│   Server    │
│   Client    │  Events │   Gateway   │ Events  │  Backend    │
└─────────────┘         └─────────────┘         └─────────────┘
       │                       │                       │
       │ HTTP Upgrade          │ Connection Pool       │ Event Bus
       │                       │                       │
       ▼                       ▼                       ▼
┌─────────────┐         ┌─────────────┐         ┌─────────────┐
│ Connection  │         │ Message     │         │ Business    │
│ Management  │         │ Routing     │         │ Logic       │
│             │         │             │         │ Events      │
│ • Auth      │         │ • Broadcast │         │             │
│ • Heartbeat │         │ • Unicast   │         │ • Tasks     │
│ • Reconnect │         │ • Room Mgmt │         │ • Agents    │
└─────────────┘         └─────────────┘         └─────────────┘
                        

Key Features

  • Real-time Updates: Instant propagation of system changes
  • Bi-directional Communication: Client can send commands and receive events
  • Connection Management: Automatic reconnection and heartbeat monitoring
  • Room-based Broadcasting: Targeted message delivery based on user permissions
  • Message Queuing: Reliable delivery with offline message storage

Connection Lifecycle

Connection State Machine

                    ┌─────────────────┐
                    │   DISCONNECTED  │
                    │                 │
                    └─────────┬───────┘
                              │ connect()
                              ▼
                    ┌─────────────────┐
                    │   CONNECTING    │
                    │                 │
                    └─────┬─────┬─────┘
                          │     │
               success    │     │ failure
                          ▼     ▼
               ┌─────────────┐ ┌─────────────┐
               │ CONNECTED   │ │ FAILED      │
               │             │ │             │
               └─────┬─┬─────┘ └─────┬───────┘
                     │ │             │
          heartbeat  │ │ disconnect  │ retry
          timeout    │ │             │
                     ▼ ▼             ▼
               ┌─────────────┐ ┌─────────────┐
               │ RECONNECTING│ │ RETRYING    │
               │             │ │             │
               └─────────────┘ └─────────────┘
                        

1. Connection Establishment

// Client-side connection establishment
const ws = new WebSocket('wss://sysmanage.example.com/ws');

// Authentication via query parameter or message
const authToken = getJWTToken();
const wsUrl = `wss://sysmanage.example.com/ws?token=${authToken}`;

// Connection events
ws.onopen = (event) => {
    console.log('WebSocket connected');
    sendMessage({
        type: 'auth',
        token: authToken
    });
};
                            

2. Authentication & Authorization

// Server-side authentication flow
async def handle_connection(websocket, path):
    try:
        # Extract token from query params or first message
        token = get_auth_token(websocket)

        # Validate JWT token
        user = await authenticate_token(token)
        if not user:
            await websocket.close(code=4001, reason="Unauthorized")
            return

        # Store user context
        websocket.user = user
        websocket.permissions = await get_user_permissions(user)

        # Join appropriate rooms based on permissions
        await join_user_rooms(websocket, user)

        # Send welcome message
        await send_message(websocket, {
            'type': 'welcome',
            'user_id': user.id,
            'permissions': websocket.permissions
        })

    except Exception as e:
        await websocket.close(code=4000, reason="Authentication failed")
                            

3. Heartbeat & Keep-alive

// Client-side heartbeat implementation
class WebSocketManager {
    constructor(url) {
        this.url = url;
        this.heartbeatInterval = 30000; // 30 seconds
        this.reconnectDelay = 5000;     // 5 seconds
        this.maxReconnectAttempts = 10;
    }

    startHeartbeat() {
        this.heartbeatTimer = setInterval(() => {
            if (this.ws.readyState === WebSocket.OPEN) {
                this.send({
                    type: 'ping',
                    timestamp: Date.now()
                });
            }
        }, this.heartbeatInterval);
    }

    handlePong(message) {
        const latency = Date.now() - message.timestamp;
        console.log(`Heartbeat latency: ${latency}ms`);
    }
}
                            

4. Graceful Disconnection

// Server-side cleanup
async def handle_disconnect(websocket):
    try:
        # Remove from all rooms
        await leave_all_rooms(websocket)

        # Clean up user session
        await cleanup_user_session(websocket.user)

        # Log disconnection
        logger.info(f"User {websocket.user.id} disconnected")

    except Exception as e:
        logger.error(f"Error during disconnect cleanup: {e}")
                            

Message Protocol

Message Format

All WebSocket messages use JSON format with a standardized envelope:

{
    "id": "uuid-v4",                    // Unique message ID
    "type": "message_type",             // Message type identifier
    "timestamp": "2024-01-15T10:30:00Z", // ISO 8601 timestamp
    "sender": "user_id",                // Sender identifier
    "room": "room_name",                // Target room (optional)
    "data": {                           // Message payload
        // Type-specific data
    },
    "metadata": {                       // Optional metadata
        "correlation_id": "uuid",       // For request/response correlation
        "priority": "high|normal|low",  // Message priority
        "ttl": 300                      // Time to live in seconds
    }
}
                        

Message Types

System Messages

  • ping/pong: Heartbeat mechanism
  • auth: Authentication credentials
  • welcome: Connection confirmation
  • error: Error notifications
  • subscribe/unsubscribe: Room management

Agent Messages

  • agent_connected: Agent comes online
  • agent_disconnected: Agent goes offline
  • agent_updated: Agent information changed
  • agent_status: Status update from agent

Task Messages

  • task_created: New task scheduled
  • task_started: Task execution began
  • task_progress: Task progress update
  • task_completed: Task finished
  • task_failed: Task execution failed

Alert Messages

  • alert_triggered: New alert generated
  • alert_acknowledged: Alert acknowledged
  • alert_resolved: Alert resolved
  • alert_escalated: Alert escalated

Inventory Messages

  • package_updated: Package inventory changed
  • service_status: Service status changed
  • metrics_update: New metrics available
  • inventory_refresh: Full inventory update

Message Examples

Agent Status Update

{
    "id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
    "type": "agent_status",
    "timestamp": "2024-01-15T10:30:00Z",
    "sender": "agent-001",
    "room": "agents",
    "data": {
        "agent_id": 123,
        "hostname": "web-server-01",
        "status": "online",
        "last_seen": "2024-01-15T10:29:45Z",
        "cpu_usage": 15.2,
        "memory_usage": 68.7,
        "disk_usage": 45.1,
        "network_io": {
            "bytes_sent": 1048576,
            "bytes_received": 2097152
        }
    },
    "metadata": {
        "priority": "normal"
    }
}
                        

Task Progress Update

{
    "id": "b2c3d4e5-f6g7-8901-bcde-f12345678901",
    "type": "task_progress",
    "timestamp": "2024-01-15T10:35:00Z",
    "sender": "task-executor",
    "room": "tasks",
    "data": {
        "task_id": 456,
        "title": "Update packages on web servers",
        "progress": 65,
        "status": "running",
        "current_step": "Installing nginx update",
        "completed_agents": 13,
        "total_agents": 20,
        "estimated_completion": "2024-01-15T10:42:00Z",
        "logs": [
            {
                "timestamp": "2024-01-15T10:34:30Z",
                "level": "info",
                "message": "Package nginx updated successfully on web-server-03"
            }
        ]
    },
    "metadata": {
        "correlation_id": "task-456-progress",
        "priority": "high"
    }
}
                        

Alert Notification

{
    "id": "c3d4e5f6-g7h8-9012-cdef-123456789012",
    "type": "alert_triggered",
    "timestamp": "2024-01-15T10:40:00Z",
    "sender": "alert-manager",
    "room": "alerts",
    "data": {
        "alert_id": 789,
        "severity": "critical",
        "title": "High CPU usage detected",
        "description": "CPU usage exceeded 90% threshold",
        "agent_id": 123,
        "hostname": "web-server-01",
        "metric": {
            "name": "cpu_usage",
            "current_value": 94.7,
            "threshold": 90.0,
            "unit": "percent"
        },
        "actions": [
            {
                "type": "escalate",
                "delay": 300,
                "target": "ops-team"
            }
        ]
    },
    "metadata": {
        "priority": "high",
        "ttl": 3600
    }
}
                        

Room Management

Room-based Broadcasting

SysManage uses a room-based system to efficiently deliver messages to relevant users based on their permissions and interests.

Room Types

Global Rooms
  • system: System-wide notifications
  • alerts: All alert notifications
  • maintenance: Maintenance announcements
Entity-based Rooms
  • agents: All agent status updates
  • agent-{id}: Specific agent updates
  • tasks: All task updates
  • task-{id}: Specific task progress
Group-based Rooms
  • group-{id}: Agent group updates
  • team-{id}: Team-specific notifications
  • role-{name}: Role-based announcements
User Rooms
  • user-{id}: Personal notifications
  • session-{id}: Session-specific messages

Room Subscription Management

// Client-side room subscription
class RoomManager {
    constructor(wsManager) {
        this.wsManager = wsManager;
        this.subscribedRooms = new Set();
    }

    subscribe(roomName) {
        if (!this.subscribedRooms.has(roomName)) {
            this.wsManager.send({
                type: 'subscribe',
                data: { room: roomName }
            });
            this.subscribedRooms.add(roomName);
        }
    }

    unsubscribe(roomName) {
        if (this.subscribedRooms.has(roomName)) {
            this.wsManager.send({
                type: 'unsubscribe',
                data: { room: roomName }
            });
            this.subscribedRooms.delete(roomName);
        }
    }

    // Automatically subscribe based on current page
    subscribeForPage(page) {
        switch (page) {
            case 'dashboard':
                this.subscribe('system');
                this.subscribe('alerts');
                this.subscribe('agents');
                break;
            case 'agent-detail':
                const agentId = getCurrentAgentId();
                this.subscribe(`agent-${agentId}`);
                break;
            case 'tasks':
                this.subscribe('tasks');
                break;
        }
    }
}
                        

Server-side Room Management

# Python server-side room management
class WebSocketRoomManager:
    def __init__(self):
        self.rooms = defaultdict(set)  # room_name -> set of websockets
        self.user_rooms = defaultdict(set)  # user_id -> set of room_names

    async def join_room(self, websocket, room_name):
        # Check permissions
        if not await self.can_join_room(websocket.user, room_name):
            await self.send_error(websocket, "Permission denied")
            return False

        # Add to room
        self.rooms[room_name].add(websocket)
        self.user_rooms[websocket.user.id].add(room_name)

        logger.info(f"User {websocket.user.id} joined room {room_name}")
        return True

    async def leave_room(self, websocket, room_name):
        self.rooms[room_name].discard(websocket)
        self.user_rooms[websocket.user.id].discard(room_name)

    async def broadcast_to_room(self, room_name, message):
        if room_name in self.rooms:
            websockets = self.rooms[room_name].copy()
            await asyncio.gather(
                *[self.send_message(ws, message) for ws in websockets],
                return_exceptions=True
            )

    async def can_join_room(self, user, room_name):
        # Implement permission logic
        if room_name.startswith('agent-'):
            agent_id = int(room_name.split('-')[1])
            return await user.can_view_agent(agent_id)
        elif room_name.startswith('group-'):
            group_id = int(room_name.split('-')[1])
            return await user.can_view_group(group_id)
        elif room_name in ['system', 'alerts']:
            return user.has_permission('view_system')
        return False
                        

Connection Scaling & Load Balancing

Horizontal Scaling Architecture

                    Load Balancer
                    (Sticky Sessions)
                           │
        ┌──────────────────┼──────────────────┐
        │                  │                  │
        ▼                  ▼                  ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ WebSocket    │ │ WebSocket    │ │ WebSocket    │
│ Server 1     │ │ Server 2     │ │ Server 3     │
│              │ │              │ │              │
│ • 10k conns  │ │ • 10k conns  │ │ • 10k conns  │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
       │                │                │
       └────────────────┼────────────────┘
                        │
                        ▼
                ┌──────────────┐
                │    Redis     │
                │   Message    │
                │    Bus       │
                └──────────────┘
                        │
                        ▼
                ┌──────────────┐
                │  Application │
                │   Servers    │
                │              │
                └──────────────┘
                        

Scaling Strategies

Session Affinity
  • Load balancer routes users to same server
  • Maintains WebSocket connection state
  • Reduces cross-server message routing
Redis Message Bus
  • Pub/Sub for cross-server communication
  • Room state synchronization
  • Message persistence for offline users
Connection Pooling
  • Efficient memory usage per connection
  • Connection lifecycle management
  • Automatic cleanup of stale connections

Redis Integration

# Redis-based cross-server message routing
class RedisMessageBus:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.local_rooms = {}

    async def publish_to_room(self, room_name, message):
        # Publish to Redis for all servers
        await self.redis.publish(f"room:{room_name}", json.dumps(message))

    async def subscribe_to_rooms(self):
        # Subscribe to Redis room channels
        pubsub = self.redis.pubsub()
        await pubsub.psubscribe("room:*")

        async for message in pubsub.listen():
            if message['type'] == 'pmessage':
                room_name = message['channel'].decode('utf-8').split(':', 1)[1]
                data = json.loads(message['data'])
                await self.deliver_to_local_room(room_name, data)

    async def deliver_to_local_room(self, room_name, message):
        # Deliver to local WebSocket connections
        if room_name in self.local_rooms:
            websockets = self.local_rooms[room_name]
            await asyncio.gather(
                *[ws.send(json.dumps(message)) for ws in websockets],
                return_exceptions=True
            )
                        

Performance Optimization

Connection Optimization

  • Connection Limits: Per-user and per-IP connection limits
  • Memory Management: Efficient message buffering and cleanup
  • CPU Optimization: Async processing for all I/O operations
  • Network Optimization: Message compression for large payloads

Message Optimization

// Client-side message optimization
class MessageOptimizer {
    constructor() {
        this.messageQueue = [];
        this.batchSize = 10;
        this.batchTimeout = 100; // 100ms
    }

    // Batch multiple messages for efficiency
    queueMessage(message) {
        this.messageQueue.push(message);

        if (this.messageQueue.length >= this.batchSize) {
            this.flushQueue();
        } else if (this.messageQueue.length === 1) {
            // Start timeout for first message in queue
            setTimeout(() => this.flushQueue(), this.batchTimeout);
        }
    }

    flushQueue() {
        if (this.messageQueue.length > 0) {
            const batch = {
                type: 'batch',
                messages: this.messageQueue.splice(0)
            };
            this.wsManager.send(batch);
        }
    }

    // Compress large messages
    compressMessage(message) {
        const serialized = JSON.stringify(message);
        if (serialized.length > 1024) { // 1KB threshold
            return {
                type: 'compressed',
                data: pako.deflate(serialized, { to: 'string' })
            };
        }
        return message;
    }
}
                        

Monitoring & Metrics

Connection Metrics
  • Active connections per server
  • Connection establishment rate
  • Connection duration distribution
  • Reconnection frequency
Message Metrics
  • Messages per second throughput
  • Message delivery latency
  • Message queue depth
  • Failed message delivery rate
Resource Metrics
  • Memory usage per connection
  • CPU usage for WebSocket handling
  • Network bandwidth utilization
  • Redis memory usage

Error Handling & Resilience

Client-side Error Handling

Automatic Reconnection

class ResilientWebSocket {
    constructor(url, options = {}) {
        this.url = url;
        this.options = {
            maxReconnectAttempts: 10,
            reconnectDelay: 1000,
            maxReconnectDelay: 30000,
            backoffFactor: 1.5,
            ...options
        };

        this.reconnectAttempts = 0;
        this.connectionState = 'disconnected';
        this.messageQueue = [];
    }

    connect() {
        return new Promise((resolve, reject) => {
            this.ws = new WebSocket(this.url);
            this.connectionState = 'connecting';

            this.ws.onopen = (event) => {
                this.connectionState = 'connected';
                this.reconnectAttempts = 0;
                this.flushMessageQueue();
                resolve(event);
            };

            this.ws.onerror = (error) => {
                console.error('WebSocket error:', error);
                this.handleConnectionError();
            };

            this.ws.onclose = (event) => {
                this.connectionState = 'disconnected';
                if (!event.wasClean) {
                    this.scheduleReconnect();
                }
            };
        });
    }

    handleConnectionError() {
        if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
            this.scheduleReconnect();
        } else {
            this.emit('max_reconnect_attempts_reached');
        }
    }

    scheduleReconnect() {
        const delay = Math.min(
            this.options.reconnectDelay * Math.pow(this.options.backoffFactor, this.reconnectAttempts),
            this.options.maxReconnectDelay
        );

        setTimeout(() => {
            this.reconnectAttempts++;
            this.connect();
        }, delay);
    }

    send(message) {
        if (this.connectionState === 'connected') {
            this.ws.send(JSON.stringify(message));
        } else {
            // Queue messages while disconnected
            this.messageQueue.push(message);
        }
    }

    flushMessageQueue() {
        while (this.messageQueue.length > 0) {
            const message = this.messageQueue.shift();
            this.ws.send(JSON.stringify(message));
        }
    }
}
                        

Message Acknowledgment

class ReliableMessageSender {
    constructor(wsManager) {
        this.wsManager = wsManager;
        this.pendingMessages = new Map();
        this.ackTimeout = 5000; // 5 seconds
    }

    sendReliable(message) {
        const messageId = this.generateMessageId();
        message.id = messageId;
        message.requiresAck = true;

        // Store for potential retry
        this.pendingMessages.set(messageId, {
            message,
            timestamp: Date.now(),
            retryCount: 0
        });

        // Send message
        this.wsManager.send(message);

        // Set timeout for acknowledgment
        setTimeout(() => {
            this.handleAckTimeout(messageId);
        }, this.ackTimeout);

        return messageId;
    }

    handleAck(ackMessage) {
        const messageId = ackMessage.ackId;
        if (this.pendingMessages.has(messageId)) {
            this.pendingMessages.delete(messageId);
        }
    }

    handleAckTimeout(messageId) {
        const pending = this.pendingMessages.get(messageId);
        if (pending && pending.retryCount < 3) {
            pending.retryCount++;
            this.wsManager.send(pending.message);

            // Reschedule timeout
            setTimeout(() => {
                this.handleAckTimeout(messageId);
            }, this.ackTimeout);
        } else {
            // Give up after 3 retries
            this.pendingMessages.delete(messageId);
            this.handleMessageFailed(messageId);
        }
    }
}
                        

Server-side Error Handling

Circuit Breaker for External Dependencies

class WebSocketCircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure_time = None
        self.state = 'closed'  # closed, open, half-open

    async def call_protected(self, func, *args, **kwargs):
        if self.state == 'open':
            if self.should_attempt_reset():
                self.state = 'half-open'
            else:
                raise CircuitBreakerOpenError("Circuit breaker is open")

        try:
            result = await func(*args, **kwargs)
            self.on_success()
            return result
        except Exception as e:
            self.on_failure()
            raise

    def on_success(self):
        self.failure_count = 0
        self.state = 'closed'

    def on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = 'open'

    def should_attempt_reset(self):
        return (time.time() - self.last_failure_time) >= self.recovery_timeout

# Usage in WebSocket handler
breaker = WebSocketCircuitBreaker()

async def send_to_external_service(data):
    try:
        await breaker.call_protected(external_api_call, data)
    except CircuitBreakerOpenError:
        # Use cached data or alternative approach
        await send_cached_response(data)
                        

Graceful Degradation

  • Fallback to HTTP: When WebSocket fails, fall back to HTTP polling
  • Reduced Functionality: Disable real-time features during high load
  • Cached Responses: Serve cached data when live data unavailable
  • Queue Overflow Handling: Drop low-priority messages when queues full

Security Considerations

Authentication & Authorization

  • JWT Token Validation: Every WebSocket connection validates JWT tokens
  • Permission-based Rooms: Users can only join rooms they have permission for
  • Token Refresh: Automatic token refresh for long-lived connections
  • Session Management: Tracking and management of active WebSocket sessions

Rate Limiting & Abuse Prevention

class WebSocketRateLimiter:
    def __init__(self):
        self.connections_per_ip = defaultdict(int)
        self.messages_per_user = defaultdict(list)
        self.max_connections_per_ip = 10
        self.max_messages_per_minute = 100

    def check_connection_limit(self, ip_address):
        if self.connections_per_ip[ip_address] >= self.max_connections_per_ip:
            raise ConnectionLimitExceeded(f"Too many connections from {ip_address}")
        return True

    def check_message_rate(self, user_id):
        now = time.time()
        user_messages = self.messages_per_user[user_id]

        # Remove messages older than 1 minute
        user_messages[:] = [msg_time for msg_time in user_messages
                           if now - msg_time < 60]

        if len(user_messages) >= self.max_messages_per_minute:
            raise RateLimitExceeded(f"Message rate limit exceeded for user {user_id}")

        user_messages.append(now)
        return True
                        

Data Validation & Sanitization

  • Message Schema Validation: All incoming messages validated against schemas
  • Input Sanitization: User input sanitized to prevent injection attacks
  • Size Limits: Maximum message size limits to prevent DoS
  • Content Filtering: Filtering of potentially malicious content

Monitoring & Alerting

  • Anomaly Detection: Detection of unusual connection patterns
  • Security Event Logging: Comprehensive logging of security events
  • Real-time Alerts: Immediate alerts for security violations
  • Forensic Capabilities: Message history for security investigations

Next Steps

To explore related WebSocket implementation topics:

  1. REST API Design: Understand how WebSocket complements the REST API
  2. Performance Metrics: Learn about WebSocket performance monitoring
  3. Design Principles: See how real-time communication fits the overall architecture
  4. Database Schema: Understand data flow from database to WebSocket
  5. Scaling Strategies: Scale WebSocket infrastructure effectively