Data Synchronization
Data synchronization strategies, conflict resolution, and consistency models in SysManage distributed systems.
Synchronization Overview
SysManage implements a sophisticated data synchronization system to maintain consistency across distributed components while handling network partitions, agent disconnections, and concurrent updates gracefully.
Core Principles
- Eventually Consistent: System converges to consistency over time
- Conflict-Free: Last-writer-wins with timestamp ordering
- Partition Tolerant: Continues operation during network issues
- Agent Autonomy: Agents can operate independently when disconnected
Synchronization Architecture
Data Flow and Synchronization Points
┌─────────────────────────────────────────────────────────────────┐
│ SysManage Server │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Sync Engine │ │ Conflict │ │ Version Vector │ │
│ │ (Master) │ │ Resolver │ │ Manager │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Change Log │ │ Delta │ │ PostgreSQL │ │
│ │ Processor │ │ Calculator │ │ WAL Streaming │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ ▲
Push │ │ Pull
Changes │ │ Requests
▼ │
┌─────────────────────────────────────────────────────────────────┐
│ SysManage Agents │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Local Sync │ │ Change │ │ SQLite WAL │ │
│ │ Manager │ │ Detector │ │ (Local Store) │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
├─────────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌──────────────┐ ┌─────────────────────────┐ │
│ │ Offline │ │ Merge │ │ Incremental │ │
│ │ Queue │ │ Resolver │ │ Backup │ │
│ └─────────────┘ └──────────────┘ └─────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Consistency Models
Eventual Consistency
SysManage uses eventual consistency with vector clocks for tracking causality:
Vector Clock Implementation
class VectorClock:
def __init__(self, node_id: str):
self.node_id = node_id
self.clock = {node_id: 0}
def increment(self) -> 'VectorClock':
"""Increment local clock for new event"""
self.clock[self.node_id] += 1
return self
def update(self, other: 'VectorClock') -> 'VectorClock':
"""Update clock when receiving message"""
for node, timestamp in other.clock.items():
self.clock[node] = max(
self.clock.get(node, 0),
timestamp
)
self.increment()
return self
def compare(self, other: 'VectorClock') -> str:
"""Compare causality relationship"""
less_or_equal = all(
self.clock.get(node, 0) <= other.clock.get(node, 0)
for node in set(self.clock.keys()) | set(other.clock.keys())
)
greater_or_equal = all(
self.clock.get(node, 0) >= other.clock.get(node, 0)
for node in set(self.clock.keys()) | set(other.clock.keys())
)
if less_or_equal and not greater_or_equal:
return "before"
elif greater_or_equal and not less_or_equal:
return "after"
elif less_or_equal and greater_or_equal:
return "equal"
else:
return "concurrent"
Strong Consistency (Critical Operations)
For critical operations like certificate management and user authentication, SysManage uses strong consistency with two-phase commit:
Two-Phase Commit Protocol
class TwoPhaseCommitCoordinator:
async def execute_transaction(self, operation: Operation) -> bool:
"""Execute distributed transaction across participants"""
participants = self.get_participants(operation)
transaction_id = self.generate_transaction_id()
# Phase 1: Prepare
prepare_responses = await asyncio.gather(*[
participant.prepare(transaction_id, operation)
for participant in participants
])
if all(response.ready for response in prepare_responses):
# Phase 2: Commit
commit_responses = await asyncio.gather(*[
participant.commit(transaction_id)
for participant in participants
])
return all(response.success for response in commit_responses)
else:
# Abort transaction
await asyncio.gather(*[
participant.abort(transaction_id)
for participant in participants
])
return False
Conflict Resolution
Conflict Detection
SysManage detects conflicts using a combination of techniques:
Timestamp-Based
- Last-writer-wins for simple conflicts
- Lamport timestamps for ordering
- Wall clock synchronization with NTP
Content-Based
- Hash comparison for data integrity
- Field-level conflict detection
- Semantic conflict identification
Operational Transform
- Transform concurrent operations
- Maintain operation commutativity
- Preserve user intentions
Conflict Resolution Strategies
Configurable Resolution
class ConflictResolver:
def __init__(self, strategy: ConflictStrategy):
self.strategy = strategy
def resolve_conflict(self, local_change: Change,
remote_change: Change) -> Change:
"""Resolve conflict between local and remote changes"""
if self.strategy == ConflictStrategy.LAST_WRITER_WINS:
return self._last_writer_wins(local_change, remote_change)
elif self.strategy == ConflictStrategy.MERGE:
return self._merge_changes(local_change, remote_change)
elif self.strategy == ConflictStrategy.MANUAL:
return self._queue_for_manual_resolution(
local_change, remote_change
)
elif self.strategy == ConflictStrategy.CUSTOM:
return self._apply_custom_rules(local_change, remote_change)
def _last_writer_wins(self, local: Change, remote: Change) -> Change:
"""Simple timestamp-based resolution"""
if remote.timestamp > local.timestamp:
return remote
elif local.timestamp > remote.timestamp:
return local
else:
# Same timestamp, use node ID for deterministic ordering
return remote if remote.node_id > local.node_id else local
def _merge_changes(self, local: Change, remote: Change) -> Change:
"""Intelligent merge of non-conflicting fields"""
merged_data = {}
for field in set(local.data.keys()) | set(remote.data.keys()):
local_val = local.data.get(field)
remote_val = remote.data.get(field)
if local_val == remote_val:
merged_data[field] = local_val
elif local_val is None:
merged_data[field] = remote_val
elif remote_val is None:
merged_data[field] = local_val
else:
# Field-level conflict, apply resolution strategy
merged_data[field] = self._resolve_field_conflict(
field, local_val, remote_val
)
return Change(
id=self.generate_merge_id(local, remote),
data=merged_data,
timestamp=max(local.timestamp, remote.timestamp),
vector_clock=local.vector_clock.merge(remote.vector_clock)
)
Synchronization Protocols
Push-Based Synchronization
Agents push changes to the server immediately when connected:
Push Protocol Implementation
class PushSyncManager:
async def sync_changes(self, changes: List[Change]) -> SyncResult:
"""Push local changes to server"""
# Batch changes by type for efficiency
batched_changes = self.batch_changes(changes)
results = []
for batch in batched_changes:
try:
# Send batch with retry logic
response = await self.send_with_retry(
endpoint="/api/sync/push",
data=batch.serialize(),
max_retries=3
)
if response.conflicts:
# Handle conflicts returned by server
resolved_changes = await self.resolve_conflicts(
response.conflicts
)
results.extend(resolved_changes)
else:
results.extend(response.accepted_changes)
except NetworkError as e:
# Queue for later when connection restored
await self.queue_for_retry(batch)
logger.warning(f"Sync failed, queued for retry: {e}")
return SyncResult(
successful=len([r for r in results if r.success]),
failed=len([r for r in results if not r.success]),
conflicts=len([r for r in results if r.conflict])
)
Pull-Based Synchronization
Periodic pull synchronization to catch missed updates:
Pull Protocol with Delta Sync
class PullSyncManager:
async def pull_updates(self, since: datetime) -> List[Change]:
"""Pull updates from server since timestamp"""
# Get incremental changes using server cursor
cursor = await self.get_sync_cursor(since)
changes = []
while cursor.has_more:
batch = await self.fetch_change_batch(
cursor=cursor.position,
limit=self.BATCH_SIZE
)
for change in batch.changes:
# Apply conflict resolution
if await self.has_local_conflict(change):
resolved_change = await self.resolve_pull_conflict(change)
changes.append(resolved_change)
else:
changes.append(change)
cursor = batch.next_cursor
# Apply changes atomically
await self.apply_changes_batch(changes)
return changes
Merkle Tree Synchronization
For efficient bulk synchronization and verification:
Merkle Tree Sync
class MerkleTreeSync:
def __init__(self, chunk_size: int = 1000):
self.chunk_size = chunk_size
def build_merkle_tree(self, data: List[Any]) -> MerkleNode:
"""Build Merkle tree for data set"""
leaves = []
# Create leaf nodes in chunks
for i in range(0, len(data), self.chunk_size):
chunk = data[i:i + self.chunk_size]
chunk_hash = self.hash_chunk(chunk)
leaves.append(MerkleLeaf(chunk_hash, chunk))
# Build tree bottom-up
return self.build_tree_recursive(leaves)
async def sync_with_merkle(self, remote_root_hash: str) -> List[Change]:
"""Synchronize using Merkle tree comparison"""
local_tree = self.build_merkle_tree(await self.get_local_data())
if local_tree.hash == remote_root_hash:
return [] # Already in sync
# Find differing subtrees
diff_ranges = await self.find_differences(
local_tree, remote_root_hash
)
# Pull only the differing data
changes = []
for range_start, range_end in diff_ranges:
range_changes = await self.pull_range(range_start, range_end)
changes.extend(range_changes)
return changes
Offline Operations
Local State Management
Agents maintain local state and operation queues during disconnection:
Offline Queue Management
class OfflineOperationQueue:
def __init__(self, storage: LocalStorage):
self.storage = storage
self.operation_log = []
async def queue_operation(self, operation: Operation) -> str:
"""Queue operation for later execution"""
operation.id = self.generate_operation_id()
operation.timestamp = datetime.utcnow()
operation.status = OperationStatus.QUEUED
# Store in local database
await self.storage.store_operation(operation)
self.operation_log.append(operation)
# Try immediate execution if possible
if await self.can_execute_locally(operation):
await self.execute_locally(operation)
return operation.id
async def replay_operations(self) -> List[OperationResult]:
"""Replay queued operations when connection restored"""
results = []
# Sort operations by timestamp for correct ordering
sorted_ops = sorted(
await self.storage.get_queued_operations(),
key=lambda op: op.timestamp
)
for operation in sorted_ops:
try:
result = await self.execute_remote(operation)
if result.success:
operation.status = OperationStatus.COMPLETED
await self.storage.update_operation(operation)
else:
# Handle operation failure
await self.handle_operation_failure(operation, result)
results.append(result)
except Exception as e:
logger.error(f"Failed to replay operation {operation.id}: {e}")
results.append(OperationResult(
operation_id=operation.id,
success=False,
error=str(e)
))
return results
Conflict Detection on Reconnection
When agents reconnect, potential conflicts must be identified and resolved:
Reconnection Sync Protocol
class ReconnectionSyncManager:
async def handle_reconnection(self) -> SyncStatus:
"""Handle agent reconnection and sync conflicts"""
# 1. Get last known sync state
last_sync = await self.get_last_sync_timestamp()
# 2. Pull server changes since last sync
server_changes = await self.pull_server_changes(since=last_sync)
# 3. Get local changes made while offline
local_changes = await self.get_local_changes(since=last_sync)
# 4. Detect conflicts between server and local changes
conflicts = self.detect_conflicts(server_changes, local_changes)
if conflicts:
# 5. Resolve conflicts using configured strategy
resolved_changes = await self.resolve_conflicts(conflicts)
# 6. Apply resolved changes
await self.apply_resolved_changes(resolved_changes)
# 7. Push any remaining local changes
remaining_local = [c for c in local_changes if not c.conflicted]
if remaining_local:
await self.push_changes(remaining_local)
# 8. Update sync state
await self.update_sync_timestamp(datetime.utcnow())
return SyncStatus(
conflicts_detected=len(conflicts),
conflicts_resolved=len(resolved_changes),
changes_pushed=len(remaining_local)
)
Performance Optimization
Batching and Compression
- Change Batching: Group related changes for efficient transmission
- Delta Compression: Only send differences between versions
- Data Compression: GZIP compression for large payloads
- Deduplication: Remove duplicate operations in queue
Adaptive Synchronization
Adaptive Sync Intervals
class AdaptiveSyncScheduler:
def __init__(self):
self.base_interval = 60 # seconds
self.max_interval = 3600 # 1 hour
self.min_interval = 10 # 10 seconds
self.current_interval = self.base_interval
def adjust_sync_interval(self, sync_result: SyncResult):
"""Adjust sync frequency based on activity"""
if sync_result.changes_count > 0:
# Frequent changes, sync more often
self.current_interval = max(
self.min_interval,
self.current_interval * 0.8
)
else:
# No changes, back off
self.current_interval = min(
self.max_interval,
self.current_interval * 1.2
)
# Factor in network conditions
if sync_result.network_latency > 1000: # ms
self.current_interval *= 1.5
return self.current_interval
Caching and Memoization
- Change Log Caching: Cache recent changes for faster conflict detection
- Hash Caching: Cache Merkle tree hashes for quick comparison
- Network Response Caching: Cache immutable data locally
- Computation Memoization: Cache expensive operations like conflict resolution
Synchronization Monitoring
Key Metrics
Sync Performance
- Sync frequency and duration
- Data transfer volume
- Operation queue lengths
- Conflict resolution time
Consistency Metrics
- Time to eventual consistency
- Conflict occurrence rate
- Data integrity violations
- Synchronization lag
Network Health
- Connection stability
- Bandwidth utilization
- Failed sync attempts
- Retry queue sizes
Health Checks
Sync Health Monitoring
class SyncHealthMonitor:
async def check_sync_health(self) -> HealthStatus:
"""Comprehensive sync health check"""
health = HealthStatus()
# Check sync lag
last_sync = await self.get_last_successful_sync()
sync_lag = datetime.utcnow() - last_sync
health.sync_lag = sync_lag.total_seconds()
# Check queue sizes
health.pending_operations = await self.get_queue_size()
health.failed_operations = await self.get_failed_operations_count()
# Check conflict rates
conflicts_24h = await self.get_conflicts_last_24h()
total_ops_24h = await self.get_total_operations_24h()
health.conflict_rate = conflicts_24h / max(total_ops_24h, 1)
# Check data consistency
consistency_check = await self.verify_data_consistency()
health.consistency_violations = consistency_check.violations
# Overall health score
health.score = self.calculate_health_score(health)
return health
Best Practices
Design Guidelines
- Idempotency: Design operations to be safely retryable
- Immutability: Use immutable data structures where possible
- Event Sourcing: Store events rather than just current state
- Graceful Degradation: System should work even with sync failures
Operational Guidelines
- Monitor Sync Health: Set up alerts for sync failures and delays
- Conflict Analysis: Regularly analyze conflict patterns
- Capacity Planning: Monitor queue sizes and growth trends
- Testing: Test sync behavior under various failure scenarios