File size: 10,493 Bytes
1c5aca1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
# STream3R API β€” Job Orchestration and Integration Plan

## Executive Summary

This document proposes a lightweight STream3R API service that wraps the async job system implemented by the RQ worker. The API exposes RESTful endpoints and Redis Stream subscriptions that allow upstream applications to submit reconstruction jobs, track progress, and retrieve completed artifacts. Responsibilities are split cleanly: the API handles request validation, persistence, and orchestration, while the GPU worker (documented in `design_docs/worker.md`) performs heavy inference and storage of artifacts.

**Key outcomes**
- Unified interface for both short `pose_pointmap` jobs and long-running `model_build` jobs.
- Consistent job lifecycle backed by Postgres and Redis, mirroring the worker contract.
- Environment-agnostic integration so other services can enqueue jobs or consume progress events without GPU access.

---

## 1. Scope & Goals

### Goals
- Provide a REST API for submitting jobs and querying job status or results.
- Offer optional server-sent events (SSE) or WebSocket feeds for near-real-time updates on `pose_pointmap` jobs.
- Enforce validation and idempotency for job submissions.
- Expose artifacts written by the worker (S3 URLs, local paths) without re-hosting the files.

### Non-Goals
- Implement GPU inference or artifact generation (handled by RQ worker).
- Manage long-term artifact retention or CDN delivery.
- Provide fine-grained authorization beyond bearer token / API key patterns (left to integration).

---

## 2. Architecture Overview

```
Client ──HTTP──► STream3R API ──RQ Enqueue──► Redis Queue ─► Worker
   β”‚                          β”‚                         β”‚
   β”‚                          └──Postgres──────────────►│
   β”‚                          └──Redis Stream (events)β—„β”€β”˜
   β”‚                          └──S3 (artifact URLs)◄───── Worker writes
   β”‚
   └── Poll `/jobs/{id}` or Subscribe SSE/WebSocket for progress updates
```

Components:
- **FastAPI** service (recommended) running behind an ASGI server.
- **Redis**: shared with worker for queues and events.
- **Postgres**: `stream3r_jobs` table is the canonical job record.
- **S3/Backblaze** (or local storage): artifact URLs returned by worker.
- **RQ worker** (implemented separately) executing jobs and updating state.

---

## 3. Endpoints

### `POST /jobs`
Submit a job for either `pose_pointmap` or `model_build`.

**Request body**
```json
{
  "job_type": "pose_pointmap",
  "scene_id": "SCENE123",
  "mode": "causal",
  "streaming": true,
  "frames": [
    {"url": "https://.../frame_0000.jpg"},
    {"path": "/data/captures/frame_0001.png"}
  ],
  "session_settings": {"prediction_mode": "Predicted Pointmap"},
  "client_request_id": "optional-idempotency-key"
}
```

**Behavior**
- Validate payload (non-empty frames, supported job type, etc.).
- If `client_request_id` is provided, search Postgres for an existing job with the same key to ensure idempotency.
- Assign `job_id` (UUID) and enqueue the payload into the appropriate RQ queue (`pose_pointmap` or `model_build`).
- Insert `stream3r_jobs` row with `status=queued`.
- Return `202 Accepted` with job metadata:

```json
{
  "job_id": "uuid",
  "status": "queued",
  "job_type": "pose_pointmap",
  "scene_id": "SCENE123"
}
```

### `GET /jobs/{job_id}`
Fetch job state and artifact references from Postgres.

**Response example** (`status=finished`)
```json
{
  "job_id": "uuid",
  "job_type": "model_build",
  "scene_id": "SCENE123",
  "status": "finished",
  "created_at": "...",
  "started_at": "...",
  "completed_at": "...",
  "result": {
    "result_url": "s3://bucket/scene/SCENE123/stream3r/models/summary.json",
    "model_dir": "s3://bucket/scene/SCENE123/stream3r/models/",
    "artifacts": {
      "scene_glb_url": "...",
      "poses_url": "...",
      "pointmaps": [ {"frame_id": "frame_0000", "url": "..."} ]
    }
  },
  "error": null
}
```

### `GET /jobs/{job_id}/events`
Server-Sent Events endpoint bridging Redis Streams.

- Uses `XREAD` on `stream3r:events` with `job_id` filter.
- Suitable for browser or gateway consumers needing near-real-time progress.
- Emits lines like:

```
event: progress
data: {"progress": 60, "status": "progress"}

event: finished
data: {"result_url": "s3://..."}
```

Optionally provide a WebSocket variant if SSE is insufficient.

### `GET /jobs`
Paged listing/filtering (optional but useful for dashboards).

Parameters: `scene_id`, `job_type`, `status`, pagination cursors.

---

## 4. Data Model & Persistence

### Postgres (`stream3r_jobs`)

The API is the authoritative owner of the job record. It should create and migrate the following schema during startup (extend with `client_request_id` if idempotency keys are required):

```sql
CREATE TABLE IF NOT EXISTS stream3r_jobs (
  job_id         UUID PRIMARY KEY,
  job_type       TEXT NOT NULL,           -- 'pose_pointmap' | 'model_build'
  scene_id       TEXT NOT NULL,
  status         TEXT NOT NULL,           -- 'queued' | 'started' | 'finished' | 'failed'
  created_at     TIMESTAMPTZ NOT NULL DEFAULT now(),
  started_at     TIMESTAMPTZ,
  completed_at   TIMESTAMPTZ,
  payload        JSONB,                   -- enqueue-time payload
  result         JSONB,                   -- worker-published result bundle
  error          TEXT,
  client_request_id TEXT UNIQUE           -- optional idempotency key
);

CREATE INDEX IF NOT EXISTS stream3r_jobs_scene_id_idx ON stream3r_jobs(scene_id);
CREATE INDEX IF NOT EXISTS stream3r_jobs_status_idx   ON stream3r_jobs(status);
```

### Redis

- **Queues**: two RQ queues existβ€”`pose_pointmap` for latency-sensitive pose extraction and `model_build` for long reconstruction jobs. The API selects the queue based on `job_type`.
- **Event Stream**: the worker pushes lifecycle updates to the Redis Stream `stream3r:events`. Every entry is a flat map with the following fields:

```
job_id, job_type, scene_id,
status, progress, result_url, model_dir,
error, ts
```

`status` takes `started`, `progress`, `finished`, or `failed`. `progress` is an integer percentage (0–100). `result_url` and `model_dir` mirror the URLs stored in Postgres when a job completes.

### Artifact Storage Layout

The worker persists artifacts to S3/Backblaze (or local storage) under a deterministic folder hierarchy. The API does not move files but should surface these URLs verbatim so consumers know where to fetch results:

```
scene/{scene_id}/stream3r/
  results/
    {job_id}.json                     # per-job result JSON (pose_pointmap)
  models/
    kv_cache.pt                       # serialized KV cache
    predictions.npz                   # packed model outputs
    session_settings.json             # runtime/config settings
    selected_frames.json              # frame subset indices (optional)
    scene.glb                         # fused 3D scene
    poses.jsonl                       # per-frame extrinsics
    summary.json                      # canonical model_build result JSON
    pointmaps/
      {frame_token}.npz               # per-frame world coords + confidence
```

`pose_pointmap` jobs typically populate `results/{job_id}.json` plus `pointmaps/`; `model_build` jobs populate the `models/` subtree. All URLs returned by the worker use this structure.

---

## 5. Job Lifecycle

1. **Submit** (`POST /jobs`):
   - Validate input, persist `queued` row, enqueue payload.
   - Return `job_id`.
2. **Worker processing**:
   - Worker acquires GPU lock, runs inference, streams events, writes artifacts, and updates DB.
3. **Status checks**:
   - Clients poll `GET /jobs/{id}` or subscribe to `/jobs/{id}/events`.
4. **Completion**:
   - Job row contains `status=finished`, `result` JSON with URLs.
   - API response is the source of truth for artifact discovery.
5. **Failure**:
   - Worker updates DB with `status=failed`, `error` string.
   - API surfaces the error in `GET /jobs/{id}` and via events.

---

## 6. Request Validation Contracts

`POST /jobs` validation rules:
- `job_type` ∈ {`pose_pointmap`, `model_build`}.
- `scene_id` non-empty string.
- `frames` list size β‰₯ 1 (unless `frames_dir` provided).
- Each frame entry must have exactly one of `url`, `path`, or `content` (base64 image string).
- `mode` default `causal`; forbid `full` for streaming jobs to match worker behavior.
- Optional numeric fields converted to `int/float` before enqueueing.
- Enforce max frames (configurable) to avoid resource exhaustion.

---

## 7. Security & Authentication

- Deploy behind an API gateway that injects `X-Client-Id` or similar metadata for auditing.
- Support bearer token / API key auth via middleware; store hashed keys in Postgres if needed.
- Restrict access to internal network when possibleβ€”as artifacts contain scene data.
- Sanitize inbound URLs to prevent SSRF; optionally proxy downloads through a whitelist.

---

## 8. Observability & Operations

- **Logging**: Structured logs capturing `job_id`, `scene_id`, `client_request_id`, and remote IP.
- **Metrics**: Track enqueue latency, job duration (from DB timestamps), queue depth, event lag.
- **Health checks**: `GET /healthz` verifying Redis and Postgres connectivity.
- **Backpressure**: Before accepting a new job, check queue length; if above threshold, return `429 Too Many Requests`.
- **Timeouts**: Configure HTTP request timeouts to avoid hanging on large payloads.

---

## 9. Deployment Considerations

- Package as a standalone FastAPI app (e.g., `stream3r_api.main:app`).
- Run under Uvicorn/Gunicorn with workers sized for I/O-bound traffic.
- Configure service with the same environment variables as worker (`STREAM3R_REDIS_URL`, `STREAM3R_DB_DSN`, etc.).
- Use infrastructure-as-code to provision Redis, Postgres, and S3 credentials shared with worker.

---

## 10. Future Enhancements

- **Job cancellation**: Add `DELETE /jobs/{id}` to flag jobs for cancellation (requires worker support).
- **Scene-level dashboards**: Aggregate artifacts from multiple jobs for a scene.
- **Signed download URLs**: API could issue pre-signed URLs for public sharing, decoupled from worker credentials.
- **Batch submissions**: Support uploading a tar/zip and asynchronously unpacking/validating frames.

---

**References**
- `design_docs/worker.md` β€” Worker design and artifact contracts
- `stream3r/worker/tasks.py` β€” Concrete payload fields exchanged with the API