Migration Strategy
Database migrations, zero-downtime deployments, rollback procedures, and version management strategies for SysManage.
Migration Strategy Overview
SysManage implements a comprehensive migration strategy that ensures system reliability, data integrity, and minimal downtime during updates and schema changes.
Core Principles
- Zero-Downtime: Updates without service interruption
- Rollback Safety: Safe rollback procedures for all changes
- Data Integrity: Guaranteed data consistency during migrations
- Gradual Deployment: Phased rollouts with canary deployments
Database Migration Framework
Migration Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Migration Controller │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Schema │ │ Data │ │ Index │ │
│ │ Migrator │ │ Migrator │ │ Migrator │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Version │ │ Dependency │ │ Rollback │ │
│ │ Manager │ │ Resolver │ │ Planner │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Backup │ │ Validation │ │ Progress │ │
│ │ Manager │ │ Engine │ │ Monitor │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ ▲
Migration │ │ Status
Commands │ │ Reports
▼ │
┌─────────────────────────────────────────────────────────────────┐
│ PostgreSQL Cluster │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Primary │ │ Replica │ │ Replica │ │
│ │ (Write) │ │ (Read) │ │ (Read) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Migration Script Framework
Base Migration Class
from abc import ABC, abstractmethod
from typing import Optional, List, Dict, Any
from datetime import datetime
import asyncio
class BaseMigration(ABC):
"""Base class for all database migrations"""
def __init__(self):
self.version: str = ""
self.description: str = ""
self.dependencies: List[str] = []
self.reversible: bool = True
self.batch_size: int = 1000
self.timeout: int = 3600 # 1 hour
@abstractmethod
async def up(self, db: Database) -> None:
"""Apply the migration"""
pass
@abstractmethod
async def down(self, db: Database) -> None:
"""Rollback the migration"""
pass
async def validate_preconditions(self, db: Database) -> bool:
"""Validate that preconditions are met"""
return True
async def validate_postconditions(self, db: Database) -> bool:
"""Validate that migration was successful"""
return True
async def estimate_duration(self, db: Database) -> int:
"""Estimate migration duration in seconds"""
return 60
async def get_rollback_data(self, db: Database) -> Optional[Dict[str, Any]]:
"""Collect data needed for rollback"""
return None
class Migration_20241201_001_AddAgentMetricsTable(BaseMigration):
def __init__(self):
super().__init__()
self.version = "20241201_001"
self.description = "Add agent_metrics table for performance tracking"
self.dependencies = ["20241130_005"]
self.reversible = True
async def up(self, db: Database) -> None:
"""Create agent_metrics table"""
# Create table with proper indexing
await db.execute("""
CREATE TABLE agent_metrics (
id BIGSERIAL PRIMARY KEY,
agent_id UUID NOT NULL REFERENCES agents(id) ON DELETE CASCADE,
metric_type VARCHAR(50) NOT NULL,
metric_value JSONB NOT NULL,
collected_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""")
# Add indexes for performance
await db.execute("""
CREATE INDEX idx_agent_metrics_agent_id
ON agent_metrics(agent_id)
""")
await db.execute("""
CREATE INDEX idx_agent_metrics_type_time
ON agent_metrics(metric_type, collected_at DESC)
""")
await db.execute("""
CREATE INDEX idx_agent_metrics_collected_at
ON agent_metrics(collected_at DESC)
""")
# Add table comment
await db.execute("""
COMMENT ON TABLE agent_metrics IS
'Stores performance and health metrics from SysManage agents'
""")
async def down(self, db: Database) -> None:
"""Remove agent_metrics table"""
await db.execute("DROP TABLE IF EXISTS agent_metrics CASCADE")
async def validate_postconditions(self, db: Database) -> bool:
"""Verify table creation"""
result = await db.fetch_one("""
SELECT table_name FROM information_schema.tables
WHERE table_name = 'agent_metrics' AND table_schema = 'public'
""")
return result is not None
class MigrationRunner:
def __init__(self, db: Database):
self.db = db
self.migration_table = "schema_migrations"
async def initialize(self):
"""Initialize migration tracking table"""
await self.db.execute(f"""
CREATE TABLE IF NOT EXISTS {self.migration_table} (
version VARCHAR(255) PRIMARY KEY,
description TEXT NOT NULL,
applied_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
applied_by VARCHAR(100) NOT NULL DEFAULT CURRENT_USER,
execution_time_ms INTEGER NOT NULL,
rollback_data JSONB,
checksum VARCHAR(64) NOT NULL
)
""")
async def get_applied_migrations(self) -> List[str]:
"""Get list of applied migration versions"""
result = await self.db.fetch_all(f"""
SELECT version FROM {self.migration_table}
ORDER BY applied_at ASC
""")
return [row["version"] for row in result]
async def apply_migration(self, migration: BaseMigration) -> bool:
"""Apply a single migration with full transaction safety"""
# Start transaction
async with self.db.transaction():
try:
# Validate preconditions
if not await migration.validate_preconditions(self.db):
raise Exception(f"Preconditions failed for {migration.version}")
# Get rollback data before applying
rollback_data = await migration.get_rollback_data(self.db)
# Apply migration with timing
start_time = datetime.utcnow()
await migration.up(self.db)
execution_time = (datetime.utcnow() - start_time).total_seconds() * 1000
# Validate postconditions
if not await migration.validate_postconditions(self.db):
raise Exception(f"Postconditions failed for {migration.version}")
# Record successful migration
await self.db.execute(f"""
INSERT INTO {self.migration_table}
(version, description, execution_time_ms, rollback_data, checksum)
VALUES ($1, $2, $3, $4, $5)
""", migration.version, migration.description,
int(execution_time), rollback_data,
self._calculate_checksum(migration))
return True
except Exception as e:
logger.error(f"Migration {migration.version} failed: {e}")
# Transaction will auto-rollback
return False
async def rollback_migration(self, migration: BaseMigration) -> bool:
"""Rollback a migration"""
if not migration.reversible:
raise Exception(f"Migration {migration.version} is not reversible")
async with self.db.transaction():
try:
# Get rollback data
result = await self.db.fetch_one(f"""
SELECT rollback_data FROM {self.migration_table}
WHERE version = $1
""", migration.version)
if result and result["rollback_data"]:
# Restore rollback data if needed
pass
# Execute rollback
await migration.down(self.db)
# Remove from migration table
await self.db.execute(f"""
DELETE FROM {self.migration_table} WHERE version = $1
""", migration.version)
return True
except Exception as e:
logger.error(f"Rollback of {migration.version} failed: {e}")
return False
Safe Schema Changes
Online Schema Migration Patterns
class SafeSchemaMigration(BaseMigration):
"""Base class for safe online schema changes"""
async def add_column_safely(self, db: Database, table: str,
column: str, definition: str,
default_value: Any = None) -> None:
"""Add column with zero downtime"""
# Step 1: Add column with NULL default (fast operation)
await db.execute(f"""
ALTER TABLE {table}
ADD COLUMN {column} {definition}
""")
if default_value is not None:
# Step 2: Update existing rows in batches
batch_size = 1000
offset = 0
while True:
affected = await db.execute(f"""
UPDATE {table}
SET {column} = $1
WHERE {column} IS NULL
AND id IN (
SELECT id FROM {table}
WHERE {column} IS NULL
ORDER BY id
LIMIT $2
)
""", default_value, batch_size)
if affected == 0:
break
# Small delay to avoid overwhelming the database
await asyncio.sleep(0.1)
# Step 3: Add NOT NULL constraint if needed
if "NOT NULL" in definition:
await db.execute(f"""
ALTER TABLE {table}
ALTER COLUMN {column} SET NOT NULL
""")
async def drop_column_safely(self, db: Database, table: str,
column: str) -> None:
"""Drop column with proper cleanup"""
# Step 1: Drop any dependent objects first
await db.execute(f"""
DROP INDEX IF EXISTS idx_{table}_{column}
""")
# Step 2: Drop the column
await db.execute(f"""
ALTER TABLE {table} DROP COLUMN {column}
""")
async def rename_table_safely(self, db: Database, old_name: str,
new_name: str) -> None:
"""Rename table with dependency handling"""
# Step 1: Create view with old name pointing to new table
await db.execute(f"ALTER TABLE {old_name} RENAME TO {new_name}")
# Step 2: Create compatibility view
await db.execute(f"""
CREATE VIEW {old_name} AS SELECT * FROM {new_name}
""")
# Note: Remove view in a future migration after code update
class Migration_20241201_002_AddAgentStatusColumn(SafeSchemaMigration):
def __init__(self):
super().__init__()
self.version = "20241201_002"
self.description = "Add status column to agents table"
self.dependencies = ["20241201_001"]
async def up(self, db: Database) -> None:
await self.add_column_safely(
db, "agents", "status",
"VARCHAR(20) DEFAULT 'active'",
"active"
)
# Add check constraint
await db.execute("""
ALTER TABLE agents
ADD CONSTRAINT chk_agent_status
CHECK (status IN ('active', 'inactive', 'maintenance'))
""")
async def down(self, db: Database) -> None:
await db.execute("ALTER TABLE agents DROP CONSTRAINT chk_agent_status")
await self.drop_column_safely(db, "agents", "status")
Zero-Downtime Deployment
Blue-Green Deployment Strategy
Blue-Green Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Load Balancer │
│ (Traffic Router) │
│ ┌─────────────────────────┐ │
│ │ 100% → Blue (Current) │ │
│ │ 0% → Green (New) │ │
│ └─────────────────────────┘ │
└─────────────────────┬───────────────────┬─────────────────────┘
│ │
┌────────────────▼──────────────┐ ┌▼──────────────────────────┐
│ Blue Environment │ │ Green Environment │
│ (Production) │ │ (Staging) │
├───────────────────────────────┤ ├───────────────────────────┤
│ ┌─────┐ ┌─────┐ ┌─────┐ │ │ ┌─────┐ ┌─────┐ ┌─────┐ │
│ │App 1│ │App 2│ │App 3│ │ │ │App 1│ │App 2│ │App 3│ │
│ │v1.5 │ │v1.5 │ │v1.5 │ │ │ │v1.6 │ │v1.6 │ │v1.6 │ │
│ └─────┘ └─────┘ └─────┘ │ │ └─────┘ └─────┘ └─────┘ │
└───────────────┬───────────────┘ └───────────────┬───────────┘
│ │
┌───────────────▼─────────────────────────────────────▼───────────┐
│ Shared Database Cluster │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Primary │ │ Replica │ │ Replica │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Phase 1: Deploy to Green Phase 2: Switch Traffic
┌─────────────────────────┐ ┌─────────────────────────┐
│ 100% → Blue (v1.5) │ ───→ │ 0% → Blue (v1.5) │
│ 0% → Green (v1.6) │ │ 100% → Green (v1.6) │
└─────────────────────────┘ └─────────────────────────┘
Deployment Automation
Blue-Green Deployment Script
import asyncio
from typing import Dict, List
from enum import Enum
class DeploymentEnvironment(Enum):
BLUE = "blue"
GREEN = "green"
class BlueGreenDeployer:
def __init__(self, config: Dict):
self.config = config
self.load_balancer = LoadBalancerController(config["lb_config"])
self.k8s_client = KubernetesClient(config["k8s_config"])
async def deploy(self, version: str, environment: DeploymentEnvironment) -> bool:
"""Deploy new version to specified environment"""
try:
# Step 1: Prepare new environment
await self._prepare_environment(environment, version)
# Step 2: Run database migrations on new environment
if not await self._run_migrations(environment):
raise Exception("Database migration failed")
# Step 3: Deploy application to new environment
if not await self._deploy_application(environment, version):
raise Exception("Application deployment failed")
# Step 4: Run health checks
if not await self._health_check(environment):
raise Exception("Health checks failed")
# Step 5: Run smoke tests
if not await self._smoke_tests(environment):
raise Exception("Smoke tests failed")
return True
except Exception as e:
logger.error(f"Deployment to {environment.value} failed: {e}")
await self._cleanup_failed_deployment(environment)
return False
async def switch_traffic(self, target_environment: DeploymentEnvironment) -> bool:
"""Switch traffic to target environment"""
try:
# Step 1: Gradual traffic shift (canary deployment)
traffic_percentages = [10, 25, 50, 75, 100]
for percentage in traffic_percentages:
await self.load_balancer.set_traffic_split(
target_environment, percentage
)
# Monitor for 2 minutes at each level
await asyncio.sleep(120)
# Check error rates and performance
if not await self._monitor_traffic_shift(target_environment, percentage):
# Rollback on issues
await self.load_balancer.set_traffic_split(target_environment, 0)
return False
# Step 2: Final verification
if await self._verify_full_traffic_switch(target_environment):
logger.info(f"Successfully switched to {target_environment.value}")
return True
else:
await self.rollback()
return False
except Exception as e:
logger.error(f"Traffic switch failed: {e}")
await self.rollback()
return False
async def rollback(self) -> bool:
"""Rollback to previous environment"""
current_env = await self.load_balancer.get_active_environment()
target_env = (DeploymentEnvironment.BLUE
if current_env == DeploymentEnvironment.GREEN
else DeploymentEnvironment.GREEN)
return await self.load_balancer.set_traffic_split(target_env, 100)
async def _run_migrations(self, environment: DeploymentEnvironment) -> bool:
"""Run database migrations for environment"""
# Create migration job in Kubernetes
migration_job = {
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"name": f"migration-{environment.value}-{int(time.time())}",
"namespace": "sysmanage"
},
"spec": {
"template": {
"spec": {
"containers": [{
"name": "migration",
"image": f"sysmanage/migrate:{self.config['version']}",
"env": [
{"name": "DATABASE_URL", "valueFrom": {"secretKeyRef": {
"name": "db-secret", "key": "url"
}}},
{"name": "ENVIRONMENT", "value": environment.value}
],
"command": ["python", "-m", "sysmanage.migrations", "apply"]
}],
"restartPolicy": "Never"
}
},
"backoffLimit": 3
}
}
# Apply job and wait for completion
job_name = await self.k8s_client.create_job(migration_job)
return await self.k8s_client.wait_for_job_completion(job_name, timeout=1800)
async def _deploy_application(self, environment: DeploymentEnvironment,
version: str) -> bool:
"""Deploy application to environment"""
deployment_name = f"sysmanage-{environment.value}"
# Update deployment with new image
await self.k8s_client.update_deployment(
deployment_name,
image=f"sysmanage/app:{version}",
environment_vars={
"ENVIRONMENT": environment.value,
"VERSION": version
}
)
# Wait for rollout to complete
return await self.k8s_client.wait_for_rollout(deployment_name, timeout=600)
async def _health_check(self, environment: DeploymentEnvironment) -> bool:
"""Comprehensive health check for environment"""
health_endpoint = f"https://{environment.value}.sysmanage.internal/api/health/detailed"
for attempt in range(30): # 5 minutes of retries
try:
async with aiohttp.ClientSession() as session:
async with session.get(health_endpoint, timeout=10) as response:
if response.status == 200:
health_data = await response.json()
if health_data.get("status") == "healthy":
return True
except Exception as e:
logger.warning(f"Health check attempt {attempt + 1} failed: {e}")
await asyncio.sleep(10)
return False
async def _smoke_tests(self, environment: DeploymentEnvironment) -> bool:
"""Run smoke tests against environment"""
test_cases = [
self._test_authentication,
self._test_agent_registration,
self._test_basic_operations,
self._test_websocket_connectivity
]
base_url = f"https://{environment.value}.sysmanage.internal"
for test_case in test_cases:
if not await test_case(base_url):
return False
return True
Canary Deployment
Gradual Traffic Shifting
class CanaryDeployment:
def __init__(self, load_balancer: LoadBalancerController,
metrics_client: MetricsClient):
self.load_balancer = load_balancer
self.metrics_client = metrics_client
async def canary_deploy(self, new_version: str,
canary_percentage: int = 5) -> bool:
"""Deploy using canary strategy"""
# Step 1: Deploy canary version
await self._deploy_canary(new_version, canary_percentage)
# Step 2: Monitor canary performance
monitoring_duration = 300 # 5 minutes
if not await self._monitor_canary(monitoring_duration):
await self._rollback_canary()
return False
# Step 3: Gradually increase traffic
traffic_levels = [10, 25, 50, 75, 100]
for level in traffic_levels:
await self.load_balancer.set_canary_traffic(level)
# Monitor at each level
if not await self._monitor_canary(monitoring_duration):
await self._rollback_canary()
return False
# Step 4: Complete deployment
await self._promote_canary()
return True
async def _monitor_canary(self, duration: int) -> bool:
"""Monitor canary deployment metrics"""
start_time = time.time()
while time.time() - start_time < duration:
metrics = await self.metrics_client.get_canary_metrics()
# Check error rate
if metrics["error_rate"] > 0.01: # 1% threshold
logger.error(f"High error rate: {metrics['error_rate']}")
return False
# Check response time
if metrics["avg_response_time"] > 500: # 500ms threshold
logger.error(f"High response time: {metrics['avg_response_time']}ms")
return False
# Check memory usage
if metrics["memory_usage"] > 0.9: # 90% threshold
logger.error(f"High memory usage: {metrics['memory_usage']}")
return False
await asyncio.sleep(30) # Check every 30 seconds
return True
Rollback Procedures
Automated Rollback System
Rollback Controller
class RollbackController:
def __init__(self, config: Dict):
self.config = config
self.deployment_history = DeploymentHistory()
self.health_monitor = HealthMonitor()
async def setup_automatic_rollback(self, deployment_id: str):
"""Setup automatic rollback triggers"""
# Monitor key metrics
triggers = [
ErrorRateTrigger(threshold=0.05, duration=300), # 5% error rate for 5 min
ResponseTimeTrigger(threshold=1000, duration=180), # 1s response time for 3 min
HealthCheckTrigger(failure_count=3), # 3 consecutive health check failures
]
for trigger in triggers:
trigger.on_trigger(lambda: self.execute_rollback(deployment_id))
await self.health_monitor.add_trigger(trigger)
async def execute_rollback(self, deployment_id: str) -> bool:
"""Execute automatic rollback"""
try:
# Get previous stable deployment
previous_deployment = await self.deployment_history.get_previous_stable(
deployment_id
)
if not previous_deployment:
raise Exception("No previous stable deployment found")
logger.info(f"Starting rollback to {previous_deployment.version}")
# Step 1: Database rollback
if not await self._rollback_database(previous_deployment):
raise Exception("Database rollback failed")
# Step 2: Application rollback
if not await self._rollback_application(previous_deployment):
raise Exception("Application rollback failed")
# Step 3: Traffic switch
if not await self._switch_traffic(previous_deployment):
raise Exception("Traffic switch failed")
# Step 4: Verify rollback
if not await self._verify_rollback(previous_deployment):
raise Exception("Rollback verification failed")
await self.deployment_history.mark_rollback_successful(
deployment_id, previous_deployment.id
)
return True
except Exception as e:
logger.error(f"Rollback failed: {e}")
await self._emergency_procedures()
return False
async def _rollback_database(self, target_deployment: Deployment) -> bool:
"""Rollback database to previous version"""
current_version = await self._get_current_db_version()
target_version = target_deployment.database_version
if current_version == target_version:
return True # Already at target version
# Get migrations to rollback
migrations_to_rollback = await self._get_migrations_to_rollback(
current_version, target_version
)
# Execute rollback migrations in reverse order
for migration in reversed(migrations_to_rollback):
if not migration.reversible:
raise Exception(f"Migration {migration.version} is not reversible")
if not await self.migration_runner.rollback_migration(migration):
raise Exception(f"Failed to rollback migration {migration.version}")
return True
async def _rollback_application(self, target_deployment: Deployment) -> bool:
"""Rollback application to previous version"""
# Update all deployments to target version
deployments = ["sysmanage-backend", "sysmanage-frontend", "sysmanage-worker"]
rollback_tasks = []
for deployment_name in deployments:
task = self.k8s_client.rollback_deployment(
deployment_name,
target_deployment.version
)
rollback_tasks.append(task)
# Wait for all rollbacks to complete
results = await asyncio.gather(*rollback_tasks, return_exceptions=True)
return all(result is True for result in results)
async def _emergency_procedures(self):
"""Emergency procedures when rollback fails"""
# Step 1: Enable maintenance mode
await self.load_balancer.enable_maintenance_mode()
# Step 2: Notify operations team
await self.alerting_system.send_critical_alert(
"CRITICAL: Automated rollback failed. Manual intervention required."
)
# Step 3: Create incident
await self.incident_management.create_incident(
title="Failed automated rollback",
severity="critical",
description="Automated rollback procedures failed. System may be unstable."
)
class DeploymentHistory:
def __init__(self, db: Database):
self.db = db
async def record_deployment(self, deployment: Deployment):
"""Record successful deployment"""
await self.db.execute("""
INSERT INTO deployment_history
(id, version, database_version, deployed_at, deployed_by, status)
VALUES ($1, $2, $3, $4, $5, $6)
""", deployment.id, deployment.version, deployment.database_version,
deployment.deployed_at, deployment.deployed_by, "stable")
async def get_previous_stable(self, current_deployment_id: str) -> Optional[Deployment]:
"""Get previous stable deployment"""
result = await self.db.fetch_one("""
SELECT * FROM deployment_history
WHERE status = 'stable' AND id != $1
ORDER BY deployed_at DESC
LIMIT 1
""", current_deployment_id)
return Deployment.from_db_row(result) if result else None
async def mark_rollback_successful(self, failed_deployment_id: str,
target_deployment_id: str):
"""Mark rollback as successful"""
await self.db.execute("""
UPDATE deployment_history
SET status = 'rolled_back', rolled_back_at = NOW()
WHERE id = $1
""", failed_deployment_id)
await self.db.execute("""
UPDATE deployment_history
SET status = 'stable'
WHERE id = $1
""", target_deployment_id)
Manual Rollback Procedures
Rollback CLI Tool
#!/bin/bash
# SysManage Rollback Tool
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
source "$SCRIPT_DIR/common.sh"
usage() {
cat << EOF
SysManage Rollback Tool
Usage: $0 [OPTIONS] COMMAND
Commands:
list List available rollback targets
plan VERSION Show rollback plan for version
execute VERSION Execute rollback to version
status Show current rollback status
abort Abort ongoing rollback
Options:
-e, --environment ENV Target environment (prod, staging)
-f, --force Force rollback without confirmation
-v, --verbose Verbose output
-h, --help Show this help
Examples:
$0 list
$0 plan v1.5.2
$0 execute v1.5.2
$0 -e staging execute v1.5.1
EOF
}
rollback_list() {
echo "Available rollback targets:"
echo "=========================="
kubectl get deployments -n sysmanage -o custom-columns=\
NAME:.metadata.name,VERSION:.metadata.labels.version,DEPLOYED:.metadata.creationTimestamp \
--sort-by=.metadata.creationTimestamp | tail -10
}
rollback_plan() {
local target_version="$1"
local current_version
current_version=$(get_current_version)
echo "Rollback Plan"
echo "============="
echo "Current Version: $current_version"
echo "Target Version: $target_version"
echo ""
# Show database migration rollbacks needed
echo "Database Changes:"
python3 -m sysmanage.migrations plan-rollback \
--from-version "$current_version" \
--to-version "$target_version"
echo ""
echo "Application Components:"
echo "- Backend API: $current_version -> $target_version"
echo "- Frontend: $current_version -> $target_version"
echo "- Worker Processes: $current_version -> $target_version"
echo ""
echo "Estimated Downtime: 5-10 minutes"
echo "Risk Level: $(calculate_risk_level "$current_version" "$target_version")"
}
rollback_execute() {
local target_version="$1"
local current_version
current_version=$(get_current_version)
if [[ "$FORCE" != "true" ]]; then
echo "WARNING: This will rollback from $current_version to $target_version"
read -p "Are you sure? (y/N): " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]; then
echo "Rollback cancelled."
exit 0
fi
fi
log "Starting rollback to $target_version"
# Step 1: Enable maintenance mode
log "Enabling maintenance mode..."
kubectl patch ingress sysmanage-ingress -n sysmanage \
-p '{"metadata":{"annotations":{"nginx.ingress.kubernetes.io/maintenance":"true"}}}'
# Step 2: Scale down current deployment
log "Scaling down current deployment..."
kubectl scale deployment sysmanage-backend -n sysmanage --replicas=0
# Step 3: Rollback database
log "Rolling back database..."
if ! python3 -m sysmanage.migrations rollback --to-version "$target_version"; then
log "ERROR: Database rollback failed"
rollback_abort
exit 1
fi
# Step 4: Deploy target version
log "Deploying target version..."
kubectl set image deployment/sysmanage-backend -n sysmanage \
app="sysmanage/backend:$target_version"
kubectl set image deployment/sysmanage-frontend -n sysmanage \
app="sysmanage/frontend:$target_version"
# Step 5: Wait for deployment
log "Waiting for deployment to be ready..."
kubectl rollout status deployment/sysmanage-backend -n sysmanage --timeout=300s
kubectl rollout status deployment/sysmanage-frontend -n sysmanage --timeout=300s
# Step 6: Run health checks
log "Running health checks..."
if ! run_health_checks; then
log "ERROR: Health checks failed"
rollback_abort
exit 1
fi
# Step 7: Disable maintenance mode
log "Disabling maintenance mode..."
kubectl patch ingress sysmanage-ingress -n sysmanage \
-p '{"metadata":{"annotations":{"nginx.ingress.kubernetes.io/maintenance":"false"}}}'
log "Rollback to $target_version completed successfully"
}
rollback_abort() {
log "Aborting rollback and restoring previous state..."
# Restore from backup if database was modified
if [[ -f "/tmp/rollback_db_backup.sql" ]]; then
log "Restoring database from backup..."
PGPASSWORD="$DB_PASSWORD" psql -h "$DB_HOST" -U "$DB_USER" -d "$DB_NAME" \
-f "/tmp/rollback_db_backup.sql"
fi
# Scale back up
kubectl scale deployment sysmanage-backend -n sysmanage --replicas=3
# Disable maintenance mode
kubectl patch ingress sysmanage-ingress -n sysmanage \
-p '{"metadata":{"annotations":{"nginx.ingress.kubernetes.io/maintenance":"false"}}}'
log "Rollback aborted. System restored to previous state."
}
run_health_checks() {
local max_attempts=30
local attempt=1
while [[ $attempt -le $max_attempts ]]; do
if curl -sf "https://sysmanage.example.com/api/health" > /dev/null; then
log "Health check passed"
return 0
fi
log "Health check attempt $attempt/$max_attempts failed, retrying..."
sleep 10
((attempt++))
done
log "Health checks failed after $max_attempts attempts"
return 1
}
Version Management
Semantic Versioning Strategy
Version Management System
import re
from typing import List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
class VersionType(Enum):
MAJOR = "major"
MINOR = "minor"
PATCH = "patch"
PRERELEASE = "prerelease"
@dataclass
class Version:
major: int
minor: int
patch: int
prerelease: Optional[str] = None
build: Optional[str] = None
@classmethod
def parse(cls, version_string: str) -> 'Version':
"""Parse version string (e.g., '1.2.3-beta.1+build.123')"""
# Remove 'v' prefix if present
version_string = version_string.lstrip('v')
# Regex for semantic version
pattern = r'^(\d+)\.(\d+)\.(\d+)(?:-([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?(?:\+([0-9A-Za-z-]+(?:\.[0-9A-Za-z-]+)*))?$'
match = re.match(pattern, version_string)
if not match:
raise ValueError(f"Invalid version string: {version_string}")
return cls(
major=int(match.group(1)),
minor=int(match.group(2)),
patch=int(match.group(3)),
prerelease=match.group(4),
build=match.group(5)
)
def __str__(self) -> str:
version = f"{self.major}.{self.minor}.{self.patch}"
if self.prerelease:
version += f"-{self.prerelease}"
if self.build:
version += f"+{self.build}"
return version
def __lt__(self, other: 'Version') -> bool:
"""Compare versions according to semver rules"""
# Compare major.minor.patch
self_tuple = (self.major, self.minor, self.patch)
other_tuple = (other.major, other.minor, other.patch)
if self_tuple != other_tuple:
return self_tuple < other_tuple
# Handle prerelease versions
if self.prerelease is None and other.prerelease is None:
return False
elif self.prerelease is None:
return False # Release > prerelease
elif other.prerelease is None:
return True # Prerelease < release
else:
return self._compare_prerelease(self.prerelease, other.prerelease)
def _compare_prerelease(self, pre1: str, pre2: str) -> bool:
"""Compare prerelease versions"""
parts1 = pre1.split('.')
parts2 = pre2.split('.')
for i in range(max(len(parts1), len(parts2))):
part1 = parts1[i] if i < len(parts1) else ''
part2 = parts2[i] if i < len(parts2) else ''
# Try to compare as integers
try:
int1, int2 = int(part1), int(part2)
if int1 != int2:
return int1 < int2
except ValueError:
# String comparison
if part1 != part2:
return part1 < part2
return len(parts1) < len(parts2)
def bump(self, version_type: VersionType) -> 'Version':
"""Bump version according to type"""
if version_type == VersionType.MAJOR:
return Version(self.major + 1, 0, 0)
elif version_type == VersionType.MINOR:
return Version(self.major, self.minor + 1, 0)
elif version_type == VersionType.PATCH:
return Version(self.major, self.minor, self.patch + 1)
else:
raise ValueError(f"Unsupported version type: {version_type}")
def is_compatible(self, other: 'Version') -> bool:
"""Check if versions are backward compatible"""
# Same major version is generally compatible
if self.major == other.major:
return True
# Major version 0 is special - minor versions are not compatible
if self.major == 0 or other.major == 0:
return self.major == other.major and self.minor == other.minor
return False
class VersionManager:
def __init__(self, db: Database):
self.db = db
async def get_current_version(self) -> Version:
"""Get current system version"""
result = await self.db.fetch_one("""
SELECT version FROM system_version
ORDER BY created_at DESC LIMIT 1
""")
if not result:
raise Exception("No version information found")
return Version.parse(result["version"])
async def get_version_history(self, limit: int = 50) -> List[Tuple[Version, dict]]:
"""Get version deployment history"""
results = await self.db.fetch_all("""
SELECT version, deployed_at, deployed_by, release_notes
FROM deployment_history
ORDER BY deployed_at DESC
LIMIT $1
""", limit)
return [(Version.parse(row["version"]), {
"deployed_at": row["deployed_at"],
"deployed_by": row["deployed_by"],
"release_notes": row["release_notes"]
}) for row in results]
async def validate_version_compatibility(self,
target_version: Version) -> List[str]:
"""Validate if target version is compatible for deployment"""
current_version = await self.get_current_version()
issues = []
# Check backward compatibility
if not current_version.is_compatible(target_version):
if target_version < current_version:
# Downgrade scenario
if target_version.major < current_version.major:
issues.append(
f"Major version downgrade from {current_version.major} "
f"to {target_version.major} may cause data compatibility issues"
)
else:
# Upgrade scenario
if target_version.major > current_version.major:
issues.append(
f"Major version upgrade from {current_version.major} "
f"to {target_version.major} requires migration planning"
)
# Check database schema compatibility
schema_issues = await self._check_schema_compatibility(
current_version, target_version
)
issues.extend(schema_issues)
# Check API compatibility
api_issues = await self._check_api_compatibility(
current_version, target_version
)
issues.extend(api_issues)
return issues
async def _check_schema_compatibility(self,
current: Version,
target: Version) -> List[str]:
"""Check database schema compatibility"""
# Get schema versions for both versions
current_schema = await self._get_schema_version(current)
target_schema = await self._get_schema_version(target)
issues = []
if target_schema < current_schema:
# Check if rollback is safe
migrations_to_rollback = await self._get_migrations_between_versions(
current_schema, target_schema
)
irreversible_migrations = [
m for m in migrations_to_rollback if not m.reversible
]
if irreversible_migrations:
issues.append(
f"Cannot rollback due to irreversible migrations: "
f"{[m.version for m in irreversible_migrations]}"
)
return issues
Migration Monitoring
Key Metrics
Deployment Success
- Deployment success rate
- Average deployment duration
- Rollback frequency
- Failed deployment causes
Migration Performance
- Migration execution time
- Database lock duration
- Data migration volume
- Index rebuild time
System Health
- Post-deployment error rates
- Performance regression detection
- Resource utilization changes
- User experience impact
Deployment Dashboard
Grafana Queries for Deployment Monitoring
# Deployment success rate (last 30 days)
(
sum(rate(deployment_total{status="success"}[30d])) /
sum(rate(deployment_total[30d]))
) * 100
# Average deployment duration
avg_over_time(deployment_duration_seconds[24h]) / 60
# Rollback frequency
rate(deployment_total{status="rollback"}[7d]) * 86400
# Migration execution time by type
histogram_quantile(0.95,
rate(migration_duration_seconds_bucket[24h])
) by (migration_type)
# Post-deployment error rate
rate(http_requests_total{status=~"5.."}[5m]) and on()
(time() - deployment_timestamp_seconds < 1800)
Best Practices
Migration Guidelines
- Backward Compatibility: Maintain compatibility for at least one version
- Incremental Changes: Make small, incremental changes rather than large migrations
- Testing: Thoroughly test migrations in staging environments
- Monitoring: Monitor system health closely after deployments
Deployment Guidelines
- Off-Peak Timing: Schedule deployments during low-traffic periods
- Feature Flags: Use feature flags for gradual feature rollouts
- Automated Testing: Implement comprehensive automated testing
- Communication: Notify stakeholders of planned deployments