REST API Design
REST API design patterns, endpoint structure, authentication, error handling, and best practices in SysManage.
API Design Philosophy
SysManage's REST API follows industry best practices and RESTful principles to provide a consistent, intuitive, and powerful interface for system management operations. The API is designed to be self-documenting, version-safe, and developer-friendly.
Core Design Principles
π― Resource-Oriented
URLs represent resources, not actions. Clear resource hierarchy with logical nesting.
π Security First
Authentication, authorization, and input validation on every endpoint.
π Consistent Responses
Standardized response formats, error codes, and pagination across all endpoints.
π Performance Optimized
Efficient queries, caching, rate limiting, and bulk operations support.
π Version Safe
Backward compatibility, gradual deprecation, and clear migration paths.
π Self-Documenting
OpenAPI specification, comprehensive examples, and interactive documentation.
API Architecture Overview
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β API Gateway β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β
β β Rate β β Auth & β β Request Validation β β
β β Limiting β β AuthZ β β & Sanitization β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β API Routing Layer β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ€
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β
β β /api/v1/ β β /api/v1/ β β /api/v1/ β β
β β auth β β agents β β tasks β β
β β β β β β β β
β β β’ login β β β’ CRUD ops β β β’ CRUD ops β β
β β β’ logout β β β’ actions β β β’ execution β β
β β β’ refresh β β β’ metrics β β β’ scheduling β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β
β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β
β β /api/v1/ β β /api/v1/ β β /api/v1/ β β
β β inventory β β config β β metrics β β
β β β β β β β β
β β β’ packages β β β’ settings β β β’ collection β β
β β β’ services β β β’ policies β β β’ aggregation β β
β β β’ hardware β β β’ templates β β β’ alerts β β
β βββββββββββββββ βββββββββββββββ βββββββββββββββββββββββββββ β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββ
β Business Logic β
β Layer β
βββββββββββββββββββββββ
β
βΌ
βββββββββββββββββββββββ
β Data Access β
β Layer β
βββββββββββββββββββββββ
URL Structure & Resource Hierarchy
URL Convention
SysManage follows a consistent URL pattern that clearly expresses resource relationships:
https://api.sysmanage.example.com/api/v1/{resource}[/{id}][/{sub-resource}][/{sub-id}]
Examples:
GET /api/v1/agents # List all agents
GET /api/v1/agents/123 # Get specific agent
GET /api/v1/agents/123/packages # List packages on agent
POST /api/v1/agents/123/tasks # Create task for agent
GET /api/v1/agents/123/tasks/456 # Get specific task
PUT /api/v1/agents/123/tasks/456/cancel # Cancel specific task
Resource Hierarchy
/api/v1/
βββ auth/ # Authentication endpoints
β βββ login # POST: User login
β βββ logout # POST: User logout
β βββ refresh # POST: Token refresh
β βββ profile # GET/PUT: User profile
β
βββ agents/ # Agent management
β βββ {id}/ # Specific agent operations
β β βββ packages/ # Package inventory
β β βββ services/ # Service status
β β βββ tasks/ # Agent-specific tasks
β β βββ metrics/ # Agent metrics
β β βββ actions/ # Agent actions (reboot, etc.)
β β βββ reboot
β β βββ shutdown
β β βββ update-agent
β βββ groups/ # Agent grouping
β βββ {id}/
β βββ {id}/members/
β
βββ tasks/ # Task management
β βββ {id}/ # Specific task operations
β β βββ cancel # POST: Cancel task
β β βββ retry # POST: Retry failed task
β β βββ logs # GET: Task execution logs
β β βββ artifacts/ # Task output artifacts
β βββ schedules/ # Scheduled tasks
β β βββ {id}/
β β βββ {id}/executions/
β βββ templates/ # Task templates
β βββ {id}/
β
βββ inventory/ # System inventory
β βββ packages/ # Package management
β β βββ search # GET: Search packages
β β βββ updates # GET: Available updates
β β βββ vulnerabilities # GET: Security issues
β βββ services/ # Service management
β β βββ {name}/ # Specific service
β βββ hardware/ # Hardware inventory
β βββ cpu/
β βββ memory/
β βββ storage/
β
βββ metrics/ # Metrics and monitoring
β βββ query # POST: Custom metric queries
β βββ dashboards/ # Dashboard management
β β βββ {id}/
β βββ alerts/ # Alert management
β βββ {id}/
β βββ rules/ # Alert rules
β βββ notifications/ # Notification channels
β
βββ config/ # Configuration management
β βββ settings/ # System settings
β βββ policies/ # Security policies
β β βββ {id}/
β βββ certificates/ # Certificate management
β βββ {id}/
β βββ ca/ # Certificate Authority
β βββ revoked/ # Revoked certificates
β
βββ admin/ # Administrative endpoints
βββ users/ # User management
β βββ {id}/
β βββ {id}/roles/
βββ roles/ # Role management
β βββ {id}/
βββ audit/ # Audit logs
βββ system/ # System status
βββ health # Health check
βββ version # Version info
βββ status # System status
HTTP Method Usage
GET
Purpose: Retrieve resources
- Safe and idempotent
- Cacheable responses
- No request body
- Supports filtering and pagination
GET /api/v1/agents?status=online&limit=50&offset=0
GET /api/v1/agents/123/packages?search=nginx
POST
Purpose: Create resources or trigger actions
- Not idempotent
- Request body contains data
- Returns 201 for creation, 200 for actions
POST /api/v1/agents/123/tasks
POST /api/v1/agents/123/actions/reboot
POST /api/v1/auth/login
PUT
Purpose: Replace entire resource
- Idempotent
- Complete resource replacement
- Creates if doesn't exist
PUT /api/v1/agents/123
PUT /api/v1/config/settings/email
PATCH
Purpose: Partial resource updates
- Idempotent
- Partial modifications
- JSON Patch or merge semantics
PATCH /api/v1/agents/123
PATCH /api/v1/tasks/456
DELETE
Purpose: Remove resources
- Idempotent
- No request body
- Returns 204 on success
DELETE /api/v1/agents/123
DELETE /api/v1/tasks/456
Response Formats & Standards
Standard Response Format
All API responses follow a consistent structure for predictable client integration:
Success Response Structure
{
"success": true,
"data": {
// Actual response data
},
"meta": {
"timestamp": "2024-01-15T10:30:00Z",
"request_id": "req-123e4567-e89b-12d3-a456-426614174000",
"api_version": "1.0",
"execution_time_ms": 45
},
"pagination": { // Only for paginated responses
"page": 1,
"per_page": 50,
"total": 247,
"total_pages": 5,
"has_next": true,
"has_prev": false
}
}
Error Response Structure
{
"success": false,
"error": {
"code": "VALIDATION_ERROR",
"message": "Invalid input provided",
"details": [
{
"field": "agent_id",
"message": "Agent ID must be a positive integer",
"value": "invalid-id"
}
],
"help_url": "https://docs.sysmanage.com/api/errors#validation-error"
},
"meta": {
"timestamp": "2024-01-15T10:30:00Z",
"request_id": "req-123e4567-e89b-12d3-a456-426614174000",
"api_version": "1.0"
}
}
Response Implementation
# Response formatting utilities
from typing import Any, Optional, Dict, List
from pydantic import BaseModel
from datetime import datetime
import time
import uuid
class PaginationMeta(BaseModel):
page: int
per_page: int
total: int
total_pages: int
has_next: bool
has_prev: bool
class ResponseMeta(BaseModel):
timestamp: datetime
request_id: str
api_version: str
execution_time_ms: Optional[int] = None
class ErrorDetail(BaseModel):
field: Optional[str] = None
message: str
value: Optional[Any] = None
class ErrorInfo(BaseModel):
code: str
message: str
details: Optional[List[ErrorDetail]] = None
help_url: Optional[str] = None
class APIResponse(BaseModel):
success: bool
data: Optional[Any] = None
error: Optional[ErrorInfo] = None
meta: ResponseMeta
pagination: Optional[PaginationMeta] = None
class ResponseBuilder:
def __init__(self, request_id: str = None):
self.request_id = request_id or str(uuid.uuid4())
self.start_time = time.time()
def success(self, data: Any = None, pagination: PaginationMeta = None) -> dict:
"""Build successful response"""
execution_time = int((time.time() - self.start_time) * 1000)
response = APIResponse(
success=True,
data=data,
meta=ResponseMeta(
timestamp=datetime.utcnow(),
request_id=self.request_id,
api_version="1.0",
execution_time_ms=execution_time
),
pagination=pagination
)
return response.dict(exclude_none=True)
def error(self, code: str, message: str, details: List[ErrorDetail] = None,
status_code: int = 400) -> dict:
"""Build error response"""
help_url = f"https://docs.sysmanage.com/api/errors#{code.lower().replace('_', '-')}"
response = APIResponse(
success=False,
error=ErrorInfo(
code=code,
message=message,
details=details,
help_url=help_url
),
meta=ResponseMeta(
timestamp=datetime.utcnow(),
request_id=self.request_id,
api_version="1.0"
)
)
return response.dict(exclude_none=True)
# FastAPI response formatting middleware
@app.middleware("http")
async def response_formatting_middleware(request: Request, call_next):
# Generate request ID
request_id = str(uuid.uuid4())
request.state.request_id = request_id
request.state.start_time = time.time()
# Add request ID to response headers
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
# Usage in API endpoints
@app.get("/api/v1/agents/{agent_id}")
async def get_agent(
agent_id: int,
request: Request,
current_user: User = Depends(get_current_user)
):
builder = ResponseBuilder(request.state.request_id)
try:
# Validate agent ID
if agent_id <= 0:
return JSONResponse(
status_code=400,
content=builder.error(
code="INVALID_AGENT_ID",
message="Agent ID must be a positive integer",
details=[ErrorDetail(
field="agent_id",
message="Must be greater than 0",
value=agent_id
)]
)
)
# Get agent
agent = await agent_service.get_agent(agent_id, current_user)
if not agent:
return JSONResponse(
status_code=404,
content=builder.error(
code="AGENT_NOT_FOUND",
message=f"Agent with ID {agent_id} not found"
)
)
# Return success response
return builder.success(data=agent.dict())
except PermissionError:
return JSONResponse(
status_code=403,
content=builder.error(
code="PERMISSION_DENIED",
message="You don't have permission to access this agent"
)
)
except Exception as e:
logger.error(f"Unexpected error in get_agent: {e}")
return JSONResponse(
status_code=500,
content=builder.error(
code="INTERNAL_ERROR",
message="An unexpected error occurred"
)
)
Pagination Standards
Pagination Parameters
- limit: Number of items per page (max 100, default 50)
- offset: Number of items to skip (default 0)
- sort: Sort field and direction (e.g., "created_at:desc")
- filter: Filter parameters (resource-specific)
Pagination Links
{
"success": true,
"data": {
"agents": [
// ... agent objects
]
},
"pagination": {
"page": 2,
"per_page": 50,
"total": 247,
"total_pages": 5,
"has_next": true,
"has_prev": true,
"links": {
"first": "/api/v1/agents?limit=50&offset=0",
"prev": "/api/v1/agents?limit=50&offset=0",
"self": "/api/v1/agents?limit=50&offset=50",
"next": "/api/v1/agents?limit=50&offset=100",
"last": "/api/v1/agents?limit=50&offset=200"
}
}
}
Filtering and Sorting
# Filtering examples
GET /api/v1/agents?status=online&platform=linux&group_id=5
GET /api/v1/tasks?status=running&created_after=2024-01-01&agent_id=123
GET /api/v1/packages?name=nginx&version_gt=1.20
# Sorting examples
GET /api/v1/agents?sort=hostname:asc
GET /api/v1/tasks?sort=created_at:desc,priority:asc
GET /api/v1/metrics?sort=timestamp:desc&limit=100
# Complex queries
GET /api/v1/agents?search=web-server&status=online&sort=last_seen:desc&limit=25
Error Handling
HTTP Status Codes
2xx Success
- 200 OK: Successful GET, PUT, PATCH, or action
- 201 Created: Successful POST that creates resource
- 202 Accepted: Async operation started
- 204 No Content: Successful DELETE or empty result
4xx Client Errors
- 400 Bad Request: Invalid request format or parameters
- 401 Unauthorized: Authentication required or invalid
- 403 Forbidden: Permission denied
- 404 Not Found: Resource doesn't exist
- 409 Conflict: Resource state conflict
- 422 Unprocessable Entity: Validation errors
- 429 Too Many Requests: Rate limit exceeded
5xx Server Errors
- 500 Internal Server Error: Unexpected server error
- 502 Bad Gateway: Upstream service error
- 503 Service Unavailable: Temporary unavailability
- 504 Gateway Timeout: Upstream timeout
Error Code System
Error Code Categories
Category | Prefix | Examples
-----------------|--------|------------------------------------------
Authentication | AUTH_ | AUTH_INVALID_TOKEN, AUTH_EXPIRED_TOKEN
Authorization | PERM_ | PERM_DENIED, PERM_INSUFFICIENT
Validation | VAL_ | VAL_REQUIRED_FIELD, VAL_INVALID_FORMAT
Resource | RES_ | RES_NOT_FOUND, RES_ALREADY_EXISTS
Business Logic | BIZ_ | BIZ_AGENT_OFFLINE, BIZ_TASK_ALREADY_RUNNING
External | EXT_ | EXT_DATABASE_ERROR, EXT_SERVICE_UNAVAILABLE
Rate Limiting | RATE_ | RATE_LIMIT_EXCEEDED, RATE_QUOTA_EXCEEDED
System | SYS_ | SYS_MAINTENANCE, SYS_OVERLOADED
Validation Error Examples
{
"success": false,
"error": {
"code": "VAL_MULTIPLE_ERRORS",
"message": "Multiple validation errors occurred",
"details": [
{
"field": "hostname",
"message": "Hostname is required",
"value": null
},
{
"field": "platform",
"message": "Platform must be one of: linux, windows, macos, bsd",
"value": "unknown"
},
{
"field": "ip_address",
"message": "Invalid IP address format",
"value": "192.168.1"
}
]
}
}
Error Handling Implementation
# Custom exception classes
class APIException(Exception):
def __init__(self, code: str, message: str, status_code: int = 400,
details: List[ErrorDetail] = None):
self.code = code
self.message = message
self.status_code = status_code
self.details = details or []
super().__init__(message)
class ValidationError(APIException):
def __init__(self, details: List[ErrorDetail]):
super().__init__(
code="VAL_MULTIPLE_ERRORS" if len(details) > 1 else "VAL_ERROR",
message="Validation error" + ("s" if len(details) > 1 else ""),
status_code=422,
details=details
)
class NotFoundError(APIException):
def __init__(self, resource: str, identifier: str):
super().__init__(
code="RES_NOT_FOUND",
message=f"{resource} with identifier '{identifier}' not found",
status_code=404
)
class PermissionDeniedError(APIException):
def __init__(self, action: str, resource: str):
super().__init__(
code="PERM_DENIED",
message=f"Permission denied: {action} on {resource}",
status_code=403
)
# Global exception handler
@app.exception_handler(APIException)
async def api_exception_handler(request: Request, exc: APIException):
builder = ResponseBuilder(getattr(request.state, 'request_id', None))
return JSONResponse(
status_code=exc.status_code,
content=builder.error(
code=exc.code,
message=exc.message,
details=exc.details
)
)
@app.exception_handler(ValidationException)
async def validation_exception_handler(request: Request, exc: ValidationException):
builder = ResponseBuilder(getattr(request.state, 'request_id', None))
details = [
ErrorDetail(
field=error['loc'][-1] if error['loc'] else None,
message=error['msg'],
value=error.get('input')
)
for error in exc.errors()
]
return JSONResponse(
status_code=422,
content=builder.error(
code="VAL_MULTIPLE_ERRORS",
message="Request validation failed",
details=details
)
)
@app.exception_handler(500)
async def internal_error_handler(request: Request, exc: Exception):
builder = ResponseBuilder(getattr(request.state, 'request_id', None))
# Log the full exception for debugging
logger.error(f"Internal server error: {exc}", exc_info=True)
return JSONResponse(
status_code=500,
content=builder.error(
code="SYS_INTERNAL_ERROR",
message="An internal server error occurred"
)
)
# Rate limiting error handler
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
builder = ResponseBuilder(getattr(request.state, 'request_id', None))
return JSONResponse(
status_code=429,
content=builder.error(
code="RATE_LIMIT_EXCEEDED",
message=f"Rate limit exceeded: {exc.limit} requests per {exc.window}"
),
headers={
"X-RateLimit-Limit": str(exc.limit),
"X-RateLimit-Remaining": str(exc.remaining),
"X-RateLimit-Reset": str(exc.reset_time)
}
)
API Versioning & Deprecation
Versioning Strategy
URL-based Versioning
SysManage uses URL path versioning for clear, explicit version identification:
# Current version
https://api.sysmanage.example.com/api/v1/agents
# Future version
https://api.sysmanage.example.com/api/v2/agents
# Version-specific features
https://api.sysmanage.example.com/api/v1/agents # Basic agent management
https://api.sysmanage.example.com/api/v2/agents # Enhanced with grouping
https://api.sysmanage.example.com/api/v3/agents # AI-powered insights
Backward Compatibility
β Non-Breaking Changes (Within Version)
- Adding optional request parameters
- Adding fields to response objects
- Adding new endpoints
- Adding new optional headers
- Adding new enum values (with default handling)
β Breaking Changes (Require New Version)
- Removing or renaming fields
- Changing field types or formats
- Making optional parameters required
- Changing endpoint URLs
- Changing authentication mechanisms
- Removing endpoints
Deprecation Process
Version Release Timeline:
v1.0 Released βββββββββΆ v2.0 Released βββββββββΆ v1.0 Deprecated βββββΆ v1.0 Removed
β β β β
β βββββ 18 months βββββΆ β βββββ 6 months βββββΆ β ββββ 6 months ββββΆ β
β β β β
Active Maintenance Deprecated Unsupported
Support Only Warning
Headers
Phase 1: Active Support (18 months)
- Full feature development
- Bug fixes and security updates
- Performance optimizations
Phase 2: Maintenance Only (6 months)
- Security updates only
- Critical bug fixes
- Deprecation warnings added
Phase 3: Deprecated (6 months)
- No updates except security
- Strong deprecation warnings
- Migration guides published
Phase 4: Removed
- API version no longer available
- 410 Gone responses
Deprecation Headers
# Deprecation warning headers
HTTP/1.1 200 OK
Deprecation: true
Sunset: Wed, 11 Nov 2024 23:59:59 GMT
Link: <https://docs.sysmanage.com/api/v2/migration>; rel="successor-version"
Warning: 299 - "This API version is deprecated. Please migrate to v2 by November 2024"
{
"success": true,
"data": { ... },
"meta": {
"deprecation_notice": {
"deprecated": true,
"sunset_date": "2024-11-11T23:59:59Z",
"migration_guide": "https://docs.sysmanage.com/api/v2/migration",
"contact": "api-support@sysmanage.com"
}
}
}
Version Management Implementation
# API version management
from enum import Enum
from datetime import datetime, timezone
class APIVersion(Enum):
V1 = "v1"
V2 = "v2"
class VersionInfo:
def __init__(self, version: APIVersion, status: str, sunset_date: datetime = None):
self.version = version
self.status = status # active, maintenance, deprecated, removed
self.sunset_date = sunset_date
# Version configuration
VERSION_CONFIG = {
APIVersion.V1: VersionInfo(
version=APIVersion.V1,
status="deprecated",
sunset_date=datetime(2024, 11, 11, 23, 59, 59, tzinfo=timezone.utc)
),
APIVersion.V2: VersionInfo(
version=APIVersion.V2,
status="active"
)
}
# Version extraction middleware
@app.middleware("http")
async def version_middleware(request: Request, call_next):
# Extract version from URL path
path_parts = request.url.path.strip('/').split('/')
if len(path_parts) >= 3 and path_parts[0] == 'api':
version_str = path_parts[1]
try:
api_version = APIVersion(version_str)
request.state.api_version = api_version
# Check if version is still supported
version_info = VERSION_CONFIG.get(api_version)
if not version_info:
return JSONResponse(
status_code=400,
content={"error": f"Unsupported API version: {version_str}"}
)
if version_info.status == "removed":
return JSONResponse(
status_code=410,
content={
"error": "This API version has been removed",
"migration_guide": "https://docs.sysmanage.com/api/v2/migration"
}
)
request.state.version_info = version_info
except ValueError:
return JSONResponse(
status_code=400,
content={"error": f"Invalid API version: {version_str}"}
)
response = await call_next(request)
# Add deprecation headers if needed
if hasattr(request.state, 'version_info'):
version_info = request.state.version_info
if version_info.status in ["maintenance", "deprecated"]:
response.headers["Deprecation"] = "true"
if version_info.sunset_date:
response.headers["Sunset"] = version_info.sunset_date.strftime(
"%a, %d %b %Y %H:%M:%S GMT"
)
if version_info.status == "deprecated":
response.headers["Warning"] = (
"299 - \"This API version is deprecated. "
"Please migrate to the latest version.\""
)
return response
# Version-specific endpoint handling
class VersionedEndpoint:
def __init__(self):
self.handlers = {}
def version(self, api_version: APIVersion):
def decorator(func):
self.handlers[api_version] = func
return func
return decorator
async def handle(self, request: Request, *args, **kwargs):
api_version = getattr(request.state, 'api_version', APIVersion.V1)
handler = self.handlers.get(api_version)
if not handler:
# Fall back to latest version or return not supported
latest_version = max(self.handlers.keys(), key=lambda v: v.value)
handler = self.handlers[latest_version]
return await handler(request, *args, **kwargs)
# Usage example
agents_endpoint = VersionedEndpoint()
@agents_endpoint.version(APIVersion.V1)
async def get_agents_v1(request: Request, current_user: User = Depends(get_current_user)):
"""Legacy agent list format"""
agents = await agent_service.list_agents(current_user)
return {
"agents": [
{
"id": agent.id,
"hostname": agent.hostname,
"status": agent.status,
"last_seen": agent.last_seen
}
for agent in agents
]
}
@agents_endpoint.version(APIVersion.V2)
async def get_agents_v2(request: Request, current_user: User = Depends(get_current_user)):
"""Enhanced agent list with grouping and metadata"""
agents = await agent_service.list_agents_with_groups(current_user)
return {
"agents": [
{
"id": agent.id,
"hostname": agent.hostname,
"status": agent.status,
"last_seen": agent.last_seen,
"group": {
"id": agent.group.id,
"name": agent.group.name
} if agent.group else None,
"metadata": agent.metadata,
"health_score": agent.health_score
}
for agent in agents
],
"summary": {
"total": len(agents),
"online": sum(1 for a in agents if a.status == "online"),
"groups": len(set(a.group_id for a in agents if a.group_id))
}
}
@app.get("/api/{version}/agents")
async def list_agents(request: Request):
return await agents_endpoint.handle(request)
Performance Optimization
Caching Strategy
Multi-Level Caching
Client Request β CDN Cache β API Gateway Cache β Application Cache β Database
β β β β β
βΌ βΌ βΌ βΌ βΌ
Static Assets API Responses Auth Tokens Query Results Raw Data
(Images, JS) (1-5 minutes) (15 minutes) (30 seconds) (Source)
TTL: 1 hour TTL: Variable TTL: Token Life TTL: Variable TTL: N/A
Caching Implementation
# Multi-level caching system
import redis
from functools import wraps
import json
import hashlib
from typing import Any, Optional
class CacheManager:
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
self.default_ttl = 300 # 5 minutes
def generate_cache_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate cache key from function arguments"""
key_data = {
'args': args,
'kwargs': sorted(kwargs.items())
}
key_hash = hashlib.md5(json.dumps(key_data, sort_keys=True).encode()).hexdigest()
return f"{prefix}:{key_hash}"
async def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
value = await self.redis.get(key)
return json.loads(value) if value else None
except Exception as e:
logger.warning(f"Cache get error: {e}")
return None
async def set(self, key: str, value: Any, ttl: int = None) -> bool:
"""Set value in cache"""
try:
ttl = ttl or self.default_ttl
serialized = json.dumps(value, default=str)
return await self.redis.setex(key, ttl, serialized)
except Exception as e:
logger.warning(f"Cache set error: {e}")
return False
async def delete(self, pattern: str) -> int:
"""Delete keys matching pattern"""
try:
keys = await self.redis.keys(pattern)
if keys:
return await self.redis.delete(*keys)
return 0
except Exception as e:
logger.warning(f"Cache delete error: {e}")
return 0
def cached(self, prefix: str, ttl: int = None,
invalidate_on: list = None):
"""Decorator for caching function results"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
cache_key = self.generate_cache_key(prefix, *args, **kwargs)
# Try to get from cache
cached_result = await self.get(cache_key)
if cached_result is not None:
return cached_result
# Execute function
result = await func(*args, **kwargs)
# Cache the result
await self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
# Cache usage in API endpoints
cache_manager = CacheManager(redis_client)
@app.get("/api/v1/agents")
@cache_manager.cached("agents_list", ttl=60) # 1 minute cache
async def list_agents(
current_user: User = Depends(get_current_user),
status: Optional[str] = None,
limit: int = 50,
offset: int = 0
):
return await agent_service.list_agents(
user=current_user,
status=status,
limit=limit,
offset=offset
)
@app.get("/api/v1/agents/{agent_id}")
@cache_manager.cached("agent_detail", ttl=30) # 30 second cache
async def get_agent(
agent_id: int,
current_user: User = Depends(get_current_user)
):
return await agent_service.get_agent(agent_id, current_user)
# Cache invalidation on updates
@app.put("/api/v1/agents/{agent_id}")
async def update_agent(
agent_id: int,
agent_data: AgentUpdate,
current_user: User = Depends(get_current_user)
):
# Update agent
updated_agent = await agent_service.update_agent(
agent_id, agent_data, current_user
)
# Invalidate related caches
await cache_manager.delete(f"agent_detail:*{agent_id}*")
await cache_manager.delete("agents_list:*")
return updated_agent
Database Query Optimization
Query Patterns
# Optimized database queries
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import and_, or_, func
class OptimizedAgentService:
def __init__(self, db_session):
self.db = db_session
async def list_agents_with_stats(self, user: User, filters: dict = None):
"""Optimized agent listing with statistics"""
# Base query with necessary joins
query = (
select(Agent)
.options(
joinedload(Agent.group), # Avoid N+1 queries for groups
selectinload(Agent.packages).options(
# Only load package counts, not full objects
load_only(Package.id, Package.status)
)
)
.where(Agent.deleted_at.is_(None)) # Soft delete filter
)
# Apply user-based permissions
accessible_agent_ids = await self.get_accessible_agent_ids(user)
if accessible_agent_ids:
query = query.where(Agent.id.in_(accessible_agent_ids))
# Apply filters efficiently
if filters:
if 'status' in filters:
query = query.where(Agent.status == filters['status'])
if 'platform' in filters:
query = query.where(Agent.platform == filters['platform'])
if 'search' in filters:
search_term = f"%{filters['search']}%"
query = query.where(
or_(
Agent.hostname.ilike(search_term),
Agent.ip_address.cast(String).like(search_term)
)
)
# Get total count efficiently (without loading all data)
count_query = select(func.count(Agent.id)).select_from(
query.subquery()
)
total_count = await self.db.scalar(count_query)
# Apply pagination
if 'limit' in filters:
query = query.limit(filters['limit'])
if 'offset' in filters:
query = query.offset(filters['offset'])
# Execute query
result = await self.db.execute(query)
agents = result.unique().scalars().all()
# Calculate statistics in a single query
stats = await self.get_agent_statistics(accessible_agent_ids)
return {
'agents': agents,
'total': total_count,
'statistics': stats
}
async def get_agent_statistics(self, agent_ids: List[int]):
"""Get agent statistics in a single optimized query"""
stats_query = (
select(
Agent.status,
func.count(Agent.id).label('count'),
func.avg(Agent.cpu_usage).label('avg_cpu'),
func.avg(Agent.memory_usage).label('avg_memory')
)
.where(Agent.id.in_(agent_ids))
.group_by(Agent.status)
)
result = await self.db.execute(stats_query)
stats_by_status = {row.status: {
'count': row.count,
'avg_cpu': row.avg_cpu,
'avg_memory': row.avg_memory
} for row in result}
return {
'by_status': stats_by_status,
'total': sum(s['count'] for s in stats_by_status.values()),
'online': stats_by_status.get('online', {}).get('count', 0)
}
# Bulk operations for efficiency
class BulkOperations:
def __init__(self, db_session):
self.db = db_session
async def bulk_update_agent_status(self, agent_updates: List[dict]):
"""Efficiently update multiple agents"""
# Use bulk update for better performance
stmt = (
update(Agent)
.where(Agent.id == bindparam('agent_id'))
.values(
status=bindparam('status'),
last_seen=bindparam('last_seen'),
updated_at=func.now()
)
)
await self.db.execute(stmt, agent_updates)
await self.db.commit()
async def bulk_create_metrics(self, metrics: List[dict]):
"""Efficiently insert multiple metrics"""
# Use bulk insert with ON CONFLICT handling
stmt = insert(Metric).values(metrics)
# Handle conflicts for duplicate timestamps
stmt = stmt.on_conflict_do_update(
index_elements=['agent_id', 'metric_name', 'timestamp'],
set_={
'value': stmt.excluded.value,
'updated_at': func.now()
}
)
await self.db.execute(stmt)
await self.db.commit()
Rate Limiting
Adaptive Rate Limiting
# Advanced rate limiting implementation
from typing import Dict, Optional
import time
import asyncio
from dataclasses import dataclass
@dataclass
class RateLimit:
requests: int
window_seconds: int
burst_requests: int = None
class AdaptiveRateLimiter:
def __init__(self, redis_client):
self.redis = redis_client
self.default_limits = {
'user': RateLimit(1000, 3600, 50), # 1000/hour, burst 50
'api_key': RateLimit(10000, 3600, 200), # 10000/hour, burst 200
'ip': RateLimit(100, 3600, 10) # 100/hour, burst 10
}
async def check_rate_limit(self, identifier: str,
limit_type: str = 'user') -> dict:
"""Check if request is within rate limits"""
limit = self.default_limits.get(limit_type)
if not limit:
return {'allowed': True}
now = time.time()
window_start = int(now // limit.window_seconds) * limit.window_seconds
# Keys for tracking
window_key = f"rate_limit:{identifier}:{window_start}"
burst_key = f"rate_limit_burst:{identifier}"
# Use Redis pipeline for atomic operations
async with self.redis.pipeline() as pipe:
pipe.incr(window_key)
pipe.expire(window_key, limit.window_seconds)
pipe.incr(burst_key)
pipe.expire(burst_key, 60) # 1-minute burst window
results = await pipe.execute()
window_count = results[0]
burst_count = results[2]
# Check limits
window_exceeded = window_count > limit.requests
burst_exceeded = (limit.burst_requests and
burst_count > limit.burst_requests)
if window_exceeded or burst_exceeded:
reset_time = window_start + limit.window_seconds
return {
'allowed': False,
'limit': limit.requests,
'remaining': max(0, limit.requests - window_count),
'reset_time': reset_time,
'retry_after': max(1, reset_time - now)
}
return {
'allowed': True,
'limit': limit.requests,
'remaining': limit.requests - window_count,
'reset_time': window_start + limit.window_seconds
}
# Rate limiting middleware
@app.middleware("http")
async def rate_limiting_middleware(request: Request, call_next):
# Skip rate limiting for health checks
if request.url.path in ['/health', '/metrics']:
return await call_next(request)
# Determine rate limit identifier
identifier = None
limit_type = 'ip'
# Check for API key
api_key = request.headers.get('X-API-Key')
if api_key:
identifier = f"api_key:{api_key}"
limit_type = 'api_key'
else:
# Check for authenticated user
auth_header = request.headers.get('Authorization')
if auth_header and auth_header.startswith('Bearer '):
try:
token = auth_header.split(' ')[1]
payload = jwt.decode(token, verify=False) # Just for rate limiting
identifier = f"user:{payload.get('user_id')}"
limit_type = 'user'
except:
pass
# Fall back to IP-based limiting
if not identifier:
# Get real IP (considering proxies)
ip = request.headers.get('X-Forwarded-For', '').split(',')[0].strip()
if not ip:
ip = request.headers.get('X-Real-IP', '')
if not ip:
ip = request.client.host
identifier = f"ip:{ip}"
# Check rate limit
rate_limiter = get_rate_limiter()
result = await rate_limiter.check_rate_limit(identifier, limit_type)
if not result['allowed']:
return JSONResponse(
status_code=429,
content={
"error": "Rate limit exceeded",
"retry_after": result['retry_after']
},
headers={
"X-RateLimit-Limit": str(result['limit']),
"X-RateLimit-Remaining": "0",
"X-RateLimit-Reset": str(int(result['reset_time'])),
"Retry-After": str(int(result['retry_after']))
}
)
# Continue with request
response = await call_next(request)
# Add rate limit headers to response
if 'limit' in result:
response.headers["X-RateLimit-Limit"] = str(result['limit'])
response.headers["X-RateLimit-Remaining"] = str(result['remaining'])
response.headers["X-RateLimit-Reset"] = str(int(result['reset_time']))
return response