solution_challenge_backend / backend /vision_engine_cloud.py
github-actions
Deploy to Hugging Face
c794b6b
Raw
History Blame Contribute Delete
9.8 kB
"""
Cloud-capable VisionEngine: cameras may be stubbed, but face recognition uses
Face_Recognition/face_matcher.py (InsightFace + embeddings) when available.
"""
from __future__ import annotations
import base64
import logging
import os
import sys
import threading
from datetime import datetime
from typing import Any
import numpy as np
logger = logging.getLogger(__name__)
_FR_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), "Face_Recognition")
if _FR_DIR not in sys.path:
sys.path.insert(0, _FR_DIR)
def _build_face_engine():
if os.getenv("CEPHEUS_CI_STUB_VISION", "").strip() == "1":
from vision_ci_stub import StubFaceEngine
logger.info("CEPHEUS: VisionEngine using CI stub face engine (no InsightFace load)")
return StubFaceEngine()
from face_matcher import FaceMatcher # noqa: E402
logger.info("CEPHEUS: VisionEngine loading FaceMatcher (InsightFace)")
return FaceMatcher()
class VisionEngine:
def __init__(self):
logger.info("CEPHEUS: VisionEngine (cloud-capable) initializing...")
self.model = None
self.active_cameras = {}
self.camera_indices: dict[str, int] = {}
self._client_cameras: dict[str, set[str]] = {}
self._cam_refcounts: dict[str, int] = {}
self.latest_raw_frames: dict[str, np.ndarray | None] = {}
self.lock = threading.Lock()
self.open_lock = threading.Lock()
self._pending_opens: set[str] = set()
self.face_engine = _build_face_engine()
self.face_results: dict[str, list[dict]] = {}
# Browser-supplied camera feeds (cloud mode) — updated by POST /vision/track_frame.
self.browser_feeds: dict[str, dict] = {}
# Facial recognition is ON by default so it auto-starts at launch and
# stays on across all feeds until explicitly disabled.
self.ai_models: dict[str, bool] = {
"crowd": False,
"facial": True,
"fight": False,
"anomaly": False,
"medical": False,
}
self.last_total_count: int = 0
self.room_stats: dict[str, dict] = {}
def toggle_ai_model(self, model_id: str) -> bool:
with self.lock:
current = self.ai_models.get(model_id, False)
self.ai_models[model_id] = not current
return not current
def set_ai_model(self, model_id: str, enabled: bool) -> None:
with self.lock:
self.ai_models[model_id] = bool(enabled)
logger.info("AI model '%s' set to %s", model_id, "ON" if enabled else "OFF")
def mark_browser_feed_active(self, cam_id: str, index: int = 0) -> None:
"""Record that a browser camera is actively streaming frames to the backend."""
with self.lock:
self.browser_feeds[cam_id] = {
"index": index,
"status": "ACTIVE",
"source": "browser",
"last_seen": datetime.now().isoformat(),
}
self.camera_indices[cam_id] = index
def clear_browser_feeds(self) -> None:
with self.lock:
self.browser_feeds.clear()
self.camera_indices.clear()
self.face_results.clear()
def get_ai_status(self) -> dict[str, bool]:
with self.lock:
return dict(self.ai_models)
def get_ai_capabilities(self) -> dict[str, dict]:
"""Cloud engine: facial via browser frames only; YOLO/flow models unavailable."""
_cloud_note = "Requires local GPU vision engine (CEPHEUS_GPU_VISION=1)"
return {
"facial": {"supported": True, "mode": "browser_frames"},
"crowd": {"supported": False, "mode": "cloud_unavailable", "note": _cloud_note},
"medical": {"supported": False, "mode": "cloud_unavailable", "note": _cloud_note},
"anomaly": {"supported": False, "mode": "cloud_unavailable", "note": _cloud_note},
"fight": {"supported": False, "mode": "cloud_unavailable", "note": _cloud_note},
"fire": {
"supported": False,
"mode": "not_implemented",
"note": "Fire alerts use SOS/IOT integrations — not a CV model",
},
}
def set_camera(self, cam_id: str, index: int | str) -> bool:
"""Local OpenCV devices are unavailable in cloud mode; register browser feed slot."""
try:
idx = int(index) if not isinstance(index, int) else index
except (TypeError, ValueError):
idx = 0
self.mark_browser_feed_active(cam_id, max(idx, 0))
logger.info("CEPHEUS_CLOUD: browser camera slot registered for %s (index=%s)", cam_id, index)
return True
def release_camera(self, cam_id: str) -> None:
with self.lock:
self.active_cameras.pop(cam_id, None)
self.camera_indices.pop(cam_id, None)
self.latest_raw_frames.pop(cam_id, None)
self.browser_feeds.pop(cam_id, None)
def set_camera_for_client(self, client_id: str, cam_id: str, index: int | str) -> bool:
with self.lock:
self._client_cameras.setdefault(client_id, set()).add(cam_id)
self._cam_refcounts[cam_id] = self._cam_refcounts.get(cam_id, 0) + 1
self.mark_browser_feed_active(cam_id, 0 if str(index) == "browser" else index)
logger.info(
"CEPHEUS_CLOUD: browser camera slot for %s (client %s, index=%s)",
cam_id,
client_id[:8],
index,
)
return True
def release_camera_for_client(self, client_id: str, cam_id: str) -> None:
with self.lock:
client_cams = self._client_cameras.get(client_id)
if not client_cams or cam_id not in client_cams:
return
client_cams.discard(cam_id)
if not client_cams:
self._client_cameras.pop(client_id, None)
ref = self._cam_refcounts.get(cam_id, 1) - 1
if ref <= 0:
self._cam_refcounts.pop(cam_id, None)
self.active_cameras.pop(cam_id, None)
self.camera_indices.pop(cam_id, None)
self.latest_raw_frames.pop(cam_id, None)
self.browser_feeds.pop(cam_id, None)
else:
self._cam_refcounts[cam_id] = ref
def release_client_cameras(self, client_id: str) -> None:
for cam_id in list(self._client_cameras.get(client_id, set())):
self.release_camera_for_client(client_id, cam_id)
def get_active_frames(self) -> tuple[dict, list]:
return {}, []
def get_face_results(self) -> dict[str, list[dict]]:
return dict(self.face_results)
def search_missing_person(self, query_frame: np.ndarray) -> dict:
result = dict(self.face_engine.match_frame(query_frame))
result["search_mode"] = "database_only"
result["live_cameras_searched"] = len(self.active_cameras)
if not result.get("found") and not result.get("reason"):
result["reason"] = "No match in enrolled database."
return result
def warmload_models(self) -> dict:
ok = self.face_engine.app is not None
return {"stub": not ok, "success": True, "insightface": ok}
def process_browser_frame(self, cam_id: str, base64_str: str) -> None:
"""Handle WS track_frame from BrowserCameraBroadcaster (facial-only in cloud mode)."""
if not base64_str:
return
try:
import cv2
encoded = base64_str.split(",")[1] if "," in base64_str else base64_str
nparr = np.frombuffer(base64.b64decode(encoded), np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
return
with self.lock:
self.latest_raw_frames[cam_id] = frame.copy()
facial_on = self.ai_models.get("facial", False)
if not facial_on:
with self.lock:
self.face_results[cam_id] = []
return
fe = self.face_engine
if hasattr(fe, "match_all_faces"):
matches = fe.match_all_faces(frame)
else:
single = fe.match_frame(frame)
matches = [single] if single else []
faces_payload = []
for m in matches:
entry = {
"name": m.get("name", "Unknown"),
"confidence": round(float(m.get("confidence", 0.0)), 3),
"bbox": m.get("bbox", [0, 0, 0, 0]),
"found": bool(m.get("found", True)),
}
if m.get("embedding") is not None:
entry["embedding"] = m["embedding"]
faces_payload.append(entry)
with self.lock:
self.face_results[cam_id] = faces_payload
self.mark_browser_feed_active(cam_id, 0)
except Exception as e:
logger.error("process_browser_frame (cloud) error for %s: %s", cam_id, e)
def process_frame(self, base64_string: str) -> tuple[int, str]:
try:
encoded = base64_string.split(",")[1] if "," in base64_string else base64_string
nparr = np.frombuffer(base64.b64decode(encoded), np.uint8)
import cv2
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
return 0, base64_string
_, buf = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
return 0, f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}"
except Exception as e:
logger.error(f"process_frame (cloud): {e}")
return 0, base64_string