brian4dwell commited on
Commit
4b27cfa
·
1 Parent(s): 01e8928
stream3r/worker/main.py CHANGED
@@ -83,11 +83,14 @@ def main() -> None:
83
  runtime = get_runtime()
84
 
85
  queues = [Queue(name, connection=runtime.redis) for name in args.queues]
 
 
86
  for queue in queues:
87
  logger.info("Listening on queue '%s'", queue.name)
88
 
89
  worker = Stream3RWorker(
90
  queues,
 
91
  name=settings.worker_name,
92
  default_timeout=settings.default_job_timeout,
93
  )
 
83
  runtime = get_runtime()
84
 
85
  queues = [Queue(name, connection=runtime.redis) for name in args.queues]
86
+ if not queues:
87
+ raise ValueError("No queues configured for worker")
88
  for queue in queues:
89
  logger.info("Listening on queue '%s'", queue.name)
90
 
91
  worker = Stream3RWorker(
92
  queues,
93
+ connection=runtime.redis,
94
  name=settings.worker_name,
95
  default_timeout=settings.default_job_timeout,
96
  )
stream3r/worker/tasks.py CHANGED
@@ -160,7 +160,13 @@ def _register_scene_media_entries(runtime: WorkerRuntime, scene_id: str, entries
160
  logger.exception("Failed to register scene media entries for scene %s", scene_id)
161
 
162
 
163
- def _resolve_frame_entry(entry: Any, *, index: int, dest_dir: Path) -> FrameRecord:
 
 
 
 
 
 
164
  metadata: dict[str, Any] = {}
165
  timestamp = None
166
  source = None
@@ -190,6 +196,13 @@ def _resolve_frame_entry(entry: Any, *, index: int, dest_dir: Path) -> FrameReco
190
  raise FileNotFoundError(f"Frame path does not exist: {path}")
191
  destination = dest_dir / (path.name if path.suffix else f"{frame_id}.jpg")
192
  shutil.copy2(path, destination)
 
 
 
 
 
 
 
193
  elif url := entry.get("url"):
194
  source = url
195
  suffix = Path(url).suffix or ".jpg"
@@ -198,10 +211,9 @@ def _resolve_frame_entry(entry: Any, *, index: int, dest_dir: Path) -> FrameReco
198
  elif content := entry.get("content"):
199
  destination = dest_dir / f"{frame_id}.jpg"
200
  _write_base64(content, destination)
201
- else:
202
- raise ValueError("Frame entry must include 'path', 'url', or 'content'")
203
  else:
204
- raise TypeError("Unsupported frame specification")
 
205
 
206
  if destination.suffix.lower() not in IMAGE_EXTENSIONS:
207
  destination = destination.with_suffix(".png")
@@ -231,7 +243,9 @@ def _collect_frames(
231
  for entry in frames_payload:
232
  if frame_limit and frame_limit > 0 and len(records) >= frame_limit:
233
  break
234
- records.append(_resolve_frame_entry(entry, index=len(records), dest_dir=frames_dir))
 
 
235
  else:
236
  directory = payload.get("frames_dir") or payload.get("images_dir")
237
  if directory:
 
160
  logger.exception("Failed to register scene media entries for scene %s", scene_id)
161
 
162
 
163
+ def _resolve_frame_entry(
164
+ entry: Any,
165
+ *,
166
+ index: int,
167
+ dest_dir: Path,
168
+ runtime: WorkerRuntime | None = None,
169
+ ) -> FrameRecord:
170
  metadata: dict[str, Any] = {}
171
  timestamp = None
172
  source = None
 
196
  raise FileNotFoundError(f"Frame path does not exist: {path}")
197
  destination = dest_dir / (path.name if path.suffix else f"{frame_id}.jpg")
198
  shutil.copy2(path, destination)
199
+ elif storage_key := entry.get("storage_key") or entry.get("file") or entry.get("key"):
200
+ if runtime is None:
201
+ raise ValueError("Frame entry provided storage key but runtime is not available")
202
+ filename = Path(str(storage_key)).name or f"{frame_id}.jpg"
203
+ destination = dest_dir / filename
204
+ runtime.storage.download_to_path(str(storage_key), destination)
205
+ source = str(storage_key)
206
  elif url := entry.get("url"):
207
  source = url
208
  suffix = Path(url).suffix or ".jpg"
 
211
  elif content := entry.get("content"):
212
  destination = dest_dir / f"{frame_id}.jpg"
213
  _write_base64(content, destination)
 
 
214
  else:
215
+ raise ValueError("Frame entry must include 'path', 'url', or 'content'")
216
+
217
 
218
  if destination.suffix.lower() not in IMAGE_EXTENSIONS:
219
  destination = destination.with_suffix(".png")
 
243
  for entry in frames_payload:
244
  if frame_limit and frame_limit > 0 and len(records) >= frame_limit:
245
  break
246
+ records.append(
247
+ _resolve_frame_entry(entry, index=len(records), dest_dir=frames_dir, runtime=runtime)
248
+ )
249
  else:
250
  directory = payload.get("frames_dir") or payload.get("images_dir")
251
  if directory: