brian4dwell's picture
initi worker working
1c5aca1

STream3R β€” Jobs, Events, and Storage Design

Executive Summary

The STream3R Job System provides an asynchronous GPU job orchestration layer for 3D scene reconstruction and perception tasks.
It standardizes how pose and world-coordinate extraction and scene model building are executed, stored, and tracked across services.

Primary Goals

  • Asynchronous GPU processing:
    All heavy inference runs on background RQ workers; FastAPI services only enqueue jobs and monitor progress.
  • Unified observability:
    • Redis Streams for job lifecycle events and progress (stream3r:events)
    • Postgres (stream3r_jobs) as the canonical job record
    • S3/Backblaze for durable artifacts and results
  • Two calling modes:
    • get_pose_and_world_coords β†’ Streams-based (Option A) for near-real-time updates
    • create_model β†’ Polling (Option B) for long-running model generation
  • Consistent storage under /scene/{scene_id}/stream3r/, containing:
    • kv_cache.pt β€” serialized key/value cache state
    • predictions.npz β€” packed outputs from the model build
    • session_settings.json β€” runtime/config parameters
    • selected_frames.json β€” frame subset selection
    • scene.glb β€” final assembled scene model
    • poses.jsonl β€” per-frame extrinsics (camera poses)
    • pointmaps/*.npz β€” per-frame world coordinates + confidence maps

Key Outcomes

  • Clean separation of API ↔ GPU worker responsibilities
  • Event-driven feedback for quick jobs; reliable polling for long ones
  • Durable, versioned scene data under a unified layout
  • End-to-end traceability of all STream3R jobs via Redis + Postgres + S3

1. Queues, Streams, and Locks

Component Purpose Notes
pose_pointmap RQ queue for latency-sensitive pose_pointmap jobs
model_build RQ queue for long model_build jobs
stream3r:events Redis Stream for all job events (started, progress, finished, failed) trimmed periodically
gpu:lock Redis lock ensuring single GPU job at a time per machine

Each Stream event is a flat map of strings:


job_id, job_type, scene_id,
status, progress, result_url, model_dir,
error, ts

2. S3 / Backblaze Storage Layout

All STream3R artifacts live under a scene folder:


s3://<bucket>/scene/{scene_id}/stream3r/
results/
{job_id}.json                     # per-job result JSON (pose_pointmap)
models/
kv_cache.pt                       # serialized KV cache
predictions.npz                   # packed model outputs
session_settings.json             # runtime/config settings
selected_frames.json              # frame subset indices
scene.glb                         # fused 3D scene
poses.jsonl                       # per-frame extrinsics
summary.json                      # canonical model_build result JSON
pointmaps/
{frame_token}.npz                 # per-frame world_coords + confidence

Key Result URLs

  • Pose/pointmap job β†’ s3://.../scene/{scene_id}/stream3r/results/{job_id}.json
  • Model build job β†’ s3://.../scene/{scene_id}/stream3r/models/summary.json

3. Database: stream3r_jobs

Canonical job table in Postgres.

CREATE TABLE IF NOT EXISTS stream3r_jobs (
  job_id         UUID PRIMARY KEY,
  job_type       TEXT NOT NULL,           -- 'pose_pointmap' | 'model_build'
  scene_id       TEXT NOT NULL,
  status         TEXT NOT NULL,           -- 'queued' | 'started' | 'finished' | 'failed'
  created_at     TIMESTAMPTZ NOT NULL DEFAULT now(),
  started_at     TIMESTAMPTZ,
  completed_at   TIMESTAMPTZ,
  payload        JSONB,                   -- enqueue-time payload
  result         JSONB,                   -- URLs / metrics
  error          TEXT
);

CREATE INDEX IF NOT EXISTS stream3r_jobs_scene_id_idx ON stream3r_jobs(scene_id);
CREATE INDEX IF NOT EXISTS stream3r_jobs_status_idx   ON stream3r_jobs(status);

Upsert pattern:

  • Insert on enqueue (queued)
  • Update on start β†’ started
  • Update on finish β†’ finished, add result
  • Update on failure β†’ failed, add error

4. Result JSON Schemas

a. Pose + World Coords (per-frame)

s3://…/scene/{scene_id}/stream3r/results/{job_id}.json

{
  "job_id": "uuid",
  "job_type": "pose_pointmap",
  "scene_id": "SCENE123",
  "artifacts": {
    "pointmap_url": "s3://.../scene/SCENE123/stream3r/pointmaps/frame_000010.npz"
  },
  "pose": { "R": [[...]], "t": [x, y, z] },
  "intrinsics": { "fx":..., "fy":..., "cx":..., "cy":... },
  "metrics": { "runtime_s": 1.23 },
  "stream3r": { "cfg": "configs/stream3r_base.yaml", "commit": "<git_sha>" }
}

b. Model Build (scene-level)

s3://…/scene/{scene_id}/stream3r/models/summary.json

{
  "job_id": "uuid",
  "job_type": "model_build",
  "scene_id": "SCENE123",
  "artifacts": {
    "model_dir":        "s3://.../scene/SCENE123/stream3r/models/",
    "kv_cache":         "s3://.../scene/SCENE123/stream3r/models/kv_cache.pt",
    "predictions":      "s3://.../scene/SCENE123/stream3r/models/predictions.npz",
    "session_settings": "s3://.../scene/SCENE123/stream3r/models/session_settings.json",
    "selected_frames":  "s3://.../scene/SCENE123/stream3r/models/selected_frames.json",
    "scene_glb":        "s3://.../scene/SCENE123/stream3r/models/scene.glb",
    "poses_jsonl":      "s3://.../scene/SCENE123/stream3r/models/poses.jsonl"
  },
  "metrics": { "frames": 128, "runtime_s": 42.3 },
  "stream3r": { "cfg": "configs/stream3r_base.yaml", "commit": "<git_sha>" }
}

5. Caller API Responsibilities

get_pose_and_world_coords β†’ Option A (Streams)

  1. Enqueue job β†’ get job_id

  2. XREAD BLOCK on stream3r:events until status=finished

  3. On finish:

    • Fetch result_url
    • Load JSON β†’ retrieve pose, intrinsics, and pointmap_url
    • Download .npz to get world_coords + confidence

create_model β†’ Option B (Polling)

  1. Enqueue job β†’ return job_id immediately

  2. Periodically poll GET /jobs/{job_id}

  3. On finished:

    • Read result with result_url + model_dir
    • Download summary.json and listed model files

6. Worker Event & Persistence Flow

  1. Acquire GPU lock

  2. Emit started

  3. Upsert DB row (stream3r_jobs)

  4. Run inference, emitting progress events (every N frames)

  5. Save artifacts to S3:

    • pointmaps/*.npz with {xyz, conf}
    • poses.jsonl
    • Model outputs listed above
  6. Write result JSON β†’ emit finished

  7. Update DB row β†’ status=finished, result=…

  8. On error β†’ emit failed, update DB


7. Example Event Payloads (Redis Stream)

Started

job_id=uuid
job_type=pose_pointmap
scene_id=SCENE123
status=started
progress=1
ts=1730312345.12

Progress

job_id=uuid
job_type=model_build
scene_id=SCENE123
status=progress
progress=40
ts=1730312456.22

Finished

job_id=uuid
job_type=model_build
scene_id=SCENE123
status=finished
progress=100
result_url=s3://bucket/scene/SCENE123/stream3r/models/summary.json
model_dir=s3://bucket/scene/SCENE123/stream3r/models/
ts=1730312567.33

Failed

job_id=uuid
job_type=pose_pointmap
scene_id=SCENE123
status=failed
error=RuntimeError: CUDA OOM
ts=1730312570.00

8. Operational Guidelines

Concern Best Practice
GPU Safety Use gpu:lock to serialize jobs per GPU
Redis Stream retention XTRIM stream3r:events MAXLEN ~50000
Durability All artifacts and summaries must persist to S3/Backblaze
DB Reliability Upsert on each transition; retry writes if DB unavailable
Idempotency Support caller-supplied job_id or request_id
Security Keep Redis internal; use signed or private S3 URLs
Backpressure Enqueueing API should reject (429) when queue depth too large

9. End-to-End Flows

πŸ”Ή Pose + World Coords (short job)

  1. API enqueues job β†’ returns job_id
  2. Client subscribes via Redis Stream (blocking XREAD)
  3. Worker runs inference β†’ writes pointmap.npz + result.json
  4. Worker emits finished β†’ client downloads results

πŸ”Ή Model Build (long job)

  1. API enqueues β†’ returns job_id
  2. Client polls GET /jobs/{id} or DB row
  3. Worker fuses frames β†’ writes full scene model files
  4. Worker updates DB + emits finished
  5. Client retrieves summary.json + artifacts under /scene/{scene_id}/stream3r/models/

10. Summary

Component Responsibility Persistence
FastAPI API Enqueue jobs, expose /jobs/{id} DB (via worker), Redis (events)
GPU Worker Execute STream3R inference, emit events S3/Backblaze, DB
Redis Streams Event bus for progress + completion ephemeral
Postgres (stream3r_jobs) Canonical job record durable
S3/Backblaze /scene/ Scene artifacts, model data durable

Outcome: This design provides an asynchronous, event-driven, and durable framework for managing STream3R GPU jobs, with standardized scene storage, traceable job metadata, and clear integration points for both real-time and long-running workflows.