Documentation > Architecture > REST API Design

REST API Design

REST API design patterns, endpoint structure, authentication, error handling, and best practices in SysManage.

API Design Philosophy

SysManage's REST API follows industry best practices and RESTful principles to provide a consistent, intuitive, and powerful interface for system management operations. The API is designed to be self-documenting, version-safe, and developer-friendly.

Core Design Principles

🎯 Resource-Oriented

URLs represent resources, not actions. Clear resource hierarchy with logical nesting.

πŸ”’ Security First

Authentication, authorization, and input validation on every endpoint.

πŸ“Š Consistent Responses

Standardized response formats, error codes, and pagination across all endpoints.

πŸ“ˆ Performance Optimized

Efficient queries, caching, rate limiting, and bulk operations support.

πŸ”„ Version Safe

Backward compatibility, gradual deprecation, and clear migration paths.

πŸ“ Self-Documenting

OpenAPI specification, comprehensive examples, and interactive documentation.

API Architecture Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                        API Gateway                             β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚ Rate        β”‚  β”‚ Auth &      β”‚  β”‚ Request Validation      β”‚ β”‚
β”‚  β”‚ Limiting    β”‚  β”‚ AuthZ       β”‚  β”‚ & Sanitization          β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚                     API Routing Layer                          β”‚
β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚ /api/v1/    β”‚  β”‚ /api/v1/    β”‚  β”‚ /api/v1/                β”‚ β”‚
β”‚  β”‚ auth        β”‚  β”‚ agents      β”‚  β”‚ tasks                   β”‚ β”‚
β”‚  β”‚             β”‚  β”‚             β”‚  β”‚                         β”‚ β”‚
β”‚  β”‚ β€’ login     β”‚  β”‚ β€’ CRUD ops  β”‚  β”‚ β€’ CRUD ops              β”‚ β”‚
β”‚  β”‚ β€’ logout    β”‚  β”‚ β€’ actions   β”‚  β”‚ β€’ execution             β”‚ β”‚
β”‚  β”‚ β€’ refresh   β”‚  β”‚ β€’ metrics   β”‚  β”‚ β€’ scheduling            β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚
β”‚  β”‚ /api/v1/    β”‚  β”‚ /api/v1/    β”‚  β”‚ /api/v1/                β”‚ β”‚
β”‚  β”‚ inventory   β”‚  β”‚ config      β”‚  β”‚ metrics                 β”‚ β”‚
β”‚  β”‚             β”‚  β”‚             β”‚  β”‚                         β”‚ β”‚
β”‚  β”‚ β€’ packages  β”‚  β”‚ β€’ settings  β”‚  β”‚ β€’ collection            β”‚ β”‚
β”‚  β”‚ β€’ services  β”‚  β”‚ β€’ policies  β”‚  β”‚ β€’ aggregation           β”‚ β”‚
β”‚  β”‚ β€’ hardware  β”‚  β”‚ β€’ templates β”‚  β”‚ β€’ alerts                β”‚ β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                               β”‚
                               β–Ό
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚  Business Logic     β”‚
                    β”‚  Layer              β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                               β”‚
                               β–Ό
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚  Data Access        β”‚
                    β”‚  Layer              β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        

URL Structure & Resource Hierarchy

URL Convention

SysManage follows a consistent URL pattern that clearly expresses resource relationships:

https://api.sysmanage.example.com/api/v1/{resource}[/{id}][/{sub-resource}][/{sub-id}]

Examples:
GET    /api/v1/agents                           # List all agents
GET    /api/v1/agents/123                       # Get specific agent
GET    /api/v1/agents/123/packages              # List packages on agent
POST   /api/v1/agents/123/tasks                 # Create task for agent
GET    /api/v1/agents/123/tasks/456             # Get specific task
PUT    /api/v1/agents/123/tasks/456/cancel      # Cancel specific task
                            

Resource Hierarchy

/api/v1/
β”œβ”€β”€ auth/                           # Authentication endpoints
β”‚   β”œβ”€β”€ login                       # POST: User login
β”‚   β”œβ”€β”€ logout                      # POST: User logout
β”‚   β”œβ”€β”€ refresh                     # POST: Token refresh
β”‚   └── profile                     # GET/PUT: User profile
β”‚
β”œβ”€β”€ agents/                         # Agent management
β”‚   β”œβ”€β”€ {id}/                       # Specific agent operations
β”‚   β”‚   β”œβ”€β”€ packages/               # Package inventory
β”‚   β”‚   β”œβ”€β”€ services/               # Service status
β”‚   β”‚   β”œβ”€β”€ tasks/                  # Agent-specific tasks
β”‚   β”‚   β”œβ”€β”€ metrics/                # Agent metrics
β”‚   β”‚   └── actions/                # Agent actions (reboot, etc.)
β”‚   β”‚       β”œβ”€β”€ reboot
β”‚   β”‚       β”œβ”€β”€ shutdown
β”‚   β”‚       └── update-agent
β”‚   └── groups/                     # Agent grouping
β”‚       β”œβ”€β”€ {id}/
β”‚       └── {id}/members/
β”‚
β”œβ”€β”€ tasks/                          # Task management
β”‚   β”œβ”€β”€ {id}/                       # Specific task operations
β”‚   β”‚   β”œβ”€β”€ cancel                  # POST: Cancel task
β”‚   β”‚   β”œβ”€β”€ retry                   # POST: Retry failed task
β”‚   β”‚   β”œβ”€β”€ logs                    # GET: Task execution logs
β”‚   β”‚   └── artifacts/              # Task output artifacts
β”‚   β”œβ”€β”€ schedules/                  # Scheduled tasks
β”‚   β”‚   β”œβ”€β”€ {id}/
β”‚   β”‚   └── {id}/executions/
β”‚   └── templates/                  # Task templates
β”‚       └── {id}/
β”‚
β”œβ”€β”€ inventory/                      # System inventory
β”‚   β”œβ”€β”€ packages/                   # Package management
β”‚   β”‚   β”œβ”€β”€ search                  # GET: Search packages
β”‚   β”‚   β”œβ”€β”€ updates                 # GET: Available updates
β”‚   β”‚   └── vulnerabilities         # GET: Security issues
β”‚   β”œβ”€β”€ services/                   # Service management
β”‚   β”‚   └── {name}/                 # Specific service
β”‚   └── hardware/                   # Hardware inventory
β”‚       β”œβ”€β”€ cpu/
β”‚       β”œβ”€β”€ memory/
β”‚       └── storage/
β”‚
β”œβ”€β”€ metrics/                        # Metrics and monitoring
β”‚   β”œβ”€β”€ query                       # POST: Custom metric queries
β”‚   β”œβ”€β”€ dashboards/                 # Dashboard management
β”‚   β”‚   └── {id}/
β”‚   └── alerts/                     # Alert management
β”‚       β”œβ”€β”€ {id}/
β”‚       β”œβ”€β”€ rules/                  # Alert rules
β”‚       └── notifications/          # Notification channels
β”‚
β”œβ”€β”€ config/                         # Configuration management
β”‚   β”œβ”€β”€ settings/                   # System settings
β”‚   β”œβ”€β”€ policies/                   # Security policies
β”‚   β”‚   └── {id}/
β”‚   └── certificates/               # Certificate management
β”‚       β”œβ”€β”€ {id}/
β”‚       β”œβ”€β”€ ca/                     # Certificate Authority
β”‚       └── revoked/                # Revoked certificates
β”‚
└── admin/                          # Administrative endpoints
    β”œβ”€β”€ users/                      # User management
    β”‚   β”œβ”€β”€ {id}/
    β”‚   └── {id}/roles/
    β”œβ”€β”€ roles/                      # Role management
    β”‚   └── {id}/
    β”œβ”€β”€ audit/                      # Audit logs
    └── system/                     # System status
        β”œβ”€β”€ health                  # Health check
        β”œβ”€β”€ version                 # Version info
        └── status                  # System status
                            

HTTP Method Usage

GET

Purpose: Retrieve resources

  • Safe and idempotent
  • Cacheable responses
  • No request body
  • Supports filtering and pagination
GET /api/v1/agents?status=online&limit=50&offset=0
GET /api/v1/agents/123/packages?search=nginx
                                

POST

Purpose: Create resources or trigger actions

  • Not idempotent
  • Request body contains data
  • Returns 201 for creation, 200 for actions
POST /api/v1/agents/123/tasks
POST /api/v1/agents/123/actions/reboot
POST /api/v1/auth/login
                                

PUT

Purpose: Replace entire resource

  • Idempotent
  • Complete resource replacement
  • Creates if doesn't exist
PUT /api/v1/agents/123
PUT /api/v1/config/settings/email
                                

PATCH

Purpose: Partial resource updates

  • Idempotent
  • Partial modifications
  • JSON Patch or merge semantics
PATCH /api/v1/agents/123
PATCH /api/v1/tasks/456
                                

DELETE

Purpose: Remove resources

  • Idempotent
  • No request body
  • Returns 204 on success
DELETE /api/v1/agents/123
DELETE /api/v1/tasks/456
                                

Authentication & Authorization

Authentication Flow

Client                     Auth Server                 API Server
  β”‚                           β”‚                           β”‚
  β”‚ 1. Login Request          β”‚                           β”‚
  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Άβ”‚                           β”‚
  β”‚   POST /api/v1/auth/login β”‚                           β”‚
  β”‚   { username, password }   β”‚                           β”‚
  β”‚                           β”‚                           β”‚
  β”‚ 2. Validate Credentials   β”‚                           β”‚
  β”‚                           β”‚ ← ← ← ← ← ← ← ← ← ← ← ← β”‚
  β”‚                           β”‚   Check User Database     β”‚
  β”‚                           β”‚                           β”‚
  β”‚ 3. JWT Tokens             β”‚                           β”‚
  β”‚ ◀──────────────────────── β”‚                           β”‚
  β”‚   { access_token,         β”‚                           β”‚
  β”‚     refresh_token }       β”‚                           β”‚
  β”‚                           β”‚                           β”‚
  β”‚ 4. API Request            β”‚                           β”‚
  β”œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Άβ”‚
  β”‚   GET /api/v1/agents      β”‚                           β”‚
  β”‚   Authorization: Bearer   β”‚                           β”‚
  β”‚   {access_token}          β”‚                           β”‚
  β”‚                           β”‚                           β”‚
  β”‚                           β”‚ 5. Validate Token         β”‚
  β”‚                           β”‚ ◀─────────────────────────│
  β”‚                           β”‚                           β”‚
  β”‚ 6. API Response           β”‚                           β”‚
  β”‚ ◀─────────────────────────────────────────────────────│
  β”‚   { agents: [...] }       β”‚                           β”‚
                        

JWT Token Structure

{
  "header": {
    "alg": "RS256",
    "typ": "JWT",
    "kid": "sysmanage-2024-01"
  },
  "payload": {
    "iss": "sysmanage.example.com",
    "sub": "user-123",
    "aud": "sysmanage-api",
    "exp": 1642694400,
    "iat": 1642608000,
    "jti": "token-uuid-123",
    "user_id": 123,
    "username": "admin",
    "roles": ["admin", "operator"],
    "permissions": {
      "agents": ["read", "write", "delete"],
      "tasks": ["read", "write", "execute"],
      "metrics": ["read"],
      "admin": ["read", "write"]
    },
    "session_id": "session-uuid-456"
  }
}
                        

Token Management Implementation

# FastAPI authentication implementation
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer
from jose import JWTError, jwt
from datetime import datetime, timedelta

security = HTTPBearer()

class AuthenticationService:
    def __init__(self, secret_key: str, algorithm: str = "RS256"):
        self.secret_key = secret_key
        self.algorithm = algorithm
        self.access_token_expire_minutes = 15
        self.refresh_token_expire_days = 30

    async def authenticate_user(self, username: str, password: str) -> Optional[User]:
        """Authenticate user credentials"""
        user = await get_user_by_username(username)
        if not user or not verify_password(password, user.password_hash):
            return None
        return user

    def create_access_token(self, user: User) -> str:
        """Create JWT access token"""
        expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes)

        payload = {
            "iss": "sysmanage.example.com",
            "sub": str(user.id),
            "aud": "sysmanage-api",
            "exp": expire,
            "iat": datetime.utcnow(),
            "jti": str(uuid.uuid4()),
            "user_id": user.id,
            "username": user.username,
            "roles": [role.name for role in user.roles],
            "permissions": await self.get_user_permissions(user),
            "session_id": str(uuid.uuid4())
        }

        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)

    def create_refresh_token(self, user: User) -> str:
        """Create JWT refresh token"""
        expire = datetime.utcnow() + timedelta(days=self.refresh_token_expire_days)

        payload = {
            "iss": "sysmanage.example.com",
            "sub": str(user.id),
            "aud": "sysmanage-refresh",
            "exp": expire,
            "iat": datetime.utcnow(),
            "jti": str(uuid.uuid4()),
            "token_type": "refresh"
        }

        return jwt.encode(payload, self.secret_key, algorithm=self.algorithm)

    async def verify_token(self, token: str) -> dict:
        """Verify and decode JWT token"""
        try:
            payload = jwt.decode(
                token,
                self.secret_key,
                algorithms=[self.algorithm],
                audience="sysmanage-api"
            )

            # Check if token is expired
            if datetime.utcnow() > datetime.fromtimestamp(payload['exp']):
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Token expired"
                )

            # Check if session is still valid
            if not await self.is_session_valid(payload['session_id']):
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Session invalid"
                )

            return payload

        except JWTError:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token"
            )

# Dependency for requiring authentication
async def get_current_user(token: str = Depends(security)) -> User:
    """Get current authenticated user"""
    auth_service = get_auth_service()
    payload = await auth_service.verify_token(token.credentials)

    user = await get_user_by_id(payload['user_id'])
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found"
        )

    return user

# Role-based authorization decorator
def require_permission(resource: str, action: str):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            user = kwargs.get('current_user')
            if not user:
                raise HTTPException(
                    status_code=status.HTTP_401_UNAUTHORIZED,
                    detail="Authentication required"
                )

            if not await user.has_permission(resource, action):
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"Permission denied: {action} on {resource}"
                )

            return await func(*args, **kwargs)
        return wrapper
    return decorator

# Usage in API endpoints
@app.post("/api/v1/auth/login")
async def login(credentials: LoginCredentials):
    user = await auth_service.authenticate_user(
        credentials.username,
        credentials.password
    )

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid credentials"
        )

    access_token = auth_service.create_access_token(user)
    refresh_token = auth_service.create_refresh_token(user)

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
        "expires_in": auth_service.access_token_expire_minutes * 60
    }

@app.get("/api/v1/agents")
@require_permission("agents", "read")
async def list_agents(
    current_user: User = Depends(get_current_user),
    limit: int = 50,
    offset: int = 0
):
    return await agent_service.list_agents(
        user=current_user,
        limit=limit,
        offset=offset
    )
                        

Role-Based Access Control (RBAC)

Permission Model

Permissions Hierarchy:

Organization
β”œβ”€β”€ Global Admin
β”‚   β”œβ”€β”€ All Resources: [create, read, update, delete, execute]
β”‚   └── All Administrative Functions
β”‚
β”œβ”€β”€ Team Admin
β”‚   β”œβ”€β”€ Team Agents: [create, read, update, delete]
β”‚   β”œβ”€β”€ Team Tasks: [create, read, update, execute]
β”‚   └── Team Users: [read, update]
β”‚
β”œβ”€β”€ Operator
β”‚   β”œβ”€β”€ Assigned Agents: [read, update]
β”‚   β”œβ”€β”€ Tasks: [create, read, execute]
β”‚   └── Metrics: [read]
β”‚
β”œβ”€β”€ Read-Only User
β”‚   β”œβ”€β”€ Assigned Agents: [read]
β”‚   β”œβ”€β”€ Tasks: [read]
β”‚   └── Metrics: [read]
β”‚
└── Service Account
    β”œβ”€β”€ API Access: [specific endpoints]
    └── Automated Tasks: [execute]
                            

Permission Checking Implementation

# Permission checking system
class PermissionChecker:
    def __init__(self, user: User):
        self.user = user
        self.permissions = self._load_user_permissions()

    def _load_user_permissions(self) -> dict:
        """Load user permissions from roles and direct assignments"""
        permissions = {}

        # Aggregate permissions from all roles
        for role in self.user.roles:
            for resource, actions in role.permissions.items():
                if resource not in permissions:
                    permissions[resource] = set()
                permissions[resource].update(actions)

        # Add direct user permissions
        for resource, actions in self.user.direct_permissions.items():
            if resource not in permissions:
                permissions[resource] = set()
            permissions[resource].update(actions)

        return permissions

    async def can_access_resource(self, resource_type: str, resource_id: str,
                                action: str) -> bool:
        """Check if user can perform action on specific resource"""

        # Check global permissions
        if self.has_global_permission(resource_type, action):
            return True

        # Check resource-specific permissions
        return await self.has_resource_permission(
            resource_type, resource_id, action
        )

    def has_global_permission(self, resource_type: str, action: str) -> bool:
        """Check if user has global permission for resource type"""
        return (resource_type in self.permissions and
                action in self.permissions[resource_type])

    async def has_resource_permission(self, resource_type: str,
                                   resource_id: str, action: str) -> bool:
        """Check resource-specific permissions (team/group based)"""

        if resource_type == "agents":
            agent = await get_agent_by_id(resource_id)
            if not agent:
                return False

            # Check if user has access to agent's group
            for role in self.user.roles:
                if role.scope_type == "group" and role.scope_id == agent.group_id:
                    return action in role.permissions.get("agents", [])

            # Check if user is assigned to specific agent
            return await self.is_user_assigned_to_agent(self.user.id, resource_id)

        elif resource_type == "tasks":
            task = await get_task_by_id(resource_id)
            if not task:
                return False

            # Check if user created the task or is assigned to it
            if task.created_by == self.user.id or task.assigned_to == self.user.id:
                return True

            # Check agent permissions for the task's target agent
            if task.agent_id:
                return await self.has_resource_permission(
                    "agents", str(task.agent_id), action
                )

        return False

# Middleware for automatic permission checking
@app.middleware("http")
async def permission_middleware(request: Request, call_next):
    # Skip permission checks for public endpoints
    if request.url.path in PUBLIC_ENDPOINTS:
        return await call_next(request)

    # Extract resource type and ID from URL
    path_parts = request.url.path.strip('/').split('/')
    if len(path_parts) >= 4 and path_parts[0] == 'api' and path_parts[1] == 'v1':
        resource_type = path_parts[2]
        resource_id = path_parts[3] if len(path_parts) > 3 else None

        # Get current user from token
        user = await get_current_user_from_request(request)
        if not user:
            return JSONResponse(
                status_code=401,
                content={"error": "Authentication required"}
            )

        # Map HTTP method to action
        action_map = {
            "GET": "read",
            "POST": "create",
            "PUT": "update",
            "PATCH": "update",
            "DELETE": "delete"
        }
        action = action_map.get(request.method, "read")

        # Check permissions
        checker = PermissionChecker(user)
        if resource_id:
            has_permission = await checker.can_access_resource(
                resource_type, resource_id, action
            )
        else:
            has_permission = checker.has_global_permission(resource_type, action)

        if not has_permission:
            return JSONResponse(
                status_code=403,
                content={
                    "error": "Permission denied",
                    "details": f"Action '{action}' not allowed on '{resource_type}'"
                }
            )

    return await call_next(request)
                        

Response Formats & Standards

Standard Response Format

All API responses follow a consistent structure for predictable client integration:

Success Response Structure

{
  "success": true,
  "data": {
    // Actual response data
  },
  "meta": {
    "timestamp": "2024-01-15T10:30:00Z",
    "request_id": "req-123e4567-e89b-12d3-a456-426614174000",
    "api_version": "1.0",
    "execution_time_ms": 45
  },
  "pagination": {  // Only for paginated responses
    "page": 1,
    "per_page": 50,
    "total": 247,
    "total_pages": 5,
    "has_next": true,
    "has_prev": false
  }
}
                        

Error Response Structure

{
  "success": false,
  "error": {
    "code": "VALIDATION_ERROR",
    "message": "Invalid input provided",
    "details": [
      {
        "field": "agent_id",
        "message": "Agent ID must be a positive integer",
        "value": "invalid-id"
      }
    ],
    "help_url": "https://docs.sysmanage.com/api/errors#validation-error"
  },
  "meta": {
    "timestamp": "2024-01-15T10:30:00Z",
    "request_id": "req-123e4567-e89b-12d3-a456-426614174000",
    "api_version": "1.0"
  }
}
                        

Response Implementation

# Response formatting utilities
from typing import Any, Optional, Dict, List
from pydantic import BaseModel
from datetime import datetime
import time
import uuid

class PaginationMeta(BaseModel):
    page: int
    per_page: int
    total: int
    total_pages: int
    has_next: bool
    has_prev: bool

class ResponseMeta(BaseModel):
    timestamp: datetime
    request_id: str
    api_version: str
    execution_time_ms: Optional[int] = None

class ErrorDetail(BaseModel):
    field: Optional[str] = None
    message: str
    value: Optional[Any] = None

class ErrorInfo(BaseModel):
    code: str
    message: str
    details: Optional[List[ErrorDetail]] = None
    help_url: Optional[str] = None

class APIResponse(BaseModel):
    success: bool
    data: Optional[Any] = None
    error: Optional[ErrorInfo] = None
    meta: ResponseMeta
    pagination: Optional[PaginationMeta] = None

class ResponseBuilder:
    def __init__(self, request_id: str = None):
        self.request_id = request_id or str(uuid.uuid4())
        self.start_time = time.time()

    def success(self, data: Any = None, pagination: PaginationMeta = None) -> dict:
        """Build successful response"""
        execution_time = int((time.time() - self.start_time) * 1000)

        response = APIResponse(
            success=True,
            data=data,
            meta=ResponseMeta(
                timestamp=datetime.utcnow(),
                request_id=self.request_id,
                api_version="1.0",
                execution_time_ms=execution_time
            ),
            pagination=pagination
        )

        return response.dict(exclude_none=True)

    def error(self, code: str, message: str, details: List[ErrorDetail] = None,
              status_code: int = 400) -> dict:
        """Build error response"""

        help_url = f"https://docs.sysmanage.com/api/errors#{code.lower().replace('_', '-')}"

        response = APIResponse(
            success=False,
            error=ErrorInfo(
                code=code,
                message=message,
                details=details,
                help_url=help_url
            ),
            meta=ResponseMeta(
                timestamp=datetime.utcnow(),
                request_id=self.request_id,
                api_version="1.0"
            )
        )

        return response.dict(exclude_none=True)

# FastAPI response formatting middleware
@app.middleware("http")
async def response_formatting_middleware(request: Request, call_next):
    # Generate request ID
    request_id = str(uuid.uuid4())
    request.state.request_id = request_id
    request.state.start_time = time.time()

    # Add request ID to response headers
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id

    return response

# Usage in API endpoints
@app.get("/api/v1/agents/{agent_id}")
async def get_agent(
    agent_id: int,
    request: Request,
    current_user: User = Depends(get_current_user)
):
    builder = ResponseBuilder(request.state.request_id)

    try:
        # Validate agent ID
        if agent_id <= 0:
            return JSONResponse(
                status_code=400,
                content=builder.error(
                    code="INVALID_AGENT_ID",
                    message="Agent ID must be a positive integer",
                    details=[ErrorDetail(
                        field="agent_id",
                        message="Must be greater than 0",
                        value=agent_id
                    )]
                )
            )

        # Get agent
        agent = await agent_service.get_agent(agent_id, current_user)
        if not agent:
            return JSONResponse(
                status_code=404,
                content=builder.error(
                    code="AGENT_NOT_FOUND",
                    message=f"Agent with ID {agent_id} not found"
                )
            )

        # Return success response
        return builder.success(data=agent.dict())

    except PermissionError:
        return JSONResponse(
            status_code=403,
            content=builder.error(
                code="PERMISSION_DENIED",
                message="You don't have permission to access this agent"
            )
        )
    except Exception as e:
        logger.error(f"Unexpected error in get_agent: {e}")
        return JSONResponse(
            status_code=500,
            content=builder.error(
                code="INTERNAL_ERROR",
                message="An unexpected error occurred"
            )
        )
                        

Error Handling

HTTP Status Codes

2xx Success

  • 200 OK: Successful GET, PUT, PATCH, or action
  • 201 Created: Successful POST that creates resource
  • 202 Accepted: Async operation started
  • 204 No Content: Successful DELETE or empty result

4xx Client Errors

  • 400 Bad Request: Invalid request format or parameters
  • 401 Unauthorized: Authentication required or invalid
  • 403 Forbidden: Permission denied
  • 404 Not Found: Resource doesn't exist
  • 409 Conflict: Resource state conflict
  • 422 Unprocessable Entity: Validation errors
  • 429 Too Many Requests: Rate limit exceeded

5xx Server Errors

  • 500 Internal Server Error: Unexpected server error
  • 502 Bad Gateway: Upstream service error
  • 503 Service Unavailable: Temporary unavailability
  • 504 Gateway Timeout: Upstream timeout

Error Code System

Error Code Categories

Category         | Prefix | Examples
-----------------|--------|------------------------------------------
Authentication   | AUTH_  | AUTH_INVALID_TOKEN, AUTH_EXPIRED_TOKEN
Authorization    | PERM_  | PERM_DENIED, PERM_INSUFFICIENT
Validation       | VAL_   | VAL_REQUIRED_FIELD, VAL_INVALID_FORMAT
Resource         | RES_   | RES_NOT_FOUND, RES_ALREADY_EXISTS
Business Logic   | BIZ_   | BIZ_AGENT_OFFLINE, BIZ_TASK_ALREADY_RUNNING
External         | EXT_   | EXT_DATABASE_ERROR, EXT_SERVICE_UNAVAILABLE
Rate Limiting    | RATE_  | RATE_LIMIT_EXCEEDED, RATE_QUOTA_EXCEEDED
System           | SYS_   | SYS_MAINTENANCE, SYS_OVERLOADED
                            

Validation Error Examples

{
  "success": false,
  "error": {
    "code": "VAL_MULTIPLE_ERRORS",
    "message": "Multiple validation errors occurred",
    "details": [
      {
        "field": "hostname",
        "message": "Hostname is required",
        "value": null
      },
      {
        "field": "platform",
        "message": "Platform must be one of: linux, windows, macos, bsd",
        "value": "unknown"
      },
      {
        "field": "ip_address",
        "message": "Invalid IP address format",
        "value": "192.168.1"
      }
    ]
  }
}
                            

Error Handling Implementation

# Custom exception classes
class APIException(Exception):
    def __init__(self, code: str, message: str, status_code: int = 400,
                 details: List[ErrorDetail] = None):
        self.code = code
        self.message = message
        self.status_code = status_code
        self.details = details or []
        super().__init__(message)

class ValidationError(APIException):
    def __init__(self, details: List[ErrorDetail]):
        super().__init__(
            code="VAL_MULTIPLE_ERRORS" if len(details) > 1 else "VAL_ERROR",
            message="Validation error" + ("s" if len(details) > 1 else ""),
            status_code=422,
            details=details
        )

class NotFoundError(APIException):
    def __init__(self, resource: str, identifier: str):
        super().__init__(
            code="RES_NOT_FOUND",
            message=f"{resource} with identifier '{identifier}' not found",
            status_code=404
        )

class PermissionDeniedError(APIException):
    def __init__(self, action: str, resource: str):
        super().__init__(
            code="PERM_DENIED",
            message=f"Permission denied: {action} on {resource}",
            status_code=403
        )

# Global exception handler
@app.exception_handler(APIException)
async def api_exception_handler(request: Request, exc: APIException):
    builder = ResponseBuilder(getattr(request.state, 'request_id', None))

    return JSONResponse(
        status_code=exc.status_code,
        content=builder.error(
            code=exc.code,
            message=exc.message,
            details=exc.details
        )
    )

@app.exception_handler(ValidationException)
async def validation_exception_handler(request: Request, exc: ValidationException):
    builder = ResponseBuilder(getattr(request.state, 'request_id', None))

    details = [
        ErrorDetail(
            field=error['loc'][-1] if error['loc'] else None,
            message=error['msg'],
            value=error.get('input')
        )
        for error in exc.errors()
    ]

    return JSONResponse(
        status_code=422,
        content=builder.error(
            code="VAL_MULTIPLE_ERRORS",
            message="Request validation failed",
            details=details
        )
    )

@app.exception_handler(500)
async def internal_error_handler(request: Request, exc: Exception):
    builder = ResponseBuilder(getattr(request.state, 'request_id', None))

    # Log the full exception for debugging
    logger.error(f"Internal server error: {exc}", exc_info=True)

    return JSONResponse(
        status_code=500,
        content=builder.error(
            code="SYS_INTERNAL_ERROR",
            message="An internal server error occurred"
        )
    )

# Rate limiting error handler
@app.exception_handler(RateLimitExceeded)
async def rate_limit_handler(request: Request, exc: RateLimitExceeded):
    builder = ResponseBuilder(getattr(request.state, 'request_id', None))

    return JSONResponse(
        status_code=429,
        content=builder.error(
            code="RATE_LIMIT_EXCEEDED",
            message=f"Rate limit exceeded: {exc.limit} requests per {exc.window}"
        ),
        headers={
            "X-RateLimit-Limit": str(exc.limit),
            "X-RateLimit-Remaining": str(exc.remaining),
            "X-RateLimit-Reset": str(exc.reset_time)
        }
    )
                        

API Versioning & Deprecation

Versioning Strategy

URL-based Versioning

SysManage uses URL path versioning for clear, explicit version identification:

# Current version
https://api.sysmanage.example.com/api/v1/agents

# Future version
https://api.sysmanage.example.com/api/v2/agents

# Version-specific features
https://api.sysmanage.example.com/api/v1/agents          # Basic agent management
https://api.sysmanage.example.com/api/v2/agents          # Enhanced with grouping
https://api.sysmanage.example.com/api/v3/agents          # AI-powered insights
                        

Backward Compatibility

βœ… Non-Breaking Changes (Within Version)
  • Adding optional request parameters
  • Adding fields to response objects
  • Adding new endpoints
  • Adding new optional headers
  • Adding new enum values (with default handling)
❌ Breaking Changes (Require New Version)
  • Removing or renaming fields
  • Changing field types or formats
  • Making optional parameters required
  • Changing endpoint URLs
  • Changing authentication mechanisms
  • Removing endpoints

Deprecation Process

Version Release Timeline:

v1.0 Released ────────▢ v2.0 Released ────────▢ v1.0 Deprecated ────▢ v1.0 Removed
     β”‚                       β”‚                       β”‚                     β”‚
     β”‚ ◀──── 18 months ────▢ β”‚ ◀──── 6 months ────▢ β”‚ ◀─── 6 months ───▢ β”‚
     β”‚                       β”‚                       β”‚                     β”‚
   Active                 Maintenance             Deprecated           Unsupported
   Support                   Only                  Warning
                                                   Headers

Phase 1: Active Support (18 months)
- Full feature development
- Bug fixes and security updates
- Performance optimizations

Phase 2: Maintenance Only (6 months)
- Security updates only
- Critical bug fixes
- Deprecation warnings added

Phase 3: Deprecated (6 months)
- No updates except security
- Strong deprecation warnings
- Migration guides published

Phase 4: Removed
- API version no longer available
- 410 Gone responses
                        

Deprecation Headers

# Deprecation warning headers
HTTP/1.1 200 OK
Deprecation: true
Sunset: Wed, 11 Nov 2024 23:59:59 GMT
Link: <https://docs.sysmanage.com/api/v2/migration>; rel="successor-version"
Warning: 299 - "This API version is deprecated. Please migrate to v2 by November 2024"

{
  "success": true,
  "data": { ... },
  "meta": {
    "deprecation_notice": {
      "deprecated": true,
      "sunset_date": "2024-11-11T23:59:59Z",
      "migration_guide": "https://docs.sysmanage.com/api/v2/migration",
      "contact": "api-support@sysmanage.com"
    }
  }
}
                        

Version Management Implementation

# API version management
from enum import Enum
from datetime import datetime, timezone

class APIVersion(Enum):
    V1 = "v1"
    V2 = "v2"

class VersionInfo:
    def __init__(self, version: APIVersion, status: str, sunset_date: datetime = None):
        self.version = version
        self.status = status  # active, maintenance, deprecated, removed
        self.sunset_date = sunset_date

# Version configuration
VERSION_CONFIG = {
    APIVersion.V1: VersionInfo(
        version=APIVersion.V1,
        status="deprecated",
        sunset_date=datetime(2024, 11, 11, 23, 59, 59, tzinfo=timezone.utc)
    ),
    APIVersion.V2: VersionInfo(
        version=APIVersion.V2,
        status="active"
    )
}

# Version extraction middleware
@app.middleware("http")
async def version_middleware(request: Request, call_next):
    # Extract version from URL path
    path_parts = request.url.path.strip('/').split('/')
    if len(path_parts) >= 3 and path_parts[0] == 'api':
        version_str = path_parts[1]
        try:
            api_version = APIVersion(version_str)
            request.state.api_version = api_version

            # Check if version is still supported
            version_info = VERSION_CONFIG.get(api_version)
            if not version_info:
                return JSONResponse(
                    status_code=400,
                    content={"error": f"Unsupported API version: {version_str}"}
                )

            if version_info.status == "removed":
                return JSONResponse(
                    status_code=410,
                    content={
                        "error": "This API version has been removed",
                        "migration_guide": "https://docs.sysmanage.com/api/v2/migration"
                    }
                )

            request.state.version_info = version_info

        except ValueError:
            return JSONResponse(
                status_code=400,
                content={"error": f"Invalid API version: {version_str}"}
            )

    response = await call_next(request)

    # Add deprecation headers if needed
    if hasattr(request.state, 'version_info'):
        version_info = request.state.version_info
        if version_info.status in ["maintenance", "deprecated"]:
            response.headers["Deprecation"] = "true"

            if version_info.sunset_date:
                response.headers["Sunset"] = version_info.sunset_date.strftime(
                    "%a, %d %b %Y %H:%M:%S GMT"
                )

            if version_info.status == "deprecated":
                response.headers["Warning"] = (
                    "299 - \"This API version is deprecated. "
                    "Please migrate to the latest version.\""
                )

    return response

# Version-specific endpoint handling
class VersionedEndpoint:
    def __init__(self):
        self.handlers = {}

    def version(self, api_version: APIVersion):
        def decorator(func):
            self.handlers[api_version] = func
            return func
        return decorator

    async def handle(self, request: Request, *args, **kwargs):
        api_version = getattr(request.state, 'api_version', APIVersion.V1)
        handler = self.handlers.get(api_version)

        if not handler:
            # Fall back to latest version or return not supported
            latest_version = max(self.handlers.keys(), key=lambda v: v.value)
            handler = self.handlers[latest_version]

        return await handler(request, *args, **kwargs)

# Usage example
agents_endpoint = VersionedEndpoint()

@agents_endpoint.version(APIVersion.V1)
async def get_agents_v1(request: Request, current_user: User = Depends(get_current_user)):
    """Legacy agent list format"""
    agents = await agent_service.list_agents(current_user)
    return {
        "agents": [
            {
                "id": agent.id,
                "hostname": agent.hostname,
                "status": agent.status,
                "last_seen": agent.last_seen
            }
            for agent in agents
        ]
    }

@agents_endpoint.version(APIVersion.V2)
async def get_agents_v2(request: Request, current_user: User = Depends(get_current_user)):
    """Enhanced agent list with grouping and metadata"""
    agents = await agent_service.list_agents_with_groups(current_user)
    return {
        "agents": [
            {
                "id": agent.id,
                "hostname": agent.hostname,
                "status": agent.status,
                "last_seen": agent.last_seen,
                "group": {
                    "id": agent.group.id,
                    "name": agent.group.name
                } if agent.group else None,
                "metadata": agent.metadata,
                "health_score": agent.health_score
            }
            for agent in agents
        ],
        "summary": {
            "total": len(agents),
            "online": sum(1 for a in agents if a.status == "online"),
            "groups": len(set(a.group_id for a in agents if a.group_id))
        }
    }

@app.get("/api/{version}/agents")
async def list_agents(request: Request):
    return await agents_endpoint.handle(request)
                        

Performance Optimization

Caching Strategy

Multi-Level Caching

Client Request β†’ CDN Cache β†’ API Gateway Cache β†’ Application Cache β†’ Database
     β”‚              β”‚              β”‚                    β”‚              β”‚
     β–Ό              β–Ό              β–Ό                    β–Ό              β–Ό
Static Assets   API Responses  Auth Tokens        Query Results   Raw Data
(Images, JS)    (1-5 minutes)  (15 minutes)      (30 seconds)    (Source)
TTL: 1 hour     TTL: Variable  TTL: Token Life   TTL: Variable   TTL: N/A
                        

Caching Implementation

# Multi-level caching system
import redis
from functools import wraps
import json
import hashlib
from typing import Any, Optional

class CacheManager:
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.default_ttl = 300  # 5 minutes

    def generate_cache_key(self, prefix: str, *args, **kwargs) -> str:
        """Generate cache key from function arguments"""
        key_data = {
            'args': args,
            'kwargs': sorted(kwargs.items())
        }
        key_hash = hashlib.md5(json.dumps(key_data, sort_keys=True).encode()).hexdigest()
        return f"{prefix}:{key_hash}"

    async def get(self, key: str) -> Optional[Any]:
        """Get value from cache"""
        try:
            value = await self.redis.get(key)
            return json.loads(value) if value else None
        except Exception as e:
            logger.warning(f"Cache get error: {e}")
            return None

    async def set(self, key: str, value: Any, ttl: int = None) -> bool:
        """Set value in cache"""
        try:
            ttl = ttl or self.default_ttl
            serialized = json.dumps(value, default=str)
            return await self.redis.setex(key, ttl, serialized)
        except Exception as e:
            logger.warning(f"Cache set error: {e}")
            return False

    async def delete(self, pattern: str) -> int:
        """Delete keys matching pattern"""
        try:
            keys = await self.redis.keys(pattern)
            if keys:
                return await self.redis.delete(*keys)
            return 0
        except Exception as e:
            logger.warning(f"Cache delete error: {e}")
            return 0

    def cached(self, prefix: str, ttl: int = None,
               invalidate_on: list = None):
        """Decorator for caching function results"""
        def decorator(func):
            @wraps(func)
            async def wrapper(*args, **kwargs):
                # Generate cache key
                cache_key = self.generate_cache_key(prefix, *args, **kwargs)

                # Try to get from cache
                cached_result = await self.get(cache_key)
                if cached_result is not None:
                    return cached_result

                # Execute function
                result = await func(*args, **kwargs)

                # Cache the result
                await self.set(cache_key, result, ttl)

                return result
            return wrapper
        return decorator

# Cache usage in API endpoints
cache_manager = CacheManager(redis_client)

@app.get("/api/v1/agents")
@cache_manager.cached("agents_list", ttl=60)  # 1 minute cache
async def list_agents(
    current_user: User = Depends(get_current_user),
    status: Optional[str] = None,
    limit: int = 50,
    offset: int = 0
):
    return await agent_service.list_agents(
        user=current_user,
        status=status,
        limit=limit,
        offset=offset
    )

@app.get("/api/v1/agents/{agent_id}")
@cache_manager.cached("agent_detail", ttl=30)  # 30 second cache
async def get_agent(
    agent_id: int,
    current_user: User = Depends(get_current_user)
):
    return await agent_service.get_agent(agent_id, current_user)

# Cache invalidation on updates
@app.put("/api/v1/agents/{agent_id}")
async def update_agent(
    agent_id: int,
    agent_data: AgentUpdate,
    current_user: User = Depends(get_current_user)
):
    # Update agent
    updated_agent = await agent_service.update_agent(
        agent_id, agent_data, current_user
    )

    # Invalidate related caches
    await cache_manager.delete(f"agent_detail:*{agent_id}*")
    await cache_manager.delete("agents_list:*")

    return updated_agent
                        

Database Query Optimization

Query Patterns

# Optimized database queries
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy import and_, or_, func

class OptimizedAgentService:
    def __init__(self, db_session):
        self.db = db_session

    async def list_agents_with_stats(self, user: User, filters: dict = None):
        """Optimized agent listing with statistics"""

        # Base query with necessary joins
        query = (
            select(Agent)
            .options(
                joinedload(Agent.group),  # Avoid N+1 queries for groups
                selectinload(Agent.packages).options(
                    # Only load package counts, not full objects
                    load_only(Package.id, Package.status)
                )
            )
            .where(Agent.deleted_at.is_(None))  # Soft delete filter
        )

        # Apply user-based permissions
        accessible_agent_ids = await self.get_accessible_agent_ids(user)
        if accessible_agent_ids:
            query = query.where(Agent.id.in_(accessible_agent_ids))

        # Apply filters efficiently
        if filters:
            if 'status' in filters:
                query = query.where(Agent.status == filters['status'])

            if 'platform' in filters:
                query = query.where(Agent.platform == filters['platform'])

            if 'search' in filters:
                search_term = f"%{filters['search']}%"
                query = query.where(
                    or_(
                        Agent.hostname.ilike(search_term),
                        Agent.ip_address.cast(String).like(search_term)
                    )
                )

        # Get total count efficiently (without loading all data)
        count_query = select(func.count(Agent.id)).select_from(
            query.subquery()
        )
        total_count = await self.db.scalar(count_query)

        # Apply pagination
        if 'limit' in filters:
            query = query.limit(filters['limit'])
        if 'offset' in filters:
            query = query.offset(filters['offset'])

        # Execute query
        result = await self.db.execute(query)
        agents = result.unique().scalars().all()

        # Calculate statistics in a single query
        stats = await self.get_agent_statistics(accessible_agent_ids)

        return {
            'agents': agents,
            'total': total_count,
            'statistics': stats
        }

    async def get_agent_statistics(self, agent_ids: List[int]):
        """Get agent statistics in a single optimized query"""

        stats_query = (
            select(
                Agent.status,
                func.count(Agent.id).label('count'),
                func.avg(Agent.cpu_usage).label('avg_cpu'),
                func.avg(Agent.memory_usage).label('avg_memory')
            )
            .where(Agent.id.in_(agent_ids))
            .group_by(Agent.status)
        )

        result = await self.db.execute(stats_query)
        stats_by_status = {row.status: {
            'count': row.count,
            'avg_cpu': row.avg_cpu,
            'avg_memory': row.avg_memory
        } for row in result}

        return {
            'by_status': stats_by_status,
            'total': sum(s['count'] for s in stats_by_status.values()),
            'online': stats_by_status.get('online', {}).get('count', 0)
        }

# Bulk operations for efficiency
class BulkOperations:
    def __init__(self, db_session):
        self.db = db_session

    async def bulk_update_agent_status(self, agent_updates: List[dict]):
        """Efficiently update multiple agents"""

        # Use bulk update for better performance
        stmt = (
            update(Agent)
            .where(Agent.id == bindparam('agent_id'))
            .values(
                status=bindparam('status'),
                last_seen=bindparam('last_seen'),
                updated_at=func.now()
            )
        )

        await self.db.execute(stmt, agent_updates)
        await self.db.commit()

    async def bulk_create_metrics(self, metrics: List[dict]):
        """Efficiently insert multiple metrics"""

        # Use bulk insert with ON CONFLICT handling
        stmt = insert(Metric).values(metrics)

        # Handle conflicts for duplicate timestamps
        stmt = stmt.on_conflict_do_update(
            index_elements=['agent_id', 'metric_name', 'timestamp'],
            set_={
                'value': stmt.excluded.value,
                'updated_at': func.now()
            }
        )

        await self.db.execute(stmt)
        await self.db.commit()
                        

Rate Limiting

Adaptive Rate Limiting

# Advanced rate limiting implementation
from typing import Dict, Optional
import time
import asyncio
from dataclasses import dataclass

@dataclass
class RateLimit:
    requests: int
    window_seconds: int
    burst_requests: int = None

class AdaptiveRateLimiter:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.default_limits = {
            'user': RateLimit(1000, 3600, 50),      # 1000/hour, burst 50
            'api_key': RateLimit(10000, 3600, 200), # 10000/hour, burst 200
            'ip': RateLimit(100, 3600, 10)          # 100/hour, burst 10
        }

    async def check_rate_limit(self, identifier: str,
                              limit_type: str = 'user') -> dict:
        """Check if request is within rate limits"""

        limit = self.default_limits.get(limit_type)
        if not limit:
            return {'allowed': True}

        now = time.time()
        window_start = int(now // limit.window_seconds) * limit.window_seconds

        # Keys for tracking
        window_key = f"rate_limit:{identifier}:{window_start}"
        burst_key = f"rate_limit_burst:{identifier}"

        # Use Redis pipeline for atomic operations
        async with self.redis.pipeline() as pipe:
            pipe.incr(window_key)
            pipe.expire(window_key, limit.window_seconds)
            pipe.incr(burst_key)
            pipe.expire(burst_key, 60)  # 1-minute burst window

            results = await pipe.execute()

        window_count = results[0]
        burst_count = results[2]

        # Check limits
        window_exceeded = window_count > limit.requests
        burst_exceeded = (limit.burst_requests and
                         burst_count > limit.burst_requests)

        if window_exceeded or burst_exceeded:
            reset_time = window_start + limit.window_seconds
            return {
                'allowed': False,
                'limit': limit.requests,
                'remaining': max(0, limit.requests - window_count),
                'reset_time': reset_time,
                'retry_after': max(1, reset_time - now)
            }

        return {
            'allowed': True,
            'limit': limit.requests,
            'remaining': limit.requests - window_count,
            'reset_time': window_start + limit.window_seconds
        }

# Rate limiting middleware
@app.middleware("http")
async def rate_limiting_middleware(request: Request, call_next):
    # Skip rate limiting for health checks
    if request.url.path in ['/health', '/metrics']:
        return await call_next(request)

    # Determine rate limit identifier
    identifier = None
    limit_type = 'ip'

    # Check for API key
    api_key = request.headers.get('X-API-Key')
    if api_key:
        identifier = f"api_key:{api_key}"
        limit_type = 'api_key'
    else:
        # Check for authenticated user
        auth_header = request.headers.get('Authorization')
        if auth_header and auth_header.startswith('Bearer '):
            try:
                token = auth_header.split(' ')[1]
                payload = jwt.decode(token, verify=False)  # Just for rate limiting
                identifier = f"user:{payload.get('user_id')}"
                limit_type = 'user'
            except:
                pass

    # Fall back to IP-based limiting
    if not identifier:
        # Get real IP (considering proxies)
        ip = request.headers.get('X-Forwarded-For', '').split(',')[0].strip()
        if not ip:
            ip = request.headers.get('X-Real-IP', '')
        if not ip:
            ip = request.client.host
        identifier = f"ip:{ip}"

    # Check rate limit
    rate_limiter = get_rate_limiter()
    result = await rate_limiter.check_rate_limit(identifier, limit_type)

    if not result['allowed']:
        return JSONResponse(
            status_code=429,
            content={
                "error": "Rate limit exceeded",
                "retry_after": result['retry_after']
            },
            headers={
                "X-RateLimit-Limit": str(result['limit']),
                "X-RateLimit-Remaining": "0",
                "X-RateLimit-Reset": str(int(result['reset_time'])),
                "Retry-After": str(int(result['retry_after']))
            }
        )

    # Continue with request
    response = await call_next(request)

    # Add rate limit headers to response
    if 'limit' in result:
        response.headers["X-RateLimit-Limit"] = str(result['limit'])
        response.headers["X-RateLimit-Remaining"] = str(result['remaining'])
        response.headers["X-RateLimit-Reset"] = str(int(result['reset_time']))

    return response
                        

Next Steps

To explore related API design and architecture topics:

  1. WebSocket Protocol: Real-time communication complementing the REST API
  2. Database Schema: Data models that support the API operations
  3. Performance Metrics: API performance monitoring and optimization
  4. Design Principles: Architectural patterns behind the API design
  5. Scaling Strategies: Scale API infrastructure for high loads