brian4dwell's picture
initi worker working
1c5aca1
# STream3R β€” Jobs, Events, and Storage Design
## **Executive Summary**
The **STream3R Job System** provides an asynchronous GPU job orchestration layer for 3D scene reconstruction and perception tasks.
It standardizes how **pose and world-coordinate extraction** and **scene model building** are executed, stored, and tracked across services.
### **Primary Goals**
- **Asynchronous GPU processing:**
All heavy inference runs on background RQ workers; FastAPI services only enqueue jobs and monitor progress.
- **Unified observability:**
- **Redis Streams** for job lifecycle events and progress (`stream3r:events`)
- **Postgres (`stream3r_jobs`)** as the canonical job record
- **S3/Backblaze** for durable artifacts and results
- **Two calling modes:**
- `get_pose_and_world_coords` β†’ **Streams-based (Option A)** for near-real-time updates
- `create_model` β†’ **Polling (Option B)** for long-running model generation
- **Consistent storage under** `/scene/{scene_id}/stream3r/`, containing:
- `kv_cache.pt` β€” serialized key/value cache state
- `predictions.npz` β€” packed outputs from the model build
- `session_settings.json` β€” runtime/config parameters
- `selected_frames.json` β€” frame subset selection
- `scene.glb` β€” final assembled scene model
- `poses.jsonl` β€” per-frame extrinsics (camera poses)
- `pointmaps/*.npz` β€” per-frame world coordinates + confidence maps
### **Key Outcomes**
- Clean separation of API ↔ GPU worker responsibilities
- Event-driven feedback for quick jobs; reliable polling for long ones
- Durable, versioned scene data under a unified layout
- End-to-end traceability of all STream3R jobs via Redis + Postgres + S3
---
## 1. Queues, Streams, and Locks
| Component | Purpose | Notes |
|------------|----------|-------|
| `pose_pointmap` | RQ queue for latency-sensitive `pose_pointmap` jobs | |
| `model_build` | RQ queue for long `model_build` jobs | |
| `stream3r:events` | Redis Stream for all job events (`started`, `progress`, `finished`, `failed`) | trimmed periodically |
| `gpu:lock` | Redis lock ensuring single GPU job at a time per machine | |
Each Stream event is a flat map of strings:
```
job_id, job_type, scene_id,
status, progress, result_url, model_dir,
error, ts
```
---
## 2. S3 / Backblaze Storage Layout
All STream3R artifacts live under a **scene folder**:
```
s3://<bucket>/scene/{scene_id}/stream3r/
results/
{job_id}.json # per-job result JSON (pose_pointmap)
models/
kv_cache.pt # serialized KV cache
predictions.npz # packed model outputs
session_settings.json # runtime/config settings
selected_frames.json # frame subset indices
scene.glb # fused 3D scene
poses.jsonl # per-frame extrinsics
summary.json # canonical model_build result JSON
pointmaps/
{frame_token}.npz # per-frame world_coords + confidence
````
**Key Result URLs**
- Pose/pointmap job β†’ `s3://.../scene/{scene_id}/stream3r/results/{job_id}.json`
- Model build job β†’ `s3://.../scene/{scene_id}/stream3r/models/summary.json`
---
## 3. Database: `stream3r_jobs`
Canonical job table in Postgres.
```sql
CREATE TABLE IF NOT EXISTS stream3r_jobs (
job_id UUID PRIMARY KEY,
job_type TEXT NOT NULL, -- 'pose_pointmap' | 'model_build'
scene_id TEXT NOT NULL,
status TEXT NOT NULL, -- 'queued' | 'started' | 'finished' | 'failed'
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
payload JSONB, -- enqueue-time payload
result JSONB, -- URLs / metrics
error TEXT
);
CREATE INDEX IF NOT EXISTS stream3r_jobs_scene_id_idx ON stream3r_jobs(scene_id);
CREATE INDEX IF NOT EXISTS stream3r_jobs_status_idx ON stream3r_jobs(status);
````
**Upsert pattern:**
* Insert on enqueue (`queued`)
* Update on start β†’ `started`
* Update on finish β†’ `finished`, add `result`
* Update on failure β†’ `failed`, add `error`
---
## 4. Result JSON Schemas
### a. Pose + World Coords (per-frame)
`s3://…/scene/{scene_id}/stream3r/results/{job_id}.json`
```json
{
"job_id": "uuid",
"job_type": "pose_pointmap",
"scene_id": "SCENE123",
"artifacts": {
"pointmap_url": "s3://.../scene/SCENE123/stream3r/pointmaps/frame_000010.npz"
},
"pose": { "R": [[...]], "t": [x, y, z] },
"intrinsics": { "fx":..., "fy":..., "cx":..., "cy":... },
"metrics": { "runtime_s": 1.23 },
"stream3r": { "cfg": "configs/stream3r_base.yaml", "commit": "<git_sha>" }
}
```
### b. Model Build (scene-level)
`s3://…/scene/{scene_id}/stream3r/models/summary.json`
```json
{
"job_id": "uuid",
"job_type": "model_build",
"scene_id": "SCENE123",
"artifacts": {
"model_dir": "s3://.../scene/SCENE123/stream3r/models/",
"kv_cache": "s3://.../scene/SCENE123/stream3r/models/kv_cache.pt",
"predictions": "s3://.../scene/SCENE123/stream3r/models/predictions.npz",
"session_settings": "s3://.../scene/SCENE123/stream3r/models/session_settings.json",
"selected_frames": "s3://.../scene/SCENE123/stream3r/models/selected_frames.json",
"scene_glb": "s3://.../scene/SCENE123/stream3r/models/scene.glb",
"poses_jsonl": "s3://.../scene/SCENE123/stream3r/models/poses.jsonl"
},
"metrics": { "frames": 128, "runtime_s": 42.3 },
"stream3r": { "cfg": "configs/stream3r_base.yaml", "commit": "<git_sha>" }
}
```
---
## 5. Caller API Responsibilities
### `get_pose_and_world_coords` β†’ **Option A (Streams)**
1. Enqueue job β†’ get `job_id`
2. `XREAD BLOCK` on `stream3r:events` until `status=finished`
3. On finish:
* Fetch `result_url`
* Load JSON β†’ retrieve `pose`, `intrinsics`, and `pointmap_url`
* Download `.npz` to get `world_coords` + `confidence`
### `create_model` β†’ **Option B (Polling)**
1. Enqueue job β†’ return `job_id` immediately
2. Periodically poll `GET /jobs/{job_id}`
3. On `finished`:
* Read `result` with `result_url` + `model_dir`
* Download `summary.json` and listed model files
---
## 6. Worker Event & Persistence Flow
1. **Acquire GPU lock**
2. **Emit** `started`
3. **Upsert** DB row (`stream3r_jobs`)
4. **Run inference**, emitting `progress` events (every N frames)
5. **Save** artifacts to S3:
* `pointmaps/*.npz` with `{xyz, conf}`
* `poses.jsonl`
* Model outputs listed above
6. **Write** result JSON β†’ emit `finished`
7. **Update** DB row β†’ `status=finished, result=…`
8. On error β†’ emit `failed`, update DB
---
## 7. Example Event Payloads (Redis Stream)
**Started**
```
job_id=uuid
job_type=pose_pointmap
scene_id=SCENE123
status=started
progress=1
ts=1730312345.12
```
**Progress**
```
job_id=uuid
job_type=model_build
scene_id=SCENE123
status=progress
progress=40
ts=1730312456.22
```
**Finished**
```
job_id=uuid
job_type=model_build
scene_id=SCENE123
status=finished
progress=100
result_url=s3://bucket/scene/SCENE123/stream3r/models/summary.json
model_dir=s3://bucket/scene/SCENE123/stream3r/models/
ts=1730312567.33
```
**Failed**
```
job_id=uuid
job_type=pose_pointmap
scene_id=SCENE123
status=failed
error=RuntimeError: CUDA OOM
ts=1730312570.00
```
---
## 8. Operational Guidelines
| Concern | Best Practice |
| -------------------------- | --------------------------------------------------------------- |
| **GPU Safety** | Use `gpu:lock` to serialize jobs per GPU |
| **Redis Stream retention** | `XTRIM stream3r:events MAXLEN ~50000` |
| **Durability** | All artifacts and summaries must persist to S3/Backblaze |
| **DB Reliability** | Upsert on each transition; retry writes if DB unavailable |
| **Idempotency** | Support caller-supplied `job_id` or `request_id` |
| **Security** | Keep Redis internal; use signed or private S3 URLs |
| **Backpressure** | Enqueueing API should reject (`429`) when queue depth too large |
---
## 9. End-to-End Flows
### πŸ”Ή Pose + World Coords (short job)
1. API enqueues job β†’ returns `job_id`
2. Client subscribes via Redis Stream (blocking XREAD)
3. Worker runs inference β†’ writes `pointmap.npz` + `result.json`
4. Worker emits `finished` β†’ client downloads results
### πŸ”Ή Model Build (long job)
1. API enqueues β†’ returns `job_id`
2. Client polls `GET /jobs/{id}` or DB row
3. Worker fuses frames β†’ writes full scene model files
4. Worker updates DB + emits `finished`
5. Client retrieves `summary.json` + artifacts under `/scene/{scene_id}/stream3r/models/`
---
## 10. Summary
| Component | Responsibility | Persistence |
| ------------------------------ | --------------------------------------- | ------------------------------- |
| **FastAPI API** | Enqueue jobs, expose `/jobs/{id}` | DB (via worker), Redis (events) |
| **GPU Worker** | Execute STream3R inference, emit events | S3/Backblaze, DB |
| **Redis Streams** | Event bus for progress + completion | ephemeral |
| **Postgres (`stream3r_jobs`)** | Canonical job record | durable |
| **S3/Backblaze /scene/** | Scene artifacts, model data | durable |
---
**Outcome:**
This design provides an **asynchronous, event-driven, and durable** framework for managing STream3R GPU jobs, with standardized scene storage, traceable job metadata, and clear integration points for both real-time and long-running workflows.
```
```