Spaces:
Configuration error
STream3R API β Job Orchestration and Integration Plan
Executive Summary
This document proposes a lightweight STream3R API service that wraps the async job system implemented by the RQ worker. The API exposes RESTful endpoints and Redis Stream subscriptions that allow upstream applications to submit reconstruction jobs, track progress, and retrieve completed artifacts. Responsibilities are split cleanly: the API handles request validation, persistence, and orchestration, while the GPU worker (documented in design_docs/worker.md) performs heavy inference and storage of artifacts.
Key outcomes
- Unified interface for both short
pose_pointmapjobs and long-runningmodel_buildjobs. - Consistent job lifecycle backed by Postgres and Redis, mirroring the worker contract.
- Environment-agnostic integration so other services can enqueue jobs or consume progress events without GPU access.
1. Scope & Goals
Goals
- Provide a REST API for submitting jobs and querying job status or results.
- Offer optional server-sent events (SSE) or WebSocket feeds for near-real-time updates on
pose_pointmapjobs. - Enforce validation and idempotency for job submissions.
- Expose artifacts written by the worker (S3 URLs, local paths) without re-hosting the files.
Non-Goals
- Implement GPU inference or artifact generation (handled by RQ worker).
- Manage long-term artifact retention or CDN delivery.
- Provide fine-grained authorization beyond bearer token / API key patterns (left to integration).
2. Architecture Overview
Client ββHTTPβββΊ STream3R API ββRQ EnqueueβββΊ Redis Queue ββΊ Worker
β β β
β βββPostgresβββββββββββββββΊβ
β βββRedis Stream (events)βββ
β βββS3 (artifact URLs)ββββββ Worker writes
β
βββ Poll `/jobs/{id}` or Subscribe SSE/WebSocket for progress updates
Components:
- FastAPI service (recommended) running behind an ASGI server.
- Redis: shared with worker for queues and events.
- Postgres:
stream3r_jobstable is the canonical job record. - S3/Backblaze (or local storage): artifact URLs returned by worker.
- RQ worker (implemented separately) executing jobs and updating state.
3. Endpoints
POST /jobs
Submit a job for either pose_pointmap or model_build.
Request body
{
"job_type": "pose_pointmap",
"scene_id": "SCENE123",
"mode": "causal",
"streaming": true,
"frames": [
{"url": "https://.../frame_0000.jpg"},
{"path": "/data/captures/frame_0001.png"}
],
"session_settings": {"prediction_mode": "Predicted Pointmap"},
"client_request_id": "optional-idempotency-key"
}
Behavior
- Validate payload (non-empty frames, supported job type, etc.).
- If
client_request_idis provided, search Postgres for an existing job with the same key to ensure idempotency. - Assign
job_id(UUID) and enqueue the payload into the appropriate RQ queue (pose_pointmapormodel_build). - Insert
stream3r_jobsrow withstatus=queued. - Return
202 Acceptedwith job metadata:
{
"job_id": "uuid",
"status": "queued",
"job_type": "pose_pointmap",
"scene_id": "SCENE123"
}
GET /jobs/{job_id}
Fetch job state and artifact references from Postgres.
Response example (status=finished)
{
"job_id": "uuid",
"job_type": "model_build",
"scene_id": "SCENE123",
"status": "finished",
"created_at": "...",
"started_at": "...",
"completed_at": "...",
"result": {
"result_url": "s3://bucket/scene/SCENE123/stream3r/models/summary.json",
"model_dir": "s3://bucket/scene/SCENE123/stream3r/models/",
"artifacts": {
"scene_glb_url": "...",
"poses_url": "...",
"pointmaps": [ {"frame_id": "frame_0000", "url": "..."} ]
}
},
"error": null
}
GET /jobs/{job_id}/events
Server-Sent Events endpoint bridging Redis Streams.
- Uses
XREADonstream3r:eventswithjob_idfilter. - Suitable for browser or gateway consumers needing near-real-time progress.
- Emits lines like:
event: progress
data: {"progress": 60, "status": "progress"}
event: finished
data: {"result_url": "s3://..."}
Optionally provide a WebSocket variant if SSE is insufficient.
GET /jobs
Paged listing/filtering (optional but useful for dashboards).
Parameters: scene_id, job_type, status, pagination cursors.
4. Data Model & Persistence
Postgres (stream3r_jobs)
The API is the authoritative owner of the job record. It should create and migrate the following schema during startup (extend with client_request_id if idempotency keys are required):
CREATE TABLE IF NOT EXISTS stream3r_jobs (
job_id UUID PRIMARY KEY,
job_type TEXT NOT NULL, -- 'pose_pointmap' | 'model_build'
scene_id TEXT NOT NULL,
status TEXT NOT NULL, -- 'queued' | 'started' | 'finished' | 'failed'
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
payload JSONB, -- enqueue-time payload
result JSONB, -- worker-published result bundle
error TEXT,
client_request_id TEXT UNIQUE -- optional idempotency key
);
CREATE INDEX IF NOT EXISTS stream3r_jobs_scene_id_idx ON stream3r_jobs(scene_id);
CREATE INDEX IF NOT EXISTS stream3r_jobs_status_idx ON stream3r_jobs(status);
Redis
- Queues: two RQ queues existβ
pose_pointmapfor latency-sensitive pose extraction andmodel_buildfor long reconstruction jobs. The API selects the queue based onjob_type. - Event Stream: the worker pushes lifecycle updates to the Redis Stream
stream3r:events. Every entry is a flat map with the following fields:
job_id, job_type, scene_id,
status, progress, result_url, model_dir,
error, ts
status takes started, progress, finished, or failed. progress is an integer percentage (0β100). result_url and model_dir mirror the URLs stored in Postgres when a job completes.
Artifact Storage Layout
The worker persists artifacts to S3/Backblaze (or local storage) under a deterministic folder hierarchy. The API does not move files but should surface these URLs verbatim so consumers know where to fetch results:
scene/{scene_id}/stream3r/
results/
{job_id}.json # per-job result JSON (pose_pointmap)
models/
kv_cache.pt # serialized KV cache
predictions.npz # packed model outputs
session_settings.json # runtime/config settings
selected_frames.json # frame subset indices (optional)
scene.glb # fused 3D scene
poses.jsonl # per-frame extrinsics
summary.json # canonical model_build result JSON
pointmaps/
{frame_token}.npz # per-frame world coords + confidence
pose_pointmap jobs typically populate results/{job_id}.json plus pointmaps/; model_build jobs populate the models/ subtree. All URLs returned by the worker use this structure.
5. Job Lifecycle
- Submit (
POST /jobs):- Validate input, persist
queuedrow, enqueue payload. - Return
job_id.
- Validate input, persist
- Worker processing:
- Worker acquires GPU lock, runs inference, streams events, writes artifacts, and updates DB.
- Status checks:
- Clients poll
GET /jobs/{id}or subscribe to/jobs/{id}/events.
- Clients poll
- Completion:
- Job row contains
status=finished,resultJSON with URLs. - API response is the source of truth for artifact discovery.
- Job row contains
- Failure:
- Worker updates DB with
status=failed,errorstring. - API surfaces the error in
GET /jobs/{id}and via events.
- Worker updates DB with
6. Request Validation Contracts
POST /jobs validation rules:
job_typeβ {pose_pointmap,model_build}.scene_idnon-empty string.frameslist size β₯ 1 (unlessframes_dirprovided).- Each frame entry must have exactly one of
url,path, orcontent(base64 image string). modedefaultcausal; forbidfullfor streaming jobs to match worker behavior.- Optional numeric fields converted to
int/floatbefore enqueueing. - Enforce max frames (configurable) to avoid resource exhaustion.
7. Security & Authentication
- Deploy behind an API gateway that injects
X-Client-Idor similar metadata for auditing. - Support bearer token / API key auth via middleware; store hashed keys in Postgres if needed.
- Restrict access to internal network when possibleβas artifacts contain scene data.
- Sanitize inbound URLs to prevent SSRF; optionally proxy downloads through a whitelist.
8. Observability & Operations
- Logging: Structured logs capturing
job_id,scene_id,client_request_id, and remote IP. - Metrics: Track enqueue latency, job duration (from DB timestamps), queue depth, event lag.
- Health checks:
GET /healthzverifying Redis and Postgres connectivity. - Backpressure: Before accepting a new job, check queue length; if above threshold, return
429 Too Many Requests. - Timeouts: Configure HTTP request timeouts to avoid hanging on large payloads.
9. Deployment Considerations
- Package as a standalone FastAPI app (e.g.,
stream3r_api.main:app). - Run under Uvicorn/Gunicorn with workers sized for I/O-bound traffic.
- Configure service with the same environment variables as worker (
STREAM3R_REDIS_URL,STREAM3R_DB_DSN, etc.). - Use infrastructure-as-code to provision Redis, Postgres, and S3 credentials shared with worker.
10. Future Enhancements
- Job cancellation: Add
DELETE /jobs/{id}to flag jobs for cancellation (requires worker support). - Scene-level dashboards: Aggregate artifacts from multiple jobs for a scene.
- Signed download URLs: API could issue pre-signed URLs for public sharing, decoupled from worker credentials.
- Batch submissions: Support uploading a tar/zip and asynchronously unpacking/validating frames.
References
design_docs/worker.mdβ Worker design and artifact contractsstream3r/worker/tasks.pyβ Concrete payload fields exchanged with the API