Spaces:
Configuration error
Configuration error
File size: 9,954 Bytes
1c5aca1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 |
# STream3R β Jobs, Events, and Storage Design
## **Executive Summary**
The **STream3R Job System** provides an asynchronous GPU job orchestration layer for 3D scene reconstruction and perception tasks.
It standardizes how **pose and world-coordinate extraction** and **scene model building** are executed, stored, and tracked across services.
### **Primary Goals**
- **Asynchronous GPU processing:**
All heavy inference runs on background RQ workers; FastAPI services only enqueue jobs and monitor progress.
- **Unified observability:**
- **Redis Streams** for job lifecycle events and progress (`stream3r:events`)
- **Postgres (`stream3r_jobs`)** as the canonical job record
- **S3/Backblaze** for durable artifacts and results
- **Two calling modes:**
- `get_pose_and_world_coords` β **Streams-based (Option A)** for near-real-time updates
- `create_model` β **Polling (Option B)** for long-running model generation
- **Consistent storage under** `/scene/{scene_id}/stream3r/`, containing:
- `kv_cache.pt` β serialized key/value cache state
- `predictions.npz` β packed outputs from the model build
- `session_settings.json` β runtime/config parameters
- `selected_frames.json` β frame subset selection
- `scene.glb` β final assembled scene model
- `poses.jsonl` β per-frame extrinsics (camera poses)
- `pointmaps/*.npz` β per-frame world coordinates + confidence maps
### **Key Outcomes**
- Clean separation of API β GPU worker responsibilities
- Event-driven feedback for quick jobs; reliable polling for long ones
- Durable, versioned scene data under a unified layout
- End-to-end traceability of all STream3R jobs via Redis + Postgres + S3
---
## 1. Queues, Streams, and Locks
| Component | Purpose | Notes |
|------------|----------|-------|
| `pose_pointmap` | RQ queue for latency-sensitive `pose_pointmap` jobs | |
| `model_build` | RQ queue for long `model_build` jobs | |
| `stream3r:events` | Redis Stream for all job events (`started`, `progress`, `finished`, `failed`) | trimmed periodically |
| `gpu:lock` | Redis lock ensuring single GPU job at a time per machine | |
Each Stream event is a flat map of strings:
```
job_id, job_type, scene_id,
status, progress, result_url, model_dir,
error, ts
```
---
## 2. S3 / Backblaze Storage Layout
All STream3R artifacts live under a **scene folder**:
```
s3://<bucket>/scene/{scene_id}/stream3r/
results/
{job_id}.json # per-job result JSON (pose_pointmap)
models/
kv_cache.pt # serialized KV cache
predictions.npz # packed model outputs
session_settings.json # runtime/config settings
selected_frames.json # frame subset indices
scene.glb # fused 3D scene
poses.jsonl # per-frame extrinsics
summary.json # canonical model_build result JSON
pointmaps/
{frame_token}.npz # per-frame world_coords + confidence
````
**Key Result URLs**
- Pose/pointmap job β `s3://.../scene/{scene_id}/stream3r/results/{job_id}.json`
- Model build job β `s3://.../scene/{scene_id}/stream3r/models/summary.json`
---
## 3. Database: `stream3r_jobs`
Canonical job table in Postgres.
```sql
CREATE TABLE IF NOT EXISTS stream3r_jobs (
job_id UUID PRIMARY KEY,
job_type TEXT NOT NULL, -- 'pose_pointmap' | 'model_build'
scene_id TEXT NOT NULL,
status TEXT NOT NULL, -- 'queued' | 'started' | 'finished' | 'failed'
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
payload JSONB, -- enqueue-time payload
result JSONB, -- URLs / metrics
error TEXT
);
CREATE INDEX IF NOT EXISTS stream3r_jobs_scene_id_idx ON stream3r_jobs(scene_id);
CREATE INDEX IF NOT EXISTS stream3r_jobs_status_idx ON stream3r_jobs(status);
````
**Upsert pattern:**
* Insert on enqueue (`queued`)
* Update on start β `started`
* Update on finish β `finished`, add `result`
* Update on failure β `failed`, add `error`
---
## 4. Result JSON Schemas
### a. Pose + World Coords (per-frame)
`s3://β¦/scene/{scene_id}/stream3r/results/{job_id}.json`
```json
{
"job_id": "uuid",
"job_type": "pose_pointmap",
"scene_id": "SCENE123",
"artifacts": {
"pointmap_url": "s3://.../scene/SCENE123/stream3r/pointmaps/frame_000010.npz"
},
"pose": { "R": [[...]], "t": [x, y, z] },
"intrinsics": { "fx":..., "fy":..., "cx":..., "cy":... },
"metrics": { "runtime_s": 1.23 },
"stream3r": { "cfg": "configs/stream3r_base.yaml", "commit": "<git_sha>" }
}
```
### b. Model Build (scene-level)
`s3://β¦/scene/{scene_id}/stream3r/models/summary.json`
```json
{
"job_id": "uuid",
"job_type": "model_build",
"scene_id": "SCENE123",
"artifacts": {
"model_dir": "s3://.../scene/SCENE123/stream3r/models/",
"kv_cache": "s3://.../scene/SCENE123/stream3r/models/kv_cache.pt",
"predictions": "s3://.../scene/SCENE123/stream3r/models/predictions.npz",
"session_settings": "s3://.../scene/SCENE123/stream3r/models/session_settings.json",
"selected_frames": "s3://.../scene/SCENE123/stream3r/models/selected_frames.json",
"scene_glb": "s3://.../scene/SCENE123/stream3r/models/scene.glb",
"poses_jsonl": "s3://.../scene/SCENE123/stream3r/models/poses.jsonl"
},
"metrics": { "frames": 128, "runtime_s": 42.3 },
"stream3r": { "cfg": "configs/stream3r_base.yaml", "commit": "<git_sha>" }
}
```
---
## 5. Caller API Responsibilities
### `get_pose_and_world_coords` β **Option A (Streams)**
1. Enqueue job β get `job_id`
2. `XREAD BLOCK` on `stream3r:events` until `status=finished`
3. On finish:
* Fetch `result_url`
* Load JSON β retrieve `pose`, `intrinsics`, and `pointmap_url`
* Download `.npz` to get `world_coords` + `confidence`
### `create_model` β **Option B (Polling)**
1. Enqueue job β return `job_id` immediately
2. Periodically poll `GET /jobs/{job_id}`
3. On `finished`:
* Read `result` with `result_url` + `model_dir`
* Download `summary.json` and listed model files
---
## 6. Worker Event & Persistence Flow
1. **Acquire GPU lock**
2. **Emit** `started`
3. **Upsert** DB row (`stream3r_jobs`)
4. **Run inference**, emitting `progress` events (every N frames)
5. **Save** artifacts to S3:
* `pointmaps/*.npz` with `{xyz, conf}`
* `poses.jsonl`
* Model outputs listed above
6. **Write** result JSON β emit `finished`
7. **Update** DB row β `status=finished, result=β¦`
8. On error β emit `failed`, update DB
---
## 7. Example Event Payloads (Redis Stream)
**Started**
```
job_id=uuid
job_type=pose_pointmap
scene_id=SCENE123
status=started
progress=1
ts=1730312345.12
```
**Progress**
```
job_id=uuid
job_type=model_build
scene_id=SCENE123
status=progress
progress=40
ts=1730312456.22
```
**Finished**
```
job_id=uuid
job_type=model_build
scene_id=SCENE123
status=finished
progress=100
result_url=s3://bucket/scene/SCENE123/stream3r/models/summary.json
model_dir=s3://bucket/scene/SCENE123/stream3r/models/
ts=1730312567.33
```
**Failed**
```
job_id=uuid
job_type=pose_pointmap
scene_id=SCENE123
status=failed
error=RuntimeError: CUDA OOM
ts=1730312570.00
```
---
## 8. Operational Guidelines
| Concern | Best Practice |
| -------------------------- | --------------------------------------------------------------- |
| **GPU Safety** | Use `gpu:lock` to serialize jobs per GPU |
| **Redis Stream retention** | `XTRIM stream3r:events MAXLEN ~50000` |
| **Durability** | All artifacts and summaries must persist to S3/Backblaze |
| **DB Reliability** | Upsert on each transition; retry writes if DB unavailable |
| **Idempotency** | Support caller-supplied `job_id` or `request_id` |
| **Security** | Keep Redis internal; use signed or private S3 URLs |
| **Backpressure** | Enqueueing API should reject (`429`) when queue depth too large |
---
## 9. End-to-End Flows
### πΉ Pose + World Coords (short job)
1. API enqueues job β returns `job_id`
2. Client subscribes via Redis Stream (blocking XREAD)
3. Worker runs inference β writes `pointmap.npz` + `result.json`
4. Worker emits `finished` β client downloads results
### πΉ Model Build (long job)
1. API enqueues β returns `job_id`
2. Client polls `GET /jobs/{id}` or DB row
3. Worker fuses frames β writes full scene model files
4. Worker updates DB + emits `finished`
5. Client retrieves `summary.json` + artifacts under `/scene/{scene_id}/stream3r/models/`
---
## 10. Summary
| Component | Responsibility | Persistence |
| ------------------------------ | --------------------------------------- | ------------------------------- |
| **FastAPI API** | Enqueue jobs, expose `/jobs/{id}` | DB (via worker), Redis (events) |
| **GPU Worker** | Execute STream3R inference, emit events | S3/Backblaze, DB |
| **Redis Streams** | Event bus for progress + completion | ephemeral |
| **Postgres (`stream3r_jobs`)** | Canonical job record | durable |
| **S3/Backblaze /scene/** | Scene artifacts, model data | durable |
---
**Outcome:**
This design provides an **asynchronous, event-driven, and durable** framework for managing STream3R GPU jobs, with standardized scene storage, traceable job metadata, and clear integration points for both real-time and long-running workflows.
```
```
|