WebSocket Protocol
Real-time communication protocol, message types, connection management, and WebSocket implementation in SysManage.
Protocol Overview
SysManage uses WebSocket connections to provide real-time communication between the web interface and server, enabling instant updates for system status, task progress, agent connectivity, and alerts without polling.
WebSocket Communication Flow
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Browser │◀───────▶│ WebSocket │◀───────▶│ Server │
│ Client │ Events │ Gateway │ Events │ Backend │
└─────────────┘ └─────────────┘ └─────────────┘
│ │ │
│ HTTP Upgrade │ Connection Pool │ Event Bus
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ Connection │ │ Message │ │ Business │
│ Management │ │ Routing │ │ Logic │
│ │ │ │ │ Events │
│ • Auth │ │ • Broadcast │ │ │
│ • Heartbeat │ │ • Unicast │ │ • Tasks │
│ • Reconnect │ │ • Room Mgmt │ │ • Agents │
└─────────────┘ └─────────────┘ └─────────────┘
Key Features
- Real-time Updates: Instant propagation of system changes
- Bi-directional Communication: Client can send commands and receive events
- Connection Management: Automatic reconnection and heartbeat monitoring
- Room-based Broadcasting: Targeted message delivery based on user permissions
- Message Queuing: Reliable delivery with offline message storage
Connection Lifecycle
Connection State Machine
┌─────────────────┐
│ DISCONNECTED │
│ │
└─────────┬───────┘
│ connect()
▼
┌─────────────────┐
│ CONNECTING │
│ │
└─────┬─────┬─────┘
│ │
success │ │ failure
▼ ▼
┌─────────────┐ ┌─────────────┐
│ CONNECTED │ │ FAILED │
│ │ │ │
└─────┬─┬─────┘ └─────┬───────┘
│ │ │
heartbeat │ │ disconnect │ retry
timeout │ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐
│ RECONNECTING│ │ RETRYING │
│ │ │ │
└─────────────┘ └─────────────┘
1. Connection Establishment
// Client-side connection establishment
const ws = new WebSocket('wss://sysmanage.example.com/ws');
// Authentication via query parameter or message
const authToken = getJWTToken();
const wsUrl = `wss://sysmanage.example.com/ws?token=${authToken}`;
// Connection events
ws.onopen = (event) => {
console.log('WebSocket connected');
sendMessage({
type: 'auth',
token: authToken
});
};
2. Authentication & Authorization
// Server-side authentication flow
async def handle_connection(websocket, path):
try:
# Extract token from query params or first message
token = get_auth_token(websocket)
# Validate JWT token
user = await authenticate_token(token)
if not user:
await websocket.close(code=4001, reason="Unauthorized")
return
# Store user context
websocket.user = user
websocket.permissions = await get_user_permissions(user)
# Join appropriate rooms based on permissions
await join_user_rooms(websocket, user)
# Send welcome message
await send_message(websocket, {
'type': 'welcome',
'user_id': user.id,
'permissions': websocket.permissions
})
except Exception as e:
await websocket.close(code=4000, reason="Authentication failed")
3. Heartbeat & Keep-alive
// Client-side heartbeat implementation
class WebSocketManager {
constructor(url) {
this.url = url;
this.heartbeatInterval = 30000; // 30 seconds
this.reconnectDelay = 5000; // 5 seconds
this.maxReconnectAttempts = 10;
}
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.send({
type: 'ping',
timestamp: Date.now()
});
}
}, this.heartbeatInterval);
}
handlePong(message) {
const latency = Date.now() - message.timestamp;
console.log(`Heartbeat latency: ${latency}ms`);
}
}
4. Graceful Disconnection
// Server-side cleanup
async def handle_disconnect(websocket):
try:
# Remove from all rooms
await leave_all_rooms(websocket)
# Clean up user session
await cleanup_user_session(websocket.user)
# Log disconnection
logger.info(f"User {websocket.user.id} disconnected")
except Exception as e:
logger.error(f"Error during disconnect cleanup: {e}")
Room Management
Room-based Broadcasting
SysManage uses a room-based system to efficiently deliver messages to relevant users based on their permissions and interests.
Room Types
Global Rooms
- system: System-wide notifications
- alerts: All alert notifications
- maintenance: Maintenance announcements
Entity-based Rooms
- agents: All agent status updates
- agent-{id}: Specific agent updates
- tasks: All task updates
- task-{id}: Specific task progress
Group-based Rooms
- group-{id}: Agent group updates
- team-{id}: Team-specific notifications
- role-{name}: Role-based announcements
User Rooms
- user-{id}: Personal notifications
- session-{id}: Session-specific messages
Room Subscription Management
// Client-side room subscription
class RoomManager {
constructor(wsManager) {
this.wsManager = wsManager;
this.subscribedRooms = new Set();
}
subscribe(roomName) {
if (!this.subscribedRooms.has(roomName)) {
this.wsManager.send({
type: 'subscribe',
data: { room: roomName }
});
this.subscribedRooms.add(roomName);
}
}
unsubscribe(roomName) {
if (this.subscribedRooms.has(roomName)) {
this.wsManager.send({
type: 'unsubscribe',
data: { room: roomName }
});
this.subscribedRooms.delete(roomName);
}
}
// Automatically subscribe based on current page
subscribeForPage(page) {
switch (page) {
case 'dashboard':
this.subscribe('system');
this.subscribe('alerts');
this.subscribe('agents');
break;
case 'agent-detail':
const agentId = getCurrentAgentId();
this.subscribe(`agent-${agentId}`);
break;
case 'tasks':
this.subscribe('tasks');
break;
}
}
}
Server-side Room Management
# Python server-side room management
class WebSocketRoomManager:
def __init__(self):
self.rooms = defaultdict(set) # room_name -> set of websockets
self.user_rooms = defaultdict(set) # user_id -> set of room_names
async def join_room(self, websocket, room_name):
# Check permissions
if not await self.can_join_room(websocket.user, room_name):
await self.send_error(websocket, "Permission denied")
return False
# Add to room
self.rooms[room_name].add(websocket)
self.user_rooms[websocket.user.id].add(room_name)
logger.info(f"User {websocket.user.id} joined room {room_name}")
return True
async def leave_room(self, websocket, room_name):
self.rooms[room_name].discard(websocket)
self.user_rooms[websocket.user.id].discard(room_name)
async def broadcast_to_room(self, room_name, message):
if room_name in self.rooms:
websockets = self.rooms[room_name].copy()
await asyncio.gather(
*[self.send_message(ws, message) for ws in websockets],
return_exceptions=True
)
async def can_join_room(self, user, room_name):
# Implement permission logic
if room_name.startswith('agent-'):
agent_id = int(room_name.split('-')[1])
return await user.can_view_agent(agent_id)
elif room_name.startswith('group-'):
group_id = int(room_name.split('-')[1])
return await user.can_view_group(group_id)
elif room_name in ['system', 'alerts']:
return user.has_permission('view_system')
return False
Connection Scaling & Load Balancing
Horizontal Scaling Architecture
Load Balancer
(Sticky Sessions)
│
┌──────────────────┼──────────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ WebSocket │ │ WebSocket │ │ WebSocket │
│ Server 1 │ │ Server 2 │ │ Server 3 │
│ │ │ │ │ │
│ • 10k conns │ │ • 10k conns │ │ • 10k conns │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└────────────────┼────────────────┘
│
▼
┌──────────────┐
│ Redis │
│ Message │
│ Bus │
└──────────────┘
│
▼
┌──────────────┐
│ Application │
│ Servers │
│ │
└──────────────┘
Scaling Strategies
Session Affinity
- Load balancer routes users to same server
- Maintains WebSocket connection state
- Reduces cross-server message routing
Redis Message Bus
- Pub/Sub for cross-server communication
- Room state synchronization
- Message persistence for offline users
Connection Pooling
- Efficient memory usage per connection
- Connection lifecycle management
- Automatic cleanup of stale connections
Redis Integration
# Redis-based cross-server message routing
class RedisMessageBus:
def __init__(self, redis_client):
self.redis = redis_client
self.local_rooms = {}
async def publish_to_room(self, room_name, message):
# Publish to Redis for all servers
await self.redis.publish(f"room:{room_name}", json.dumps(message))
async def subscribe_to_rooms(self):
# Subscribe to Redis room channels
pubsub = self.redis.pubsub()
await pubsub.psubscribe("room:*")
async for message in pubsub.listen():
if message['type'] == 'pmessage':
room_name = message['channel'].decode('utf-8').split(':', 1)[1]
data = json.loads(message['data'])
await self.deliver_to_local_room(room_name, data)
async def deliver_to_local_room(self, room_name, message):
# Deliver to local WebSocket connections
if room_name in self.local_rooms:
websockets = self.local_rooms[room_name]
await asyncio.gather(
*[ws.send(json.dumps(message)) for ws in websockets],
return_exceptions=True
)
Performance Optimization
Connection Optimization
- Connection Limits: Per-user and per-IP connection limits
- Memory Management: Efficient message buffering and cleanup
- CPU Optimization: Async processing for all I/O operations
- Network Optimization: Message compression for large payloads
Message Optimization
// Client-side message optimization
class MessageOptimizer {
constructor() {
this.messageQueue = [];
this.batchSize = 10;
this.batchTimeout = 100; // 100ms
}
// Batch multiple messages for efficiency
queueMessage(message) {
this.messageQueue.push(message);
if (this.messageQueue.length >= this.batchSize) {
this.flushQueue();
} else if (this.messageQueue.length === 1) {
// Start timeout for first message in queue
setTimeout(() => this.flushQueue(), this.batchTimeout);
}
}
flushQueue() {
if (this.messageQueue.length > 0) {
const batch = {
type: 'batch',
messages: this.messageQueue.splice(0)
};
this.wsManager.send(batch);
}
}
// Compress large messages
compressMessage(message) {
const serialized = JSON.stringify(message);
if (serialized.length > 1024) { // 1KB threshold
return {
type: 'compressed',
data: pako.deflate(serialized, { to: 'string' })
};
}
return message;
}
}
Monitoring & Metrics
Connection Metrics
- Active connections per server
- Connection establishment rate
- Connection duration distribution
- Reconnection frequency
Message Metrics
- Messages per second throughput
- Message delivery latency
- Message queue depth
- Failed message delivery rate
Resource Metrics
- Memory usage per connection
- CPU usage for WebSocket handling
- Network bandwidth utilization
- Redis memory usage
Error Handling & Resilience
Client-side Error Handling
Automatic Reconnection
class ResilientWebSocket {
constructor(url, options = {}) {
this.url = url;
this.options = {
maxReconnectAttempts: 10,
reconnectDelay: 1000,
maxReconnectDelay: 30000,
backoffFactor: 1.5,
...options
};
this.reconnectAttempts = 0;
this.connectionState = 'disconnected';
this.messageQueue = [];
}
connect() {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(this.url);
this.connectionState = 'connecting';
this.ws.onopen = (event) => {
this.connectionState = 'connected';
this.reconnectAttempts = 0;
this.flushMessageQueue();
resolve(event);
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
this.handleConnectionError();
};
this.ws.onclose = (event) => {
this.connectionState = 'disconnected';
if (!event.wasClean) {
this.scheduleReconnect();
}
};
});
}
handleConnectionError() {
if (this.reconnectAttempts < this.options.maxReconnectAttempts) {
this.scheduleReconnect();
} else {
this.emit('max_reconnect_attempts_reached');
}
}
scheduleReconnect() {
const delay = Math.min(
this.options.reconnectDelay * Math.pow(this.options.backoffFactor, this.reconnectAttempts),
this.options.maxReconnectDelay
);
setTimeout(() => {
this.reconnectAttempts++;
this.connect();
}, delay);
}
send(message) {
if (this.connectionState === 'connected') {
this.ws.send(JSON.stringify(message));
} else {
// Queue messages while disconnected
this.messageQueue.push(message);
}
}
flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.ws.send(JSON.stringify(message));
}
}
}
Message Acknowledgment
class ReliableMessageSender {
constructor(wsManager) {
this.wsManager = wsManager;
this.pendingMessages = new Map();
this.ackTimeout = 5000; // 5 seconds
}
sendReliable(message) {
const messageId = this.generateMessageId();
message.id = messageId;
message.requiresAck = true;
// Store for potential retry
this.pendingMessages.set(messageId, {
message,
timestamp: Date.now(),
retryCount: 0
});
// Send message
this.wsManager.send(message);
// Set timeout for acknowledgment
setTimeout(() => {
this.handleAckTimeout(messageId);
}, this.ackTimeout);
return messageId;
}
handleAck(ackMessage) {
const messageId = ackMessage.ackId;
if (this.pendingMessages.has(messageId)) {
this.pendingMessages.delete(messageId);
}
}
handleAckTimeout(messageId) {
const pending = this.pendingMessages.get(messageId);
if (pending && pending.retryCount < 3) {
pending.retryCount++;
this.wsManager.send(pending.message);
// Reschedule timeout
setTimeout(() => {
this.handleAckTimeout(messageId);
}, this.ackTimeout);
} else {
// Give up after 3 retries
this.pendingMessages.delete(messageId);
this.handleMessageFailed(messageId);
}
}
}
Server-side Error Handling
Circuit Breaker for External Dependencies
class WebSocketCircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'closed' # closed, open, half-open
async def call_protected(self, func, *args, **kwargs):
if self.state == 'open':
if self.should_attempt_reset():
self.state = 'half-open'
else:
raise CircuitBreakerOpenError("Circuit breaker is open")
try:
result = await func(*args, **kwargs)
self.on_success()
return result
except Exception as e:
self.on_failure()
raise
def on_success(self):
self.failure_count = 0
self.state = 'closed'
def on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'open'
def should_attempt_reset(self):
return (time.time() - self.last_failure_time) >= self.recovery_timeout
# Usage in WebSocket handler
breaker = WebSocketCircuitBreaker()
async def send_to_external_service(data):
try:
await breaker.call_protected(external_api_call, data)
except CircuitBreakerOpenError:
# Use cached data or alternative approach
await send_cached_response(data)
Graceful Degradation
- Fallback to HTTP: When WebSocket fails, fall back to HTTP polling
- Reduced Functionality: Disable real-time features during high load
- Cached Responses: Serve cached data when live data unavailable
- Queue Overflow Handling: Drop low-priority messages when queues full
Security Considerations
Authentication & Authorization
- JWT Token Validation: Every WebSocket connection validates JWT tokens
- Permission-based Rooms: Users can only join rooms they have permission for
- Token Refresh: Automatic token refresh for long-lived connections
- Session Management: Tracking and management of active WebSocket sessions
Rate Limiting & Abuse Prevention
class WebSocketRateLimiter:
def __init__(self):
self.connections_per_ip = defaultdict(int)
self.messages_per_user = defaultdict(list)
self.max_connections_per_ip = 10
self.max_messages_per_minute = 100
def check_connection_limit(self, ip_address):
if self.connections_per_ip[ip_address] >= self.max_connections_per_ip:
raise ConnectionLimitExceeded(f"Too many connections from {ip_address}")
return True
def check_message_rate(self, user_id):
now = time.time()
user_messages = self.messages_per_user[user_id]
# Remove messages older than 1 minute
user_messages[:] = [msg_time for msg_time in user_messages
if now - msg_time < 60]
if len(user_messages) >= self.max_messages_per_minute:
raise RateLimitExceeded(f"Message rate limit exceeded for user {user_id}")
user_messages.append(now)
return True
Data Validation & Sanitization
- Message Schema Validation: All incoming messages validated against schemas
- Input Sanitization: User input sanitized to prevent injection attacks
- Size Limits: Maximum message size limits to prevent DoS
- Content Filtering: Filtering of potentially malicious content
Monitoring & Alerting
- Anomaly Detection: Detection of unusual connection patterns
- Security Event Logging: Comprehensive logging of security events
- Real-time Alerts: Immediate alerts for security violations
- Forensic Capabilities: Message history for security investigations