# STream3R API — Job Orchestration and Integration Plan ## Executive Summary This document proposes a lightweight STream3R API service that wraps the async job system implemented by the RQ worker. The API exposes RESTful endpoints and Redis Stream subscriptions that allow upstream applications to submit reconstruction jobs, track progress, and retrieve completed artifacts. Responsibilities are split cleanly: the API handles request validation, persistence, and orchestration, while the GPU worker (documented in `design_docs/worker.md`) performs heavy inference and storage of artifacts. **Key outcomes** - Unified interface for both short `pose_pointmap` jobs and long-running `model_build` jobs. - Consistent job lifecycle backed by Postgres and Redis, mirroring the worker contract. - Environment-agnostic integration so other services can enqueue jobs or consume progress events without GPU access. --- ## 1. Scope & Goals ### Goals - Provide a REST API for submitting jobs and querying job status or results. - Offer optional server-sent events (SSE) or WebSocket feeds for near-real-time updates on `pose_pointmap` jobs. - Enforce validation and idempotency for job submissions. - Expose artifacts written by the worker (S3 URLs, local paths) without re-hosting the files. ### Non-Goals - Implement GPU inference or artifact generation (handled by RQ worker). - Manage long-term artifact retention or CDN delivery. - Provide fine-grained authorization beyond bearer token / API key patterns (left to integration). --- ## 2. Architecture Overview ``` Client ──HTTP──► STream3R API ──RQ Enqueue──► Redis Queue ─► Worker │ │ │ │ └──Postgres──────────────►│ │ └──Redis Stream (events)◄─┘ │ └──S3 (artifact URLs)◄───── Worker writes │ └── Poll `/jobs/{id}` or Subscribe SSE/WebSocket for progress updates ``` Components: - **FastAPI** service (recommended) running behind an ASGI server. - **Redis**: shared with worker for queues and events. - **Postgres**: `stream3r_jobs` table is the canonical job record. - **S3/Backblaze** (or local storage): artifact URLs returned by worker. - **RQ worker** (implemented separately) executing jobs and updating state. --- ## 3. Endpoints ### `POST /jobs` Submit a job for either `pose_pointmap` or `model_build`. **Request body** ```json { "job_type": "pose_pointmap", "scene_id": "SCENE123", "mode": "causal", "streaming": true, "frames": [ {"url": "https://.../frame_0000.jpg"}, {"path": "/data/captures/frame_0001.png"} ], "session_settings": {"prediction_mode": "Predicted Pointmap"}, "client_request_id": "optional-idempotency-key" } ``` **Behavior** - Validate payload (non-empty frames, supported job type, etc.). - If `client_request_id` is provided, search Postgres for an existing job with the same key to ensure idempotency. - Assign `job_id` (UUID) and enqueue the payload into the appropriate RQ queue (`pose_pointmap` or `model_build`). - Insert `stream3r_jobs` row with `status=queued`. - Return `202 Accepted` with job metadata: ```json { "job_id": "uuid", "status": "queued", "job_type": "pose_pointmap", "scene_id": "SCENE123" } ``` ### `GET /jobs/{job_id}` Fetch job state and artifact references from Postgres. **Response example** (`status=finished`) ```json { "job_id": "uuid", "job_type": "model_build", "scene_id": "SCENE123", "status": "finished", "created_at": "...", "started_at": "...", "completed_at": "...", "result": { "result_url": "s3://bucket/scene/SCENE123/stream3r/models/summary.json", "model_dir": "s3://bucket/scene/SCENE123/stream3r/models/", "artifacts": { "scene_glb_url": "...", "poses_url": "...", "pointmaps": [ {"frame_id": "frame_0000", "url": "..."} ] } }, "error": null } ``` ### `GET /jobs/{job_id}/events` Server-Sent Events endpoint bridging Redis Streams. - Uses `XREAD` on `stream3r:events` with `job_id` filter. - Suitable for browser or gateway consumers needing near-real-time progress. - Emits lines like: ``` event: progress data: {"progress": 60, "status": "progress"} event: finished data: {"result_url": "s3://..."} ``` Optionally provide a WebSocket variant if SSE is insufficient. ### `GET /jobs` Paged listing/filtering (optional but useful for dashboards). Parameters: `scene_id`, `job_type`, `status`, pagination cursors. --- ## 4. Data Model & Persistence ### Postgres (`stream3r_jobs`) The API is the authoritative owner of the job record. It should create and migrate the following schema during startup (extend with `client_request_id` if idempotency keys are required): ```sql CREATE TABLE IF NOT EXISTS stream3r_jobs ( job_id UUID PRIMARY KEY, job_type TEXT NOT NULL, -- 'pose_pointmap' | 'model_build' scene_id TEXT NOT NULL, status TEXT NOT NULL, -- 'queued' | 'started' | 'finished' | 'failed' created_at TIMESTAMPTZ NOT NULL DEFAULT now(), started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, payload JSONB, -- enqueue-time payload result JSONB, -- worker-published result bundle error TEXT, client_request_id TEXT UNIQUE -- optional idempotency key ); CREATE INDEX IF NOT EXISTS stream3r_jobs_scene_id_idx ON stream3r_jobs(scene_id); CREATE INDEX IF NOT EXISTS stream3r_jobs_status_idx ON stream3r_jobs(status); ``` ### Redis - **Queues**: two RQ queues exist—`pose_pointmap` for latency-sensitive pose extraction and `model_build` for long reconstruction jobs. The API selects the queue based on `job_type`. - **Event Stream**: the worker pushes lifecycle updates to the Redis Stream `stream3r:events`. Every entry is a flat map with the following fields: ``` job_id, job_type, scene_id, status, progress, result_url, model_dir, error, ts ``` `status` takes `started`, `progress`, `finished`, or `failed`. `progress` is an integer percentage (0–100). `result_url` and `model_dir` mirror the URLs stored in Postgres when a job completes. ### Artifact Storage Layout The worker persists artifacts to S3/Backblaze (or local storage) under a deterministic folder hierarchy. The API does not move files but should surface these URLs verbatim so consumers know where to fetch results: ``` scene/{scene_id}/stream3r/ results/ {job_id}.json # per-job result JSON (pose_pointmap) models/ kv_cache.pt # serialized KV cache predictions.npz # packed model outputs session_settings.json # runtime/config settings selected_frames.json # frame subset indices (optional) scene.glb # fused 3D scene poses.jsonl # per-frame extrinsics summary.json # canonical model_build result JSON pointmaps/ {frame_token}.npz # per-frame world coords + confidence ``` `pose_pointmap` jobs typically populate `results/{job_id}.json` plus `pointmaps/`; `model_build` jobs populate the `models/` subtree. All URLs returned by the worker use this structure. --- ## 5. Job Lifecycle 1. **Submit** (`POST /jobs`): - Validate input, persist `queued` row, enqueue payload. - Return `job_id`. 2. **Worker processing**: - Worker acquires GPU lock, runs inference, streams events, writes artifacts, and updates DB. 3. **Status checks**: - Clients poll `GET /jobs/{id}` or subscribe to `/jobs/{id}/events`. 4. **Completion**: - Job row contains `status=finished`, `result` JSON with URLs. - API response is the source of truth for artifact discovery. 5. **Failure**: - Worker updates DB with `status=failed`, `error` string. - API surfaces the error in `GET /jobs/{id}` and via events. --- ## 6. Request Validation Contracts `POST /jobs` validation rules: - `job_type` ∈ {`pose_pointmap`, `model_build`}. - `scene_id` non-empty string. - `frames` list size ≥ 1 (unless `frames_dir` provided). - Each frame entry must have exactly one of `url`, `path`, or `content` (base64 image string). - `mode` default `causal`; forbid `full` for streaming jobs to match worker behavior. - Optional numeric fields converted to `int/float` before enqueueing. - Enforce max frames (configurable) to avoid resource exhaustion. --- ## 7. Security & Authentication - Deploy behind an API gateway that injects `X-Client-Id` or similar metadata for auditing. - Support bearer token / API key auth via middleware; store hashed keys in Postgres if needed. - Restrict access to internal network when possible—as artifacts contain scene data. - Sanitize inbound URLs to prevent SSRF; optionally proxy downloads through a whitelist. --- ## 8. Observability & Operations - **Logging**: Structured logs capturing `job_id`, `scene_id`, `client_request_id`, and remote IP. - **Metrics**: Track enqueue latency, job duration (from DB timestamps), queue depth, event lag. - **Health checks**: `GET /healthz` verifying Redis and Postgres connectivity. - **Backpressure**: Before accepting a new job, check queue length; if above threshold, return `429 Too Many Requests`. - **Timeouts**: Configure HTTP request timeouts to avoid hanging on large payloads. --- ## 9. Deployment Considerations - Package as a standalone FastAPI app (e.g., `stream3r_api.main:app`). - Run under Uvicorn/Gunicorn with workers sized for I/O-bound traffic. - Configure service with the same environment variables as worker (`STREAM3R_REDIS_URL`, `STREAM3R_DB_DSN`, etc.). - Use infrastructure-as-code to provision Redis, Postgres, and S3 credentials shared with worker. --- ## 10. Future Enhancements - **Job cancellation**: Add `DELETE /jobs/{id}` to flag jobs for cancellation (requires worker support). - **Scene-level dashboards**: Aggregate artifacts from multiple jobs for a scene. - **Signed download URLs**: API could issue pre-signed URLs for public sharing, decoupled from worker credentials. - **Batch submissions**: Support uploading a tar/zip and asynchronously unpacking/validating frames. --- **References** - `design_docs/worker.md` — Worker design and artifact contracts - `stream3r/worker/tasks.py` — Concrete payload fields exchanged with the API