dwellbot_stream3r / design_docs /stream3r_api.md
brian4dwell's picture
initi worker working
1c5aca1

STream3R API β€” Job Orchestration and Integration Plan

Executive Summary

This document proposes a lightweight STream3R API service that wraps the async job system implemented by the RQ worker. The API exposes RESTful endpoints and Redis Stream subscriptions that allow upstream applications to submit reconstruction jobs, track progress, and retrieve completed artifacts. Responsibilities are split cleanly: the API handles request validation, persistence, and orchestration, while the GPU worker (documented in design_docs/worker.md) performs heavy inference and storage of artifacts.

Key outcomes

  • Unified interface for both short pose_pointmap jobs and long-running model_build jobs.
  • Consistent job lifecycle backed by Postgres and Redis, mirroring the worker contract.
  • Environment-agnostic integration so other services can enqueue jobs or consume progress events without GPU access.

1. Scope & Goals

Goals

  • Provide a REST API for submitting jobs and querying job status or results.
  • Offer optional server-sent events (SSE) or WebSocket feeds for near-real-time updates on pose_pointmap jobs.
  • Enforce validation and idempotency for job submissions.
  • Expose artifacts written by the worker (S3 URLs, local paths) without re-hosting the files.

Non-Goals

  • Implement GPU inference or artifact generation (handled by RQ worker).
  • Manage long-term artifact retention or CDN delivery.
  • Provide fine-grained authorization beyond bearer token / API key patterns (left to integration).

2. Architecture Overview

Client ──HTTP──► STream3R API ──RQ Enqueue──► Redis Queue ─► Worker
   β”‚                          β”‚                         β”‚
   β”‚                          └──Postgres──────────────►│
   β”‚                          └──Redis Stream (events)β—„β”€β”˜
   β”‚                          └──S3 (artifact URLs)◄───── Worker writes
   β”‚
   └── Poll `/jobs/{id}` or Subscribe SSE/WebSocket for progress updates

Components:

  • FastAPI service (recommended) running behind an ASGI server.
  • Redis: shared with worker for queues and events.
  • Postgres: stream3r_jobs table is the canonical job record.
  • S3/Backblaze (or local storage): artifact URLs returned by worker.
  • RQ worker (implemented separately) executing jobs and updating state.

3. Endpoints

POST /jobs

Submit a job for either pose_pointmap or model_build.

Request body

{
  "job_type": "pose_pointmap",
  "scene_id": "SCENE123",
  "mode": "causal",
  "streaming": true,
  "frames": [
    {"url": "https://.../frame_0000.jpg"},
    {"path": "/data/captures/frame_0001.png"}
  ],
  "session_settings": {"prediction_mode": "Predicted Pointmap"},
  "client_request_id": "optional-idempotency-key"
}

Behavior

  • Validate payload (non-empty frames, supported job type, etc.).
  • If client_request_id is provided, search Postgres for an existing job with the same key to ensure idempotency.
  • Assign job_id (UUID) and enqueue the payload into the appropriate RQ queue (pose_pointmap or model_build).
  • Insert stream3r_jobs row with status=queued.
  • Return 202 Accepted with job metadata:
{
  "job_id": "uuid",
  "status": "queued",
  "job_type": "pose_pointmap",
  "scene_id": "SCENE123"
}

GET /jobs/{job_id}

Fetch job state and artifact references from Postgres.

Response example (status=finished)

{
  "job_id": "uuid",
  "job_type": "model_build",
  "scene_id": "SCENE123",
  "status": "finished",
  "created_at": "...",
  "started_at": "...",
  "completed_at": "...",
  "result": {
    "result_url": "s3://bucket/scene/SCENE123/stream3r/models/summary.json",
    "model_dir": "s3://bucket/scene/SCENE123/stream3r/models/",
    "artifacts": {
      "scene_glb_url": "...",
      "poses_url": "...",
      "pointmaps": [ {"frame_id": "frame_0000", "url": "..."} ]
    }
  },
  "error": null
}

GET /jobs/{job_id}/events

Server-Sent Events endpoint bridging Redis Streams.

  • Uses XREAD on stream3r:events with job_id filter.
  • Suitable for browser or gateway consumers needing near-real-time progress.
  • Emits lines like:
event: progress
data: {"progress": 60, "status": "progress"}

event: finished
data: {"result_url": "s3://..."}

Optionally provide a WebSocket variant if SSE is insufficient.

GET /jobs

Paged listing/filtering (optional but useful for dashboards).

Parameters: scene_id, job_type, status, pagination cursors.


4. Data Model & Persistence

Postgres (stream3r_jobs)

The API is the authoritative owner of the job record. It should create and migrate the following schema during startup (extend with client_request_id if idempotency keys are required):

CREATE TABLE IF NOT EXISTS stream3r_jobs (
  job_id         UUID PRIMARY KEY,
  job_type       TEXT NOT NULL,           -- 'pose_pointmap' | 'model_build'
  scene_id       TEXT NOT NULL,
  status         TEXT NOT NULL,           -- 'queued' | 'started' | 'finished' | 'failed'
  created_at     TIMESTAMPTZ NOT NULL DEFAULT now(),
  started_at     TIMESTAMPTZ,
  completed_at   TIMESTAMPTZ,
  payload        JSONB,                   -- enqueue-time payload
  result         JSONB,                   -- worker-published result bundle
  error          TEXT,
  client_request_id TEXT UNIQUE           -- optional idempotency key
);

CREATE INDEX IF NOT EXISTS stream3r_jobs_scene_id_idx ON stream3r_jobs(scene_id);
CREATE INDEX IF NOT EXISTS stream3r_jobs_status_idx   ON stream3r_jobs(status);

Redis

  • Queues: two RQ queues existβ€”pose_pointmap for latency-sensitive pose extraction and model_build for long reconstruction jobs. The API selects the queue based on job_type.
  • Event Stream: the worker pushes lifecycle updates to the Redis Stream stream3r:events. Every entry is a flat map with the following fields:
job_id, job_type, scene_id,
status, progress, result_url, model_dir,
error, ts

status takes started, progress, finished, or failed. progress is an integer percentage (0–100). result_url and model_dir mirror the URLs stored in Postgres when a job completes.

Artifact Storage Layout

The worker persists artifacts to S3/Backblaze (or local storage) under a deterministic folder hierarchy. The API does not move files but should surface these URLs verbatim so consumers know where to fetch results:

scene/{scene_id}/stream3r/
  results/
    {job_id}.json                     # per-job result JSON (pose_pointmap)
  models/
    kv_cache.pt                       # serialized KV cache
    predictions.npz                   # packed model outputs
    session_settings.json             # runtime/config settings
    selected_frames.json              # frame subset indices (optional)
    scene.glb                         # fused 3D scene
    poses.jsonl                       # per-frame extrinsics
    summary.json                      # canonical model_build result JSON
    pointmaps/
      {frame_token}.npz               # per-frame world coords + confidence

pose_pointmap jobs typically populate results/{job_id}.json plus pointmaps/; model_build jobs populate the models/ subtree. All URLs returned by the worker use this structure.


5. Job Lifecycle

  1. Submit (POST /jobs):
    • Validate input, persist queued row, enqueue payload.
    • Return job_id.
  2. Worker processing:
    • Worker acquires GPU lock, runs inference, streams events, writes artifacts, and updates DB.
  3. Status checks:
    • Clients poll GET /jobs/{id} or subscribe to /jobs/{id}/events.
  4. Completion:
    • Job row contains status=finished, result JSON with URLs.
    • API response is the source of truth for artifact discovery.
  5. Failure:
    • Worker updates DB with status=failed, error string.
    • API surfaces the error in GET /jobs/{id} and via events.

6. Request Validation Contracts

POST /jobs validation rules:

  • job_type ∈ {pose_pointmap, model_build}.
  • scene_id non-empty string.
  • frames list size β‰₯ 1 (unless frames_dir provided).
  • Each frame entry must have exactly one of url, path, or content (base64 image string).
  • mode default causal; forbid full for streaming jobs to match worker behavior.
  • Optional numeric fields converted to int/float before enqueueing.
  • Enforce max frames (configurable) to avoid resource exhaustion.

7. Security & Authentication

  • Deploy behind an API gateway that injects X-Client-Id or similar metadata for auditing.
  • Support bearer token / API key auth via middleware; store hashed keys in Postgres if needed.
  • Restrict access to internal network when possibleβ€”as artifacts contain scene data.
  • Sanitize inbound URLs to prevent SSRF; optionally proxy downloads through a whitelist.

8. Observability & Operations

  • Logging: Structured logs capturing job_id, scene_id, client_request_id, and remote IP.
  • Metrics: Track enqueue latency, job duration (from DB timestamps), queue depth, event lag.
  • Health checks: GET /healthz verifying Redis and Postgres connectivity.
  • Backpressure: Before accepting a new job, check queue length; if above threshold, return 429 Too Many Requests.
  • Timeouts: Configure HTTP request timeouts to avoid hanging on large payloads.

9. Deployment Considerations

  • Package as a standalone FastAPI app (e.g., stream3r_api.main:app).
  • Run under Uvicorn/Gunicorn with workers sized for I/O-bound traffic.
  • Configure service with the same environment variables as worker (STREAM3R_REDIS_URL, STREAM3R_DB_DSN, etc.).
  • Use infrastructure-as-code to provision Redis, Postgres, and S3 credentials shared with worker.

10. Future Enhancements

  • Job cancellation: Add DELETE /jobs/{id} to flag jobs for cancellation (requires worker support).
  • Scene-level dashboards: Aggregate artifacts from multiple jobs for a scene.
  • Signed download URLs: API could issue pre-signed URLs for public sharing, decoupled from worker credentials.
  • Batch submissions: Support uploading a tar/zip and asynchronously unpacking/validating frames.

References

  • design_docs/worker.md β€” Worker design and artifact contracts
  • stream3r/worker/tasks.py β€” Concrete payload fields exchanged with the API