| """ |
| Worker entry for the parallel multi-GPU Auto-Mode dispatcher. |
| |
| This module is intentionally lightweight at top level — it must NOT import |
| torch (or anything that imports torch) before `worker_main()` has a chance to |
| narrow ``CUDA_VISIBLE_DEVICES`` to a single device. ``mp.spawn`` re-imports the |
| target module in each child process; importing torch at top level here would |
| cause CUDA to initialize against all visible devices before we can pin the |
| worker to one GPU. |
| |
| Flow: |
| 1. Parent dispatcher in app.py spawns one of these per concurrent video. |
| 2. ``worker_main(gpu_index, args, progress_queue)`` is the spawn target. |
| 3. It sets ``CUDA_VISIBLE_DEVICES`` to ``str(gpu_index)`` BEFORE importing |
| torch, so the child sees only one device (always referenced as cuda:0). |
| 4. It then imports ``app._segment_video_core`` and runs the segmentation, |
| streaming progress / status / result / error messages back to the parent |
| via ``progress_queue``. Each message carries ``gpu_index`` so the |
| dispatcher can route it to the right per-video UI slot. |
| """ |
|
|
| import os |
| import pathlib |
| import shutil |
| import sys |
| import tempfile |
| import traceback |
| import uuid |
|
|
|
|
| _DISTRIBUTED_ENV_KEYS = ( |
| "RANK", |
| "WORLD_SIZE", |
| "LOCAL_RANK", |
| "LOCAL_WORLD_SIZE", |
| "GROUP_RANK", |
| "GROUP_WORLD_SIZE", |
| "ROLE_RANK", |
| "ROLE_WORLD_SIZE", |
| "MASTER_ADDR", |
| "MASTER_PORT", |
| "TORCHELASTIC_RUN_ID", |
| "TORCHELASTIC_RESTART_COUNT", |
| "TORCHELASTIC_MAX_RESTARTS", |
| ) |
|
|
|
|
| def _truthy(value): |
| return str(value).strip().lower() not in {"0", "false", "no", "off"} |
|
|
|
|
| def _force_single_rank_env(gpu_index=None, cpu_only=False): |
| os.environ.setdefault("CUDA_DEVICE_ORDER", "PCI_BUS_ID") |
| if cpu_only: |
| os.environ["CUDA_VISIBLE_DEVICES"] = "" |
| elif gpu_index is not None: |
| os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_index) |
|
|
| for key in _DISTRIBUTED_ENV_KEYS: |
| os.environ.pop(key, None) |
| os.environ["RANK"] = "0" |
| os.environ["WORLD_SIZE"] = "1" |
| os.environ["LOCAL_RANK"] = "0" |
| os.environ["LOCAL_WORLD_SIZE"] = "1" |
|
|
| os.environ["SAM3_WORKER_MODE"] = "1" |
| os.environ.setdefault("SAM3_CACHE_FRAME_OUTPUTS", "0") |
| os.environ.setdefault("SAM3_OFFLOAD_TRACKER_STATE_TO_CPU", "1") |
| os.environ.setdefault("PYTHONDONTWRITEBYTECODE", "1") |
| sys.dont_write_bytecode = True |
|
|
|
|
| def _copy_runtime_item(source_dir, runtime_dir, name): |
| src = pathlib.Path(source_dir) / name |
| if not src.exists(): |
| fallback = pathlib.Path(__file__).resolve().parent / name |
| src = fallback if fallback.exists() else src |
| if not src.exists(): |
| return |
|
|
| dst = pathlib.Path(runtime_dir) / name |
| if src.is_dir(): |
| shutil.copytree( |
| src, |
| dst, |
| symlinks=False, |
| ignore=shutil.ignore_patterns("__pycache__", "*.pyc", ".git"), |
| ) |
| else: |
| shutil.copy2(src, dst) |
|
|
|
|
| def _prepare_runtime(worker_options, tag): |
| """ |
| Optionally copy app.py + SAM3 source into a private temp runtime. |
| |
| The model weights still come from the HF cache, but Python module globals, |
| __pycache__, and source imports are per-worker. Result temp files stay in |
| the normal system temp area so the parent process can persist them after |
| the worker exits. Disable with |
| SAM3_PARALLEL_COPY_RUNTIME=0 if startup latency matters more than isolation. |
| """ |
| worker_options = worker_options or {} |
| source_dir = pathlib.Path( |
| worker_options.get("source_app_dir") or pathlib.Path(__file__).resolve().parent |
| ).resolve() |
| os.environ["SAM3_OUTPUT_ROOT"] = str(source_dir) |
|
|
| if not _truthy(worker_options.get("isolate_runtime", "1")): |
| return str(source_dir), None |
|
|
| app_src = source_dir / "app.py" |
| if not app_src.exists(): |
| return str(source_dir), None |
|
|
| runtime_dir = pathlib.Path( |
| tempfile.mkdtemp(prefix=f"sam3_{tag}_{uuid.uuid4().hex[:8]}_") |
| ).resolve() |
| try: |
| _copy_runtime_item(source_dir, runtime_dir, "app.py") |
| _copy_runtime_item(source_dir, runtime_dir, "parallel_segment_worker.py") |
| _copy_runtime_item(source_dir, runtime_dir, "sam3") |
| _copy_runtime_item(source_dir, runtime_dir, "assets") |
| os.environ["SAM3_ISOLATED_RUNTIME_DIR"] = str(runtime_dir) |
| return str(runtime_dir), str(runtime_dir) |
| except Exception: |
| shutil.rmtree(runtime_dir, ignore_errors=True) |
| raise |
|
|
|
|
| def _prepend_import_paths(*paths): |
| for path in reversed([p for p in paths if p]): |
| if path in sys.path: |
| sys.path.remove(path) |
| sys.path.insert(0, path) |
|
|
|
|
| def _cleanup_runtime(runtime_dir): |
| if runtime_dir: |
| shutil.rmtree(runtime_dir, ignore_errors=True) |
|
|
|
|
| def worker_main(gpu_index, args, progress_queue, worker_options=None): |
| |
| _force_single_rank_env(gpu_index=gpu_index) |
|
|
| runtime_dir = None |
| try: |
| app_root, runtime_dir = _prepare_runtime(worker_options, f"gpu{gpu_index}") |
| except Exception as exc: |
| progress_queue.put({ |
| "type": "error", |
| "message": f"runtime isolation setup failed on GPU {gpu_index}: {exc}", |
| "traceback": traceback.format_exc(), |
| "gpu_index": gpu_index, |
| }) |
| return |
|
|
| repo_root = os.path.dirname(os.path.abspath(__file__)) |
| _prepend_import_paths(app_root, repo_root) |
| sys.modules.pop("app", None) |
|
|
| try: |
| import torch |
| if torch.cuda.is_available(): |
| torch.cuda.set_device(0) |
| except Exception as exc: |
| progress_queue.put({ |
| "type": "error", |
| "message": f"torch init failed on GPU {gpu_index}: {exc}", |
| "traceback": traceback.format_exc(), |
| "gpu_index": gpu_index, |
| }) |
| _cleanup_runtime(runtime_dir) |
| return |
|
|
| try: |
| progress_queue.put({ |
| "type": "status", |
| "message": ( |
| f"🔒 GPU {gpu_index}: isolated worker ready " |
| f"(CUDA_VISIBLE_DEVICES={os.environ.get('CUDA_VISIBLE_DEVICES')}, " |
| f"WORLD_SIZE={os.environ.get('WORLD_SIZE')}, runtime={app_root})" |
| ), |
| "gpu_index": gpu_index, |
| }) |
| from app import _segment_video_core |
| except Exception as exc: |
| progress_queue.put({ |
| "type": "error", |
| "message": f"import _segment_video_core failed: {exc}", |
| "traceback": traceback.format_exc(), |
| "gpu_index": gpu_index, |
| }) |
| _cleanup_runtime(runtime_dir) |
| return |
|
|
| ( |
| video_path, |
| text_prompt, |
| duration_limit, |
| id_corrections_text, |
| id_drop_text, |
| id_override_start_sec, |
| show_trails, |
| view_mode, |
| ) = args |
|
|
| def _progress_cb(val, desc): |
| progress_queue.put({ |
| "type": "progress", |
| "value": val, |
| "desc": desc, |
| "gpu_index": gpu_index, |
| }) |
|
|
| def _status_cb(msg): |
| progress_queue.put({ |
| "type": "status", |
| "message": msg, |
| "gpu_index": gpu_index, |
| }) |
|
|
| try: |
| progress_queue.put({ |
| "type": "progress", |
| "value": 0.0, |
| "desc": f"GPU {gpu_index}: starting...", |
| "gpu_index": gpu_index, |
| }) |
| out_path, status, loc_path = _segment_video_core( |
| video_path, |
| text_prompt, |
| duration_limit, |
| id_corrections_text=id_corrections_text, |
| id_drop_text=id_drop_text, |
| id_override_start_sec=id_override_start_sec, |
| show_trails=show_trails, |
| view_mode=view_mode, |
| progress_callback=_progress_cb, |
| status_callback=_status_cb, |
| ) |
| progress_queue.put({ |
| "type": "result", |
| "data": (out_path, status, loc_path), |
| "gpu_index": gpu_index, |
| }) |
| except Exception as exc: |
| progress_queue.put({ |
| "type": "error", |
| "message": str(exc), |
| "traceback": traceback.format_exc(), |
| "gpu_index": gpu_index, |
| }) |
| finally: |
| try: |
| import torch |
| if torch.cuda.is_available(): |
| torch.cuda.empty_cache() |
| torch.cuda.ipc_collect() |
| except Exception: |
| pass |
| _cleanup_runtime(runtime_dir) |
|
|
|
|
| def overlay_worker_main(task_id, args, event_queue, worker_options=None): |
| """ |
| Render segmentation + trails overlays in an isolated process so that |
| multiple videos' CPU-bound overlay work can run on different vCPUs in |
| parallel after their GPU segmentation completes. |
| |
| Pushes a single ``overlay_done`` (or ``overlay_error``) message onto |
| ``event_queue`` carrying ``task_id`` so the parent dispatcher can route |
| it back to the correct video slot. |
| """ |
| |
| |
| _force_single_rank_env(cpu_only=True) |
|
|
| runtime_dir = None |
| try: |
| app_root, runtime_dir = _prepare_runtime(worker_options, f"overlay_{task_id}") |
| except Exception as exc: |
| event_queue.put({ |
| "type": "overlay_error", |
| "task_id": task_id, |
| "error": f"runtime isolation setup failed: {exc}", |
| "traceback": traceback.format_exc(), |
| }) |
| return |
|
|
| repo_root = os.path.dirname(os.path.abspath(__file__)) |
| _prepend_import_paths(app_root, repo_root) |
| sys.modules.pop("app", None) |
|
|
| try: |
| from app import ( |
| _render_segmentation_overlay_video, |
| _render_trails_overlay_video, |
| _build_trail_filter_options, |
| _persist_for_download, |
| ) |
| except Exception as exc: |
| event_queue.put({ |
| "type": "overlay_error", |
| "task_id": task_id, |
| "error": f"import failed: {exc}", |
| "traceback": traceback.format_exc(), |
| }) |
| _cleanup_runtime(runtime_dir) |
| return |
|
|
| (output_video, location_path, text_prompt) = args |
|
|
| try: |
| seg_overlay = _render_segmentation_overlay_video( |
| output_video, location_path, text_prompt |
| ) |
| except Exception as exc: |
| event_queue.put({ |
| "type": "overlay_error", |
| "task_id": task_id, |
| "error": f"seg overlay failed: {exc}", |
| "traceback": traceback.format_exc(), |
| }) |
| _cleanup_runtime(runtime_dir) |
| return |
|
|
| seg_display_path = seg_overlay or output_video |
|
|
| try: |
| trails_overlay = _render_trails_overlay_video( |
| seg_display_path, location_path, text_prompt, force_unique=True |
| ) |
| except Exception as exc: |
| |
| trails_overlay = None |
| trails_error = f"trails overlay failed: {exc}" |
| else: |
| trails_error = None |
|
|
| try: |
| choices, defaults, _legend = _build_trail_filter_options( |
| location_path, text_prompt |
| ) |
| except Exception: |
| choices, defaults = [], [] |
|
|
| download_path = trails_overlay or seg_display_path |
| try: |
| persisted = _persist_for_download(download_path, subdir="downloads") |
| if persisted: |
| download_path = persisted |
| except Exception: |
| pass |
|
|
| event_queue.put({ |
| "type": "overlay_done", |
| "task_id": task_id, |
| "seg_overlay": seg_display_path, |
| "trails_overlay": trails_overlay, |
| "download_path": download_path, |
| "trail_choices": choices, |
| "trail_selected": defaults, |
| "warning": trails_error, |
| }) |
| _cleanup_runtime(runtime_dir) |
|
|