rescored / docs /integration /websocket-protocol.md
calebhan's picture
initial docs
c27ae8d

WebSocket Protocol for Real-Time Updates

Overview

WebSocket connections provide real-time progress updates from backend workers to the frontend during transcription.

Why WebSocket?

  • Push-Based: Server pushes updates, no client polling
  • Low Latency: ~10-50ms vs. 1000ms for HTTP polling
  • Efficient: Single persistent connection vs. repeated HTTP requests
  • Real-Time UX: Smooth progress bar, stage updates

Connection Lifecycle

sequenceDiagram
    participant Client
    participant Server
    participant Worker

    Client->>Server: 1. Connect WS ws://localhost:8000/api/v1/jobs/{job_id}/stream
    Server->>Client: 2. Connection established
    Worker->>Server: 3. Send updates
    Server->>Client: Broadcast updates
    Worker->>Server: 4. Job complete
    Server->>Client: Send "completed" message
    Client->>Server: 5. Close connection

Message Types

1. Progress Update

Sent by: Worker during processing Frequency: Every 5-10% progress or stage change

{
  "type": "progress",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "progress": 45,
  "stage": "separation",
  "message": "Separated drums stem (2/4)",
  "timestamp": "2025-01-15T10:30:45Z"
}

Fields:

  • progress: 0-100
  • stage: "download" | "separation" | "transcription" | "musicxml"
  • message: Human-readable status

2. Completion

Sent by: Worker when job finishes successfully

{
  "type": "completed",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "progress": 100,
  "result_url": "/api/v1/scores/550e8400-e29b-41d4-a716-446655440000",
  "duration_seconds": 125,
  "timestamp": "2025-01-15T10:32:15Z"
}

Client Action: Fetch MusicXML from result_url


3. Error

Sent by: Worker when job fails

{
  "type": "error",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "error": {
    "message": "GPU out of memory during source separation",
    "retryable": true,
    "stage": "separation"
  },
  "timestamp": "2025-01-15T10:31:00Z"
}

Client Action: Show error, optionally retry if retryable: true


4. Heartbeat (Keep-Alive)

Sent by: Server every 30 seconds Purpose: Detect dropped connections

{
  "type": "heartbeat",
  "timestamp": "2025-01-15T10:30:00Z"
}

Client Action: Send "pong" response

{
  "type": "pong",
  "timestamp": "2025-01-15T10:30:00Z"
}

Frontend Implementation

WebSocket Hook

import { useEffect, useState } from 'react';

interface ProgressUpdate {
  type: 'progress' | 'completed' | 'error';
  progress: number;
  stage?: string;
  message?: string;
  result_url?: string;
  error?: { message: string; retryable: boolean };
}

export function useJobProgress(jobId: string) {
  const [progress, setProgress] = useState(0);
  const [status, setStatus] = useState<'connecting' | 'processing' | 'completed' | 'failed'>('connecting');
  const [error, setError] = useState<string | null>(null);

  useEffect(() => {
    const ws = new WebSocket(`ws://localhost:8000/api/v1/jobs/${jobId}/stream`);

    ws.onopen = () => {
      console.log('WebSocket connected');
      setStatus('processing');
    };

    ws.onmessage = (event) => {
      const update: ProgressUpdate = JSON.parse(event.data);

      switch (update.type) {
        case 'progress':
          setProgress(update.progress);
          break;

        case 'completed':
          setProgress(100);
          setStatus('completed');
          // Fetch score
          fetchScore(update.result_url!);
          break;

        case 'error':
          setStatus('failed');
          setError(update.error!.message);
          break;

        case 'heartbeat':
          // Send pong
          ws.send(JSON.stringify({ type: 'pong', timestamp: new Date().toISOString() }));
          break;
      }
    };

    ws.onerror = (error) => {
      console.error('WebSocket error:', error);
      setStatus('failed');
      setError('Connection error');
    };

    ws.onclose = () => {
      console.log('WebSocket closed');
    };

    return () => {
      ws.close();
    };
  }, [jobId]);

  return { progress, status, error };
}

Backend Implementation (FastAPI)

Connection Manager

from fastapi import WebSocket
from typing import Dict, List
import json

class ConnectionManager:
    def __init__(self):
        self.active_connections: Dict[str, List[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, job_id: str):
        await websocket.accept()
        if job_id not in self.active_connections:
            self.active_connections[job_id] = []
        self.active_connections[job_id].append(websocket)

    def disconnect(self, websocket: WebSocket, job_id: str):
        if job_id in self.active_connections:
            self.active_connections[job_id].remove(websocket)

    async def broadcast(self, job_id: str, message: dict):
        """Send message to all clients connected to this job."""
        if job_id in self.active_connections:
            dead_connections = []

            for connection in self.active_connections[job_id]:
                try:
                    await connection.send_json(message)
                except:
                    dead_connections.append(connection)

            # Clean up dead connections
            for conn in dead_connections:
                self.disconnect(conn, job_id)

manager = ConnectionManager()

WebSocket Endpoint

@app.websocket("/api/v1/jobs/{job_id}/stream")
async def websocket_endpoint(websocket: WebSocket, job_id: str):
    await manager.connect(websocket, job_id)

    try:
        # Subscribe to Redis pub/sub for this job
        pubsub = redis_client.pubsub()
        pubsub.subscribe(f"job:{job_id}:updates")

        async for message in pubsub.listen():
            if message['type'] == 'message':
                update = json.loads(message['data'])
                await websocket.send_json(update)

                # Close connection if job completed or failed
                if update.get('type') in ['completed', 'error']:
                    break

    except WebSocketDisconnect:
        manager.disconnect(websocket, job_id)
    finally:
        pubsub.unsubscribe(f"job:{job_id}:updates")

Error Handling

Reconnection Strategy

let reconnectAttempts = 0;
const MAX_RECONNECTS = 5;

function connectWithRetry(jobId: string) {
  const ws = new WebSocket(`ws://localhost:8000/api/v1/jobs/${jobId}/stream`);

  ws.onerror = () => {
    if (reconnectAttempts < MAX_RECONNECTS) {
      reconnectAttempts++;
      const delay = Math.min(1000 * 2 ** reconnectAttempts, 10000);  // Exponential backoff

      setTimeout(() => connectWithRetry(jobId), delay);
    } else {
      // Fallback to polling
      pollJobStatus(jobId);
    }
  };
}

Fallback to Polling

If WebSocket fails, fall back to HTTP polling:

function pollJobStatus(jobId: string) {
  const interval = setInterval(async () => {
    const response = await fetch(`/api/v1/jobs/${jobId}`);
    const job = await response.json();

    setProgress(job.progress);

    if (job.status === 'completed' || job.status === 'failed') {
      clearInterval(interval);
    }
  }, 2000);  // Poll every 2 seconds
}

Security

Authentication (Future)

// Include JWT in connection
const ws = new WebSocket(`ws://localhost:8000/api/v1/jobs/${jobId}/stream?token=${jwtToken}`);

Rate Limiting

Limit connections per IP to prevent abuse:

from slowapi import Limiter

@app.websocket("/api/v1/jobs/{job_id}/stream")
@limiter.limit("10/minute")
async def websocket_endpoint(...):
    pass

Next Steps

See API Design for REST endpoint integration.