calebhan's picture
initial docs
c27ae8d

API Design

Overview

The Rescored API provides REST endpoints for job submission and status tracking, plus WebSocket connections for real-time progress updates.

Base URL

  • Development: http://localhost:8000
  • Production: https://api.rescored.com (future)

Authentication

MVP: No authentication (local development only)

Future: JWT-based authentication

Authorization: Bearer <jwt_token>

REST Endpoints

1. Submit Transcription Job

Endpoint: POST /api/v1/transcribe

Purpose: Submit a YouTube URL for transcription processing.

Request:

{
  "youtube_url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
  "options": {
    "instruments": ["piano"],  // MVP: only "piano", future: ["drums", "bass", "guitar", etc.]
    "tempo_override": null,    // Optional: override detected tempo (BPM)
    "key_override": null       // Optional: override detected key (e.g., "C", "Gm")
  }
}

Response (201 Created):

{
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "queued",
  "created_at": "2025-01-15T10:30:00Z",
  "estimated_duration_seconds": 120,
  "websocket_url": "ws://localhost:8000/api/v1/jobs/550e8400-e29b-41d4-a716-446655440000/stream"
}

Errors:

// 400 Bad Request - Invalid URL
{
  "error": "Invalid YouTube URL format"
}

// 422 Unprocessable Entity - Video unavailable
{
  "error": "Video is age-restricted or private"
}

// 429 Too Many Requests - Rate limit exceeded
{
  "error": "Rate limit exceeded. Max 10 jobs per hour.",
  "retry_after_seconds": 3600
}

Implementation:

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, HttpUrl
from uuid import uuid4
from datetime import datetime

app = FastAPI()

class TranscribeRequest(BaseModel):
    youtube_url: HttpUrl
    options: dict = {"instruments": ["piano"]}

class TranscribeResponse(BaseModel):
    job_id: str
    status: str
    created_at: datetime
    estimated_duration_seconds: int
    websocket_url: str

@app.post("/api/v1/transcribe", response_model=TranscribeResponse, status_code=201)
async def submit_transcription(request: TranscribeRequest):
    # Validate YouTube URL
    is_valid, video_id_or_error = validate_youtube_url(str(request.youtube_url))
    if not is_valid:
        raise HTTPException(status_code=400, detail=video_id_or_error)

    # Check video availability
    availability = check_video_availability(video_id_or_error)
    if not availability['available']:
        raise HTTPException(status_code=422, detail=availability['reason'])

    # Create job
    job_id = str(uuid4())
    job_data = {
        "job_id": job_id,
        "status": "queued",
        "youtube_url": str(request.youtube_url),
        "video_id": video_id_or_error,
        "options": request.options,
        "created_at": datetime.utcnow().isoformat(),
        "progress": 0,
    }

    # Store in Redis
    redis_client.hset(f"job:{job_id}", mapping=job_data)

    # Queue Celery task
    process_transcription_task.delay(job_id)

    return TranscribeResponse(
        job_id=job_id,
        status="queued",
        created_at=datetime.utcnow(),
        estimated_duration_seconds=120,
        websocket_url=f"ws://localhost:8000/api/v1/jobs/{job_id}/stream"
    )

2. Get Job Status

Endpoint: GET /api/v1/jobs/{job_id}

Purpose: Poll job status (alternative to WebSocket).

Response:

{
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "processing",  // "queued" | "processing" | "completed" | "failed"
  "progress": 65,          // 0-100
  "current_stage": "transcription",  // "download" | "separation" | "transcription" | "musicxml"
  "created_at": "2025-01-15T10:30:00Z",
  "started_at": "2025-01-15T10:30:05Z",
  "completed_at": null,
  "error": null,
  "result_url": null       // Available when status="completed"
}

Completed Job:

{
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "completed",
  "progress": 100,
  "current_stage": "musicxml",
  "created_at": "2025-01-15T10:30:00Z",
  "started_at": "2025-01-15T10:30:05Z",
  "completed_at": "2025-01-15T10:32:15Z",
  "duration_seconds": 130,
  "result_url": "/api/v1/scores/550e8400-e29b-41d4-a716-446655440000"
}

Failed Job:

{
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "status": "failed",
  "progress": 35,
  "current_stage": "separation",
  "error": {
    "message": "GPU out of memory",
    "retryable": true
  },
  "created_at": "2025-01-15T10:30:00Z",
  "started_at": "2025-01-15T10:30:05Z",
  "failed_at": "2025-01-15T10:31:20Z"
}

Implementation:

@app.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str):
    job_data = redis_client.hgetall(f"job:{job_id}")

    if not job_data:
        raise HTTPException(status_code=404, detail="Job not found")

    return job_data

3. Download MusicXML

Endpoint: GET /api/v1/scores/{job_id}

Purpose: Download the generated MusicXML file.

Response: application/vnd.recordare.musicxml+xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE score-partwise PUBLIC "-//Recordare//DTD MusicXML 4.0 Partwise//EN" "http://www.musicxml.org/dtds/partwise.dtd">
<score-partwise version="4.0">
  <part-list>
    <score-part id="P1">
      <part-name>Piano</part-name>
    </score-part>
  </part-list>
  <part id="P1">
    <!-- measures, notes, etc. -->
  </part>
</score-partwise>

Headers:

Content-Type: application/vnd.recordare.musicxml+xml
Content-Disposition: attachment; filename="score_550e8400.musicxml"

Errors:

// 404 Not Found
{
  "error": "Job not found or not yet completed"
}

Implementation:

from fastapi.responses import FileResponse

@app.get("/api/v1/scores/{job_id}")
async def download_score(job_id: str):
    job_data = redis_client.hgetall(f"job:{job_id}")

    if not job_data or job_data['status'] != 'completed':
        raise HTTPException(status_code=404, detail="Score not available")

    file_path = Path(job_data['output_path'])

    if not file_path.exists():
        raise HTTPException(status_code=404, detail="Score file not found")

    return FileResponse(
        path=file_path,
        media_type="application/vnd.recordare.musicxml+xml",
        filename=f"score_{job_id}.musicxml"
    )

4. Download MIDI

Endpoint: GET /api/v1/scores/{job_id}/midi

Purpose: Download MIDI version of the score.

Response: audio/midi

Headers:

Content-Type: audio/midi
Content-Disposition: attachment; filename="score_550e8400.mid"

Implementation:

@app.get("/api/v1/scores/{job_id}/midi")
async def download_midi(job_id: str):
    # Similar to MusicXML download, but serve .mid file
    pass

WebSocket Endpoint

Real-Time Progress Updates

Endpoint: WS /api/v1/jobs/{job_id}/stream

Purpose: Stream real-time progress updates to the client.

Connection:

// Frontend code
const ws = new WebSocket(`ws://localhost:8000/api/v1/jobs/${jobId}/stream`);

ws.onmessage = (event) => {
  const update = JSON.parse(event.data);
  console.log(update);
};

Message Types:

1. Progress Update:

{
  "type": "progress",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "progress": 45,
  "stage": "separation",
  "message": "Separated drums stem",
  "timestamp": "2025-01-15T10:30:45Z"
}

2. Completion:

{
  "type": "completed",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "progress": 100,
  "result_url": "/api/v1/scores/550e8400-e29b-41d4-a716-446655440000",
  "timestamp": "2025-01-15T10:32:15Z"
}

3. Error:

{
  "type": "error",
  "job_id": "550e8400-e29b-41d4-a716-446655440000",
  "error": {
    "message": "Failed to download audio: Network error",
    "retryable": true
  },
  "timestamp": "2025-01-15T10:31:00Z"
}

Implementation:

from fastapi import WebSocket
from fastapi.websockets import WebSocketDisconnect
import asyncio

class ConnectionManager:
    def __init__(self):
        self.active_connections: dict[str, list[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, job_id: str):
        await websocket.accept()
        if job_id not in self.active_connections:
            self.active_connections[job_id] = []
        self.active_connections[job_id].append(websocket)

    def disconnect(self, websocket: WebSocket, job_id: str):
        self.active_connections[job_id].remove(websocket)

    async def send_update(self, job_id: str, message: dict):
        if job_id in self.active_connections:
            for connection in self.active_connections[job_id]:
                await connection.send_json(message)

manager = ConnectionManager()

@app.websocket("/api/v1/jobs/{job_id}/stream")
async def websocket_endpoint(websocket: WebSocket, job_id: str):
    await manager.connect(websocket, job_id)

    try:
        # Keep connection alive and send updates
        while True:
            # Check for updates in Redis
            job_data = redis_client.hgetall(f"job:{job_id}")

            if job_data:
                message = {
                    "type": "progress",
                    "job_id": job_id,
                    "progress": int(job_data.get('progress', 0)),
                    "stage": job_data.get('current_stage', ''),
                    "message": job_data.get('status_message', ''),
                    "timestamp": datetime.utcnow().isoformat(),
                }

                await websocket.send_json(message)

                # Check if job completed or failed
                if job_data['status'] in ['completed', 'failed']:
                    break

            await asyncio.sleep(1)  # Poll every second

    except WebSocketDisconnect:
        manager.disconnect(websocket, job_id)

Alternative: Redis Pub/Sub for Efficiency

Instead of polling Redis every second, use pub/sub:

# Worker publishes updates
redis_client.publish(f"job:{job_id}:updates", json.dumps(message))

# WebSocket subscribes
pubsub = redis_client.pubsub()
pubsub.subscribe(f"job:{job_id}:updates")

for message in pubsub.listen():
    await websocket.send_json(json.loads(message['data']))

Data Models

Job Status

from enum import Enum

class JobStatus(str, Enum):
    QUEUED = "queued"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

class JobStage(str, Enum):
    DOWNLOAD = "download"
    SEPARATION = "separation"
    TRANSCRIPTION = "transcription"
    MUSICXML = "musicxml"

Job Schema (Redis)

job_data = {
    "job_id": str,
    "status": JobStatus,
    "youtube_url": str,
    "video_id": str,
    "progress": int,  # 0-100
    "current_stage": JobStage,
    "status_message": str,  # e.g., "Separated drums stem"
    "created_at": str,  # ISO 8601
    "started_at": str | None,
    "completed_at": str | None,
    "failed_at": str | None,
    "output_path": str | None,  # Path to .musicxml file
    "error": dict | None,
}

Error Handling

HTTP Error Codes

Code Meaning Example
400 Bad Request Invalid URL format
404 Not Found Job ID doesn't exist
422 Unprocessable Entity Video unavailable or age-restricted
429 Too Many Requests Rate limit exceeded
500 Internal Server Error Unexpected server error

Error Response Format

{
  "error": "Human-readable error message",
  "details": {
    "field": "youtube_url",
    "issue": "Invalid format"
  }
}

Rate Limiting

MVP: Simple in-memory rate limiting

Production: Redis-based rate limiting with sliding window

Limits:

  • 10 jobs per IP per hour (unauthenticated)
  • 100 jobs per user per hour (authenticated, future)

Implementation:

from fastapi import Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter

@app.post("/api/v1/transcribe")
@limiter.limit("10/hour")
async def submit_transcription(request: Request, transcribe_request: TranscribeRequest):
    # ... implementation
    pass

CORS Configuration

Development:

from fastapi.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["http://localhost:5173"],  # Vite dev server
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

Production:

allow_origins=["https://rescored.com", "https://www.rescored.com"]

API Versioning

Current: /api/v1/

Future: /api/v2/ for breaking changes

Deprecation Policy: Support old version for 6 months after new version release


Next Steps

  1. Implement Celery workers to process jobs
  2. Test WebSocket connections with frontend
  3. Add monitoring for API latency and error rates
  4. Implement proper authentication for production

See WebSocket Protocol for detailed message specs.