solution_challenge_backend / backend /vision_session.py
github-actions
Deploy to Hugging Face
c794b6b
Raw
History Blame Contribute Delete
5.6 kB
"""Per-session camera ownership and face-result freshness (TTL)."""
from __future__ import annotations
import os
import threading
import time
from typing import Any
PRESENCE_TTL_S = float(os.getenv("CEPHEUS_PRESENCE_TTL", "10") or "10")
_lock = threading.Lock()
_frame_meta: dict[str, dict[str, Any]] = {}
_camera_owners: dict[str, str] = {}
_client_cameras: dict[str, set[str]] = {}
_client_alive: dict[str, float] = {}
_client_selection: dict[str, tuple[str, str]] = {}
_frame_counter = 0
def register_client(client_id: str) -> None:
with _lock:
_client_alive[client_id] = time.time()
def touch_client(client_id: str) -> None:
with _lock:
_client_alive[client_id] = time.time()
def release_client(client_id: str) -> None:
with _lock:
_client_alive.pop(client_id, None)
_client_selection.pop(client_id, None)
cams = _client_cameras.pop(client_id, set())
for cam_id in cams:
if _camera_owners.get(cam_id) == client_id:
_camera_owners.pop(cam_id, None)
def camera_count() -> int:
with _lock:
return len(_camera_owners)
def claim_camera(client_id: str, cam_id: str, index: Any) -> tuple[bool, str]:
"""One owner per cam_id; one active camera per client."""
idx_key = str(index)
sel = (cam_id, idx_key)
with _lock:
touch_client(client_id)
prev = _client_selection.get(client_id)
if prev == sel:
return True, "duplicate_ignored"
owner = _camera_owners.get(cam_id)
if owner and owner != client_id and owner in _client_alive:
return False, "camera_owned_by_other_client"
if prev and prev[0] != cam_id:
old_cam = prev[0]
if _camera_owners.get(old_cam) == client_id:
_camera_owners.pop(old_cam, None)
_client_cameras.get(client_id, set()).discard(old_cam)
_camera_owners[cam_id] = client_id
_client_cameras.setdefault(client_id, set()).add(cam_id)
_client_selection[client_id] = sel
return True, "ok"
def record_processed_frame(cam_id: str, had_match: bool) -> int:
"""Bump frame id + TTL only when a frame was actually processed."""
global _frame_counter
now = time.time()
with _lock:
_frame_counter += 1
prev = _frame_meta.get(cam_id, {})
_frame_meta[cam_id] = {
"last_frame_ts": now,
"last_match_ts": now if had_match else prev.get("last_match_ts"),
"frame_id": _frame_counter,
"presence_expires_at": now + PRESENCE_TTL_S,
}
return _frame_counter
def get_cam_meta(cam_id: str) -> dict[str, Any]:
with _lock:
return dict(_frame_meta.get(cam_id, {}))
def is_cam_stale(cam_id: str) -> bool:
with _lock:
meta = _frame_meta.get(cam_id)
if not meta:
return True
return time.time() > float(meta.get("presence_expires_at", 0))
def presence_summary() -> dict[str, Any]:
now = time.time()
with _lock:
if not _frame_meta:
return {
"presence_state": "inactive",
"presence_is_stale": True,
"last_frame_age_ms": None,
"last_presence_update_age_ms": None,
}
latest_ts = max(m.get("last_frame_ts", 0) for m in _frame_meta.values())
match_ts = max((m.get("last_match_ts") or 0) for m in _frame_meta.values())
age_ms = int((now - latest_ts) * 1000) if latest_ts else None
match_age_ms = int((now - match_ts) * 1000) if match_ts else None
stale = age_ms is None or age_ms > int(PRESENCE_TTL_S * 1000)
return {
"presence_state": "inactive" if stale else "active",
"presence_is_stale": stale,
"last_frame_age_ms": age_ms,
"last_presence_update_age_ms": match_age_ms,
"last_match_age_ms": match_age_ms,
}
def build_face_results_payload(raw_results: dict, strip_fn) -> dict:
"""Return per-camera payload; never resurrect expired recognition."""
now = time.time()
out: dict[str, Any] = {}
for cam_id, faces in (raw_results or {}).items():
meta = get_cam_meta(cam_id)
expires = float(meta.get("presence_expires_at", 0))
last_ts = float(meta.get("last_frame_ts", 0))
age_ms = int((now - last_ts) * 1000) if last_ts else None
stale = not meta or now > expires
if stale:
out[cam_id] = {
"live": False,
"stale": True,
"reason": "frame_timeout" if meta else "no_frames",
"tracked_faces": [],
"faces": [],
"confidence": None,
"presence": False,
"last_frame_ts": last_ts or None,
"last_match_ts": meta.get("last_match_ts"),
"presence_expires_at": expires or None,
"last_frame_age_ms": age_ms,
"frame_id": meta.get("frame_id"),
}
else:
public = strip_fn({cam_id: faces}).get(cam_id, faces)
out[cam_id] = {
"live": True,
"stale": False,
"tracked_faces": public,
"faces": public,
"presence": bool(public),
"last_frame_ts": last_ts,
"last_match_ts": meta.get("last_match_ts"),
"presence_expires_at": expires,
"last_frame_age_ms": age_ms,
"frame_id": meta.get("frame_id"),
}
return out