Chapter 2: Service-Oriented Architecture for AI Systems

Building a collaborative AI coding assistant requires careful architectural decisions. How do you create a system that feels responsive to individual users while managing the complexity of distributed state, multi-user collaboration, and AI model interactions?

This chapter explores service-oriented architecture patterns for AI systems, reactive state management approaches, and the design decisions that enable teams to work together seamlessly while maintaining system reliability.

Core Design Principles

AI systems require architecture that balances responsiveness, collaboration, and reliability. Five key principles guide technical decisions:

1. Service Isolation by Domain

Each service owns a specific domain and communicates through well-defined interfaces. This prevents tight coupling between AI processing, state management, and collaboration features.

Recognition Pattern: You need service isolation when:

  • Different parts of your system have distinct failure modes
  • Teams need to deploy features independently
  • You're mixing real-time collaboration with AI processing

Implementation Approach:

// Service interface defines clear boundaries
interface IThreadService {
  modifyThread(id: string, modifier: ThreadModifier): Promise<Thread>;
  observeThread(id: string): Observable<Thread>;
}

// Implementation handles domain logic without external dependencies
class ThreadService implements IThreadService {
  constructor(
    private storage: IThreadStorage,
    private syncService: ISyncService
  ) {}
}

2. Observable-First Communication

Replace callbacks and promises with reactive streams for state changes. This pattern handles the complex data flow between AI responses, user actions, and collaboration updates.

Recognition Pattern: You need reactive communication when:

  • Multiple components need to react to the same state changes
  • You're handling real-time updates from multiple sources
  • UI needs to stay synchronized with rapidly changing AI output

Implementation Approach:

// Services expose Observable interfaces
interface IThreadService {
  observeThread(id: string): Observable<Thread>;
  observeActiveThread(): Observable<Thread | null>;
}

// Consumers compose reactive streams
threadService.observeActiveThread().pipe(
  filter(thread => thread !== null),
  switchMap(thread => combineLatest([
    of(thread),
    syncService.observeSyncStatus(thread.id)
  ]))
).subscribe(([thread, syncStatus]) => {
  updateUI(thread, syncStatus);
});

3. Optimistic Updates

Update local state immediately while syncing in the background. This provides responsive user experience even with high-latency AI operations or network issues.

Recognition Pattern: You need optimistic updates when:

  • Users expect immediate feedback for their actions
  • Network latency affects user experience
  • AI operations take multiple seconds to complete

Implementation Approach:

// Apply changes locally first, sync later
class OptimisticUpdateService {
  async updateThread(id: string, update: ThreadUpdate): Promise<void> {
    // 1. Apply locally for immediate UI response
    this.applyLocalUpdate(id, update);
    
    // 2. Queue for background synchronization
    this.syncQueue.add({ threadId: id, update, timestamp: Date.now() });
    
    // 3. Process queue without blocking user
    this.processSyncQueue();
  }
}

4. Graceful Degradation

Continue functioning even when external services are unavailable. AI systems depend on many external services (models, APIs, collaboration servers) that can fail independently.

Recognition Pattern: You need graceful degradation when:

  • Your system depends on external AI APIs or collaboration servers
  • Users need to work during network outages
  • System components have different availability requirements

Implementation Approach:

// Fallback patterns for service failures
class ResilientService {
  async fetchData(id: string): Promise<Data> {
    try {
      const data = await this.remoteAPI.get(`/data/${id}`);
      await this.localCache.set(id, data); // Cache for offline use
      return data;
    } catch (error) {
      if (this.isNetworkError(error)) {
        return this.localCache.get(id) || this.getDefaultData(id);
      }
      throw error;
    }
  }
}

5. Explicit Resource Management

Prevent memory leaks and resource exhaustion through consistent lifecycle patterns. AI systems often create many subscriptions, connections, and cached resources.

Recognition Pattern: You need explicit resource management when:

  • Creating Observable subscriptions or WebSocket connections
  • Caching AI model responses or user data
  • Managing background processing tasks

Implementation Approach:

// Base class ensures consistent cleanup
abstract class BaseService implements IDisposable {
  protected disposables: IDisposable[] = [];
  
  protected addDisposable(disposable: IDisposable): void {
    this.disposables.push(disposable);
  }
  
  dispose(): void {
    this.disposables.forEach(d => d.dispose());
    this.disposables.length = 0;
  }
}

Service Architecture Patterns

AI systems benefit from layered architecture where each layer has specific responsibilities and failure modes. This separation allows different parts to evolve independently.

graph TB
    subgraph "Interface Layer"
        CLI[CLI Interface]
        IDE[IDE Extension]
        Web[Web Interface]
    end
    
    subgraph "Session Layer"
        Session[Session Management]
        Commands[Command Processing]
    end
    
    subgraph "Core Services"
        State[State Management]
        Sync[Synchronization]
        Auth[Authentication]
        Tools[Tool Execution]
        Config[Configuration]
    end
    
    subgraph "Infrastructure"
        Storage[Persistent Storage]
        Network[Network/API]
        External[External Services]
        Events[Event System]
    end
    
    CLI --> Session
    IDE --> Session
    Web --> Session
    
    Session --> State
    Session --> Tools
    Commands --> State
    
    State --> Storage
    State --> Sync
    Sync --> Network
    Tools --> External
    
    Events -.->|Reactive Updates| State
    Events -.->|Reactive Updates| Sync

Key Architectural Decisions:

  • Interface Layer: Multiple interfaces (CLI, IDE, web) share the same session layer
  • Session Layer: Manages user context and coordinates service interactions
  • Core Services: Business logic isolated from infrastructure concerns
  • Infrastructure: Handles persistence, networking, and external integrations

State Management: Conversation Threading

The conversation state service demonstrates key patterns for managing AI conversation state with collaborative features.

Core Responsibilities:

  • Maintain conversation state and history
  • Ensure single-writer semantics to prevent conflicts
  • Provide reactive updates to UI components
  • Handle auto-saving and background synchronization

Key Patterns:

// 1. Single-writer pattern prevents state conflicts
interface IStateManager<T> {
  observeState(id: string): Observable<T>;
  modifyState(id: string, modifier: (state: T) => T): Promise<T>;
}

// 2. Auto-save with throttling prevents excessive I/O
class AutoSaveService {
  setupAutoSave(state$: Observable<State>): void {
    state$.pipe(
      skip(1), // Skip initial value
      throttleTime(1000), // Limit saves to once per second
      switchMap(state => this.storage.save(state))
    ).subscribe();
  }
}

// 3. Lazy loading with caching improves performance
class LazyStateLoader {
  getState(id: string): Observable<State> {
    if (!this.cache.has(id)) {
      this.cache.set(id, this.loadFromStorage(id));
    }
    return this.cache.get(id);
  }
}

Sync Service: Bridging Local and Remote

The ThreadSyncService manages the complex dance of keeping local and server state synchronized:

export class ThreadSyncService extends BaseService {
  private syncQueue = new Map<string, SyncQueueItem>();
  private syncStatus$ = new Map<string, BehaviorSubject<SyncStatus>>();
  private socket?: WebSocket;
  
  constructor(
    private api: ServerAPIClient,
    private threadService: IThreadService
  ) {
    super();
    this.initializeWebSocket();
    this.startSyncLoop();
  }
  
  private initializeWebSocket(): void {
    this.socket = new WebSocket(this.api.wsEndpoint);
    
    this.socket.on('message', (data) => {
      const message = JSON.parse(data);
      this.handleServerMessage(message);
    });
    
    // Reconnection logic
    this.socket.on('close', () => {
      setTimeout(() => this.initializeWebSocket(), 5000);
    });
  }
  
  async queueSync(threadId: string, thread: Thread): Promise<void> {
    // Calculate changes from last known server state
    const serverVersion = await this.getServerVersion(threadId);
    const changes = this.calculateChanges(thread, serverVersion);
    
    // Add to sync queue
    this.syncQueue.set(threadId, {
      threadId,
      changes,
      localVersion: thread.version,
      serverVersion,
      attempts: 0,
      lastAttempt: null
    });
    
    // Update sync status
    this.updateSyncStatus(threadId, 'pending');
  }
  
  private async processSyncQueue(): Promise<void> {
    for (const [threadId, item] of this.syncQueue) {
      if (this.shouldSync(item)) {
        try {
          await this.syncThread(item);
          this.syncQueue.delete(threadId);
          this.updateSyncStatus(threadId, 'synced');
        } catch (error) {
          this.handleSyncError(threadId, item, error);
        }
      }
    }
  }
  
  private async syncThread(item: SyncQueueItem): Promise<void> {
    const response = await this.api.syncThread({
      threadId: item.threadId,
      changes: item.changes,
      baseVersion: item.serverVersion
    });
    
    if (response.conflict) {
      // Handle conflict resolution using standard patterns
      await this.resolveConflict(item.threadId, response);
    }
  }
  
  private handleServerMessage(message: ServerMessage): void {
    switch (message.type) {
      case 'thread-updated':
        this.handleRemoteUpdate(message);
        break;
      case 'presence-update':
        this.handlePresenceUpdate(message);
        break;
      case 'permission-changed':
        this.handlePermissionChange(message);
        break;
    }
  }
}

Observable System: The Reactive Foundation

Amp's custom Observable implementation provides the foundation for reactive state management:

// Core Observable implementation
export abstract class Observable<T> {
  abstract subscribe(observer: Observer<T>): Subscription;
  
  pipe<R>(...operators: Operator<any, any>[]): Observable<R> {
    return operators.reduce(
      (source, operator) => operator(source),
      this as Observable<any>
    );
  }
}

// BehaviorSubject maintains current value
export class BehaviorSubject<T> extends Subject<T> {
  constructor(private currentValue: T) {
    super();
  }
  
  get value(): T {
    return this.currentValue;
  }
  
  next(value: T): void {
    this.currentValue = value;
    super.next(value);
  }
  
  subscribe(observer: Observer<T>): Subscription {
    // Emit current value immediately
    observer.next(this.currentValue);
    return super.subscribe(observer);
  }
}

// Rich operator library
export const operators = {
  map: <T, R>(fn: (value: T) => R) => 
    (source: Observable<T>): Observable<R> => 
      new MapObservable(source, fn),
      
  filter: <T>(predicate: (value: T) => boolean) =>
    (source: Observable<T>): Observable<T> =>
      new FilterObservable(source, predicate),
      
  switchMap: <T, R>(fn: (value: T) => Observable<R>) =>
    (source: Observable<T>): Observable<R> =>
      new SwitchMapObservable(source, fn),
      
  throttleTime: <T>(ms: number) =>
    (source: Observable<T>): Observable<T> =>
      new ThrottleTimeObservable(source, ms)
};

Thread Model and Data Flow

Amp's thread model supports complex conversations with tool use, sub-agents, and rich metadata:

interface Thread {
  id: string;                    // Unique identifier
  version: number;               // Version for optimistic updates
  title?: string;                // Thread title
  createdAt: string;             // Creation timestamp
  updatedAt: string;             // Last update timestamp
  sharing?: ThreadSharing;       // Visibility scope
  messages: Message[];           // Conversation history
  metadata?: ThreadMetadata;     // Additional properties
  
  // Thread relationships for hierarchical conversations
  summaryThreadId?: string;      // Link to summary thread
  parentThreadId?: string;       // Parent thread reference
  childThreadIds?: string[];     // Child thread references
}

interface Message {
  id: string;
  type: 'user' | 'assistant' | 'info';
  content: string;
  timestamp: string;
  
  // Tool interactions
  toolUse?: ToolUseBlock[];
  toolResults?: ToolResultBlock[];
  
  // Rich content
  attachments?: Attachment[];
  mentions?: FileMention[];
  
  // Metadata
  model?: string;
  cost?: UsageCost;
  error?: ErrorInfo;
}

Data Flow Through the System

When a user sends a message, it flows through multiple services:

sequenceDiagram
    participant User
    participant UI
    participant ThreadService
    participant ToolService
    participant LLMService
    participant SyncService
    participant Server
    
    User->>UI: Type message
    UI->>ThreadService: addMessage()
    ThreadService->>ThreadService: Update thread state
    ThreadService->>ToolService: Process tool requests
    ToolService->>LLMService: Generate completion
    LLMService->>ToolService: Stream response
    ToolService->>ThreadService: Update with results
    ThreadService->>UI: Observable update
    ThreadService->>SyncService: Queue sync
    SyncService->>Server: Sync changes
    Server->>SyncService: Acknowledge

Service Integration Patterns

Services in Amp integrate through several patterns that promote loose coupling:

1. Constructor Injection

Dependencies are explicitly declared and injected:

export class ThreadSession {
  constructor(
    private threadService: IThreadService,
    private toolService: IToolService,
    private configService: IConfigService,
    @optional private syncService?: IThreadSyncService
  ) {
    // Services are injected, not created
    this.initialize();
  }
}

2. Interface Segregation

Services depend on interfaces, not implementations:

// Minimal interface for consumers
export interface IThreadReader {
  observeThread(id: string): Observable<Thread | null>;
  observeThreadList(): Observable<ThreadListItem[]>;
}

// Extended interface for writers
export interface IThreadWriter extends IThreadReader {
  modifyThread(id: string, modifier: ThreadModifier): Promise<Thread>;
  deleteThread(id: string): Promise<void>;
}

// Full service interface
export interface IThreadService extends IThreadWriter {
  openThread(id: string): Promise<void>;
  closeThread(id: string): Promise<void>;
  createThread(options?: CreateThreadOptions): Promise<Thread>;
}

3. Event-Driven Communication

Services communicate through Observable streams:

class ConfigService {
  private config$ = new BehaviorSubject<Config>(defaultConfig);
  
  observeConfig(): Observable<Config> {
    return this.config$.asObservable();
  }
  
  updateConfig(updates: Partial<Config>): void {
    const current = this.config$.value;
    const updated = { ...current, ...updates };
    this.config$.next(updated);
  }
}

// Other services react to config changes
class ThemeService {
  constructor(private configService: ConfigService) {
    configService.observeConfig().pipe(
      map(config => config.theme),
      distinctUntilChanged()
    ).subscribe(theme => {
      this.applyTheme(theme);
    });
  }
}

4. Resource Lifecycle Management

Services manage resources consistently:

abstract class BaseService implements IDisposable {
  protected disposables: IDisposable[] = [];
  protected subscriptions: Subscription[] = [];
  
  protected addDisposable(disposable: IDisposable): void {
    this.disposables.push(disposable);
  }
  
  protected addSubscription(subscription: Subscription): void {
    this.subscriptions.push(subscription);
  }
  
  dispose(): void {
    // Clean up in reverse order
    [...this.subscriptions].reverse().forEach(s => s.unsubscribe());
    [...this.disposables].reverse().forEach(d => d.dispose());
    
    this.subscriptions = [];
    this.disposables = [];
  }
}

Performance Patterns

Amp employs several patterns to maintain responsiveness at scale:

1. Lazy Loading with Observables

Data is loaded on-demand and cached:

class LazyDataService {
  private cache = new Map<string, BehaviorSubject<Data | null>>();
  
  observeData(id: string): Observable<Data | null> {
    if (!this.cache.has(id)) {
      const subject = new BehaviorSubject<Data | null>(null);
      this.cache.set(id, subject);
      
      // Load data asynchronously
      this.loadData(id).then(data => {
        subject.next(data);
      });
    }
    
    return this.cache.get(id)!.asObservable();
  }
  
  private async loadData(id: string): Promise<Data> {
    // Check memory cache, disk cache, then network
    return this.memCache.get(id) 
        || await this.diskCache.get(id)
        || await this.api.fetchData(id);
  }
}

2. Backpressure Handling

Operators prevent overwhelming downstream consumers:

// Throttle rapid updates
threadService.observeActiveThread().pipe(
  throttleTime(100), // Max 10 updates per second
  distinctUntilChanged((a, b) => a?.version === b?.version)
).subscribe(thread => {
  updateExpensiveUI(thread);
});

// Debounce user input
searchInput$.pipe(
  debounceTime(300), // Wait for typing to stop
  distinctUntilChanged(),
  switchMap(query => searchService.search(query))
).subscribe(results => {
  displayResults(results);
});

3. Optimistic Concurrency Control

Version numbers prevent lost updates:

class OptimisticUpdateService {
  async updateThread(id: string, updates: ThreadUpdate): Promise<Thread> {
    const maxRetries = 3;
    let attempts = 0;
    
    while (attempts < maxRetries) {
      try {
        const current = await this.getThread(id);
        const updated = {
          ...current,
          ...updates,
          version: current.version + 1
        };
        
        return await this.api.updateThread(id, updated);
      } catch (error) {
        if (error.code === 'VERSION_CONFLICT' && attempts < maxRetries - 1) {
          attempts++;
          await this.delay(attempts * 100); // Exponential backoff
          continue;
        }
        throw error;
      }
    }
  }
}

Security and Isolation

Amp's architecture enforces security boundaries at multiple levels:

1. Service-Level Permissions

Each service validates permissions independently:

class SecureThreadService extends ThreadService {
  async modifyThread(
    id: string, 
    modifier: ThreadModifier
  ): Promise<Thread> {
    // Check permissions first
    const canModify = await this.permissionService.check({
      user: this.currentUser,
      action: 'thread:modify',
      resource: id
    });
    
    if (!canModify) {
      throw new PermissionError('Cannot modify thread');
    }
    
    return super.modifyThread(id, modifier);
  }
}

2. Data Isolation

Services maintain separate data stores per team:

class TeamIsolatedStorage implements IThreadStorage {
  constructor(
    private teamId: string,
    private baseStorage: IStorage
  ) {}
  
  private getTeamPath(threadId: string): string {
    return `teams/${this.teamId}/threads/${threadId}`;
  }
  
  async loadThread(id: string): Promise<Thread> {
    const path = this.getTeamPath(id);
    const data = await this.baseStorage.read(path);
    
    // Verify access permissions
    if (data.teamId !== this.teamId) {
      throw new Error('Access denied: insufficient permissions');
    }
    
    return data;
  }
}

3. API Gateway Protection

The server API client enforces authentication:

class AuthenticatedAPIClient extends ServerAPIClient {
  constructor(
    endpoint: string,
    private authService: IAuthService
  ) {
    super(endpoint);
  }
  
  protected async request<T>(
    method: string,
    path: string,
    data?: any
  ): Promise<T> {
    const token = await this.authService.getAccessToken();
    
    const response = await fetch(`${this.endpoint}${path}`, {
      method,
      headers: {
        'Authorization': `Bearer ${token}`,
        'Content-Type': 'application/json'
      },
      body: data ? JSON.stringify(data) : undefined
    });
    
    if (response.status === 401) {
      // Token expired, refresh and retry
      await this.authService.refreshToken();
      return this.request(method, path, data);
    }
    
    return response.json();
  }
}

Scaling Considerations

Amp's architecture supports horizontal scaling through several design decisions:

1. Stateless Services

Most services maintain no local state beyond caches:

// Services can be instantiated per-request for horizontal scaling
class StatelessThreadService {
  constructor(
    private storage: IThreadStorage,
    private cache: ICache
  ) {
    // No instance state maintained for scalability
  }
  
  async getThread(id: string): Promise<Thread> {
    // Check cache first for performance
    const cached = await this.cache.get(`thread:${id}`);
    if (cached) return cached;
    
    // Load from persistent storage
    const thread = await this.storage.load(id);
    await this.cache.set(`thread:${id}`, thread, { ttl: 300 });
    
    return thread;
  }
}

2. Distributed Caching

Cache layers can be shared across instances:

interface IDistributedCache {
  get<T>(key: string): Promise<T | null>;
  set<T>(key: string, value: T, options?: CacheOptions): Promise<void>;
  delete(key: string): Promise<void>;
  
  // Pub/sub for cache invalidation
  subscribe(pattern: string, handler: (key: string) => void): void;
  publish(key: string, event: CacheEvent): void;
}

3. Load Balancing Support

WebSocket connections support sticky sessions:

class WebSocketManager {
  private servers: string[] = [
    'wss://server1.example.com',
    'wss://server2.example.com',
    'wss://server3.example.com'
  ];
  
  async connect(sessionId: string): Promise<WebSocket> {
    // Use consistent hashing for session affinity
    const serverIndex = this.hash(sessionId) % this.servers.length;
    const server = this.servers[serverIndex];
    
    const ws = new WebSocket(`${server}?session=${sessionId}`);
    await this.waitForConnection(ws);
    
    return ws;
  }
}

Summary

Amp's architecture demonstrates how to build a production-ready collaborative AI system:

  • Service isolation ensures maintainability and testability
  • Observable patterns enable reactive, real-time updates
  • Optimistic updates provide responsive user experience
  • Careful resource management prevents memory leaks
  • Security boundaries protect user data
  • Scaling considerations support growth

The combination of these patterns creates a foundation that can evolve from serving individual developers to supporting entire engineering organizations. In the next chapter, we'll explore how Amp's authentication and identity system enables secure multi-user collaboration while maintaining the simplicity users expect.