Documentation > Architecture > Migration Strategy

Migration Strategy

Database migrations, zero-downtime deployments, rollback procedures, and version management strategies for SysManage.

Migration Strategy Overview

SysManage implements a comprehensive migration strategy that ensures system reliability, data integrity, and minimal downtime during updates and schema changes.

Core Principles

  • Zero-Downtime: Updates without service interruption
  • Rollback Safety: Safe rollback procedures for all changes
  • Data Integrity: Guaranteed data consistency during migrations
  • Gradual Deployment: Phased rollouts with canary deployments

Database Migration Framework

Migration Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Migration Controller                        │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Schema      │  │ Data         │  │ Index                   │ │
│  │ Migrator    │  │ Migrator     │  │ Migrator                │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Version     │  │ Dependency   │  │ Rollback                │ │
│  │ Manager     │  │ Resolver     │  │ Planner                 │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Backup      │  │ Validation   │  │ Progress                │ │
│  │ Manager     │  │ Engine       │  │ Monitor                 │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                          │    ▲
                 Migration │    │ Status
                  Commands │    │ Reports
                          ▼    │
┌─────────────────────────────────────────────────────────────────┐
│                     PostgreSQL Cluster                        │
│    ┌──────────────┐    ┌──────────────┐    ┌──────────────┐    │
│    │   Primary    │    │   Replica    │    │   Replica    │    │
│    │  (Write)     │    │   (Read)     │    │   (Read)     │    │
│    └──────────────┘    └──────────────┘    └──────────────┘    │
└─────────────────────────────────────────────────────────────────┘
                        

Migration Script Framework

Base Migration Class

from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
from datetime import datetime
import asyncio

class BaseMigration(ABC):
    """Base class for all database migrations"""

    def __init__(self):
        self.version: str = ""
        self.description: str = ""
        self.dependencies: List[str] = []
        self.reversible: bool = True
        self.batch_size: int = 1000
        self.timeout: int = 3600  # 1 hour

    @abstractmethod
    async def up(self, db: Database) -> None:
        """Apply the migration"""
        pass

    @abstractmethod
    async def down(self, db: Database) -> None:
        """Rollback the migration"""
        pass

    async def validate_preconditions(self, db: Database) -> bool:
        """Validate that preconditions are met"""
        return True

    async def validate_postconditions(self, db: Database) -> bool:
        """Validate that migration was successful"""
        return True

    async def estimate_duration(self, db: Database) -> int:
        """Estimate migration duration in seconds"""
        return 60

    async def get_rollback_data(self, db: Database) -> Optional[Dict[str, Any]]:
        """Collect data needed for rollback"""
        return None


class Migration_20241201_001_AddAgentMetricsTable(BaseMigration):
    def __init__(self):
        super().__init__()
        self.version = "20241201_001"
        self.description = "Add agent_metrics table for performance tracking"
        self.dependencies = ["20241130_005"]
        self.reversible = True

    async def up(self, db: Database) -> None:
        """Create agent_metrics table"""

        # Create table with proper indexing
        await db.execute("""
            CREATE TABLE agent_metrics (
                id BIGSERIAL PRIMARY KEY,
                agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
                metric_type VARCHAR(50) NOT NULL,
                metric_value JSONB NOT NULL,
                collected_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
                created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
            )
        """)

        # Add indexes for performance
        await db.execute("""
            CREATE INDEX idx_agent_metrics_agent_id
            ON agent_metrics(agent_id)
        """)

        await db.execute("""
            CREATE INDEX idx_agent_metrics_type_time
            ON agent_metrics(metric_type, collected_at DESC)
        """)

        await db.execute("""
            CREATE INDEX idx_agent_metrics_collected_at
            ON agent_metrics(collected_at DESC)
        """)

        # Add table comment
        await db.execute("""
            COMMENT ON TABLE agent_metrics IS
            'Stores performance and health metrics from SysManage agents'
        """)

    async def down(self, db: Database) -> None:
        """Remove agent_metrics table"""
        await db.execute("DROP TABLE IF EXISTS agent_metrics CASCADE")

    async def validate_postconditions(self, db: Database) -> bool:
        """Verify table creation"""
        result = await db.fetch_one("""
            SELECT table_name FROM information_schema.tables
            WHERE table_name = 'agent_metrics' AND table_schema = 'public'
        """)
        return result is not None


class MigrationRunner:
    def __init__(self, db: Database):
        self.db = db
        self.migration_table = "schema_migrations"

    async def initialize(self):
        """Initialize migration tracking table"""
        await self.db.execute(f"""
            CREATE TABLE IF NOT EXISTS {self.migration_table} (
                version VARCHAR(255) PRIMARY KEY,
                description TEXT NOT NULL,
                applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
                applied_by VARCHAR(100) NOT NULL DEFAULT CURRENT_USER,
                execution_time_ms INTEGER NOT NULL,
                rollback_data JSONB,
                checksum VARCHAR(64) NOT NULL
            )
        """)

    async def get_applied_migrations(self) -> List[str]:
        """Get list of applied migration versions"""
        result = await self.db.fetch_all(f"""
            SELECT version FROM {self.migration_table}
            ORDER BY applied_at ASC
        """)
        return [row["version"] for row in result]

    async def apply_migration(self, migration: BaseMigration) -> bool:
        """Apply a single migration with full transaction safety"""

        # Start transaction
        async with self.db.transaction():
            try:
                # Validate preconditions
                if not await migration.validate_preconditions(self.db):
                    raise Exception(f"Preconditions failed for {migration.version}")

                # Get rollback data before applying
                rollback_data = await migration.get_rollback_data(self.db)

                # Apply migration with timing
                start_time = datetime.utcnow()
                await migration.up(self.db)
                execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000

                # Validate postconditions
                if not await migration.validate_postconditions(self.db):
                    raise Exception(f"Postconditions failed for {migration.version}")

                # Record successful migration
                await self.db.execute(f"""
                    INSERT INTO {self.migration_table}
                    (version, description, execution_time_ms, rollback_data, checksum)
                    VALUES ($1, $2, $3, $4, $5)
                """, migration.version, migration.description,
                    int(execution_time), rollback_data,
                    self._calculate_checksum(migration))

                return True

            except Exception as e:
                logger.error(f"Migration {migration.version} failed: {e}")
                # Transaction will auto-rollback
                return False

    async def rollback_migration(self, migration: BaseMigration) -> bool:
        """Rollback a migration"""
        if not migration.reversible:
            raise Exception(f"Migration {migration.version} is not reversible")

        async with self.db.transaction():
            try:
                # Get rollback data
                result = await self.db.fetch_one(f"""
                    SELECT rollback_data FROM {self.migration_table}
                    WHERE version = $1
                """, migration.version)

                if result and result["rollback_data"]:
                    # Restore rollback data if needed
                    pass

                # Execute rollback
                await migration.down(self.db)

                # Remove from migration table
                await self.db.execute(f"""
                    DELETE FROM {self.migration_table} WHERE version = $1
                """, migration.version)

                return True

            except Exception as e:
                logger.error(f"Rollback of {migration.version} failed: {e}")
                return False

Safe Schema Changes

Online Schema Migration Patterns

class SafeSchemaMigration(BaseMigration):
    """Base class for safe online schema changes"""

    async def add_column_safely(self, db: Database, table: str,
                               column: str, definition: str,
                               default_value: Any = None) -> None:
        """Add column with zero downtime"""

        # Step 1: Add column with NULL default (fast operation)
        await db.execute(f"""
            ALTER TABLE {table}
            ADD COLUMN {column} {definition}
        """)

        if default_value is not None:
            # Step 2: Update existing rows in batches
            batch_size = 1000
            offset = 0

            while True:
                affected = await db.execute(f"""
                    UPDATE {table}
                    SET {column} = $1
                    WHERE {column} IS NULL
                    AND id IN (
                        SELECT id FROM {table}
                        WHERE {column} IS NULL
                        ORDER BY id
                        LIMIT $2
                    )
                """, default_value, batch_size)

                if affected == 0:
                    break

                # Small delay to avoid overwhelming the database
                await asyncio.sleep(0.1)

            # Step 3: Add NOT NULL constraint if needed
            if "NOT NULL" in definition:
                await db.execute(f"""
                    ALTER TABLE {table}
                    ALTER COLUMN {column} SET NOT NULL
                """)

    async def drop_column_safely(self, db: Database, table: str,
                                column: str) -> None:
        """Drop column with proper cleanup"""

        # Step 1: Drop any dependent objects first
        await db.execute(f"""
            DROP INDEX IF EXISTS idx_{table}_{column}
        """)

        # Step 2: Drop the column
        await db.execute(f"""
            ALTER TABLE {table} DROP COLUMN {column}
        """)

    async def rename_table_safely(self, db: Database, old_name: str,
                                 new_name: str) -> None:
        """Rename table with dependency handling"""

        # Step 1: Create view with old name pointing to new table
        await db.execute(f"ALTER TABLE {old_name} RENAME TO {new_name}")

        # Step 2: Create compatibility view
        await db.execute(f"""
            CREATE VIEW {old_name} AS SELECT * FROM {new_name}
        """)

        # Note: Remove view in a future migration after code update


class Migration_20241201_002_AddAgentStatusColumn(SafeSchemaMigration):
    def __init__(self):
        super().__init__()
        self.version = "20241201_002"
        self.description = "Add status column to agents table"
        self.dependencies = ["20241201_001"]

    async def up(self, db: Database) -> None:
        await self.add_column_safely(
            db, "agents", "status",
            "VARCHAR(20) DEFAULT 'active'",
            "active"
        )

        # Add check constraint
        await db.execute("""
            ALTER TABLE agents
            ADD CONSTRAINT chk_agent_status
            CHECK (status IN ('active', 'inactive', 'maintenance'))
        """)

    async def down(self, db: Database) -> None:
        await db.execute("ALTER TABLE agents DROP CONSTRAINT chk_agent_status")
        await self.drop_column_safely(db, "agents", "status")

Zero-Downtime Deployment

Blue-Green Deployment Strategy

Blue-Green Architecture

┌─────────────────────────────────────────────────────────────────┐
│                    Load Balancer                               │
│                   (Traffic Router)                             │
│              ┌─────────────────────────┐                       │
│              │  100% → Blue (Current)  │                       │
│              │   0%  → Green (New)     │                       │
│              └─────────────────────────┘                       │
└─────────────────────┬───────────────────┬─────────────────────┘
                      │                   │
     ┌────────────────▼──────────────┐   ┌▼──────────────────────────┐
     │        Blue Environment       │   │       Green Environment    │
     │         (Production)          │   │        (Staging)           │
     ├───────────────────────────────┤   ├───────────────────────────┤
     │  ┌─────┐  ┌─────┐  ┌─────┐   │   │  ┌─────┐  ┌─────┐  ┌─────┐ │
     │  │App 1│  │App 2│  │App 3│   │   │  │App 1│  │App 2│  │App 3│ │
     │  │v1.5 │  │v1.5 │  │v1.5 │   │   │  │v1.6 │  │v1.6 │  │v1.6 │ │
     │  └─────┘  └─────┘  └─────┘   │   │  └─────┘  └─────┘  └─────┘ │
     └───────────────┬───────────────┘   └───────────────┬───────────┘
                     │                                   │
     ┌───────────────▼─────────────────────────────────────▼───────────┐
     │                  Shared Database Cluster                       │
     │    ┌──────────────┐    ┌──────────────┐    ┌──────────────┐    │
     │    │   Primary    │    │   Replica    │    │   Replica    │    │
     │    └──────────────┘    └──────────────┘    └──────────────┘    │
     └─────────────────────────────────────────────────────────────────┘

Phase 1: Deploy to Green          Phase 2: Switch Traffic
┌─────────────────────────┐       ┌─────────────────────────┐
│ 100% → Blue (v1.5)     │  ───→ │   0% → Blue (v1.5)     │
│   0% → Green (v1.6)    │       │ 100% → Green (v1.6)    │
└─────────────────────────┘       └─────────────────────────┘
                        

Deployment Automation

Blue-Green Deployment Script

import asyncio
from typing import Dict, List
from enum import Enum

class DeploymentEnvironment(Enum):
    BLUE = "blue"
    GREEN = "green"

class BlueGreenDeployer:
    def __init__(self, config: Dict):
        self.config = config
        self.load_balancer = LoadBalancerController(config["lb_config"])
        self.k8s_client = KubernetesClient(config["k8s_config"])

    async def deploy(self, version: str, environment: DeploymentEnvironment) -> bool:
        """Deploy new version to specified environment"""

        try:
            # Step 1: Prepare new environment
            await self._prepare_environment(environment, version)

            # Step 2: Run database migrations on new environment
            if not await self._run_migrations(environment):
                raise Exception("Database migration failed")

            # Step 3: Deploy application to new environment
            if not await self._deploy_application(environment, version):
                raise Exception("Application deployment failed")

            # Step 4: Run health checks
            if not await self._health_check(environment):
                raise Exception("Health checks failed")

            # Step 5: Run smoke tests
            if not await self._smoke_tests(environment):
                raise Exception("Smoke tests failed")

            return True

        except Exception as e:
            logger.error(f"Deployment to {environment.value} failed: {e}")
            await self._cleanup_failed_deployment(environment)
            return False

    async def switch_traffic(self, target_environment: DeploymentEnvironment) -> bool:
        """Switch traffic to target environment"""

        try:
            # Step 1: Gradual traffic shift (canary deployment)
            traffic_percentages = [10, 25, 50, 75, 100]

            for percentage in traffic_percentages:
                await self.load_balancer.set_traffic_split(
                    target_environment, percentage
                )

                # Monitor for 2 minutes at each level
                await asyncio.sleep(120)

                # Check error rates and performance
                if not await self._monitor_traffic_shift(target_environment, percentage):
                    # Rollback on issues
                    await self.load_balancer.set_traffic_split(target_environment, 0)
                    return False

            # Step 2: Final verification
            if await self._verify_full_traffic_switch(target_environment):
                logger.info(f"Successfully switched to {target_environment.value}")
                return True
            else:
                await self.rollback()
                return False

        except Exception as e:
            logger.error(f"Traffic switch failed: {e}")
            await self.rollback()
            return False

    async def rollback(self) -> bool:
        """Rollback to previous environment"""
        current_env = await self.load_balancer.get_active_environment()
        target_env = (DeploymentEnvironment.BLUE
                     if current_env == DeploymentEnvironment.GREEN
                     else DeploymentEnvironment.GREEN)

        return await self.load_balancer.set_traffic_split(target_env, 100)

    async def _run_migrations(self, environment: DeploymentEnvironment) -> bool:
        """Run database migrations for environment"""

        # Create migration job in Kubernetes
        migration_job = {
            "apiVersion": "batch/v1",
            "kind": "Job",
            "metadata": {
                "name": f"migration-{environment.value}-{int(time.time())}",
                "namespace": "sysmanage"
            },
            "spec": {
                "template": {
                    "spec": {
                        "containers": [{
                            "name": "migration",
                            "image": f"sysmanage/migrate:{self.config['version']}",
                            "env": [
                                {"name": "DATABASE_URL", "valueFrom": {"secretKeyRef": {
                                    "name": "db-secret", "key": "url"
                                }}},
                                {"name": "ENVIRONMENT", "value": environment.value}
                            ],
                            "command": ["python", "-m", "sysmanage.migrations", "apply"]
                        }],
                        "restartPolicy": "Never"
                    }
                },
                "backoffLimit": 3
            }
        }

        # Apply job and wait for completion
        job_name = await self.k8s_client.create_job(migration_job)
        return await self.k8s_client.wait_for_job_completion(job_name, timeout=1800)

    async def _deploy_application(self, environment: DeploymentEnvironment,
                                 version: str) -> bool:
        """Deploy application to environment"""

        deployment_name = f"sysmanage-{environment.value}"

        # Update deployment with new image
        await self.k8s_client.update_deployment(
            deployment_name,
            image=f"sysmanage/app:{version}",
            environment_vars={
                "ENVIRONMENT": environment.value,
                "VERSION": version
            }
        )

        # Wait for rollout to complete
        return await self.k8s_client.wait_for_rollout(deployment_name, timeout=600)

    async def _health_check(self, environment: DeploymentEnvironment) -> bool:
        """Comprehensive health check for environment"""

        health_endpoint = f"https://{environment.value}.sysmanage.internal/api/health/detailed"

        for attempt in range(30):  # 5 minutes of retries
            try:
                async with aiohttp.ClientSession() as session:
                    async with session.get(health_endpoint, timeout=10) as response:
                        if response.status == 200:
                            health_data = await response.json()
                            if health_data.get("status") == "healthy":
                                return True

            except Exception as e:
                logger.warning(f"Health check attempt {attempt + 1} failed: {e}")

            await asyncio.sleep(10)

        return False

    async def _smoke_tests(self, environment: DeploymentEnvironment) -> bool:
        """Run smoke tests against environment"""

        test_cases = [
            self._test_authentication,
            self._test_agent_registration,
            self._test_basic_operations,
            self._test_websocket_connectivity
        ]

        base_url = f"https://{environment.value}.sysmanage.internal"

        for test_case in test_cases:
            if not await test_case(base_url):
                return False

        return True

Canary Deployment

Gradual Traffic Shifting

class CanaryDeployment:
    def __init__(self, load_balancer: LoadBalancerController,
                 metrics_client: MetricsClient):
        self.load_balancer = load_balancer
        self.metrics_client = metrics_client

    async def canary_deploy(self, new_version: str,
                           canary_percentage: int = 5) -> bool:
        """Deploy using canary strategy"""

        # Step 1: Deploy canary version
        await self._deploy_canary(new_version, canary_percentage)

        # Step 2: Monitor canary performance
        monitoring_duration = 300  # 5 minutes
        if not await self._monitor_canary(monitoring_duration):
            await self._rollback_canary()
            return False

        # Step 3: Gradually increase traffic
        traffic_levels = [10, 25, 50, 75, 100]

        for level in traffic_levels:
            await self.load_balancer.set_canary_traffic(level)

            # Monitor at each level
            if not await self._monitor_canary(monitoring_duration):
                await self._rollback_canary()
                return False

        # Step 4: Complete deployment
        await self._promote_canary()
        return True

    async def _monitor_canary(self, duration: int) -> bool:
        """Monitor canary deployment metrics"""

        start_time = time.time()

        while time.time() - start_time < duration:
            metrics = await self.metrics_client.get_canary_metrics()

            # Check error rate
            if metrics["error_rate"] > 0.01:  # 1% threshold
                logger.error(f"High error rate: {metrics['error_rate']}")
                return False

            # Check response time
            if metrics["avg_response_time"] > 500:  # 500ms threshold
                logger.error(f"High response time: {metrics['avg_response_time']}ms")
                return False

            # Check memory usage
            if metrics["memory_usage"] > 0.9:  # 90% threshold
                logger.error(f"High memory usage: {metrics['memory_usage']}")
                return False

            await asyncio.sleep(30)  # Check every 30 seconds

        return True

Rollback Procedures

Automated Rollback System

Rollback Controller

class RollbackController:
    def __init__(self, config: Dict):
        self.config = config
        self.deployment_history = DeploymentHistory()
        self.health_monitor = HealthMonitor()

    async def setup_automatic_rollback(self, deployment_id: str):
        """Setup automatic rollback triggers"""

        # Monitor key metrics
        triggers = [
            ErrorRateTrigger(threshold=0.05, duration=300),  # 5% error rate for 5 min
            ResponseTimeTrigger(threshold=1000, duration=180),  # 1s response time for 3 min
            HealthCheckTrigger(failure_count=3),  # 3 consecutive health check failures
        ]

        for trigger in triggers:
            trigger.on_trigger(lambda: self.execute_rollback(deployment_id))
            await self.health_monitor.add_trigger(trigger)

    async def execute_rollback(self, deployment_id: str) -> bool:
        """Execute automatic rollback"""

        try:
            # Get previous stable deployment
            previous_deployment = await self.deployment_history.get_previous_stable(
                deployment_id
            )

            if not previous_deployment:
                raise Exception("No previous stable deployment found")

            logger.info(f"Starting rollback to {previous_deployment.version}")

            # Step 1: Database rollback
            if not await self._rollback_database(previous_deployment):
                raise Exception("Database rollback failed")

            # Step 2: Application rollback
            if not await self._rollback_application(previous_deployment):
                raise Exception("Application rollback failed")

            # Step 3: Traffic switch
            if not await self._switch_traffic(previous_deployment):
                raise Exception("Traffic switch failed")

            # Step 4: Verify rollback
            if not await self._verify_rollback(previous_deployment):
                raise Exception("Rollback verification failed")

            await self.deployment_history.mark_rollback_successful(
                deployment_id, previous_deployment.id
            )

            return True

        except Exception as e:
            logger.error(f"Rollback failed: {e}")
            await self._emergency_procedures()
            return False

    async def _rollback_database(self, target_deployment: Deployment) -> bool:
        """Rollback database to previous version"""

        current_version = await self._get_current_db_version()
        target_version = target_deployment.database_version

        if current_version == target_version:
            return True  # Already at target version

        # Get migrations to rollback
        migrations_to_rollback = await self._get_migrations_to_rollback(
            current_version, target_version
        )

        # Execute rollback migrations in reverse order
        for migration in reversed(migrations_to_rollback):
            if not migration.reversible:
                raise Exception(f"Migration {migration.version} is not reversible")

            if not await self.migration_runner.rollback_migration(migration):
                raise Exception(f"Failed to rollback migration {migration.version}")

        return True

    async def _rollback_application(self, target_deployment: Deployment) -> bool:
        """Rollback application to previous version"""

        # Update all deployments to target version
        deployments = ["sysmanage-backend", "sysmanage-frontend", "sysmanage-worker"]

        rollback_tasks = []
        for deployment_name in deployments:
            task = self.k8s_client.rollback_deployment(
                deployment_name,
                target_deployment.version
            )
            rollback_tasks.append(task)

        # Wait for all rollbacks to complete
        results = await asyncio.gather(*rollback_tasks, return_exceptions=True)

        return all(result is True for result in results)

    async def _emergency_procedures(self):
        """Emergency procedures when rollback fails"""

        # Step 1: Enable maintenance mode
        await self.load_balancer.enable_maintenance_mode()

        # Step 2: Notify operations team
        await self.alerting_system.send_critical_alert(
            "CRITICAL: Automated rollback failed. Manual intervention required."
        )

        # Step 3: Create incident
        await self.incident_management.create_incident(
            title="Failed automated rollback",
            severity="critical",
            description="Automated rollback procedures failed. System may be unstable."
        )


class DeploymentHistory:
    def __init__(self, db: Database):
        self.db = db

    async def record_deployment(self, deployment: Deployment):
        """Record successful deployment"""
        await self.db.execute("""
            INSERT INTO deployment_history
            (id, version, database_version, deployed_at, deployed_by, status)
            VALUES ($1, $2, $3, $4, $5, $6)
        """, deployment.id, deployment.version, deployment.database_version,
            deployment.deployed_at, deployment.deployed_by, "stable")

    async def get_previous_stable(self, current_deployment_id: str) -> Optional[Deployment]:
        """Get previous stable deployment"""
        result = await self.db.fetch_one("""
            SELECT * FROM deployment_history
            WHERE status = 'stable' AND id != $1
            ORDER BY deployed_at DESC
            LIMIT 1
        """, current_deployment_id)

        return Deployment.from_db_row(result) if result else None

    async def mark_rollback_successful(self, failed_deployment_id: str,
                                     target_deployment_id: str):
        """Mark rollback as successful"""
        await self.db.execute("""
            UPDATE deployment_history
            SET status = 'rolled_back', rolled_back_at = NOW()
            WHERE id = $1
        """, failed_deployment_id)

        await self.db.execute("""
            UPDATE deployment_history
            SET status = 'stable'
            WHERE id = $1
        """, target_deployment_id)

Manual Rollback Procedures

Rollback CLI Tool

#!/bin/bash
# SysManage Rollback Tool

set -euo pipefail

SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
source "$SCRIPT_DIR/common.sh"

usage() {
    cat << EOF
SysManage Rollback Tool

Usage: $0 [OPTIONS] COMMAND

Commands:
    list                    List available rollback targets
    plan VERSION           Show rollback plan for version
    execute VERSION        Execute rollback to version
    status                 Show current rollback status
    abort                  Abort ongoing rollback

Options:
    -e, --environment ENV   Target environment (prod, staging)
    -f, --force             Force rollback without confirmation
    -v, --verbose           Verbose output
    -h, --help              Show this help

Examples:
    $0 list
    $0 plan v1.5.2
    $0 execute v1.5.2
    $0 -e staging execute v1.5.1

EOF
}

rollback_list() {
    echo "Available rollback targets:"
    echo "=========================="

    kubectl get deployments -n sysmanage -o custom-columns=\
NAME:.metadata.name,VERSION:.metadata.labels.version,DEPLOYED:.metadata.creationTimestamp \
        --sort-by=.metadata.creationTimestamp | tail -10
}

rollback_plan() {
    local target_version="$1"
    local current_version

    current_version=$(get_current_version)

    echo "Rollback Plan"
    echo "============="
    echo "Current Version: $current_version"
    echo "Target Version:  $target_version"
    echo ""

    # Show database migration rollbacks needed
    echo "Database Changes:"
    python3 -m sysmanage.migrations plan-rollback \
        --from-version "$current_version" \
        --to-version "$target_version"

    echo ""
    echo "Application Components:"
    echo "- Backend API: $current_version -> $target_version"
    echo "- Frontend: $current_version -> $target_version"
    echo "- Worker Processes: $current_version -> $target_version"

    echo ""
    echo "Estimated Downtime: 5-10 minutes"
    echo "Risk Level: $(calculate_risk_level "$current_version" "$target_version")"
}

rollback_execute() {
    local target_version="$1"
    local current_version

    current_version=$(get_current_version)

    if [[ "$FORCE" != "true" ]]; then
        echo "WARNING: This will rollback from $current_version to $target_version"
        read -p "Are you sure? (y/N): " -n 1 -r
        echo
        if [[ ! $REPLY =~ ^[Yy]$ ]]; then
            echo "Rollback cancelled."
            exit 0
        fi
    fi

    log "Starting rollback to $target_version"

    # Step 1: Enable maintenance mode
    log "Enabling maintenance mode..."
    kubectl patch ingress sysmanage-ingress -n sysmanage \
        -p '{"metadata":{"annotations":{"nginx.ingress.kubernetes.io/maintenance":"true"}}}'

    # Step 2: Scale down current deployment
    log "Scaling down current deployment..."
    kubectl scale deployment sysmanage-backend -n sysmanage --replicas=0

    # Step 3: Rollback database
    log "Rolling back database..."
    if ! python3 -m sysmanage.migrations rollback --to-version "$target_version"; then
        log "ERROR: Database rollback failed"
        rollback_abort
        exit 1
    fi

    # Step 4: Deploy target version
    log "Deploying target version..."
    kubectl set image deployment/sysmanage-backend -n sysmanage \
        app="sysmanage/backend:$target_version"

    kubectl set image deployment/sysmanage-frontend -n sysmanage \
        app="sysmanage/frontend:$target_version"

    # Step 5: Wait for deployment
    log "Waiting for deployment to be ready..."
    kubectl rollout status deployment/sysmanage-backend -n sysmanage --timeout=300s
    kubectl rollout status deployment/sysmanage-frontend -n sysmanage --timeout=300s

    # Step 6: Run health checks
    log "Running health checks..."
    if ! run_health_checks; then
        log "ERROR: Health checks failed"
        rollback_abort
        exit 1
    fi

    # Step 7: Disable maintenance mode
    log "Disabling maintenance mode..."
    kubectl patch ingress sysmanage-ingress -n sysmanage \
        -p '{"metadata":{"annotations":{"nginx.ingress.kubernetes.io/maintenance":"false"}}}'

    log "Rollback to $target_version completed successfully"
}

rollback_abort() {
    log "Aborting rollback and restoring previous state..."

    # Restore from backup if database was modified
    if [[ -f "/tmp/rollback_db_backup.sql" ]]; then
        log "Restoring database from backup..."
        PGPASSWORD="$DB_PASSWORD" psql -h "$DB_HOST" -U "$DB_USER" -d "$DB_NAME" \
            -f "/tmp/rollback_db_backup.sql"
    fi

    # Scale back up
    kubectl scale deployment sysmanage-backend -n sysmanage --replicas=3

    # Disable maintenance mode
    kubectl patch ingress sysmanage-ingress -n sysmanage \
        -p '{"metadata":{"annotations":{"nginx.ingress.kubernetes.io/maintenance":"false"}}}'

    log "Rollback aborted. System restored to previous state."
}

run_health_checks() {
    local max_attempts=30
    local attempt=1

    while [[ $attempt -le $max_attempts ]]; do
        if curl -sf "https://sysmanage.example.com/api/health" > /dev/null; then
            log "Health check passed"
            return 0
        fi

        log "Health check attempt $attempt/$max_attempts failed, retrying..."
        sleep 10
        ((attempt++))
    done

    log "Health checks failed after $max_attempts attempts"
    return 1
}

Version Management

Semantic Versioning Strategy

Version Management System

import re
from typing import List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum

class VersionType(Enum):
    MAJOR = "major"
    MINOR = "minor"
    PATCH = "patch"
    PRERELEASE = "prerelease"

@dataclass
class Version:
    major: int
    minor: int
    patch: int
    prerelease: Optional[str] = None
    build: Optional[str] = None

    @classmethod
    def parse(cls, version_string: str) -> 'Version':
        """Parse version string (e.g., '1.2.3-beta.1+build.123')"""

        # Remove 'v' prefix if present
        version_string = version_string.lstrip('v')

        # Regex for semantic version
        pattern = r'^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?$'
        match = re.match(pattern, version_string)

        if not match:
            raise ValueError(f"Invalid version string: {version_string}")

        return cls(
            major=int(match.group(1)),
            minor=int(match.group(2)),
            patch=int(match.group(3)),
            prerelease=match.group(4),
            build=match.group(5)
        )

    def __str__(self) -> str:
        version = f"{self.major}.{self.minor}.{self.patch}"
        if self.prerelease:
            version += f"-{self.prerelease}"
        if self.build:
            version += f"+{self.build}"
        return version

    def __lt__(self, other: 'Version') -> bool:
        """Compare versions according to semver rules"""

        # Compare major.minor.patch
        self_tuple = (self.major, self.minor, self.patch)
        other_tuple = (other.major, other.minor, other.patch)

        if self_tuple != other_tuple:
            return self_tuple < other_tuple

        # Handle prerelease versions
        if self.prerelease is None and other.prerelease is None:
            return False
        elif self.prerelease is None:
            return False  # Release > prerelease
        elif other.prerelease is None:
            return True   # Prerelease < release
        else:
            return self._compare_prerelease(self.prerelease, other.prerelease)

    def _compare_prerelease(self, pre1: str, pre2: str) -> bool:
        """Compare prerelease versions"""
        parts1 = pre1.split('.')
        parts2 = pre2.split('.')

        for i in range(max(len(parts1), len(parts2))):
            part1 = parts1[i] if i < len(parts1) else ''
            part2 = parts2[i] if i < len(parts2) else ''

            # Try to compare as integers
            try:
                int1, int2 = int(part1), int(part2)
                if int1 != int2:
                    return int1 < int2
            except ValueError:
                # String comparison
                if part1 != part2:
                    return part1 < part2

        return len(parts1) < len(parts2)

    def bump(self, version_type: VersionType) -> 'Version':
        """Bump version according to type"""

        if version_type == VersionType.MAJOR:
            return Version(self.major + 1, 0, 0)
        elif version_type == VersionType.MINOR:
            return Version(self.major, self.minor + 1, 0)
        elif version_type == VersionType.PATCH:
            return Version(self.major, self.minor, self.patch + 1)
        else:
            raise ValueError(f"Unsupported version type: {version_type}")

    def is_compatible(self, other: 'Version') -> bool:
        """Check if versions are backward compatible"""

        # Same major version is generally compatible
        if self.major == other.major:
            return True

        # Major version 0 is special - minor versions are not compatible
        if self.major == 0 or other.major == 0:
            return self.major == other.major and self.minor == other.minor

        return False


class VersionManager:
    def __init__(self, db: Database):
        self.db = db

    async def get_current_version(self) -> Version:
        """Get current system version"""
        result = await self.db.fetch_one("""
            SELECT version FROM system_version
            ORDER BY created_at DESC LIMIT 1
        """)

        if not result:
            raise Exception("No version information found")

        return Version.parse(result["version"])

    async def get_version_history(self, limit: int = 50) -> List[Tuple[Version, dict]]:
        """Get version deployment history"""
        results = await self.db.fetch_all("""
            SELECT version, deployed_at, deployed_by, release_notes
            FROM deployment_history
            ORDER BY deployed_at DESC
            LIMIT $1
        """, limit)

        return [(Version.parse(row["version"]), {
            "deployed_at": row["deployed_at"],
            "deployed_by": row["deployed_by"],
            "release_notes": row["release_notes"]
        }) for row in results]

    async def validate_version_compatibility(self,
                                           target_version: Version) -> List[str]:
        """Validate if target version is compatible for deployment"""

        current_version = await self.get_current_version()
        issues = []

        # Check backward compatibility
        if not current_version.is_compatible(target_version):
            if target_version < current_version:
                # Downgrade scenario
                if target_version.major < current_version.major:
                    issues.append(
                        f"Major version downgrade from {current_version.major} "
                        f"to {target_version.major} may cause data compatibility issues"
                    )
            else:
                # Upgrade scenario
                if target_version.major > current_version.major:
                    issues.append(
                        f"Major version upgrade from {current_version.major} "
                        f"to {target_version.major} requires migration planning"
                    )

        # Check database schema compatibility
        schema_issues = await self._check_schema_compatibility(
            current_version, target_version
        )
        issues.extend(schema_issues)

        # Check API compatibility
        api_issues = await self._check_api_compatibility(
            current_version, target_version
        )
        issues.extend(api_issues)

        return issues

    async def _check_schema_compatibility(self,
                                        current: Version,
                                        target: Version) -> List[str]:
        """Check database schema compatibility"""

        # Get schema versions for both versions
        current_schema = await self._get_schema_version(current)
        target_schema = await self._get_schema_version(target)

        issues = []

        if target_schema < current_schema:
            # Check if rollback is safe
            migrations_to_rollback = await self._get_migrations_between_versions(
                current_schema, target_schema
            )

            irreversible_migrations = [
                m for m in migrations_to_rollback if not m.reversible
            ]

            if irreversible_migrations:
                issues.append(
                    f"Cannot rollback due to irreversible migrations: "
                    f"{[m.version for m in irreversible_migrations]}"
                )

        return issues

Migration Monitoring

Key Metrics

Deployment Success

  • Deployment success rate
  • Average deployment duration
  • Rollback frequency
  • Failed deployment causes

Migration Performance

  • Migration execution time
  • Database lock duration
  • Data migration volume
  • Index rebuild time

System Health

  • Post-deployment error rates
  • Performance regression detection
  • Resource utilization changes
  • User experience impact

Deployment Dashboard

Grafana Queries for Deployment Monitoring

# Deployment success rate (last 30 days)
(
  sum(rate(deployment_total{status="success"}[30d])) /
  sum(rate(deployment_total[30d]))
) * 100

# Average deployment duration
avg_over_time(deployment_duration_seconds[24h]) / 60

# Rollback frequency
rate(deployment_total{status="rollback"}[7d]) * 86400

# Migration execution time by type
histogram_quantile(0.95,
  rate(migration_duration_seconds_bucket[24h])
) by (migration_type)

# Post-deployment error rate
rate(http_requests_total{status=~"5.."}[5m]) and on()
  (time() - deployment_timestamp_seconds < 1800)

Best Practices

Migration Guidelines

  • Backward Compatibility: Maintain compatibility for at least one version
  • Incremental Changes: Make small, incremental changes rather than large migrations
  • Testing: Thoroughly test migrations in staging environments
  • Monitoring: Monitor system health closely after deployments

Deployment Guidelines

  • Off-Peak Timing: Schedule deployments during low-traffic periods
  • Feature Flags: Use feature flags for gradual feature rollouts
  • Automated Testing: Implement comprehensive automated testing
  • Communication: Notify stakeholders of planned deployments

Next Steps

To learn more about related deployment and operations topics:

  1. Performance Optimization: System performance tuning strategies
  2. Queue Management: Background task processing
  3. Retry Logic: Resilience and fault tolerance
  4. Scaling Strategies: Horizontal and vertical scaling