# STream3R — Jobs, Events, and Storage Design ## **Executive Summary** The **STream3R Job System** provides an asynchronous GPU job orchestration layer for 3D scene reconstruction and perception tasks. It standardizes how **pose and world-coordinate extraction** and **scene model building** are executed, stored, and tracked across services. ### **Primary Goals** - **Asynchronous GPU processing:** All heavy inference runs on background RQ workers; FastAPI services only enqueue jobs and monitor progress. - **Unified observability:** - **Redis Streams** for job lifecycle events and progress (`stream3r:events`) - **Postgres (`stream3r_jobs`)** as the canonical job record - **S3/Backblaze** for durable artifacts and results - **Two calling modes:** - `get_pose_and_world_coords` → **Streams-based (Option A)** for near-real-time updates - `create_model` → **Polling (Option B)** for long-running model generation - **Consistent storage under** `/scene/{scene_id}/stream3r/`, containing: - `kv_cache.pt` — serialized key/value cache state - `predictions.npz` — packed outputs from the model build - `session_settings.json` — runtime/config parameters - `selected_frames.json` — frame subset selection - `scene.glb` — final assembled scene model - `poses.jsonl` — per-frame extrinsics (camera poses) - `pointmaps/*.npz` — per-frame world coordinates + confidence maps ### **Key Outcomes** - Clean separation of API ↔ GPU worker responsibilities - Event-driven feedback for quick jobs; reliable polling for long ones - Durable, versioned scene data under a unified layout - End-to-end traceability of all STream3R jobs via Redis + Postgres + S3 --- ## 1. Queues, Streams, and Locks | Component | Purpose | Notes | |------------|----------|-------| | `pose_pointmap` | RQ queue for latency-sensitive `pose_pointmap` jobs | | | `model_build` | RQ queue for long `model_build` jobs | | | `stream3r:events` | Redis Stream for all job events (`started`, `progress`, `finished`, `failed`) | trimmed periodically | | `gpu:lock` | Redis lock ensuring single GPU job at a time per machine | | Each Stream event is a flat map of strings: ``` job_id, job_type, scene_id, status, progress, result_url, model_dir, error, ts ``` --- ## 2. S3 / Backblaze Storage Layout All STream3R artifacts live under a **scene folder**: ``` s3:///scene/{scene_id}/stream3r/ results/ {job_id}.json # per-job result JSON (pose_pointmap) models/ kv_cache.pt # serialized KV cache predictions.npz # packed model outputs session_settings.json # runtime/config settings selected_frames.json # frame subset indices scene.glb # fused 3D scene poses.jsonl # per-frame extrinsics summary.json # canonical model_build result JSON pointmaps/ {frame_token}.npz # per-frame world_coords + confidence ```` **Key Result URLs** - Pose/pointmap job → `s3://.../scene/{scene_id}/stream3r/results/{job_id}.json` - Model build job → `s3://.../scene/{scene_id}/stream3r/models/summary.json` --- ## 3. Database: `stream3r_jobs` Canonical job table in Postgres. ```sql CREATE TABLE IF NOT EXISTS stream3r_jobs ( job_id UUID PRIMARY KEY, job_type TEXT NOT NULL, -- 'pose_pointmap' | 'model_build' scene_id TEXT NOT NULL, status TEXT NOT NULL, -- 'queued' | 'started' | 'finished' | 'failed' created_at TIMESTAMPTZ NOT NULL DEFAULT now(), started_at TIMESTAMPTZ, completed_at TIMESTAMPTZ, payload JSONB, -- enqueue-time payload result JSONB, -- URLs / metrics error TEXT ); CREATE INDEX IF NOT EXISTS stream3r_jobs_scene_id_idx ON stream3r_jobs(scene_id); CREATE INDEX IF NOT EXISTS stream3r_jobs_status_idx ON stream3r_jobs(status); ```` **Upsert pattern:** * Insert on enqueue (`queued`) * Update on start → `started` * Update on finish → `finished`, add `result` * Update on failure → `failed`, add `error` --- ## 4. Result JSON Schemas ### a. Pose + World Coords (per-frame) `s3://…/scene/{scene_id}/stream3r/results/{job_id}.json` ```json { "job_id": "uuid", "job_type": "pose_pointmap", "scene_id": "SCENE123", "artifacts": { "pointmap_url": "s3://.../scene/SCENE123/stream3r/pointmaps/frame_000010.npz" }, "pose": { "R": [[...]], "t": [x, y, z] }, "intrinsics": { "fx":..., "fy":..., "cx":..., "cy":... }, "metrics": { "runtime_s": 1.23 }, "stream3r": { "cfg": "configs/stream3r_base.yaml", "commit": "" } } ``` ### b. Model Build (scene-level) `s3://…/scene/{scene_id}/stream3r/models/summary.json` ```json { "job_id": "uuid", "job_type": "model_build", "scene_id": "SCENE123", "artifacts": { "model_dir": "s3://.../scene/SCENE123/stream3r/models/", "kv_cache": "s3://.../scene/SCENE123/stream3r/models/kv_cache.pt", "predictions": "s3://.../scene/SCENE123/stream3r/models/predictions.npz", "session_settings": "s3://.../scene/SCENE123/stream3r/models/session_settings.json", "selected_frames": "s3://.../scene/SCENE123/stream3r/models/selected_frames.json", "scene_glb": "s3://.../scene/SCENE123/stream3r/models/scene.glb", "poses_jsonl": "s3://.../scene/SCENE123/stream3r/models/poses.jsonl" }, "metrics": { "frames": 128, "runtime_s": 42.3 }, "stream3r": { "cfg": "configs/stream3r_base.yaml", "commit": "" } } ``` --- ## 5. Caller API Responsibilities ### `get_pose_and_world_coords` → **Option A (Streams)** 1. Enqueue job → get `job_id` 2. `XREAD BLOCK` on `stream3r:events` until `status=finished` 3. On finish: * Fetch `result_url` * Load JSON → retrieve `pose`, `intrinsics`, and `pointmap_url` * Download `.npz` to get `world_coords` + `confidence` ### `create_model` → **Option B (Polling)** 1. Enqueue job → return `job_id` immediately 2. Periodically poll `GET /jobs/{job_id}` 3. On `finished`: * Read `result` with `result_url` + `model_dir` * Download `summary.json` and listed model files --- ## 6. Worker Event & Persistence Flow 1. **Acquire GPU lock** 2. **Emit** `started` 3. **Upsert** DB row (`stream3r_jobs`) 4. **Run inference**, emitting `progress` events (every N frames) 5. **Save** artifacts to S3: * `pointmaps/*.npz` with `{xyz, conf}` * `poses.jsonl` * Model outputs listed above 6. **Write** result JSON → emit `finished` 7. **Update** DB row → `status=finished, result=…` 8. On error → emit `failed`, update DB --- ## 7. Example Event Payloads (Redis Stream) **Started** ``` job_id=uuid job_type=pose_pointmap scene_id=SCENE123 status=started progress=1 ts=1730312345.12 ``` **Progress** ``` job_id=uuid job_type=model_build scene_id=SCENE123 status=progress progress=40 ts=1730312456.22 ``` **Finished** ``` job_id=uuid job_type=model_build scene_id=SCENE123 status=finished progress=100 result_url=s3://bucket/scene/SCENE123/stream3r/models/summary.json model_dir=s3://bucket/scene/SCENE123/stream3r/models/ ts=1730312567.33 ``` **Failed** ``` job_id=uuid job_type=pose_pointmap scene_id=SCENE123 status=failed error=RuntimeError: CUDA OOM ts=1730312570.00 ``` --- ## 8. Operational Guidelines | Concern | Best Practice | | -------------------------- | --------------------------------------------------------------- | | **GPU Safety** | Use `gpu:lock` to serialize jobs per GPU | | **Redis Stream retention** | `XTRIM stream3r:events MAXLEN ~50000` | | **Durability** | All artifacts and summaries must persist to S3/Backblaze | | **DB Reliability** | Upsert on each transition; retry writes if DB unavailable | | **Idempotency** | Support caller-supplied `job_id` or `request_id` | | **Security** | Keep Redis internal; use signed or private S3 URLs | | **Backpressure** | Enqueueing API should reject (`429`) when queue depth too large | --- ## 9. End-to-End Flows ### 🔹 Pose + World Coords (short job) 1. API enqueues job → returns `job_id` 2. Client subscribes via Redis Stream (blocking XREAD) 3. Worker runs inference → writes `pointmap.npz` + `result.json` 4. Worker emits `finished` → client downloads results ### 🔹 Model Build (long job) 1. API enqueues → returns `job_id` 2. Client polls `GET /jobs/{id}` or DB row 3. Worker fuses frames → writes full scene model files 4. Worker updates DB + emits `finished` 5. Client retrieves `summary.json` + artifacts under `/scene/{scene_id}/stream3r/models/` --- ## 10. Summary | Component | Responsibility | Persistence | | ------------------------------ | --------------------------------------- | ------------------------------- | | **FastAPI API** | Enqueue jobs, expose `/jobs/{id}` | DB (via worker), Redis (events) | | **GPU Worker** | Execute STream3R inference, emit events | S3/Backblaze, DB | | **Redis Streams** | Event bus for progress + completion | ephemeral | | **Postgres (`stream3r_jobs`)** | Canonical job record | durable | | **S3/Backblaze /scene/** | Scene artifacts, model data | durable | --- **Outcome:** This design provides an **asynchronous, event-driven, and durable** framework for managing STream3R GPU jobs, with standardized scene storage, traceable job metadata, and clear integration points for both real-time and long-running workflows. ``` ```