| """Per-session camera ownership and face-result freshness (TTL).""" |
| from __future__ import annotations |
|
|
| import os |
| import threading |
| import time |
| from typing import Any |
|
|
| PRESENCE_TTL_S = float(os.getenv("CEPHEUS_PRESENCE_TTL", "10") or "10") |
|
|
| _lock = threading.Lock() |
| _frame_meta: dict[str, dict[str, Any]] = {} |
| _camera_owners: dict[str, str] = {} |
| _client_cameras: dict[str, set[str]] = {} |
| _client_alive: dict[str, float] = {} |
| _client_selection: dict[str, tuple[str, str]] = {} |
| _frame_counter = 0 |
|
|
|
|
| def register_client(client_id: str) -> None: |
| with _lock: |
| _client_alive[client_id] = time.time() |
|
|
|
|
| def touch_client(client_id: str) -> None: |
| with _lock: |
| _client_alive[client_id] = time.time() |
|
|
|
|
| def release_client(client_id: str) -> None: |
| with _lock: |
| _client_alive.pop(client_id, None) |
| _client_selection.pop(client_id, None) |
| cams = _client_cameras.pop(client_id, set()) |
| for cam_id in cams: |
| if _camera_owners.get(cam_id) == client_id: |
| _camera_owners.pop(cam_id, None) |
|
|
|
|
| def camera_count() -> int: |
| with _lock: |
| return len(_camera_owners) |
|
|
|
|
| def claim_camera(client_id: str, cam_id: str, index: Any) -> tuple[bool, str]: |
| """One owner per cam_id; one active camera per client.""" |
| idx_key = str(index) |
| sel = (cam_id, idx_key) |
| with _lock: |
| touch_client(client_id) |
| prev = _client_selection.get(client_id) |
| if prev == sel: |
| return True, "duplicate_ignored" |
| owner = _camera_owners.get(cam_id) |
| if owner and owner != client_id and owner in _client_alive: |
| return False, "camera_owned_by_other_client" |
| if prev and prev[0] != cam_id: |
| old_cam = prev[0] |
| if _camera_owners.get(old_cam) == client_id: |
| _camera_owners.pop(old_cam, None) |
| _client_cameras.get(client_id, set()).discard(old_cam) |
| _camera_owners[cam_id] = client_id |
| _client_cameras.setdefault(client_id, set()).add(cam_id) |
| _client_selection[client_id] = sel |
| return True, "ok" |
|
|
|
|
| def record_processed_frame(cam_id: str, had_match: bool) -> int: |
| """Bump frame id + TTL only when a frame was actually processed.""" |
| global _frame_counter |
| now = time.time() |
| with _lock: |
| _frame_counter += 1 |
| prev = _frame_meta.get(cam_id, {}) |
| _frame_meta[cam_id] = { |
| "last_frame_ts": now, |
| "last_match_ts": now if had_match else prev.get("last_match_ts"), |
| "frame_id": _frame_counter, |
| "presence_expires_at": now + PRESENCE_TTL_S, |
| } |
| return _frame_counter |
|
|
|
|
| def get_cam_meta(cam_id: str) -> dict[str, Any]: |
| with _lock: |
| return dict(_frame_meta.get(cam_id, {})) |
|
|
|
|
| def is_cam_stale(cam_id: str) -> bool: |
| with _lock: |
| meta = _frame_meta.get(cam_id) |
| if not meta: |
| return True |
| return time.time() > float(meta.get("presence_expires_at", 0)) |
|
|
|
|
| def presence_summary() -> dict[str, Any]: |
| now = time.time() |
| with _lock: |
| if not _frame_meta: |
| return { |
| "presence_state": "inactive", |
| "presence_is_stale": True, |
| "last_frame_age_ms": None, |
| "last_presence_update_age_ms": None, |
| } |
| latest_ts = max(m.get("last_frame_ts", 0) for m in _frame_meta.values()) |
| match_ts = max((m.get("last_match_ts") or 0) for m in _frame_meta.values()) |
| age_ms = int((now - latest_ts) * 1000) if latest_ts else None |
| match_age_ms = int((now - match_ts) * 1000) if match_ts else None |
| stale = age_ms is None or age_ms > int(PRESENCE_TTL_S * 1000) |
| return { |
| "presence_state": "inactive" if stale else "active", |
| "presence_is_stale": stale, |
| "last_frame_age_ms": age_ms, |
| "last_presence_update_age_ms": match_age_ms, |
| "last_match_age_ms": match_age_ms, |
| } |
|
|
|
|
| def build_face_results_payload(raw_results: dict, strip_fn) -> dict: |
| """Return per-camera payload; never resurrect expired recognition.""" |
| now = time.time() |
| out: dict[str, Any] = {} |
| for cam_id, faces in (raw_results or {}).items(): |
| meta = get_cam_meta(cam_id) |
| expires = float(meta.get("presence_expires_at", 0)) |
| last_ts = float(meta.get("last_frame_ts", 0)) |
| age_ms = int((now - last_ts) * 1000) if last_ts else None |
| stale = not meta or now > expires |
| if stale: |
| out[cam_id] = { |
| "live": False, |
| "stale": True, |
| "reason": "frame_timeout" if meta else "no_frames", |
| "tracked_faces": [], |
| "faces": [], |
| "confidence": None, |
| "presence": False, |
| "last_frame_ts": last_ts or None, |
| "last_match_ts": meta.get("last_match_ts"), |
| "presence_expires_at": expires or None, |
| "last_frame_age_ms": age_ms, |
| "frame_id": meta.get("frame_id"), |
| } |
| else: |
| public = strip_fn({cam_id: faces}).get(cam_id, faces) |
| out[cam_id] = { |
| "live": True, |
| "stale": False, |
| "tracked_faces": public, |
| "faces": public, |
| "presence": bool(public), |
| "last_frame_ts": last_ts, |
| "last_match_ts": meta.get("last_match_ts"), |
| "presence_expires_at": expires, |
| "last_frame_age_ms": age_ms, |
| "frame_id": meta.get("frame_id"), |
| } |
| return out |
|
|