"""Non-blocking inference bridge for live WebRTC streams. Wraps an ``EveWorker`` so that the main asyncio thread never blocks on pipe I/O. A dedicated daemon thread per connection handles the blocking ``send_inference`` round-trip (~35-50 ms), while the frame handler deposits the latest camera frame and picks up the most recent completed result in under 1 ms. Trade-off: one frame of display latency (~33 ms at 30 fps) — imperceptible. FPS cap ~~~~~~~ The bridge always caps its inference rate between ``max_fps`` and ``min_fps``. The ``load_fn`` signal (0.0–1.0) linearly interpolates between the two: idle → ``max_fps``, full pressure → ``min_fps``. """ import threading import time from dataclasses import dataclass from typing import Callable import numpy as np from eve_messages import FeatureFlags from eve_worker_pool import EveWorker, compute_fps_sleep from log_utils import setup_logger logger = setup_logger("NonBlockingBridge") @dataclass class _FrameInput: """Snapshot of what the main thread wants processed next.""" bgr_frame: np.ndarray features: FeatureFlags class NonBlockingInferenceBridge: """Non-blocking wrapper around ``EveWorker.send_inference``. The background thread is the **sole** caller of worker pipe methods, so ``EveWorker``'s single-thread contract is satisfied. Args: worker: The ``EveWorker`` to drive. Must already be acquired. setup_fn: Optional callable executed on the bridge thread before the inference loop starts. Use this for blocking worker setup (gallery restore, ``send_remove_all_users``, etc.) so the asyncio thread is never held up. Pipe errors are fatal; other errors are logged and the inference loop continues. max_fps: Inference rate when the system is idle. ``None`` disables the FPS cap entirely (unlimited). min_fps: Inference rate when the system is under full pressure. Defaults to ``max_fps`` (no scaling). Ignored when ``max_fps`` is ``None``. load_fn: Callable returning a float in [0.0, 1.0] indicating system pressure. Interpolates between ``max_fps`` (0.0) and ``min_fps`` (1.0). ``None`` defaults to 0.0 (always run at ``max_fps``). """ def __init__( self, worker: EveWorker, setup_fn: Callable[[], None] | None = None, max_fps: float | None = None, min_fps: float | None = None, load_fn: Callable[[], float] | None = None, ) -> None: self._worker = worker self._setup_fn = setup_fn self._max_fps = max_fps self._min_fps = min_fps if min_fps is not None else max_fps self._load_fn = load_fn self._latest_input: _FrameInput | None = None self._latest_result: np.ndarray | None = None self._lock = threading.Lock() self._new_frame = threading.Event() self._stop = threading.Event() self._error: BaseException | None = None self._thread = threading.Thread( target=self._run, name=f"bridge-w{worker.worker_id}", daemon=True, ) self._thread.start() # -- public API (called from asyncio / main thread) -------------------- def submit_and_get_latest( self, bgr_frame: np.ndarray, features: FeatureFlags, ) -> np.ndarray | None: """Deposit a frame and return the latest completed result. Always completes in <1 ms. Returns ``None`` until the first inference finishes (typically one frame period). Raises: BrokenPipeError / EOFError / OSError: re-raised from the bridge thread when the worker pipe dies. """ if self._error is not None: raise self._error # noqa: B904 – intentional re-raise with self._lock: self._latest_input = _FrameInput(bgr_frame, features) result = self._latest_result self._new_frame.set() return result @property def is_alive(self) -> bool: """``True`` while the bridge thread is running and error-free.""" return self._thread.is_alive() and self._error is None def stop(self) -> None: """Signal the bridge thread to exit and wait for it.""" self._stop.set() self._new_frame.set() # unblock if waiting self._thread.join(timeout=2.0) # -- background thread ------------------------------------------------- def _run(self) -> None: """Bridge thread entry point: optional setup, then inference loop.""" if self._setup_fn is not None: try: self._setup_fn() except (BrokenPipeError, EOFError, OSError) as exc: logger.warning( f"Bridge setup (worker {self._worker.worker_id}) pipe error: {exc}" ) self._error = exc return except Exception as exc: # Gallery restore failed — not fatal, inference still works logger.warning( f"Bridge setup (worker {self._worker.worker_id}) error: {exc}" ) self._inference_loop() def _inference_loop(self) -> None: while not self._stop.is_set(): # Wait for a frame (or periodic wake to check _stop) self._new_frame.wait(timeout=0.5) self._new_frame.clear() if self._stop.is_set(): break with self._lock: frame_input = self._latest_input self._latest_input = None if frame_input is None: continue t0 = time.monotonic() try: result = self._worker.send_inference( frame_input.bgr_frame, frame_input.features ) except (BrokenPipeError, EOFError, OSError) as exc: logger.warning( f"Bridge thread (worker {self._worker.worker_id}) pipe error: {exc}" ) self._error = exc return except Exception as exc: logger.error( f"Bridge thread (worker {self._worker.worker_id}) unexpected error: {exc}" ) self._error = exc return with self._lock: self._latest_result = result # FPS cap — sleep to keep inference rate within target range. # Uses _stop.wait() so bridge.stop() returns immediately. load = self._load_fn() if self._load_fn is not None else 0.0 sleep = compute_fps_sleep( time.monotonic() - t0, self._max_fps, self._min_fps, load ) if sleep > 0: self._stop.wait(timeout=sleep)