Spaces:
Configuration error
Configuration error
File size: 10,493 Bytes
1c5aca1 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
# STream3R API β Job Orchestration and Integration Plan
## Executive Summary
This document proposes a lightweight STream3R API service that wraps the async job system implemented by the RQ worker. The API exposes RESTful endpoints and Redis Stream subscriptions that allow upstream applications to submit reconstruction jobs, track progress, and retrieve completed artifacts. Responsibilities are split cleanly: the API handles request validation, persistence, and orchestration, while the GPU worker (documented in `design_docs/worker.md`) performs heavy inference and storage of artifacts.
**Key outcomes**
- Unified interface for both short `pose_pointmap` jobs and long-running `model_build` jobs.
- Consistent job lifecycle backed by Postgres and Redis, mirroring the worker contract.
- Environment-agnostic integration so other services can enqueue jobs or consume progress events without GPU access.
---
## 1. Scope & Goals
### Goals
- Provide a REST API for submitting jobs and querying job status or results.
- Offer optional server-sent events (SSE) or WebSocket feeds for near-real-time updates on `pose_pointmap` jobs.
- Enforce validation and idempotency for job submissions.
- Expose artifacts written by the worker (S3 URLs, local paths) without re-hosting the files.
### Non-Goals
- Implement GPU inference or artifact generation (handled by RQ worker).
- Manage long-term artifact retention or CDN delivery.
- Provide fine-grained authorization beyond bearer token / API key patterns (left to integration).
---
## 2. Architecture Overview
```
Client ββHTTPβββΊ STream3R API ββRQ EnqueueβββΊ Redis Queue ββΊ Worker
β β β
β βββPostgresβββββββββββββββΊβ
β βββRedis Stream (events)βββ
β βββS3 (artifact URLs)ββββββ Worker writes
β
βββ Poll `/jobs/{id}` or Subscribe SSE/WebSocket for progress updates
```
Components:
- **FastAPI** service (recommended) running behind an ASGI server.
- **Redis**: shared with worker for queues and events.
- **Postgres**: `stream3r_jobs` table is the canonical job record.
- **S3/Backblaze** (or local storage): artifact URLs returned by worker.
- **RQ worker** (implemented separately) executing jobs and updating state.
---
## 3. Endpoints
### `POST /jobs`
Submit a job for either `pose_pointmap` or `model_build`.
**Request body**
```json
{
"job_type": "pose_pointmap",
"scene_id": "SCENE123",
"mode": "causal",
"streaming": true,
"frames": [
{"url": "https://.../frame_0000.jpg"},
{"path": "/data/captures/frame_0001.png"}
],
"session_settings": {"prediction_mode": "Predicted Pointmap"},
"client_request_id": "optional-idempotency-key"
}
```
**Behavior**
- Validate payload (non-empty frames, supported job type, etc.).
- If `client_request_id` is provided, search Postgres for an existing job with the same key to ensure idempotency.
- Assign `job_id` (UUID) and enqueue the payload into the appropriate RQ queue (`pose_pointmap` or `model_build`).
- Insert `stream3r_jobs` row with `status=queued`.
- Return `202 Accepted` with job metadata:
```json
{
"job_id": "uuid",
"status": "queued",
"job_type": "pose_pointmap",
"scene_id": "SCENE123"
}
```
### `GET /jobs/{job_id}`
Fetch job state and artifact references from Postgres.
**Response example** (`status=finished`)
```json
{
"job_id": "uuid",
"job_type": "model_build",
"scene_id": "SCENE123",
"status": "finished",
"created_at": "...",
"started_at": "...",
"completed_at": "...",
"result": {
"result_url": "s3://bucket/scene/SCENE123/stream3r/models/summary.json",
"model_dir": "s3://bucket/scene/SCENE123/stream3r/models/",
"artifacts": {
"scene_glb_url": "...",
"poses_url": "...",
"pointmaps": [ {"frame_id": "frame_0000", "url": "..."} ]
}
},
"error": null
}
```
### `GET /jobs/{job_id}/events`
Server-Sent Events endpoint bridging Redis Streams.
- Uses `XREAD` on `stream3r:events` with `job_id` filter.
- Suitable for browser or gateway consumers needing near-real-time progress.
- Emits lines like:
```
event: progress
data: {"progress": 60, "status": "progress"}
event: finished
data: {"result_url": "s3://..."}
```
Optionally provide a WebSocket variant if SSE is insufficient.
### `GET /jobs`
Paged listing/filtering (optional but useful for dashboards).
Parameters: `scene_id`, `job_type`, `status`, pagination cursors.
---
## 4. Data Model & Persistence
### Postgres (`stream3r_jobs`)
The API is the authoritative owner of the job record. It should create and migrate the following schema during startup (extend with `client_request_id` if idempotency keys are required):
```sql
CREATE TABLE IF NOT EXISTS stream3r_jobs (
job_id UUID PRIMARY KEY,
job_type TEXT NOT NULL, -- 'pose_pointmap' | 'model_build'
scene_id TEXT NOT NULL,
status TEXT NOT NULL, -- 'queued' | 'started' | 'finished' | 'failed'
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
payload JSONB, -- enqueue-time payload
result JSONB, -- worker-published result bundle
error TEXT,
client_request_id TEXT UNIQUE -- optional idempotency key
);
CREATE INDEX IF NOT EXISTS stream3r_jobs_scene_id_idx ON stream3r_jobs(scene_id);
CREATE INDEX IF NOT EXISTS stream3r_jobs_status_idx ON stream3r_jobs(status);
```
### Redis
- **Queues**: two RQ queues existβ`pose_pointmap` for latency-sensitive pose extraction and `model_build` for long reconstruction jobs. The API selects the queue based on `job_type`.
- **Event Stream**: the worker pushes lifecycle updates to the Redis Stream `stream3r:events`. Every entry is a flat map with the following fields:
```
job_id, job_type, scene_id,
status, progress, result_url, model_dir,
error, ts
```
`status` takes `started`, `progress`, `finished`, or `failed`. `progress` is an integer percentage (0β100). `result_url` and `model_dir` mirror the URLs stored in Postgres when a job completes.
### Artifact Storage Layout
The worker persists artifacts to S3/Backblaze (or local storage) under a deterministic folder hierarchy. The API does not move files but should surface these URLs verbatim so consumers know where to fetch results:
```
scene/{scene_id}/stream3r/
results/
{job_id}.json # per-job result JSON (pose_pointmap)
models/
kv_cache.pt # serialized KV cache
predictions.npz # packed model outputs
session_settings.json # runtime/config settings
selected_frames.json # frame subset indices (optional)
scene.glb # fused 3D scene
poses.jsonl # per-frame extrinsics
summary.json # canonical model_build result JSON
pointmaps/
{frame_token}.npz # per-frame world coords + confidence
```
`pose_pointmap` jobs typically populate `results/{job_id}.json` plus `pointmaps/`; `model_build` jobs populate the `models/` subtree. All URLs returned by the worker use this structure.
---
## 5. Job Lifecycle
1. **Submit** (`POST /jobs`):
- Validate input, persist `queued` row, enqueue payload.
- Return `job_id`.
2. **Worker processing**:
- Worker acquires GPU lock, runs inference, streams events, writes artifacts, and updates DB.
3. **Status checks**:
- Clients poll `GET /jobs/{id}` or subscribe to `/jobs/{id}/events`.
4. **Completion**:
- Job row contains `status=finished`, `result` JSON with URLs.
- API response is the source of truth for artifact discovery.
5. **Failure**:
- Worker updates DB with `status=failed`, `error` string.
- API surfaces the error in `GET /jobs/{id}` and via events.
---
## 6. Request Validation Contracts
`POST /jobs` validation rules:
- `job_type` β {`pose_pointmap`, `model_build`}.
- `scene_id` non-empty string.
- `frames` list size β₯ 1 (unless `frames_dir` provided).
- Each frame entry must have exactly one of `url`, `path`, or `content` (base64 image string).
- `mode` default `causal`; forbid `full` for streaming jobs to match worker behavior.
- Optional numeric fields converted to `int/float` before enqueueing.
- Enforce max frames (configurable) to avoid resource exhaustion.
---
## 7. Security & Authentication
- Deploy behind an API gateway that injects `X-Client-Id` or similar metadata for auditing.
- Support bearer token / API key auth via middleware; store hashed keys in Postgres if needed.
- Restrict access to internal network when possibleβas artifacts contain scene data.
- Sanitize inbound URLs to prevent SSRF; optionally proxy downloads through a whitelist.
---
## 8. Observability & Operations
- **Logging**: Structured logs capturing `job_id`, `scene_id`, `client_request_id`, and remote IP.
- **Metrics**: Track enqueue latency, job duration (from DB timestamps), queue depth, event lag.
- **Health checks**: `GET /healthz` verifying Redis and Postgres connectivity.
- **Backpressure**: Before accepting a new job, check queue length; if above threshold, return `429 Too Many Requests`.
- **Timeouts**: Configure HTTP request timeouts to avoid hanging on large payloads.
---
## 9. Deployment Considerations
- Package as a standalone FastAPI app (e.g., `stream3r_api.main:app`).
- Run under Uvicorn/Gunicorn with workers sized for I/O-bound traffic.
- Configure service with the same environment variables as worker (`STREAM3R_REDIS_URL`, `STREAM3R_DB_DSN`, etc.).
- Use infrastructure-as-code to provision Redis, Postgres, and S3 credentials shared with worker.
---
## 10. Future Enhancements
- **Job cancellation**: Add `DELETE /jobs/{id}` to flag jobs for cancellation (requires worker support).
- **Scene-level dashboards**: Aggregate artifacts from multiple jobs for a scene.
- **Signed download URLs**: API could issue pre-signed URLs for public sharing, decoupled from worker credentials.
- **Batch submissions**: Support uploading a tar/zip and asynchronously unpacking/validating frames.
---
**References**
- `design_docs/worker.md` β Worker design and artifact contracts
- `stream3r/worker/tasks.py` β Concrete payload fields exchanged with the API
|