sensAI-Generic-Object-Detection / shared /non_blocking_bridge.py
beaupreda's picture
Upload sensAI-Generic-Object-Detection with upload_repo.py
13170f7 verified
Raw
History Blame Contribute Delete
6.95 kB
"""Non-blocking inference bridge for live WebRTC streams.
Wraps an ``EveWorker`` so that the main asyncio thread never blocks on
pipe I/O. A dedicated daemon thread per connection handles the blocking
``send_inference`` round-trip (~35-50 ms), while the frame handler
deposits the latest camera frame and picks up the most recent completed
result in under 1 ms.
Trade-off: one frame of display latency (~33 ms at 30 fps) — imperceptible.
FPS cap
~~~~~~~
The bridge always caps its inference rate between ``max_fps`` and
``min_fps``. The ``load_fn`` signal (0.0–1.0) linearly interpolates
between the two: idle → ``max_fps``, full pressure → ``min_fps``.
"""
import threading
import time
from dataclasses import dataclass
from typing import Callable
import numpy as np
from eve_messages import FeatureFlags
from eve_worker_pool import EveWorker, compute_fps_sleep
from log_utils import setup_logger
logger = setup_logger("NonBlockingBridge")
@dataclass
class _FrameInput:
"""Snapshot of what the main thread wants processed next."""
bgr_frame: np.ndarray
features: FeatureFlags
class NonBlockingInferenceBridge:
"""Non-blocking wrapper around ``EveWorker.send_inference``.
The background thread is the **sole** caller of worker pipe methods,
so ``EveWorker``'s single-thread contract is satisfied.
Args:
worker: The ``EveWorker`` to drive. Must already be acquired.
setup_fn: Optional callable executed on the bridge thread before
the inference loop starts. Use this for blocking worker
setup (gallery restore, ``send_remove_all_users``, etc.) so
the asyncio thread is never held up. Pipe errors are fatal;
other errors are logged and the inference loop continues.
max_fps: Inference rate when the system is idle. ``None``
disables the FPS cap entirely (unlimited).
min_fps: Inference rate when the system is under full pressure.
Defaults to ``max_fps`` (no scaling). Ignored when
``max_fps`` is ``None``.
load_fn: Callable returning a float in [0.0, 1.0] indicating
system pressure. Interpolates between ``max_fps`` (0.0)
and ``min_fps`` (1.0). ``None`` defaults to 0.0 (always
run at ``max_fps``).
"""
def __init__(
self,
worker: EveWorker,
setup_fn: Callable[[], None] | None = None,
max_fps: float | None = None,
min_fps: float | None = None,
load_fn: Callable[[], float] | None = None,
) -> None:
self._worker = worker
self._setup_fn = setup_fn
self._max_fps = max_fps
self._min_fps = min_fps if min_fps is not None else max_fps
self._load_fn = load_fn
self._latest_input: _FrameInput | None = None
self._latest_result: np.ndarray | None = None
self._lock = threading.Lock()
self._new_frame = threading.Event()
self._stop = threading.Event()
self._error: BaseException | None = None
self._thread = threading.Thread(
target=self._run,
name=f"bridge-w{worker.worker_id}",
daemon=True,
)
self._thread.start()
# -- public API (called from asyncio / main thread) --------------------
def submit_and_get_latest(
self,
bgr_frame: np.ndarray,
features: FeatureFlags,
) -> np.ndarray | None:
"""Deposit a frame and return the latest completed result.
Always completes in <1 ms. Returns ``None`` until the first
inference finishes (typically one frame period).
Raises:
BrokenPipeError / EOFError / OSError: re-raised from the
bridge thread when the worker pipe dies.
"""
if self._error is not None:
raise self._error # noqa: B904 – intentional re-raise
with self._lock:
self._latest_input = _FrameInput(bgr_frame, features)
result = self._latest_result
self._new_frame.set()
return result
@property
def is_alive(self) -> bool:
"""``True`` while the bridge thread is running and error-free."""
return self._thread.is_alive() and self._error is None
def stop(self) -> None:
"""Signal the bridge thread to exit and wait for it."""
self._stop.set()
self._new_frame.set() # unblock if waiting
self._thread.join(timeout=2.0)
# -- background thread -------------------------------------------------
def _run(self) -> None:
"""Bridge thread entry point: optional setup, then inference loop."""
if self._setup_fn is not None:
try:
self._setup_fn()
except (BrokenPipeError, EOFError, OSError) as exc:
logger.warning(
f"Bridge setup (worker {self._worker.worker_id}) pipe error: {exc}"
)
self._error = exc
return
except Exception as exc:
# Gallery restore failed — not fatal, inference still works
logger.warning(
f"Bridge setup (worker {self._worker.worker_id}) error: {exc}"
)
self._inference_loop()
def _inference_loop(self) -> None:
while not self._stop.is_set():
# Wait for a frame (or periodic wake to check _stop)
self._new_frame.wait(timeout=0.5)
self._new_frame.clear()
if self._stop.is_set():
break
with self._lock:
frame_input = self._latest_input
self._latest_input = None
if frame_input is None:
continue
t0 = time.monotonic()
try:
result = self._worker.send_inference(
frame_input.bgr_frame, frame_input.features
)
except (BrokenPipeError, EOFError, OSError) as exc:
logger.warning(
f"Bridge thread (worker {self._worker.worker_id}) pipe error: {exc}"
)
self._error = exc
return
except Exception as exc:
logger.error(
f"Bridge thread (worker {self._worker.worker_id}) unexpected error: {exc}"
)
self._error = exc
return
with self._lock:
self._latest_result = result
# FPS cap — sleep to keep inference rate within target range.
# Uses _stop.wait() so bridge.stop() returns immediately.
load = self._load_fn() if self._load_fn is not None else 0.0
sleep = compute_fps_sleep(
time.monotonic() - t0, self._max_fps, self._min_fps, load
)
if sleep > 0:
self._stop.wait(timeout=sleep)