Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """Non-blocking inference bridge for live WebRTC streams. | |
| Wraps an ``EveWorker`` so that the main asyncio thread never blocks on | |
| pipe I/O. A dedicated daemon thread per connection handles the blocking | |
| ``send_inference`` round-trip (~35-50 ms), while the frame handler | |
| deposits the latest camera frame and picks up the most recent completed | |
| result in under 1 ms. | |
| Trade-off: one frame of display latency (~33 ms at 30 fps) — imperceptible. | |
| FPS cap | |
| ~~~~~~~ | |
| The bridge always caps its inference rate between ``max_fps`` and | |
| ``min_fps``. The ``load_fn`` signal (0.0–1.0) linearly interpolates | |
| between the two: idle → ``max_fps``, full pressure → ``min_fps``. | |
| """ | |
| import threading | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Callable | |
| import numpy as np | |
| from eve_messages import FeatureFlags | |
| from eve_worker_pool import EveWorker, compute_fps_sleep | |
| from log_utils import setup_logger | |
| logger = setup_logger("NonBlockingBridge") | |
| class _FrameInput: | |
| """Snapshot of what the main thread wants processed next.""" | |
| bgr_frame: np.ndarray | |
| features: FeatureFlags | |
| class NonBlockingInferenceBridge: | |
| """Non-blocking wrapper around ``EveWorker.send_inference``. | |
| The background thread is the **sole** caller of worker pipe methods, | |
| so ``EveWorker``'s single-thread contract is satisfied. | |
| Args: | |
| worker: The ``EveWorker`` to drive. Must already be acquired. | |
| setup_fn: Optional callable executed on the bridge thread before | |
| the inference loop starts. Use this for blocking worker | |
| setup (gallery restore, ``send_remove_all_users``, etc.) so | |
| the asyncio thread is never held up. Pipe errors are fatal; | |
| other errors are logged and the inference loop continues. | |
| max_fps: Inference rate when the system is idle. ``None`` | |
| disables the FPS cap entirely (unlimited). | |
| min_fps: Inference rate when the system is under full pressure. | |
| Defaults to ``max_fps`` (no scaling). Ignored when | |
| ``max_fps`` is ``None``. | |
| load_fn: Callable returning a float in [0.0, 1.0] indicating | |
| system pressure. Interpolates between ``max_fps`` (0.0) | |
| and ``min_fps`` (1.0). ``None`` defaults to 0.0 (always | |
| run at ``max_fps``). | |
| """ | |
| def __init__( | |
| self, | |
| worker: EveWorker, | |
| setup_fn: Callable[[], None] | None = None, | |
| max_fps: float | None = None, | |
| min_fps: float | None = None, | |
| load_fn: Callable[[], float] | None = None, | |
| ) -> None: | |
| self._worker = worker | |
| self._setup_fn = setup_fn | |
| self._max_fps = max_fps | |
| self._min_fps = min_fps if min_fps is not None else max_fps | |
| self._load_fn = load_fn | |
| self._latest_input: _FrameInput | None = None | |
| self._latest_result: np.ndarray | None = None | |
| self._lock = threading.Lock() | |
| self._new_frame = threading.Event() | |
| self._stop = threading.Event() | |
| self._error: BaseException | None = None | |
| self._thread = threading.Thread( | |
| target=self._run, | |
| name=f"bridge-w{worker.worker_id}", | |
| daemon=True, | |
| ) | |
| self._thread.start() | |
| # -- public API (called from asyncio / main thread) -------------------- | |
| def submit_and_get_latest( | |
| self, | |
| bgr_frame: np.ndarray, | |
| features: FeatureFlags, | |
| ) -> np.ndarray | None: | |
| """Deposit a frame and return the latest completed result. | |
| Always completes in <1 ms. Returns ``None`` until the first | |
| inference finishes (typically one frame period). | |
| Raises: | |
| BrokenPipeError / EOFError / OSError: re-raised from the | |
| bridge thread when the worker pipe dies. | |
| """ | |
| if self._error is not None: | |
| raise self._error # noqa: B904 – intentional re-raise | |
| with self._lock: | |
| self._latest_input = _FrameInput(bgr_frame, features) | |
| result = self._latest_result | |
| self._new_frame.set() | |
| return result | |
| def is_alive(self) -> bool: | |
| """``True`` while the bridge thread is running and error-free.""" | |
| return self._thread.is_alive() and self._error is None | |
| def stop(self) -> None: | |
| """Signal the bridge thread to exit and wait for it.""" | |
| self._stop.set() | |
| self._new_frame.set() # unblock if waiting | |
| self._thread.join(timeout=2.0) | |
| # -- background thread ------------------------------------------------- | |
| def _run(self) -> None: | |
| """Bridge thread entry point: optional setup, then inference loop.""" | |
| if self._setup_fn is not None: | |
| try: | |
| self._setup_fn() | |
| except (BrokenPipeError, EOFError, OSError) as exc: | |
| logger.warning( | |
| f"Bridge setup (worker {self._worker.worker_id}) pipe error: {exc}" | |
| ) | |
| self._error = exc | |
| return | |
| except Exception as exc: | |
| # Gallery restore failed — not fatal, inference still works | |
| logger.warning( | |
| f"Bridge setup (worker {self._worker.worker_id}) error: {exc}" | |
| ) | |
| self._inference_loop() | |
| def _inference_loop(self) -> None: | |
| while not self._stop.is_set(): | |
| # Wait for a frame (or periodic wake to check _stop) | |
| self._new_frame.wait(timeout=0.5) | |
| self._new_frame.clear() | |
| if self._stop.is_set(): | |
| break | |
| with self._lock: | |
| frame_input = self._latest_input | |
| self._latest_input = None | |
| if frame_input is None: | |
| continue | |
| t0 = time.monotonic() | |
| try: | |
| result = self._worker.send_inference( | |
| frame_input.bgr_frame, frame_input.features | |
| ) | |
| except (BrokenPipeError, EOFError, OSError) as exc: | |
| logger.warning( | |
| f"Bridge thread (worker {self._worker.worker_id}) pipe error: {exc}" | |
| ) | |
| self._error = exc | |
| return | |
| except Exception as exc: | |
| logger.error( | |
| f"Bridge thread (worker {self._worker.worker_id}) unexpected error: {exc}" | |
| ) | |
| self._error = exc | |
| return | |
| with self._lock: | |
| self._latest_result = result | |
| # FPS cap — sleep to keep inference rate within target range. | |
| # Uses _stop.wait() so bridge.stop() returns immediately. | |
| load = self._load_fn() if self._load_fn is not None else 0.0 | |
| sleep = compute_fps_sleep( | |
| time.monotonic() - t0, self._max_fps, self._min_fps, load | |
| ) | |
| if sleep > 0: | |
| self._stop.wait(timeout=sleep) | |