| """ |
| vision_engine.py β Single source of all camera frames and AI processing. |
| |
| Key principles: |
| - ONE cv2.VideoCapture per camera index. No other module opens cameras. |
| - AI processing (YOLO crowd count + face recognition) is gated by `ai_enabled`. |
| - When AI runs, detections are pushed into gossip_bridge for interaction tracking. |
| - gossip_bridge never opens a camera itself. |
| """ |
| import cv2 |
| import base64 |
| import numpy as np |
| import logging |
| import threading |
| import os |
| import sys |
| import uuid |
| from datetime import datetime, timezone |
| from ultralytics import YOLO |
|
|
| |
| _FR_DIR = os.path.join(os.path.dirname(__file__), "Face_Recognition") |
| if _FR_DIR not in sys.path: |
| sys.path.insert(0, _FR_DIR) |
| import gossip_bridge |
| from vision_runtime import detect_acceleration, insightface_ctx_id |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def _build_face_engine(): |
| """Prefer FaceMatcher (buffalo_sc, enrolled-first, unknown persistence) over legacy engine.""" |
| try: |
| from face_matcher import FaceMatcher |
|
|
| logger.info( |
| "VisionEngine using FaceMatcher (model=%s)", |
| os.getenv("FACE_MODEL_PACK", "buffalo_sc"), |
| ) |
| return FaceMatcher() |
| except Exception as exc: |
| logger.warning("FaceMatcher unavailable (%s) β using legacy FaceRecognitionEngine", exc) |
| return FaceRecognitionEngine() |
|
|
|
|
| |
| |
| |
|
|
| class FaceRecognitionEngine: |
| def __init__(self): |
| self.app = None |
| self.db: dict[str, np.ndarray] = {} |
| self.lock = threading.Lock() |
| self._load_model() |
| self._load_db() |
|
|
| def _load_model(self): |
| try: |
| from insightface.app import FaceAnalysis |
|
|
| accel = detect_acceleration() |
| ctx_id = insightface_ctx_id() |
| provider = "cuda" if ctx_id >= 0 else "cpu" |
| fa = FaceAnalysis(name="buffalo_l", allowed_modules=["detection", "recognition"]) |
| fa.prepare(ctx_id=ctx_id, det_size=(640, 640)) |
| self.app = fa |
| if provider == "cuda": |
| logger.info( |
| "InsightFace model loaded (provider=cuda, device=%s).", |
| accel.get("device_name") or "cuda:0", |
| ) |
| else: |
| reason = accel.get("fallback_reason") or "no GPU detected" |
| logger.info("InsightFace model loaded (provider=cpu, fallback=%s).", reason) |
| except Exception as e: |
| logger.warning(f"InsightFace not available ({e}). Face recognition disabled.") |
|
|
| def _load_db(self): |
| base = _FR_DIR |
| self.db = {} |
| for folder in ["faces_db", "temp_faces_db"]: |
| path = os.path.join(base, folder) |
| if not os.path.exists(path): |
| logger.warning(f"Face DB folder not found: {path}") |
| continue |
| files = [f for f in os.listdir(path) if f.endswith(".npy")] |
| for f in files: |
| name = f.replace(".npy", "") |
| try: |
| emb = np.load(os.path.join(path, f)) |
| self.db[name] = emb |
| logger.info(f"Loaded face: {name} from {folder}") |
| except Exception as e: |
| logger.error(f"Failed to load face {f}: {e}") |
| |
| if not self.db: |
| logger.warning("Face database is EMPTY. No faces will be recognized.") |
| else: |
| logger.info(f"Face DB loaded successfully: {list(self.db.keys())}") |
|
|
| @staticmethod |
| def _cosine(a: np.ndarray, b: np.ndarray) -> float: |
| na, nb = np.linalg.norm(a), np.linalg.norm(b) |
| if na == 0 or nb == 0: |
| return 0.0 |
| return float(np.dot(a, b) / (na * nb)) |
|
|
| def _allocate_unknown_id(self) -> int: |
| existing_ids = [] |
| for k in self.db.keys(): |
| if k.startswith("unknown_"): |
| try: |
| existing_ids.append(int(k.split("_")[1])) |
| except (IndexError, ValueError): |
| pass |
| base = _FR_DIR |
| for folder in ["faces_db", "temp_faces_db", "face_database", "temp_face_database"]: |
| path = os.path.join(base, folder) |
| if os.path.exists(path): |
| for item in os.listdir(path): |
| name = item.replace(".npy", "") |
| if name.startswith("unknown_"): |
| try: |
| existing_ids.append(int(name.split("_")[1])) |
| except (IndexError, ValueError): |
| pass |
| return max(existing_ids) + 1 if existing_ids else 1 |
|
|
| def recognize(self, frame: np.ndarray) -> list[dict]: |
| """Return recognized faces with bbox, name, confidence.""" |
| if self.app is None: |
| return [] |
| with self.lock: |
| try: |
| faces = self.app.get(frame) |
| except Exception as e: |
| logger.error(f"InsightFace inference error: {e}") |
| return [] |
|
|
| results = [] |
| for face in faces: |
| x1, y1, x2, y2 = face.bbox.astype(int) |
| emb = face.embedding |
| best_name = "Unknown" |
| best_score = 0.0 |
| for name, db_emb in self.db.items(): |
| |
| if db_emb.ndim == 1: |
| score = self._cosine(emb, db_emb) |
| else: |
| score = max((self._cosine(emb, v) for v in db_emb), default=0.0) |
| |
| if score > best_score: |
| best_score = score |
| best_name = name |
| |
| |
| threshold = 0.35 if best_name.startswith("unknown") else 0.30 |
| if best_score < threshold: |
| |
| next_id = self._allocate_unknown_id() |
| new_name = f"unknown_{next_id}" |
| |
| |
| self.db[new_name] = np.array([emb]) |
| |
| |
| emb_dir = os.path.join(_FR_DIR, "temp_faces_db") |
| os.makedirs(emb_dir, exist_ok=True) |
| np.save(os.path.join(emb_dir, f"{new_name}.npy"), np.array([emb])) |
| |
| |
| img_dir = os.path.join(_FR_DIR, "face_database", new_name) |
| os.makedirs(img_dir, exist_ok=True) |
| |
| h, w, _ = frame.shape |
| x1_c, y1_c = max(0, x1), max(0, y1) |
| x2_c, y2_c = min(w, x2), min(h, y2) |
| face_img = frame[y1_c:y2_c, x1_c:x2_c] |
| if face_img.size > 0: |
| cv2.imwrite(os.path.join(img_dir, "0.jpg"), face_img) |
| |
| best_name = new_name |
| best_score = 1.0 |
| |
| results.append({ |
| "name": best_name, |
| "confidence": round(best_score, 3), |
| "bbox": [int(x1), int(y1), int(x2), int(y2)], |
| }) |
| return results |
|
|
| def match_all_faces(self, frame: np.ndarray, threshold: float | None = None) -> list[dict]: |
| """Detect and identify every face in the frame. Parity with FaceMatcher.""" |
| return self.recognize(frame) |
|
|
|
|
| def register_face_from_frame(self, name: str, frame: np.ndarray) -> bool: |
| """ |
| Register a new face: |
| 1. Saves image to face_database/<name>/ |
| 2. Generates embeddings (including occluded version) |
| 3. Saves .npy to faces_db/ |
| 4. Updates in-memory DB |
| """ |
| if self.app is None: |
| return False |
| try: |
| import register_face |
| |
| |
| faces = self.app.get(frame) |
| if not faces: |
| logger.warning(f"Registration failed: No face detected in frame for '{name}'") |
| return False |
| primary_emb = faces[0].embedding |
| |
| |
| db_root = os.path.join(_FR_DIR, "face_database") |
| emb_root = os.path.join(_FR_DIR, "faces_db") |
| os.makedirs(db_root, exist_ok=True) |
| os.makedirs(emb_root, exist_ok=True) |
| |
| |
| temp_path = os.path.join(_FR_DIR, f"temp_reg_{uuid.uuid4().hex}.jpg") |
| cv2.imwrite(temp_path, frame) |
| |
| try: |
| |
| |
| new_embs = register_face.register_face( |
| name, temp_path, db_root=db_root, emb_root=emb_root, |
| known_embedding=primary_emb, app=self.app |
| ) |
| |
| |
| with self.lock: |
| self.db[name] = new_embs |
| |
| logger.info(f"Registered face '{name}': Image saved to database and embeddings generated.") |
| return True |
| finally: |
| if os.path.exists(temp_path): |
| os.remove(temp_path) |
| |
| except Exception as e: |
| logger.error(f"Error registering face: {e}") |
| return False |
|
|
| def search_missing_person(self, query_frame: np.ndarray, cam_frames: dict) -> dict: |
| if self.app is None: |
| return {"found": False, "reason": "Face recognition unavailable"} |
| try: |
| query_faces = self.app.get(query_frame) |
| except Exception as e: |
| return {"found": False, "reason": str(e)} |
| if not query_faces: |
| return {"found": False, "reason": "No face detected in query image"} |
|
|
| query_emb = max(query_faces, key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1])).embedding |
|
|
| best_name = None |
| best_score = 0.0 |
| for name, db_emb in self.db.items(): |
| if db_emb.ndim == 1: |
| score = self._cosine(query_emb, db_emb) |
| else: |
| score = max((self._cosine(query_emb, v) for v in db_emb), default=0.0) |
| if score > best_score: |
| best_score = score |
| best_name = name |
|
|
| threshold = 0.35 if (best_name and best_name.startswith("unknown")) else 0.30 |
| if best_name and best_score >= threshold: |
| return { |
| "found": True, |
| "name": best_name.replace("_", " "), |
| "confidence": round(best_score, 3), |
| "cam_id": "database", |
| "location": "Enrolled database", |
| "search_mode": "database", |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "reason": "Matched enrolled face database", |
| } |
|
|
| best_match = {"found": False, "cam_id": None, "confidence": round(best_score, 3), "timestamp": None} |
| for cam_id, frame in cam_frames.items(): |
| if frame is None: |
| continue |
| try: |
| live_faces = self.app.get(frame) |
| except Exception: |
| continue |
| for face in live_faces: |
| score = self._cosine(query_emb, face.embedding) |
| if score > best_match["confidence"]: |
| matched = score >= threshold |
| best_match = { |
| "found": matched, |
| "name": best_name.replace("_", " ") if best_name and matched else None, |
| "cam_id": cam_id, |
| "confidence": round(score, 3), |
| "timestamp": datetime.now(timezone.utc).isoformat(), |
| "location": cam_id, |
| "search_mode": "live" if matched else None, |
| } |
|
|
| if not best_match.get("found"): |
| best_match["reason"] = "Face detected but no match in enrolled database" |
| best_match["best_score"] = round(best_score, 3) |
| best_match["threshold"] = threshold |
| best_match["enrolled_count"] = len([k for k in self.db if not k.startswith("unknown_")]) |
| best_match.setdefault("search_mode", "none") |
| elif best_match.get("found"): |
| best_match.setdefault("search_mode", "live") |
| return best_match |
|
|
|
|
| |
| |
| |
|
|
| class VisionEngine: |
| def __init__(self): |
| model_path = os.getenv("YOLO_MODEL_PATH", "yolov5nu.pt") |
| logger.info("Loading YOLO model (%s)...", model_path) |
| try: |
| self.model = YOLO(model_path) |
| logger.info("YOLO model loaded.") |
| except Exception as e: |
| logger.error(f"Failed to load YOLO: {e}") |
| self.model = None |
|
|
| self.active_cameras: dict[str, cv2.VideoCapture] = {} |
| self.camera_indices: dict[str, int | str] = {} |
| self._client_cameras: dict[str, set[str]] = {} |
| self._cam_refcounts: dict[str, int] = {} |
| self.latest_raw_frames: dict[str, np.ndarray | None] = {} |
| self.browser_annotated_frames: dict[str, tuple[int, np.ndarray | None]] = {} |
| self.lock = threading.Lock() |
| self.open_lock = threading.Lock() |
| self._pending_opens: set[str] = set() |
|
|
| self.face_engine = _build_face_engine() |
| self.face_results: dict[str, list[dict]] = {} |
|
|
| |
| |
| |
| |
| self.ai_models: dict[str, bool] = { |
| "crowd": True, |
| "facial": True, |
| "fight": False, |
| "anomaly": False, |
| "medical": False, |
| } |
| |
| |
| self.fall_state: dict[str, int] = {} |
| self.prev_gray: dict[str, np.ndarray] = {} |
| self.last_event_ts: dict[str, datetime] = {} |
| self.last_total_count: int = 0 |
| self.last_crowd_count: int = 0 |
| self.last_face_count: int = 0 |
| self.room_stats: dict[str, dict] = {} |
|
|
| accel = detect_acceleration() |
| if self.model is not None and accel["cuda_available"]: |
| logger.info( |
| "YOLO will use CUDA (device=%s).", |
| accel.get("device_name") or "cuda:0", |
| ) |
| elif self.model is not None: |
| logger.info( |
| "YOLO using CPU (fallback=%s).", |
| accel.get("fallback_reason") or "no GPU", |
| ) |
|
|
| def warmload_models(self) -> dict: |
| """Run lightweight dummy inference to populate GPU memory and warm ONNX/torch.""" |
| accel = detect_acceleration() |
| result: dict = { |
| "yolo": False, |
| "insightface": False, |
| "provider": accel.get("provider", "cpu"), |
| "cuda_available": accel.get("cuda_available", False), |
| "device_name": accel.get("device_name"), |
| "fallback_reason": accel.get("fallback_reason"), |
| } |
| dummy_bgr = np.zeros((480, 640, 3), dtype=np.uint8) |
|
|
| if self.model is not None: |
| try: |
| self.model.predict(dummy_bgr, verbose=False) |
| result["yolo"] = True |
| logger.info("Warmload: YOLO dummy inference OK (provider=%s).", result["provider"]) |
| except Exception as exc: |
| logger.warning("Warmload: YOLO dummy inference failed: %s", exc) |
| result["yolo_error"] = str(exc) |
|
|
| if self.face_engine.app is not None: |
| try: |
| |
| if hasattr(self.face_engine, "_force_prepare"): |
| self.face_engine._force_prepare() |
|
|
| |
| dummy_insight = np.zeros((320, 320, 3), dtype=np.uint8) |
| with self.face_engine.lock: |
| faces = self.face_engine.app.get(dummy_insight) |
| |
| det_ok = True |
| rec_ok = True |
|
|
| |
| import numpy as _np |
| for _mname, _mobj in self.face_engine.app.models.items(): |
| _session = getattr(_mobj, 'session', None) |
| if _session is not None: |
| try: |
| _inp_meta = _session.get_inputs()[0] |
| _shape = [1 if (d is None or isinstance(d, str)) else d |
| for d in _inp_meta.shape] |
| _dummy = _np.zeros(_shape, dtype=_np.float32) |
| _session.run(None, {_inp_meta.name: _dummy}) |
| logger.info("Warmload: InsightFace model '%s' ONNX session force-run OK", _mname) |
| except Exception as _exc: |
| logger.warning("Warmload: InsightFace model '%s' ONNX force-run skipped: %s", |
| _mname, _exc) |
| else: |
| logger.warning("Warmload: InsightFace model '%s' has no session attribute β " |
| "may still lazy-init", _mname) |
|
|
| result["insightface"] = True |
| ctx = insightface_ctx_id() |
| logger.info( |
| "Warmload: InsightFace force-warmload OK (ctx_id=%s, faces on blank: %d, det_ok: %s, rec_ok: %s).", |
| ctx, len(faces), det_ok, rec_ok |
| ) |
| except Exception as exc: |
| logger.warning("Warmload: InsightFace dummy inference failed: %s", exc) |
| result["insightface_error"] = str(exc) |
|
|
| ok = result["yolo"] or result["insightface"] or (self.model is None and self.face_engine.app is None) |
| result["success"] = bool(ok) |
| if result["success"]: |
| logger.info("Startup warmload success: %s", result) |
| else: |
| logger.warning("Startup warmload incomplete: %s", result) |
| return result |
|
|
| |
|
|
| def toggle_ai_model(self, model_id: str) -> bool: |
| """Toggle a specific AI model on/off. Returns new state.""" |
| with self.lock: |
| current = self.ai_models.get(model_id, False) |
| self.ai_models[model_id] = not current |
| logger.info(f"AI model '{model_id}' β {'ON' if not current else 'OFF'}") |
| return not current |
|
|
| def set_ai_model(self, model_id: str, enabled: bool): |
| with self.lock: |
| self.ai_models[model_id] = enabled |
| logger.info(f"AI model '{model_id}' set to {'ON' if enabled else 'OFF'}") |
|
|
| def get_ai_status(self) -> dict[str, bool]: |
| with self.lock: |
| return dict(self.ai_models) |
|
|
| def get_ai_capabilities(self) -> dict[str, dict]: |
| """Per-model support metadata for UI and cloud/local deployment hints.""" |
| yolo_ok = self.model is not None |
| return { |
| "facial": {"supported": True, "mode": "browser_frames"}, |
| "crowd": {"supported": yolo_ok, "mode": "yolo" if yolo_ok else "unavailable"}, |
| "medical": {"supported": yolo_ok, "mode": "heuristic_fall" if yolo_ok else "unavailable"}, |
| "anomaly": {"supported": True, "mode": "optical_flow"}, |
| "fight": {"supported": yolo_ok, "mode": "heuristic_proximity" if yolo_ok else "unavailable"}, |
| "fire": { |
| "supported": False, |
| "mode": "not_implemented", |
| "note": "Fire alerts use SOS/IOT integrations β not a CV model", |
| }, |
| } |
|
|
| |
|
|
| def _release_camera_internal(self, cam_id: str): |
| if cam_id in self.active_cameras: |
| self.active_cameras[cam_id].release() |
| del self.active_cameras[cam_id] |
| self.camera_indices.pop(cam_id, None) |
| self.latest_raw_frames.pop(cam_id, None) |
| logger.info(f"Released camera {cam_id}") |
|
|
| @staticmethod |
| def _open_with_timeout(index, backend, timeout_s: float = 10.0): |
| """Open a cv2.VideoCapture in a daemon thread with a timeout. |
| Returns the cap if successful, None on timeout or failure.""" |
| result = [None] |
| def _worker(): |
| try: |
| cap = cv2.VideoCapture(index, backend) |
| result[0] = cap |
| except Exception: |
| result[0] = None |
|
|
| t = threading.Thread(target=_worker, daemon=True) |
| t.start() |
| t.join(timeout=timeout_s) |
| if t.is_alive(): |
| logger.warning(f"cv2.VideoCapture({index}, ...) timed out after {timeout_s}s") |
| return None |
| return result[0] |
|
|
| def _open_cap(self, index) -> cv2.VideoCapture | None: |
| """Try backends in order: DSHOW (fastest on Windows) β MSMF β ANY. |
| Called OUTSIDE the lock to avoid blocking the entire engine. |
| Each cv2.VideoCapture attempt is wrapped in a 10-second timeout.""" |
| if isinstance(index, str) and not index.isdigit(): |
| backends = [("FFMPEG", cv2.CAP_FFMPEG, 15), ("ANY", cv2.CAP_ANY, 15)] |
| else: |
| index = int(index) |
| backends = [ |
| ("ANY", cv2.CAP_ANY, 10), |
| ("MSMF", cv2.CAP_MSMF, 12), |
| ("DSHOW", cv2.CAP_DSHOW, 8), |
| ] |
| for name, backend, timeout in backends: |
| try: |
| logger.info(f"Trying camera index {index} via {name} (timeout {timeout}s)...") |
| cap = self._open_with_timeout(index, backend, timeout_s=timeout) |
| if cap is None: |
| continue |
| if cap.isOpened(): |
| |
| |
| |
| |
| import time |
| time.sleep(1.5) |
| |
| |
| ret = False |
| for _ in range(5): |
| ret, _ = cap.read() |
| if ret: |
| break |
| time.sleep(0.5) |
| |
| if ret: |
| logger.info(f"Camera index {index} opened via {name} β") |
| return cap |
| else: |
| logger.warning(f"Camera index {index} opened via {name} but read() failed after retries") |
| cap.release() |
| else: |
| cap.release() |
| except Exception as e: |
| logger.warning(f"Backend {name} failed for index {index}: {e}") |
| return None |
|
|
| |
|
|
| def set_camera(self, cam_id: str, index) -> bool: |
| if isinstance(index, str): |
| if index == "browser": |
| pass |
| elif index.isdigit(): |
| index = int(index) |
| elif index.lower().startswith("rstp://"): |
| index = "rtsp://" + index[7:] |
| elif index.lower().startswith("rtsp://") and not index.startswith("rtsp://"): |
| index = "rtsp://" + index[7:] |
|
|
| |
| with self.lock: |
| if cam_id in self._pending_opens: |
| logger.info(f"Camera {cam_id} is already being opened, skipping duplicate request") |
| return False |
|
|
| if cam_id in self.active_cameras and self.camera_indices.get(cam_id) == index: |
| if self.active_cameras[cam_id].isOpened(): |
| logger.info(f"Camera {cam_id} already open at index {index}") |
| return True |
|
|
| |
| for other_id, other_idx in list(self.camera_indices.items()): |
| if other_id != cam_id and other_idx == index: |
| logger.info(f"Releasing {other_id} (index {index}) β reassigning to {cam_id}") |
| self._release_camera_internal(other_id) |
|
|
| |
| if cam_id in self.active_cameras: |
| self._release_camera_internal(cam_id) |
| if index == "browser": |
| self.camera_indices[cam_id] = index |
| self.latest_raw_frames[cam_id] = None |
| self.browser_annotated_frames[cam_id] = (0, None) |
| logger.info(f"Camera {cam_id} β index {index} (Browser Feed) OK") |
| return True |
|
|
| self._pending_opens.add(cam_id) |
|
|
| |
| |
| logger.info(f"Opening camera {cam_id} at index {index}...") |
| try: |
| with self.open_lock: |
| cap = self._open_cap(index) |
| finally: |
| with self.lock: |
| self._pending_opens.discard(cam_id) |
|
|
| with self.lock: |
|
|
| if cap: |
| self.active_cameras[cam_id] = cap |
| self.camera_indices[cam_id] = index |
| self.latest_raw_frames[cam_id] = None |
| logger.info(f"Camera {cam_id} β index {index} OK") |
| return True |
|
|
| logger.error(f"All backends failed for {cam_id} at index {index}") |
| return False |
|
|
| def process_browser_frame(self, cam_id: str, base64_str: str): |
| if not base64_str: |
| return |
| try: |
| import base64 |
| encoded = base64_str.split(",")[1] if "," in base64_str else base64_str |
| nparr = np.frombuffer(base64.b64decode(encoded), np.uint8) |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if frame is None: |
| return |
|
|
| with self.lock: |
| self.latest_raw_frames[cam_id] = frame.copy() |
| crowd_on = self.ai_models.get("crowd", True) |
| facial_on = self.ai_models.get("facial", False) |
| medical_on = self.ai_models.get("medical", False) |
| anomaly_on = self.ai_models.get("anomaly", False) |
| fight_on = self.ai_models.get("fight", False) |
|
|
| annotated = frame.copy() |
| count = 0 |
| boxes = [] |
|
|
| if crowd_on or medical_on or anomaly_on or fight_on: |
| count, annotated, boxes = self._run_yolo(annotated) |
|
|
| if medical_on: |
| annotated, _ = self._run_medical_fall(annotated, cam_id, boxes) |
| if anomaly_on: |
| annotated, _ = self._run_anomaly_heuristic(annotated, cam_id, boxes) |
| if fight_on: |
| annotated, _ = self._run_fight_heuristic(annotated, cam_id, boxes) |
|
|
| if facial_on: |
| fe = self.face_engine |
| try: |
| if hasattr(fe, "match_all_faces"): |
| faces = fe.match_all_faces(annotated) |
| elif hasattr(fe, "recognize"): |
| faces = fe.recognize(annotated) |
| else: |
| faces = [] |
| except (AttributeError, TypeError) as e: |
| logger.warning("[WS] face match failed: %s β returning empty result", e) |
| faces = [] |
| with self.lock: |
| self.face_results[cam_id] = faces if isinstance(faces, list) else [] |
| if isinstance(faces, list): |
| annotated = self._annotate_faces(annotated, faces) |
| else: |
| with self.lock: |
| self.face_results[cam_id] = [] |
|
|
| with self.lock: |
| self.browser_annotated_frames[cam_id] = (count, annotated) |
| except Exception as e: |
| logger.error(f"process_browser_frame error for {cam_id}: {e}") |
|
|
| def release_camera(self, cam_id: str): |
| with self.lock: |
| self._release_camera_internal(cam_id) |
|
|
| def set_camera_for_client(self, client_id: str, cam_id: str, index) -> bool: |
| """Per-client camera registration β browser feeds do not steal from other clients.""" |
| if isinstance(index, str): |
| if index == "browser": |
| pass |
| elif index.isdigit(): |
| index = int(index) |
| elif index.lower().startswith("rstp://"): |
| index = "rtsp://" + index[7:] |
| elif index.lower().startswith("rtsp://") and not index.startswith("rtsp://"): |
| index = "rtsp://" + index[7:] |
|
|
| if index == "browser": |
| with self.lock: |
| self._client_cameras.setdefault(client_id, set()).add(cam_id) |
| self._cam_refcounts[cam_id] = self._cam_refcounts.get(cam_id, 0) + 1 |
| self.camera_indices[cam_id] = "browser" |
| self.latest_raw_frames[cam_id] = None |
| self.browser_annotated_frames[cam_id] = (0, None) |
| logger.info("Camera %s β index browser (client %s) OK", cam_id, client_id[:8]) |
| return True |
|
|
| with self.lock: |
| self._client_cameras.setdefault(client_id, set()).add(cam_id) |
| return self.set_camera(cam_id, index) |
|
|
| def release_camera_for_client(self, client_id: str, cam_id: str) -> None: |
| with self.lock: |
| client_cams = self._client_cameras.get(client_id) |
| if not client_cams or cam_id not in client_cams: |
| return |
| client_cams.discard(cam_id) |
| if not client_cams: |
| self._client_cameras.pop(client_id, None) |
| idx = self.camera_indices.get(cam_id) |
|
|
| if idx == "browser": |
| with self.lock: |
| ref = self._cam_refcounts.get(cam_id, 1) - 1 |
| if ref <= 0: |
| self._cam_refcounts.pop(cam_id, None) |
| self._release_camera_internal(cam_id) |
| else: |
| self._cam_refcounts[cam_id] = ref |
| return |
|
|
| with self.lock: |
| other_holders = any( |
| cam_id in cams for cid, cams in self._client_cameras.items() if cid != client_id |
| ) |
| if not other_holders: |
| self.release_camera(cam_id) |
|
|
| def release_client_cameras(self, client_id: str) -> None: |
| for cam_id in list(self._client_cameras.get(client_id, set())): |
| self.release_camera_for_client(client_id, cam_id) |
|
|
| |
|
|
| def _run_yolo(self, frame: np.ndarray) -> tuple[int, np.ndarray, list]: |
| """Run crowd detection YOLO. Only called when crowd or medical model is enabled.""" |
| if frame is None or self.model is None: |
| return 0, frame, [] |
| results = self.model(frame, verbose=False, classes=[0]) |
| count = 0 |
| boxes_out = [] |
| for r in results: |
| for box in r.boxes: |
| count += 1 |
| x1, y1, x2, y2 = map(int, box.xyxy[0]) |
| boxes_out.append((x1, y1, x2, y2)) |
| cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) |
| cv2.putText(frame, f"People: {count}", (10, 30), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) |
| return count, frame, boxes_out |
|
|
| def _run_medical_fall(self, frame: np.ndarray, cam_id: str, boxes: list) -> tuple[np.ndarray, bool]: |
| fall_detected = False |
| for (x1, y1, x2, y2) in boxes: |
| w = x2 - x1 |
| h = y2 - y1 |
| if w > h * 1.5: |
| fall_detected = True |
| cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 3) |
| cv2.putText(frame, "FALL DETECTED", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) |
| |
| if cam_id not in self.fall_state: |
| self.fall_state[cam_id] = 0 |
| |
| event_triggered = False |
| if fall_detected: |
| self.fall_state[cam_id] += 1 |
| if self.fall_state[cam_id] == 20: |
| event_key = f"{cam_id}_fall" |
| now = datetime.now() |
| last = self.last_event_ts.get(event_key, datetime.min) |
| if (now - last).total_seconds() > 60: |
| logger.warning(f"MEDICAL EMERGENCY: Persistent fall detected on {cam_id}") |
| self.last_event_ts[event_key] = now |
| event_triggered = True |
| else: |
| self.fall_state[cam_id] = max(0, self.fall_state[cam_id] - 1) |
| |
| return frame, event_triggered |
|
|
| def _run_stampede_flow(self, frame: np.ndarray, cam_id: str, orig_frame: np.ndarray) -> tuple[np.ndarray, bool]: |
| gray = cv2.cvtColor(orig_frame, cv2.COLOR_BGR2GRAY) |
| gray = cv2.resize(gray, (320, 240)) |
| |
| if cam_id not in self.prev_gray: |
| self.prev_gray[cam_id] = gray |
| return frame, False |
| |
| prev = self.prev_gray[cam_id] |
| flow = cv2.calcOpticalFlowFarneback(prev, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) |
| self.prev_gray[cam_id] = gray |
| |
| mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1]) |
| avg_mag = np.mean(mag) |
| |
| event_triggered = False |
| if avg_mag > 2.8: |
| cv2.putText(frame, "STAMPEDE / CHAOS DETECTED", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 165, 255), 2) |
| event_key = f"{cam_id}_stampede" |
| now = datetime.now() |
| last = self.last_event_ts.get(event_key, datetime.min) |
| if (now - last).total_seconds() > 60: |
| logger.warning(f"STAMPEDE DETECTED on {cam_id}") |
| self.last_event_ts[event_key] = now |
| event_triggered = True |
| |
| return frame, event_triggered |
|
|
| def _run_fight_heuristic( |
| self, frame: np.ndarray, cam_id: str, boxes: list |
| ) -> tuple[np.ndarray, bool]: |
| """Experimental fight stub: flag when multiple persons are extremely close.""" |
| if len(boxes) < 2: |
| return frame, False |
|
|
| close_pairs = 0 |
| for i in range(len(boxes)): |
| for j in range(i + 1, len(boxes)): |
| x1a, y1a, x2a, y2a = boxes[i] |
| x1b, y1b, x2b, y2b = boxes[j] |
| cax, cay = (x1a + x2a) / 2, (y1a + y2a) / 2 |
| cbx, cby = (x1b + x2b) / 2, (y1b + y2b) / 2 |
| dist = ((cax - cbx) ** 2 + (cay - cby) ** 2) ** 0.5 |
| scale = max(1.0, ((x2a - x1a) + (x2b - x1b)) / 2) |
| if dist < scale * 0.55: |
| close_pairs += 1 |
| cv2.rectangle(frame, (x1a, y1a), (x2a, y2a), (0, 0, 255), 3) |
| cv2.rectangle(frame, (x1b, y1b), (x2b, y2b), (0, 0, 255), 3) |
|
|
| if close_pairs < 1: |
| return frame, False |
|
|
| cv2.putText( |
| frame, "FIGHT? (heuristic)", (10, 90), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, |
| ) |
| event_key = f"{cam_id}_fight" |
| now = datetime.now() |
| last = self.last_event_ts.get(event_key, datetime.min) |
| if (now - last).total_seconds() > 90: |
| self.last_event_ts[event_key] = now |
| logger.warning("FIGHT HEURISTIC triggered on %s", cam_id) |
| return frame, True |
| return frame, False |
|
|
| def _annotate_faces(self, frame: np.ndarray, faces: list[dict]) -> np.ndarray: |
| for f in faces: |
| x1, y1, x2, y2 = f["bbox"] |
| name_disp = f["name"] |
| if name_disp.startswith("unknown_"): |
| try: |
| num = name_disp.split("_")[1] |
| name_disp = f"Unknown #{num}" |
| except IndexError: |
| pass |
| label = f'{name_disp} ({f["confidence"]:.2f})' |
| is_unknown = f["name"].startswith("unknown") |
| color = (0, 140, 255) if is_unknown else (0, 255, 0) |
| cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) |
| cv2.putText(frame, label, (x1, max(y1 - 10, 15)), |
| cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 1) |
| return frame |
|
|
| def get_active_frames(self) -> tuple[dict[str, tuple[int, np.ndarray | None]], list[dict]]: |
| """ |
| Read one frame from each active camera. |
| Returns: |
| (results_dict, list_of_new_alerts) |
| """ |
| results: dict[str, tuple[int, np.ndarray | None]] = {} |
| new_alerts = [] |
|
|
| with self.lock: |
| browser_feeds = [cid for cid, idx in self.camera_indices.items() if idx == "browser"] |
| if not self.active_cameras and not browser_feeds: |
| return results, new_alerts |
| cam_items = list(self.active_cameras.items()) |
| crowd_on = self.ai_models.get("crowd", False) |
| facial_on = self.ai_models.get("facial", False) |
| medical_on = self.ai_models.get("medical", False) |
| anomaly_on = self.ai_models.get("anomaly", False) |
| fight_on = self.ai_models.get("fight", False) |
|
|
| |
| if not hasattr(self, "_last_status_log"): self._last_status_log = 0 |
| self._last_status_log += 1 |
| if self._last_status_log % 50 == 0: |
| logger.info(f"Vision Engine AI Status: Crowd={crowd_on}, Facial={facial_on}, Medical={medical_on}") |
|
|
| for cam_id, cap in cam_items: |
| try: |
| ret, frame = cap.read() |
| except Exception as e: |
| logger.warning(f"cap.read() error for {cam_id}: {e}") |
| ret, frame = False, None |
|
|
| if not ret or frame is None: |
| results[cam_id] = (0, None) |
| continue |
|
|
| |
| with self.lock: |
| self.latest_raw_frames[cam_id] = frame.copy() |
|
|
| annotated = frame.copy() |
| count = 0 |
|
|
| |
| boxes = [] |
| if crowd_on or medical_on or fight_on: |
| count, annotated, boxes = self._run_yolo(annotated) |
|
|
| if fight_on: |
| annotated, fight_trig = self._run_fight_heuristic(annotated, cam_id, boxes) |
| if fight_trig: |
| new_alerts.append({ |
| "type": "fight", |
| "severity": "high", |
| "location": cam_id, |
| "message": ( |
| f"POSSIBLE ALTERCATION (heuristic): close proximity of multiple " |
| f"persons on {cam_id} β verify manually" |
| ), |
| }) |
|
|
| if medical_on: |
| annotated, fall_trig = self._run_medical_fall(annotated, cam_id, boxes) |
| if fall_trig: |
| new_alerts.append({ |
| "type": "medical", |
| "severity": "critical", |
| "location": cam_id, |
| "message": f"WARNING: MEDICAL EMERGENCY: Persistent fall detected on {cam_id}" |
| }) |
| |
| |
| if anomaly_on: |
| annotated, stampede_trig = self._run_stampede_flow(annotated, cam_id, frame) |
| if stampede_trig: |
| new_alerts.append({ |
| "type": "stampede", |
| "severity": "critical", |
| "location": cam_id, |
| "message": f"WARNING: CROWD ANOMALY: Stampede/chaos detected on {cam_id}" |
| }) |
|
|
| |
| faces: list[dict] = [] |
| if facial_on: |
| fe = self.face_engine |
| if hasattr(fe, "match_all_faces"): |
| faces = fe.match_all_faces(frame) |
| elif hasattr(fe, "recognize"): |
| faces = fe.recognize(frame) |
| else: |
| faces = [] |
| |
| |
| for f in faces: |
| |
| |
| small_frame = cv2.resize(frame, (640, int(640 * frame.shape[0] / frame.shape[1]))) |
| _, buf = cv2.imencode(".jpg", small_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 65]) |
| f["thumbnail"] = f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}" |
|
|
| annotated = self._annotate_faces(annotated, faces) |
|
|
| |
| names = [f["name"] for f in faces] |
| bboxes = [tuple(f["bbox"]) for f in faces] |
| if names: |
| gossip_bridge.on_detections(cam_id, names, bboxes, frame.shape[1]) |
|
|
| self.face_results[cam_id] = faces |
| results[cam_id] = (count, annotated) |
|
|
| |
| with self.lock: |
| for cid in [k for k, v in self.camera_indices.items() if v == "browser"]: |
| if cid in self.browser_annotated_frames: |
| results[cid] = self.browser_annotated_frames[cid] |
|
|
| return results, new_alerts |
|
|
| def get_face_results(self) -> dict[str, list[dict]]: |
| return dict(self.face_results) |
|
|
| def search_missing_person(self, query_frame: np.ndarray) -> dict: |
| fe = self.face_engine |
| if hasattr(fe, "search_missing_person") and not hasattr(fe, "match_all_faces"): |
| with self.lock: |
| live_frames = {k: v for k, v in self.latest_raw_frames.items() if v is not None} |
| return fe.search_missing_person(query_frame, live_frames) |
| if hasattr(fe, "ensure_db"): |
| fe.ensure_db() |
| db_match = dict(fe.match_frame(query_frame)) |
| if db_match.get("found"): |
| db_match.setdefault("search_mode", "database") |
| return db_match |
| try: |
| import face_live_search |
|
|
| live = face_live_search.search_query_in_live_feeds(query_frame, fe, self) |
| if live.get("found"): |
| return live |
| except Exception as exc: |
| logger.debug("live search fallback skipped: %s", exc) |
| db_match.setdefault("search_mode", "database") |
| return db_match |
|
|
| |
|
|
| def process_frame(self, base64_string: str) -> tuple[int, str]: |
| try: |
| encoded = base64_string.split(",")[1] if "," in base64_string else base64_string |
| nparr = np.frombuffer(base64.b64decode(encoded), np.uint8) |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| count, annotated, _ = self._run_yolo(frame.copy()) if self.model else (0, frame, []) |
| if annotated is None: |
| return 0, base64_string |
| _, buf = cv2.imencode(".jpg", annotated, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) |
| return count, f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}" |
| except Exception as e: |
| logger.error(f"process_frame error: {e}") |
| return 0, base64_string |
|
|