""" Cloud-capable VisionEngine: cameras may be stubbed, but face recognition uses Face_Recognition/face_matcher.py (InsightFace + embeddings) when available. """ from __future__ import annotations import base64 import logging import os import sys import threading from datetime import datetime from typing import Any import numpy as np logger = logging.getLogger(__name__) _FR_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Face_Recognition") if _FR_DIR not in sys.path: sys.path.insert(0, _FR_DIR) def _build_face_engine(): if os.getenv("CEPHEUS_CI_STUB_VISION", "").strip() == "1": from vision_ci_stub import StubFaceEngine logger.info("CEPHEUS: VisionEngine using CI stub face engine (no InsightFace load)") return StubFaceEngine() from face_matcher import FaceMatcher # noqa: E402 logger.info("CEPHEUS: VisionEngine loading FaceMatcher (InsightFace)") return FaceMatcher() class VisionEngine: def __init__(self): logger.info("CEPHEUS: VisionEngine (cloud-capable) initializing...") self.model = None self.active_cameras = {} self.camera_indices: dict[str, int] = {} self._client_cameras: dict[str, set[str]] = {} self._cam_refcounts: dict[str, int] = {} self.latest_raw_frames: dict[str, np.ndarray | None] = {} self.lock = threading.Lock() self.open_lock = threading.Lock() self._pending_opens: set[str] = set() self.face_engine = _build_face_engine() self.face_results: dict[str, list[dict]] = {} # Browser-supplied camera feeds (cloud mode) — updated by POST /vision/track_frame. self.browser_feeds: dict[str, dict] = {} # Facial recognition is ON by default so it auto-starts at launch and # stays on across all feeds until explicitly disabled. self.ai_models: dict[str, bool] = { "crowd": False, "facial": True, "fight": False, "anomaly": False, "medical": False, } self.last_total_count: int = 0 self.room_stats: dict[str, dict] = {} def toggle_ai_model(self, model_id: str) -> bool: with self.lock: current = self.ai_models.get(model_id, False) self.ai_models[model_id] = not current return not current def set_ai_model(self, model_id: str, enabled: bool) -> None: with self.lock: self.ai_models[model_id] = bool(enabled) logger.info("AI model '%s' set to %s", model_id, "ON" if enabled else "OFF") def mark_browser_feed_active(self, cam_id: str, index: int = 0) -> None: """Record that a browser camera is actively streaming frames to the backend.""" with self.lock: self.browser_feeds[cam_id] = { "index": index, "status": "ACTIVE", "source": "browser", "last_seen": datetime.now().isoformat(), } self.camera_indices[cam_id] = index def clear_browser_feeds(self) -> None: with self.lock: self.browser_feeds.clear() self.camera_indices.clear() self.face_results.clear() def get_ai_status(self) -> dict[str, bool]: with self.lock: return dict(self.ai_models) def get_ai_capabilities(self) -> dict[str, dict]: """Cloud engine: facial via browser frames only; YOLO/flow models unavailable.""" _cloud_note = "Requires local GPU vision engine (CEPHEUS_GPU_VISION=1)" return { "facial": {"supported": True, "mode": "browser_frames"}, "crowd": {"supported": False, "mode": "cloud_unavailable", "note": _cloud_note}, "medical": {"supported": False, "mode": "cloud_unavailable", "note": _cloud_note}, "anomaly": {"supported": False, "mode": "cloud_unavailable", "note": _cloud_note}, "fight": {"supported": False, "mode": "cloud_unavailable", "note": _cloud_note}, "fire": { "supported": False, "mode": "not_implemented", "note": "Fire alerts use SOS/IOT integrations — not a CV model", }, } def set_camera(self, cam_id: str, index: int | str) -> bool: """Local OpenCV devices are unavailable in cloud mode; register browser feed slot.""" try: idx = int(index) if not isinstance(index, int) else index except (TypeError, ValueError): idx = 0 self.mark_browser_feed_active(cam_id, max(idx, 0)) logger.info("CEPHEUS_CLOUD: browser camera slot registered for %s (index=%s)", cam_id, index) return True def release_camera(self, cam_id: str) -> None: with self.lock: self.active_cameras.pop(cam_id, None) self.camera_indices.pop(cam_id, None) self.latest_raw_frames.pop(cam_id, None) self.browser_feeds.pop(cam_id, None) def set_camera_for_client(self, client_id: str, cam_id: str, index: int | str) -> bool: with self.lock: self._client_cameras.setdefault(client_id, set()).add(cam_id) self._cam_refcounts[cam_id] = self._cam_refcounts.get(cam_id, 0) + 1 self.mark_browser_feed_active(cam_id, 0 if str(index) == "browser" else index) logger.info( "CEPHEUS_CLOUD: browser camera slot for %s (client %s, index=%s)", cam_id, client_id[:8], index, ) return True def release_camera_for_client(self, client_id: str, cam_id: str) -> None: with self.lock: client_cams = self._client_cameras.get(client_id) if not client_cams or cam_id not in client_cams: return client_cams.discard(cam_id) if not client_cams: self._client_cameras.pop(client_id, None) ref = self._cam_refcounts.get(cam_id, 1) - 1 if ref <= 0: self._cam_refcounts.pop(cam_id, None) self.active_cameras.pop(cam_id, None) self.camera_indices.pop(cam_id, None) self.latest_raw_frames.pop(cam_id, None) self.browser_feeds.pop(cam_id, None) else: self._cam_refcounts[cam_id] = ref def release_client_cameras(self, client_id: str) -> None: for cam_id in list(self._client_cameras.get(client_id, set())): self.release_camera_for_client(client_id, cam_id) def get_active_frames(self) -> tuple[dict, list]: return {}, [] def get_face_results(self) -> dict[str, list[dict]]: return dict(self.face_results) def search_missing_person(self, query_frame: np.ndarray) -> dict: result = dict(self.face_engine.match_frame(query_frame)) result["search_mode"] = "database_only" result["live_cameras_searched"] = len(self.active_cameras) if not result.get("found") and not result.get("reason"): result["reason"] = "No match in enrolled database." return result def warmload_models(self) -> dict: ok = self.face_engine.app is not None return {"stub": not ok, "success": True, "insightface": ok} def process_browser_frame(self, cam_id: str, base64_str: str) -> None: """Handle WS track_frame from BrowserCameraBroadcaster (facial-only in cloud mode).""" if not base64_str: return try: import cv2 encoded = base64_str.split(",")[1] if "," in base64_str else base64_str nparr = np.frombuffer(base64.b64decode(encoded), np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: return with self.lock: self.latest_raw_frames[cam_id] = frame.copy() facial_on = self.ai_models.get("facial", False) if not facial_on: with self.lock: self.face_results[cam_id] = [] return fe = self.face_engine if hasattr(fe, "match_all_faces"): matches = fe.match_all_faces(frame) else: single = fe.match_frame(frame) matches = [single] if single else [] faces_payload = [] for m in matches: entry = { "name": m.get("name", "Unknown"), "confidence": round(float(m.get("confidence", 0.0)), 3), "bbox": m.get("bbox", [0, 0, 0, 0]), "found": bool(m.get("found", True)), } if m.get("embedding") is not None: entry["embedding"] = m["embedding"] faces_payload.append(entry) with self.lock: self.face_results[cam_id] = faces_payload self.mark_browser_feed_active(cam_id, 0) except Exception as e: logger.error("process_browser_frame (cloud) error for %s: %s", cam_id, e) def process_frame(self, base64_string: str) -> tuple[int, str]: try: encoded = base64_string.split(",")[1] if "," in base64_string else base64_string nparr = np.frombuffer(base64.b64decode(encoded), np.uint8) import cv2 frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: return 0, base64_string _, buf = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) return 0, f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}" except Exception as e: logger.error(f"process_frame (cloud): {e}") return 0, base64_string