Chapter 5: Real-Time Synchronization
Building a collaborative AI coding assistant requires keeping multiple clients synchronized in real-time. When one developer makes changes, their teammates need to see updates immediately. But unlike traditional real-time applications, AI assistants face unique challenges: long-running operations, large payloads, unreliable networks, and the need for eventual consistency.
This chapter explores synchronization patterns using polling, observables, and smart batching that prove more reliable than traditional WebSocket approaches for AI systems.
The Synchronization Challenge
Real-time sync for AI assistants differs from typical collaborative applications:
- Large Payloads - AI responses can be megabytes of text and code
- Long Operations - Tool executions may take minutes to complete
- Unreliable Networks - Developers work from cafes, trains, and flaky WiFi
- Cost Sensitivity - Every sync operation costs money in API calls
- Consistency Requirements - Code changes must apply in the correct order
Traditional WebSocket approaches struggle with these constraints. Amp takes a different path.
WebSocket Challenges for AI Systems
WebSockets seem ideal for real-time synchronization, but AI systems present unique challenges that make them problematic.
Recognition Pattern: WebSockets become problematic when:
- Clients frequently disconnect (mobile networks, laptop sleep)
- Message sizes vary dramatically (small updates vs. large AI responses)
- Operations have long durations (multi-minute tool executions)
- Debugging requires message replay and inspection
WebSocket Complications:
- Stateful connections require careful lifecycle management
- Message ordering must be handled explicitly for correctness
- Reconnection storms can overwhelm servers during outages
- Debugging is difficult without proper message logging
- Load balancing requires sticky sessions or complex routing
- Firewall issues in enterprise environments
Alternative Approach: Smart polling with observables provides:
- Stateless interactions that survive network interruptions
- Natural batching that reduces server load
- Simple debugging with standard HTTP request logs
- Easy caching and CDN compatibility
Observable-Based Architecture
At the heart of Amp's sync system is a custom Observable implementation:
export abstract class Observable<T> {
abstract subscribe(observer: Observer<T>): Subscription<T>;
pipe<Out>(...operators: Operator[]): Observable<Out> {
return operators.reduce(
(source, operator) => operator(source),
this as Observable<any>
);
}
// Convert various sources to Observables
static from<T>(source: ObservableLike<T>): Observable<T> {
if (source instanceof Observable) return source;
if (isPromise(source)) {
return new Observable(observer => {
source.then(
value => {
observer.next(value);
observer.complete();
},
error => observer.error(error)
);
});
}
if (isIterable(source)) {
return new Observable(observer => {
for (const value of source) {
observer.next(value);
}
observer.complete();
});
}
throw new Error('Invalid source');
}
}
This provides a foundation for reactive data flow throughout the system.
Subjects for State Broadcasting
Amp uses specialized Subject types for different synchronization needs:
// BehaviorSubject maintains current state
export class BehaviorSubject<T> extends Observable<T> {
constructor(private currentValue: T) {
super();
}
getValue(): T {
return this.currentValue;
}
next(value: T): void {
this.currentValue = value;
this.observers.forEach(observer => observer.next(value));
}
subscribe(observer: Observer<T>): Subscription<T> {
// New subscribers immediately receive current value
observer.next(this.currentValue);
return super.subscribe(observer);
}
}
// SetSubject for managing collections
export function createSetSubject<T>(): SetSubject<T> {
const set = new Set<T>();
const subject = new BehaviorSubject<Set<T>>(set);
return {
add(value: T): void {
set.add(value);
subject.next(set);
},
delete(value: T): void {
set.delete(value);
subject.next(set);
},
has(value: T): boolean {
return set.has(value);
},
clear(): void {
set.clear();
subject.next(set);
},
get size(): number {
return set.size;
},
observable: subject.asObservable()
};
}
These patterns enable efficient state synchronization across components.
Sync Service Architecture
Amp's synchronization system provides observable streams and queue management:
// Core synchronization interface
export interface SyncService {
// Observable data streams
observeSyncStatus(threadId: ThreadID): Observable<SyncStatus>;
observePendingItems(): Observable<Set<ThreadID>>;
// Sync operations
queueForSync(threadId: ThreadID): void;
syncImmediately(threadId: ThreadID): Promise<void>;
// Service lifecycle
start(): void;
stop(): void;
dispose(): void;
}
// Factory function creates configured sync service
export function createSyncService(dependencies: {
threadService: ThreadService;
cloudAPI: CloudAPIClient;
configuration: ConfigService;
}): SyncService {
// Track items waiting for synchronization
const pendingItems = createSetSubject<ThreadID>();
// Per-thread sync status tracking
const statusTracking = new Map<ThreadID, BehaviorSubject<SyncStatus>>();
// Failure tracking for exponential backoff
const failureHistory = new Map<ThreadID, number>();
// Configurable sync parameters
const SYNC_INTERVAL = 5000; // 5 seconds
const RETRY_BACKOFF = 60000; // 1 minute
const BATCH_SIZE = 50; // Items per batch
let syncTimer: NodeJS.Timer | null = null;
let serviceRunning = false;
return {
observeSyncStatus(threadId: ThreadID): Observable<SyncStatus> {
if (!statusTracking.has(threadId)) {
statusTracking.set(threadId, new BehaviorSubject<SyncStatus>({
state: 'unknown',
lastSync: null
}));
}
return statusTracking.get(threadId)!.asObservable();
},
observePendingItems(): Observable<Set<ThreadID>> {
return pendingItems.observable;
},
queueForSync(threadId: ThreadID): void {
pendingItems.add(threadId);
updateSyncStatus(threadId, { state: 'pending' });
},
async syncImmediately(threadId: ThreadID): Promise<void> {
// Bypass queue for high-priority sync
await performThreadSync(threadId);
},
start(): void {
if (serviceRunning) return;
serviceRunning = true;
// Begin periodic sync processing
scheduleSyncLoop();
// Set up reactive change detection
setupChangeListeners();
},
stop(): void {
serviceRunning = false;
if (syncTimer) {
clearTimeout(syncTimer);
syncTimer = null;
}
},
dispose(): void {
this.stop();
statusTracking.forEach(subject => subject.complete());
statusTracking.clear();
}
};
function scheduleSyncLoop(): void {
if (!serviceRunning) return;
syncTimer = setTimeout(async () => {
await processQueuedItems();
scheduleSyncLoop();
}, SYNC_INTERVAL);
}
async function processQueuedItems(): Promise<void> {
const queuedThreads = Array.from(pendingItems.set);
if (queuedThreads.length === 0) return;
// Filter items ready for sync (respecting backoff)
const readyItems = queuedThreads.filter(shouldAttemptSync);
if (readyItems.length === 0) return;
// Process in manageable batches
for (let i = 0; i < readyItems.length; i += BATCH_SIZE) {
const batch = readyItems.slice(i, i + BATCH_SIZE);
await processBatch(batch);
}
}
function shouldAttemptSync(threadId: ThreadID): boolean {
const lastFailure = failureHistory.get(threadId);
if (!lastFailure) return true;
const timeSinceFailure = Date.now() - lastFailure;
return timeSinceFailure >= RETRY_BACKOFF;
}
}
Adaptive Polling Strategy
Instead of fixed-interval polling, Amp adapts to user activity:
// Dynamically adjusts polling frequency based on activity
export class AdaptivePoller {
private baseInterval = 5000; // 5 seconds baseline
private maxInterval = 60000; // 1 minute maximum
private currentInterval = this.baseInterval;
private activityLevel = 0;
constructor(
private syncService: SyncService,
private threadService: ThreadService
) {
this.setupActivityMonitoring();
}
private setupActivityMonitoring(): void {
// Monitor thread modifications for user activity
this.threadService.observeActiveThread().pipe(
pairwise(),
filter(([previous, current]) => previous?.v !== current?.v),
tap(() => this.recordUserActivity())
).subscribe();
// Monitor sync queue depth to adjust frequency
this.syncService.observePendingItems().pipe(
map(pending => pending.size),
tap(queueDepth => {
if (queueDepth > 10) this.increaseSyncFrequency();
if (queueDepth === 0) this.decreaseSyncFrequency();
})
).subscribe();
}
private recordUserActivity(): void {
this.activityLevel = Math.min(100, this.activityLevel + 10);
this.adjustPollingInterval();
}
private adjustPollingInterval(): void {
// Higher activity leads to more frequent polling
const scaleFactor = 1 - (this.activityLevel / 100) * 0.8;
this.currentInterval = Math.floor(
this.baseInterval + (this.maxInterval - this.baseInterval) * scaleFactor
);
// Schedule activity decay for gradual slow-down
this.scheduleActivityDecay();
}
private scheduleActivityDecay(): void {
setTimeout(() => {
this.activityLevel = Math.max(0, this.activityLevel - 1);
this.adjustPollingInterval();
}, 1000);
}
getCurrentInterval(): number {
return this.currentInterval;
}
}
Debouncing and Throttling
Amp implements sophisticated flow control to prevent overwhelming the system:
// Debounce rapid changes
export function debounceTime<T>(
duration: number
): OperatorFunction<T, T> {
return (source: Observable<T>) =>
new Observable<T>(observer => {
let timeoutId: NodeJS.Timeout | null = null;
let lastValue: T;
let hasValue = false;
const subscription = source.subscribe({
next(value: T) {
lastValue = value;
hasValue = true;
if (timeoutId) {
clearTimeout(timeoutId);
}
timeoutId = setTimeout(() => {
if (hasValue) {
observer.next(lastValue);
hasValue = false;
}
timeoutId = null;
}, duration);
},
error(err) {
observer.error(err);
},
complete() {
if (timeoutId) {
clearTimeout(timeoutId);
if (hasValue) {
observer.next(lastValue);
}
}
observer.complete();
}
});
return () => {
if (timeoutId) {
clearTimeout(timeoutId);
}
subscription.unsubscribe();
};
});
}
// Throttle with leading and trailing edges
export function throttleTime<T>(
duration: number,
{ leading = true, trailing = true } = {}
): OperatorFunction<T, T> {
return (source: Observable<T>) =>
new Observable<T>(observer => {
let lastEmitTime = 0;
let trailingTimeout: NodeJS.Timeout | null = null;
let lastValue: T;
let hasTrailingValue = false;
const emit = (value: T) => {
lastEmitTime = Date.now();
hasTrailingValue = false;
observer.next(value);
};
const subscription = source.subscribe({
next(value: T) {
const now = Date.now();
const elapsed = now - lastEmitTime;
lastValue = value;
if (elapsed >= duration) {
// Enough time has passed
if (leading) {
emit(value);
}
if (trailing && !leading) {
// Schedule trailing emit
hasTrailingValue = true;
trailingTimeout = setTimeout(() => {
if (hasTrailingValue) {
emit(lastValue);
}
trailingTimeout = null;
}, duration);
}
} else {
// Still within throttle window
if (trailing && !trailingTimeout) {
hasTrailingValue = true;
trailingTimeout = setTimeout(() => {
if (hasTrailingValue) {
emit(lastValue);
}
trailingTimeout = null;
}, duration - elapsed);
}
}
}
});
return () => {
if (trailingTimeout) {
clearTimeout(trailingTimeout);
}
subscription.unsubscribe();
};
});
}
Batch Synchronization
Amp groups sync operations for network efficiency:
// Collects individual sync requests into efficient batches
export class BatchSyncOrchestrator {
private requestQueue = new Map<ThreadID, SyncRequest>();
private batchTimer: NodeJS.Timeout | null = null;
private readonly BATCH_WINDOW = 100; // 100ms collection window
private readonly MAX_BATCH_SIZE = 50; // Maximum items per batch
constructor(private cloudAPI: CloudAPIClient) {}
queueRequest(threadId: ThreadID, request: SyncRequest): void {
// Merge with any existing request for same thread
const existing = this.requestQueue.get(threadId);
if (existing) {
request = this.mergeRequests(existing, request);
}
this.requestQueue.set(threadId, request);
// Start batch timer if not already running
if (!this.batchTimer) {
this.batchTimer = setTimeout(() => {
this.flushBatch();
}, this.BATCH_WINDOW);
}
}
private async flushBatch(): Promise<void> {
this.batchTimer = null;
if (this.requestQueue.size === 0) return;
// Extract batch of requests up to size limit
const batchEntries = Array.from(this.requestQueue.entries())
.slice(0, this.MAX_BATCH_SIZE);
// Remove processed items from queue
batchEntries.forEach(([id]) => this.requestQueue.delete(id));
// Format batch request for API
const batchRequest: BatchSyncRequest = {
items: batchEntries.map(([id, request]) => ({
threadId: id,
version: request.version,
changes: request.operations
}))
};
try {
const response = await this.cloudAPI.syncBatch(batchRequest);
this.handleBatchResponse(response);
} catch (error) {
// Retry failed requests with exponential backoff
batchEntries.forEach(([id, request]) => {
request.attempts = (request.attempts || 0) + 1;
if (request.attempts < 3) {
this.queueRequest(id, request);
}
});
}
// Continue processing if more items queued
if (this.requestQueue.size > 0) {
this.batchTimer = setTimeout(() => {
this.flushBatch();
}, this.BATCH_WINDOW);
}
}
private mergeRequests(
existing: SyncRequest,
incoming: SyncRequest
): SyncRequest {
return {
version: Math.max(existing.version, incoming.version),
operations: [...existing.operations, ...incoming.operations],
attempts: existing.attempts || 0
};
}
}
Conflict Resolution
When concurrent edits occur, Amp resolves conflicts intelligently:
export class ConflictResolver {
async resolveConflict(
local: Thread,
remote: Thread,
base?: Thread
): Promise<Thread> {
// Simple case: one side didn't change
if (!base) {
return this.resolveWithoutBase(local, remote);
}
// Three-way merge
const merged: Thread = {
id: local.id,
created: base.created,
v: Math.max(local.v, remote.v) + 1,
messages: await this.mergeMessages(
base.messages,
local.messages,
remote.messages
),
title: this.mergeScalar(base.title, local.title, remote.title),
env: base.env
};
return merged;
}
private async mergeMessages(
base: Message[],
local: Message[],
remote: Message[]
): Promise<Message[]> {
// Find divergence point
let commonIndex = 0;
while (
commonIndex < base.length &&
commonIndex < local.length &&
commonIndex < remote.length &&
this.messagesEqual(
base[commonIndex],
local[commonIndex],
remote[commonIndex]
)
) {
commonIndex++;
}
// Common prefix
const merged = base.slice(0, commonIndex);
// Get new messages from each branch
const localNew = local.slice(commonIndex);
const remoteNew = remote.slice(commonIndex);
// Merge by timestamp
const allNew = [...localNew, ...remoteNew].sort(
(a, b) => a.timestamp - b.timestamp
);
// Remove duplicates
const seen = new Set<string>();
for (const msg of allNew) {
const key = this.messageKey(msg);
if (!seen.has(key)) {
seen.add(key);
merged.push(msg);
}
}
return merged;
}
private messageKey(msg: Message): string {
// Create unique key for deduplication
return `${msg.role}:${msg.timestamp}:${msg.content.slice(0, 50)}`;
}
private mergeScalar<T>(base: T, local: T, remote: T): T {
// If both changed to same value, use it
if (local === remote) return local;
// If only one changed, use the change
if (local === base) return remote;
if (remote === base) return local;
// Both changed differently - prefer local
return local;
}
}
Network Resilience
Amp handles network failures gracefully:
export class ResilientSyncClient {
private online$ = new BehaviorSubject(navigator.onLine);
private retryDelays = [1000, 2000, 5000, 10000, 30000]; // Exponential backoff
constructor(private api: ServerAPIClient) {
// Monitor network status
window.addEventListener('online', () => this.online$.next(true));
window.addEventListener('offline', () => this.online$.next(false));
// Test connectivity periodically
this.startConnectivityCheck();
}
async syncWithRetry(
request: SyncRequest,
attempt = 0
): Promise<SyncResponse> {
try {
// Wait for network if offline
await this.waitForNetwork();
// Make request with timeout
const response = await this.withTimeout(
this.api.sync(request),
10000 // 10 second timeout
);
return response;
} catch (error) {
if (this.isRetryable(error) && attempt < this.retryDelays.length) {
const delay = this.retryDelays[attempt];
logger.debug(
`Sync failed, retrying in ${delay}ms (attempt ${attempt + 1})`
);
await this.delay(delay);
return this.syncWithRetry(request, attempt + 1);
}
throw error;
}
}
private async waitForNetwork(): Promise<void> {
if (this.online$.getValue()) return;
return new Promise(resolve => {
const sub = this.online$.subscribe(online => {
if (online) {
sub.unsubscribe();
resolve();
}
});
});
}
private isRetryable(error: unknown): boolean {
if (error instanceof NetworkError) return true;
if (error instanceof TimeoutError) return true;
if (error instanceof HTTPError) {
return error.status >= 500 || error.status === 429;
}
return false;
}
private async startConnectivityCheck(): Promise<void> {
while (true) {
if (!this.online$.getValue()) {
// Try to ping server
try {
await this.api.ping();
this.online$.next(true);
} catch {
// Still offline
}
}
await this.delay(30000); // Check every 30 seconds
}
}
}
Optimistic Updates
To maintain responsiveness, Amp applies changes optimistically:
export class OptimisticSyncManager {
private pendingUpdates = new Map<string, PendingUpdate>();
async applyOptimisticUpdate<T>(
key: string,
currentValue: T,
update: (value: T) => T,
persist: (value: T) => Promise<void>
): Promise<T> {
// Apply update locally immediately
const optimisticValue = update(currentValue);
// Track pending update
const pendingUpdate: PendingUpdate<T> = {
key,
originalValue: currentValue,
optimisticValue,
promise: null
};
this.pendingUpdates.set(key, pendingUpdate);
// Persist asynchronously
pendingUpdate.promise = persist(optimisticValue)
.then(() => {
// Success - remove from pending
this.pendingUpdates.delete(key);
})
.catch(error => {
// Failure - prepare for rollback
pendingUpdate.error = error;
throw error;
});
return optimisticValue;
}
async rollback(key: string): Promise<void> {
const pending = this.pendingUpdates.get(key);
if (!pending) return;
// Wait for pending operation to complete
try {
await pending.promise;
} catch {
// Expected to fail
}
// Rollback if it failed
if (pending.error) {
// Notify UI to revert to original value
this.onRollback?.(key, pending.originalValue);
}
this.pendingUpdates.delete(key);
}
hasPendingUpdates(): boolean {
return this.pendingUpdates.size > 0;
}
async waitForPendingUpdates(): Promise<void> {
const promises = Array.from(this.pendingUpdates.values())
.map(update => update.promise);
await Promise.allSettled(promises);
}
}
Performance Monitoring
Amp tracks sync performance to optimize behavior:
export class SyncPerformanceMonitor {
private metrics = new Map<string, MetricHistory>();
recordSyncTime(
threadId: string,
duration: number,
size: number
): void {
const history = this.getHistory('sync-time');
history.add({
timestamp: Date.now(),
value: duration,
metadata: { threadId, size }
});
// Analyze for anomalies
if (duration > this.getP95(history)) {
logger.warn(`Slow sync detected: ${duration}ms for thread ${threadId}`);
}
}
recordBatchSize(size: number): void {
this.getHistory('batch-size').add({
timestamp: Date.now(),
value: size
});
}
recordConflictRate(hadConflict: boolean): void {
this.getHistory('conflicts').add({
timestamp: Date.now(),
value: hadConflict ? 1 : 0
});
}
getOptimalBatchSize(): number {
const history = this.getHistory('batch-size');
const recentSizes = history.getRecent(100);
// Find size that minimizes sync time
const sizeToTime = new Map<number, number[]>();
for (const entry of this.getHistory('sync-time').getRecent(100)) {
const size = entry.metadata?.size || 1;
if (!sizeToTime.has(size)) {
sizeToTime.set(size, []);
}
sizeToTime.get(size)!.push(entry.value);
}
// Calculate average time per size
let optimalSize = 50;
let minAvgTime = Infinity;
for (const [size, times] of sizeToTime) {
const avgTime = times.reduce((a, b) => a + b) / times.length;
if (avgTime < minAvgTime) {
minAvgTime = avgTime;
optimalSize = size;
}
}
return Math.max(10, Math.min(100, optimalSize));
}
private getP95(history: MetricHistory): number {
const values = history.getRecent(100)
.map(entry => entry.value)
.sort((a, b) => a - b);
const index = Math.floor(values.length * 0.95);
return values[index] || 0;
}
}
Testing Synchronization
Amp includes comprehensive sync testing utilities:
export class SyncTestHarness {
private mockServer = new MockSyncServer();
private clients: TestClient[] = [];
async testConcurrentEdits(): Promise<void> {
// Create multiple clients
const client1 = this.createClient('user1');
const client2 = this.createClient('user2');
// Both edit same thread
const threadId = 'test-thread';
await Promise.all([
client1.addMessage(threadId, 'Hello from user 1'),
client2.addMessage(threadId, 'Hello from user 2')
]);
// Let sync complete
await this.waitForSync();
// Both clients should have both messages
const thread1 = await client1.getThread(threadId);
const thread2 = await client2.getThread(threadId);
assert.equal(thread1.messages.length, 2);
assert.equal(thread2.messages.length, 2);
assert.deepEqual(thread1, thread2);
}
async testNetworkPartition(): Promise<void> {
const client = this.createClient('user1');
// Make changes while online
await client.addMessage('thread1', 'Online message');
// Go offline
this.mockServer.disconnect(client);
// Make offline changes
await client.addMessage('thread1', 'Offline message 1');
await client.addMessage('thread1', 'Offline message 2');
// Verify changes are queued
assert.equal(client.getPendingSyncCount(), 1);
// Reconnect
this.mockServer.connect(client);
// Wait for sync
await this.waitForSync();
// Verify all changes synced
assert.equal(client.getPendingSyncCount(), 0);
const serverThread = this.mockServer.getThread('thread1');
assert.equal(serverThread.messages.length, 3);
}
async testSyncPerformance(): Promise<void> {
const client = this.createClient('user1');
const messageCount = 1000;
// Add many messages
const startTime = Date.now();
for (let i = 0; i < messageCount; i++) {
await client.addMessage('perf-thread', `Message ${i}`);
}
await this.waitForSync();
const duration = Date.now() - startTime;
const throughput = messageCount / (duration / 1000);
console.log(`Synced ${messageCount} messages in ${duration}ms`);
console.log(`Throughput: ${throughput.toFixed(2)} messages/second`);
// Should sync within reasonable time
assert(throughput > 100, 'Sync throughput too low');
}
}
Summary
This chapter demonstrated that real-time synchronization doesn't require WebSockets:
- Adaptive polling adjusts frequency based on activity patterns
- Observable architectures provide reactive local state management
- Intelligent batching optimizes network efficiency
- Optimistic updates maintain responsive user interfaces
- Resilient retry logic handles network failures gracefully
- Conflict resolution strategies ensure eventual consistency
This approach proves more reliable and debuggable than traditional WebSocket solutions while maintaining real-time user experience. The key insight: for AI systems, eventual consistency with intelligent conflict resolution often outperforms complex real-time protocols.
The next chapter explores tool system architecture for distributed execution with safety and performance at scale.