Documentation > Architecture > Data Synchronization

Data Synchronization

Data synchronization strategies, conflict resolution, and consistency models in SysManage distributed systems.

Synchronization Overview

SysManage implements a sophisticated data synchronization system to maintain consistency across distributed components while handling network partitions, agent disconnections, and concurrent updates gracefully.

Core Principles

  • Eventually Consistent: System converges to consistency over time
  • Conflict-Free: Last-writer-wins with timestamp ordering
  • Partition Tolerant: Continues operation during network issues
  • Agent Autonomy: Agents can operate independently when disconnected

Synchronization Architecture

Data Flow and Synchronization Points

┌─────────────────────────────────────────────────────────────────┐
│                    SysManage Server                            │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Sync Engine │  │ Conflict     │  │ Version Vector          │ │
│  │ (Master)    │  │ Resolver     │  │ Manager                 │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Change Log  │  │ Delta        │  │ PostgreSQL              │ │
│  │ Processor   │  │ Calculator   │  │ WAL Streaming           │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                          │    ▲
                   Push   │    │ Pull
                  Changes │    │ Requests
                          ▼    │
┌─────────────────────────────────────────────────────────────────┐
│                   SysManage Agents                             │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Local Sync  │  │ Change       │  │ SQLite WAL              │ │
│  │ Manager     │  │ Detector     │  │ (Local Store)           │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│  ┌─────────────┐  ┌──────────────┐  ┌─────────────────────────┐ │
│  │ Offline     │  │ Merge        │  │ Incremental             │ │
│  │ Queue       │  │ Resolver     │  │ Backup                  │ │
│  └─────────────┘  └──────────────┘  └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
                        

Consistency Models

Eventual Consistency

SysManage uses eventual consistency with vector clocks for tracking causality:

Vector Clock Implementation

class VectorClock:
    def __init__(self, node_id: str):
        self.node_id = node_id
        self.clock = {node_id: 0}

    def increment(self) -> 'VectorClock':
        """Increment local clock for new event"""
        self.clock[self.node_id] += 1
        return self

    def update(self, other: 'VectorClock') -> 'VectorClock':
        """Update clock when receiving message"""
        for node, timestamp in other.clock.items():
            self.clock[node] = max(
                self.clock.get(node, 0),
                timestamp
            )
        self.increment()
        return self

    def compare(self, other: 'VectorClock') -> str:
        """Compare causality relationship"""
        less_or_equal = all(
            self.clock.get(node, 0) <= other.clock.get(node, 0)
            for node in set(self.clock.keys()) | set(other.clock.keys())
        )
        greater_or_equal = all(
            self.clock.get(node, 0) >= other.clock.get(node, 0)
            for node in set(self.clock.keys()) | set(other.clock.keys())
        )

        if less_or_equal and not greater_or_equal:
            return "before"
        elif greater_or_equal and not less_or_equal:
            return "after"
        elif less_or_equal and greater_or_equal:
            return "equal"
        else:
            return "concurrent"

Strong Consistency (Critical Operations)

For critical operations like certificate management and user authentication, SysManage uses strong consistency with two-phase commit:

Two-Phase Commit Protocol

class TwoPhaseCommitCoordinator:
    async def execute_transaction(self, operation: Operation) -> bool:
        """Execute distributed transaction across participants"""
        participants = self.get_participants(operation)
        transaction_id = self.generate_transaction_id()

        # Phase 1: Prepare
        prepare_responses = await asyncio.gather(*[
            participant.prepare(transaction_id, operation)
            for participant in participants
        ])

        if all(response.ready for response in prepare_responses):
            # Phase 2: Commit
            commit_responses = await asyncio.gather(*[
                participant.commit(transaction_id)
                for participant in participants
            ])
            return all(response.success for response in commit_responses)
        else:
            # Abort transaction
            await asyncio.gather(*[
                participant.abort(transaction_id)
                for participant in participants
            ])
            return False

Conflict Resolution

Conflict Detection

SysManage detects conflicts using a combination of techniques:

Timestamp-Based

  • Last-writer-wins for simple conflicts
  • Lamport timestamps for ordering
  • Wall clock synchronization with NTP

Content-Based

  • Hash comparison for data integrity
  • Field-level conflict detection
  • Semantic conflict identification

Operational Transform

  • Transform concurrent operations
  • Maintain operation commutativity
  • Preserve user intentions

Conflict Resolution Strategies

Configurable Resolution

class ConflictResolver:
    def __init__(self, strategy: ConflictStrategy):
        self.strategy = strategy

    def resolve_conflict(self, local_change: Change,
                        remote_change: Change) -> Change:
        """Resolve conflict between local and remote changes"""

        if self.strategy == ConflictStrategy.LAST_WRITER_WINS:
            return self._last_writer_wins(local_change, remote_change)
        elif self.strategy == ConflictStrategy.MERGE:
            return self._merge_changes(local_change, remote_change)
        elif self.strategy == ConflictStrategy.MANUAL:
            return self._queue_for_manual_resolution(
                local_change, remote_change
            )
        elif self.strategy == ConflictStrategy.CUSTOM:
            return self._apply_custom_rules(local_change, remote_change)

    def _last_writer_wins(self, local: Change, remote: Change) -> Change:
        """Simple timestamp-based resolution"""
        if remote.timestamp > local.timestamp:
            return remote
        elif local.timestamp > remote.timestamp:
            return local
        else:
            # Same timestamp, use node ID for deterministic ordering
            return remote if remote.node_id > local.node_id else local

    def _merge_changes(self, local: Change, remote: Change) -> Change:
        """Intelligent merge of non-conflicting fields"""
        merged_data = {}

        for field in set(local.data.keys()) | set(remote.data.keys()):
            local_val = local.data.get(field)
            remote_val = remote.data.get(field)

            if local_val == remote_val:
                merged_data[field] = local_val
            elif local_val is None:
                merged_data[field] = remote_val
            elif remote_val is None:
                merged_data[field] = local_val
            else:
                # Field-level conflict, apply resolution strategy
                merged_data[field] = self._resolve_field_conflict(
                    field, local_val, remote_val
                )

        return Change(
            id=self.generate_merge_id(local, remote),
            data=merged_data,
            timestamp=max(local.timestamp, remote.timestamp),
            vector_clock=local.vector_clock.merge(remote.vector_clock)
        )

Synchronization Protocols

Push-Based Synchronization

Agents push changes to the server immediately when connected:

Push Protocol Implementation

class PushSyncManager:
    async def sync_changes(self, changes: List[Change]) -> SyncResult:
        """Push local changes to server"""

        # Batch changes by type for efficiency
        batched_changes = self.batch_changes(changes)

        results = []
        for batch in batched_changes:
            try:
                # Send batch with retry logic
                response = await self.send_with_retry(
                    endpoint="/api/sync/push",
                    data=batch.serialize(),
                    max_retries=3
                )

                if response.conflicts:
                    # Handle conflicts returned by server
                    resolved_changes = await self.resolve_conflicts(
                        response.conflicts
                    )
                    results.extend(resolved_changes)
                else:
                    results.extend(response.accepted_changes)

            except NetworkError as e:
                # Queue for later when connection restored
                await self.queue_for_retry(batch)
                logger.warning(f"Sync failed, queued for retry: {e}")

        return SyncResult(
            successful=len([r for r in results if r.success]),
            failed=len([r for r in results if not r.success]),
            conflicts=len([r for r in results if r.conflict])
        )

Pull-Based Synchronization

Periodic pull synchronization to catch missed updates:

Pull Protocol with Delta Sync

class PullSyncManager:
    async def pull_updates(self, since: datetime) -> List[Change]:
        """Pull updates from server since timestamp"""

        # Get incremental changes using server cursor
        cursor = await self.get_sync_cursor(since)

        changes = []
        while cursor.has_more:
            batch = await self.fetch_change_batch(
                cursor=cursor.position,
                limit=self.BATCH_SIZE
            )

            for change in batch.changes:
                # Apply conflict resolution
                if await self.has_local_conflict(change):
                    resolved_change = await self.resolve_pull_conflict(change)
                    changes.append(resolved_change)
                else:
                    changes.append(change)

            cursor = batch.next_cursor

        # Apply changes atomically
        await self.apply_changes_batch(changes)

        return changes

Merkle Tree Synchronization

For efficient bulk synchronization and verification:

Merkle Tree Sync

class MerkleTreeSync:
    def __init__(self, chunk_size: int = 1000):
        self.chunk_size = chunk_size

    def build_merkle_tree(self, data: List[Any]) -> MerkleNode:
        """Build Merkle tree for data set"""
        leaves = []

        # Create leaf nodes in chunks
        for i in range(0, len(data), self.chunk_size):
            chunk = data[i:i + self.chunk_size]
            chunk_hash = self.hash_chunk(chunk)
            leaves.append(MerkleLeaf(chunk_hash, chunk))

        # Build tree bottom-up
        return self.build_tree_recursive(leaves)

    async def sync_with_merkle(self, remote_root_hash: str) -> List[Change]:
        """Synchronize using Merkle tree comparison"""
        local_tree = self.build_merkle_tree(await self.get_local_data())

        if local_tree.hash == remote_root_hash:
            return []  # Already in sync

        # Find differing subtrees
        diff_ranges = await self.find_differences(
            local_tree, remote_root_hash
        )

        # Pull only the differing data
        changes = []
        for range_start, range_end in diff_ranges:
            range_changes = await self.pull_range(range_start, range_end)
            changes.extend(range_changes)

        return changes

Offline Operations

Local State Management

Agents maintain local state and operation queues during disconnection:

Offline Queue Management

class OfflineOperationQueue:
    def __init__(self, storage: LocalStorage):
        self.storage = storage
        self.operation_log = []

    async def queue_operation(self, operation: Operation) -> str:
        """Queue operation for later execution"""
        operation.id = self.generate_operation_id()
        operation.timestamp = datetime.utcnow()
        operation.status = OperationStatus.QUEUED

        # Store in local database
        await self.storage.store_operation(operation)
        self.operation_log.append(operation)

        # Try immediate execution if possible
        if await self.can_execute_locally(operation):
            await self.execute_locally(operation)

        return operation.id

    async def replay_operations(self) -> List[OperationResult]:
        """Replay queued operations when connection restored"""
        results = []

        # Sort operations by timestamp for correct ordering
        sorted_ops = sorted(
            await self.storage.get_queued_operations(),
            key=lambda op: op.timestamp
        )

        for operation in sorted_ops:
            try:
                result = await self.execute_remote(operation)

                if result.success:
                    operation.status = OperationStatus.COMPLETED
                    await self.storage.update_operation(operation)
                else:
                    # Handle operation failure
                    await self.handle_operation_failure(operation, result)

                results.append(result)

            except Exception as e:
                logger.error(f"Failed to replay operation {operation.id}: {e}")
                results.append(OperationResult(
                    operation_id=operation.id,
                    success=False,
                    error=str(e)
                ))

        return results

Conflict Detection on Reconnection

When agents reconnect, potential conflicts must be identified and resolved:

Reconnection Sync Protocol

class ReconnectionSyncManager:
    async def handle_reconnection(self) -> SyncStatus:
        """Handle agent reconnection and sync conflicts"""

        # 1. Get last known sync state
        last_sync = await self.get_last_sync_timestamp()

        # 2. Pull server changes since last sync
        server_changes = await self.pull_server_changes(since=last_sync)

        # 3. Get local changes made while offline
        local_changes = await self.get_local_changes(since=last_sync)

        # 4. Detect conflicts between server and local changes
        conflicts = self.detect_conflicts(server_changes, local_changes)

        if conflicts:
            # 5. Resolve conflicts using configured strategy
            resolved_changes = await self.resolve_conflicts(conflicts)

            # 6. Apply resolved changes
            await self.apply_resolved_changes(resolved_changes)

        # 7. Push any remaining local changes
        remaining_local = [c for c in local_changes if not c.conflicted]
        if remaining_local:
            await self.push_changes(remaining_local)

        # 8. Update sync state
        await self.update_sync_timestamp(datetime.utcnow())

        return SyncStatus(
            conflicts_detected=len(conflicts),
            conflicts_resolved=len(resolved_changes),
            changes_pushed=len(remaining_local)
        )

Performance Optimization

Batching and Compression

  • Change Batching: Group related changes for efficient transmission
  • Delta Compression: Only send differences between versions
  • Data Compression: GZIP compression for large payloads
  • Deduplication: Remove duplicate operations in queue

Adaptive Synchronization

Adaptive Sync Intervals

class AdaptiveSyncScheduler:
    def __init__(self):
        self.base_interval = 60  # seconds
        self.max_interval = 3600  # 1 hour
        self.min_interval = 10   # 10 seconds
        self.current_interval = self.base_interval

    def adjust_sync_interval(self, sync_result: SyncResult):
        """Adjust sync frequency based on activity"""

        if sync_result.changes_count > 0:
            # Frequent changes, sync more often
            self.current_interval = max(
                self.min_interval,
                self.current_interval * 0.8
            )
        else:
            # No changes, back off
            self.current_interval = min(
                self.max_interval,
                self.current_interval * 1.2
            )

        # Factor in network conditions
        if sync_result.network_latency > 1000:  # ms
            self.current_interval *= 1.5

        return self.current_interval

Caching and Memoization

  • Change Log Caching: Cache recent changes for faster conflict detection
  • Hash Caching: Cache Merkle tree hashes for quick comparison
  • Network Response Caching: Cache immutable data locally
  • Computation Memoization: Cache expensive operations like conflict resolution

Synchronization Monitoring

Key Metrics

Sync Performance

  • Sync frequency and duration
  • Data transfer volume
  • Operation queue lengths
  • Conflict resolution time

Consistency Metrics

  • Time to eventual consistency
  • Conflict occurrence rate
  • Data integrity violations
  • Synchronization lag

Network Health

  • Connection stability
  • Bandwidth utilization
  • Failed sync attempts
  • Retry queue sizes

Health Checks

Sync Health Monitoring

class SyncHealthMonitor:
    async def check_sync_health(self) -> HealthStatus:
        """Comprehensive sync health check"""

        health = HealthStatus()

        # Check sync lag
        last_sync = await self.get_last_successful_sync()
        sync_lag = datetime.utcnow() - last_sync
        health.sync_lag = sync_lag.total_seconds()

        # Check queue sizes
        health.pending_operations = await self.get_queue_size()
        health.failed_operations = await self.get_failed_operations_count()

        # Check conflict rates
        conflicts_24h = await self.get_conflicts_last_24h()
        total_ops_24h = await self.get_total_operations_24h()
        health.conflict_rate = conflicts_24h / max(total_ops_24h, 1)

        # Check data consistency
        consistency_check = await self.verify_data_consistency()
        health.consistency_violations = consistency_check.violations

        # Overall health score
        health.score = self.calculate_health_score(health)

        return health

Best Practices

Design Guidelines

  • Idempotency: Design operations to be safely retryable
  • Immutability: Use immutable data structures where possible
  • Event Sourcing: Store events rather than just current state
  • Graceful Degradation: System should work even with sync failures

Operational Guidelines

  • Monitor Sync Health: Set up alerts for sync failures and delays
  • Conflict Analysis: Regularly analyze conflict patterns
  • Capacity Planning: Monitor queue sizes and growth trends
  • Testing: Test sync behavior under various failure scenarios

Next Steps

To learn more about related architectural concepts:

  1. Queue Management: Understand the message queue architecture
  2. Performance Optimization: Learn about system optimization strategies
  3. Retry Logic: Explore retry and resilience patterns
  4. Database Schema: Understand the data model