""" vision_engine.py — Single source of all camera frames and AI processing. Key principles: - ONE cv2.VideoCapture per camera index. No other module opens cameras. - AI processing (YOLO crowd count + face recognition) is gated by `ai_enabled`. - When AI runs, detections are pushed into gossip_bridge for interaction tracking. - gossip_bridge never opens a camera itself. """ import cv2 import base64 import numpy as np import logging import threading import os import sys import uuid from datetime import datetime, timezone from ultralytics import YOLO # ── Add Face_Recognition/ to path so gossip_bridge can be imported _FR_DIR = os.path.join(os.path.dirname(__file__), "Face_Recognition") if _FR_DIR not in sys.path: sys.path.insert(0, _FR_DIR) import gossip_bridge # noqa: E402 from vision_runtime import detect_acceleration, insightface_ctx_id logger = logging.getLogger(__name__) def _build_face_engine(): """Prefer FaceMatcher (buffalo_sc, enrolled-first, unknown persistence) over legacy engine.""" try: from face_matcher import FaceMatcher logger.info( "VisionEngine using FaceMatcher (model=%s)", os.getenv("FACE_MODEL_PACK", "buffalo_sc"), ) return FaceMatcher() except Exception as exc: logger.warning("FaceMatcher unavailable (%s) — using legacy FaceRecognitionEngine", exc) return FaceRecognitionEngine() # ───────────────────────────────────────────────────────────────────────────── # Face Recognition Engine (lazy-loads InsightFace) # ───────────────────────────────────────────────────────────────────────────── class FaceRecognitionEngine: def __init__(self): self.app = None self.db: dict[str, np.ndarray] = {} self.lock = threading.Lock() self._load_model() self._load_db() def _load_model(self): try: from insightface.app import FaceAnalysis # type: ignore accel = detect_acceleration() ctx_id = insightface_ctx_id() provider = "cuda" if ctx_id >= 0 else "cpu" fa = FaceAnalysis(name="buffalo_l", allowed_modules=["detection", "recognition"]) fa.prepare(ctx_id=ctx_id, det_size=(640, 640)) self.app = fa if provider == "cuda": logger.info( "InsightFace model loaded (provider=cuda, device=%s).", accel.get("device_name") or "cuda:0", ) else: reason = accel.get("fallback_reason") or "no GPU detected" logger.info("InsightFace model loaded (provider=cpu, fallback=%s).", reason) except Exception as e: logger.warning(f"InsightFace not available ({e}). Face recognition disabled.") def _load_db(self): base = _FR_DIR self.db = {} for folder in ["faces_db", "temp_faces_db"]: path = os.path.join(base, folder) if not os.path.exists(path): logger.warning(f"Face DB folder not found: {path}") continue files = [f for f in os.listdir(path) if f.endswith(".npy")] for f in files: name = f.replace(".npy", "") try: emb = np.load(os.path.join(path, f)) self.db[name] = emb logger.info(f"Loaded face: {name} from {folder}") except Exception as e: logger.error(f"Failed to load face {f}: {e}") if not self.db: logger.warning("Face database is EMPTY. No faces will be recognized.") else: logger.info(f"Face DB loaded successfully: {list(self.db.keys())}") @staticmethod def _cosine(a: np.ndarray, b: np.ndarray) -> float: na, nb = np.linalg.norm(a), np.linalg.norm(b) if na == 0 or nb == 0: return 0.0 return float(np.dot(a, b) / (na * nb)) def _allocate_unknown_id(self) -> int: existing_ids = [] for k in self.db.keys(): if k.startswith("unknown_"): try: existing_ids.append(int(k.split("_")[1])) except (IndexError, ValueError): pass base = _FR_DIR for folder in ["faces_db", "temp_faces_db", "face_database", "temp_face_database"]: path = os.path.join(base, folder) if os.path.exists(path): for item in os.listdir(path): name = item.replace(".npy", "") if name.startswith("unknown_"): try: existing_ids.append(int(name.split("_")[1])) except (IndexError, ValueError): pass return max(existing_ids) + 1 if existing_ids else 1 def recognize(self, frame: np.ndarray) -> list[dict]: """Return recognized faces with bbox, name, confidence.""" if self.app is None: return [] with self.lock: try: faces = self.app.get(frame) except Exception as e: logger.error(f"InsightFace inference error: {e}") return [] results = [] for face in faces: x1, y1, x2, y2 = face.bbox.astype(int) emb = face.embedding best_name = "Unknown" best_score = 0.0 for name, db_emb in self.db.items(): # db_emb can be (512,) or (N, 512) if db_emb.ndim == 1: score = self._cosine(emb, db_emb) else: score = max((self._cosine(emb, v) for v in db_emb), default=0.0) if score > best_score: best_score = score best_name = name # Conservative threshold to reduce false-positive identity matches. threshold = 0.35 if best_name.startswith("unknown") else 0.30 if best_score < threshold: # Assign a new persistent unknown identity next_id = self._allocate_unknown_id() new_name = f"unknown_{next_id}" # Update in-memory db self.db[new_name] = np.array([emb]) # Save embedding on disk emb_dir = os.path.join(_FR_DIR, "temp_faces_db") os.makedirs(emb_dir, exist_ok=True) np.save(os.path.join(emb_dir, f"{new_name}.npy"), np.array([emb])) # Save crop image to face_database img_dir = os.path.join(_FR_DIR, "face_database", new_name) os.makedirs(img_dir, exist_ok=True) h, w, _ = frame.shape x1_c, y1_c = max(0, x1), max(0, y1) x2_c, y2_c = min(w, x2), min(h, y2) face_img = frame[y1_c:y2_c, x1_c:x2_c] if face_img.size > 0: cv2.imwrite(os.path.join(img_dir, "0.jpg"), face_img) best_name = new_name best_score = 1.0 # Initial creation match confidence results.append({ "name": best_name, "confidence": round(best_score, 3), "bbox": [int(x1), int(y1), int(x2), int(y2)], }) return results def match_all_faces(self, frame: np.ndarray, threshold: float | None = None) -> list[dict]: """Detect and identify every face in the frame. Parity with FaceMatcher.""" return self.recognize(frame) def register_face_from_frame(self, name: str, frame: np.ndarray) -> bool: """ Register a new face: 1. Saves image to face_database// 2. Generates embeddings (including occluded version) 3. Saves .npy to faces_db/ 4. Updates in-memory DB """ if self.app is None: return False try: import register_face # Detect face to get initial embedding (to ensure it works and for robustness) faces = self.app.get(frame) if not faces: logger.warning(f"Registration failed: No face detected in frame for '{name}'") return False primary_emb = faces[0].embedding # Define paths db_root = os.path.join(_FR_DIR, "face_database") emb_root = os.path.join(_FR_DIR, "faces_db") os.makedirs(db_root, exist_ok=True) os.makedirs(emb_root, exist_ok=True) # Save temporary image for the register_face module to process temp_path = os.path.join(_FR_DIR, f"temp_reg_{uuid.uuid4().hex}.jpg") cv2.imwrite(temp_path, frame) try: # Call the specialized register_face logic (handles folder creation, image copying, occlusion) # We pass self.app to avoid redundant model loading new_embs = register_face.register_face( name, temp_path, db_root=db_root, emb_root=emb_root, known_embedding=primary_emb, app=self.app ) # Update in-memory DB so it's available immediately without restart with self.lock: self.db[name] = new_embs logger.info(f"Registered face '{name}': Image saved to database and embeddings generated.") return True finally: if os.path.exists(temp_path): os.remove(temp_path) except Exception as e: logger.error(f"Error registering face: {e}") return False def search_missing_person(self, query_frame: np.ndarray, cam_frames: dict) -> dict: if self.app is None: return {"found": False, "reason": "Face recognition unavailable"} try: query_faces = self.app.get(query_frame) except Exception as e: return {"found": False, "reason": str(e)} if not query_faces: return {"found": False, "reason": "No face detected in query image"} query_emb = max(query_faces, key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1])).embedding best_name = None best_score = 0.0 for name, db_emb in self.db.items(): if db_emb.ndim == 1: score = self._cosine(query_emb, db_emb) else: score = max((self._cosine(query_emb, v) for v in db_emb), default=0.0) if score > best_score: best_score = score best_name = name threshold = 0.35 if (best_name and best_name.startswith("unknown")) else 0.30 if best_name and best_score >= threshold: return { "found": True, "name": best_name.replace("_", " "), "confidence": round(best_score, 3), "cam_id": "database", "location": "Enrolled database", "search_mode": "database", "timestamp": datetime.now(timezone.utc).isoformat(), "reason": "Matched enrolled face database", } best_match = {"found": False, "cam_id": None, "confidence": round(best_score, 3), "timestamp": None} for cam_id, frame in cam_frames.items(): if frame is None: continue try: live_faces = self.app.get(frame) except Exception: continue for face in live_faces: score = self._cosine(query_emb, face.embedding) if score > best_match["confidence"]: matched = score >= threshold best_match = { "found": matched, "name": best_name.replace("_", " ") if best_name and matched else None, "cam_id": cam_id, "confidence": round(score, 3), "timestamp": datetime.now(timezone.utc).isoformat(), "location": cam_id, "search_mode": "live" if matched else None, } if not best_match.get("found"): best_match["reason"] = "Face detected but no match in enrolled database" best_match["best_score"] = round(best_score, 3) best_match["threshold"] = threshold best_match["enrolled_count"] = len([k for k in self.db if not k.startswith("unknown_")]) best_match.setdefault("search_mode", "none") elif best_match.get("found"): best_match.setdefault("search_mode", "live") return best_match # ───────────────────────────────────────────────────────────────────────────── # Vision Engine — single owner of all VideoCapture objects # ───────────────────────────────────────────────────────────────────────────── class VisionEngine: def __init__(self): model_path = os.getenv("YOLO_MODEL_PATH", "yolov5nu.pt") logger.info("Loading YOLO model (%s)...", model_path) try: self.model = YOLO(model_path) logger.info("YOLO model loaded.") except Exception as e: logger.error(f"Failed to load YOLO: {e}") self.model = None self.active_cameras: dict[str, cv2.VideoCapture] = {} self.camera_indices: dict[str, int | str] = {} self._client_cameras: dict[str, set[str]] = {} self._cam_refcounts: dict[str, int] = {} self.latest_raw_frames: dict[str, np.ndarray | None] = {} self.browser_annotated_frames: dict[str, tuple[int, np.ndarray | None]] = {} self.lock = threading.Lock() self.open_lock = threading.Lock() # strictly sequential cv2.VideoCapture self._pending_opens: set[str] = set() # cam_ids currently being opened self.face_engine = _build_face_engine() self.face_results: dict[str, list[dict]] = {} # ── AI toggle — controlled by the Spatial page models button ────────── # Dict of model_id → enabled bool. # Facial recognition is ON by default: it auto-starts on every camera # feed at launch and stays on until explicitly disabled. self.ai_models: dict[str, bool] = { "crowd": True, # YOLO crowd detection "facial": True, # InsightFace recognition — always-on by default "fight": False, "anomaly": False, # Used for Stampede Detection "medical": False, # Used for Fall Detection } # ── Model States ────────────────────────────────────────────────────── self.fall_state: dict[str, int] = {} self.prev_gray: dict[str, np.ndarray] = {} self.last_event_ts: dict[str, datetime] = {} # {cam_id_event_type: ts} self.last_total_count: int = 0 self.last_crowd_count: int = 0 self.last_face_count: int = 0 self.room_stats: dict[str, dict] = {} accel = detect_acceleration() if self.model is not None and accel["cuda_available"]: logger.info( "YOLO will use CUDA (device=%s).", accel.get("device_name") or "cuda:0", ) elif self.model is not None: logger.info( "YOLO using CPU (fallback=%s).", accel.get("fallback_reason") or "no GPU", ) def warmload_models(self) -> dict: """Run lightweight dummy inference to populate GPU memory and warm ONNX/torch.""" accel = detect_acceleration() result: dict = { "yolo": False, "insightface": False, "provider": accel.get("provider", "cpu"), "cuda_available": accel.get("cuda_available", False), "device_name": accel.get("device_name"), "fallback_reason": accel.get("fallback_reason"), } dummy_bgr = np.zeros((480, 640, 3), dtype=np.uint8) if self.model is not None: try: self.model.predict(dummy_bgr, verbose=False) result["yolo"] = True logger.info("Warmload: YOLO dummy inference OK (provider=%s).", result["provider"]) except Exception as exc: logger.warning("Warmload: YOLO dummy inference failed: %s", exc) result["yolo_error"] = str(exc) if self.face_engine.app is not None: try: # Step 1: Call prepare() exactly once if hasattr(self.face_engine, "_force_prepare"): self.face_engine._force_prepare() # Step 2: Force detection model with real-sized image dummy_insight = np.zeros((320, 320, 3), dtype=np.uint8) with self.face_engine.lock: faces = self.face_engine.app.get(dummy_insight) det_ok = True rec_ok = True # Force InsightFace's internal per-model ONNX sessions — prevent lazy init on first WS frame import numpy as _np for _mname, _mobj in self.face_engine.app.models.items(): _session = getattr(_mobj, 'session', None) if _session is not None: try: _inp_meta = _session.get_inputs()[0] _shape = [1 if (d is None or isinstance(d, str)) else d for d in _inp_meta.shape] _dummy = _np.zeros(_shape, dtype=_np.float32) _session.run(None, {_inp_meta.name: _dummy}) logger.info("Warmload: InsightFace model '%s' ONNX session force-run OK", _mname) except Exception as _exc: logger.warning("Warmload: InsightFace model '%s' ONNX force-run skipped: %s", _mname, _exc) else: logger.warning("Warmload: InsightFace model '%s' has no session attribute — " "may still lazy-init", _mname) result["insightface"] = True ctx = insightface_ctx_id() logger.info( "Warmload: InsightFace force-warmload OK (ctx_id=%s, faces on blank: %d, det_ok: %s, rec_ok: %s).", ctx, len(faces), det_ok, rec_ok ) except Exception as exc: logger.warning("Warmload: InsightFace dummy inference failed: %s", exc) result["insightface_error"] = str(exc) ok = result["yolo"] or result["insightface"] or (self.model is None and self.face_engine.app is None) result["success"] = bool(ok) if result["success"]: logger.info("Startup warmload success: %s", result) else: logger.warning("Startup warmload incomplete: %s", result) return result # ── AI toggle API ───────────────────────────────────────────────────────── def toggle_ai_model(self, model_id: str) -> bool: """Toggle a specific AI model on/off. Returns new state.""" with self.lock: current = self.ai_models.get(model_id, False) self.ai_models[model_id] = not current logger.info(f"AI model '{model_id}' → {'ON' if not current else 'OFF'}") return not current def set_ai_model(self, model_id: str, enabled: bool): with self.lock: self.ai_models[model_id] = enabled logger.info(f"AI model '{model_id}' set to {'ON' if enabled else 'OFF'}") def get_ai_status(self) -> dict[str, bool]: with self.lock: return dict(self.ai_models) def get_ai_capabilities(self) -> dict[str, dict]: """Per-model support metadata for UI and cloud/local deployment hints.""" yolo_ok = self.model is not None return { "facial": {"supported": True, "mode": "browser_frames"}, "crowd": {"supported": yolo_ok, "mode": "yolo" if yolo_ok else "unavailable"}, "medical": {"supported": yolo_ok, "mode": "heuristic_fall" if yolo_ok else "unavailable"}, "anomaly": {"supported": True, "mode": "optical_flow"}, "fight": {"supported": yolo_ok, "mode": "heuristic_proximity" if yolo_ok else "unavailable"}, "fire": { "supported": False, "mode": "not_implemented", "note": "Fire alerts use SOS/IOT integrations — not a CV model", }, } # ── Internal helpers ────────────────────────────────────────────────────── def _release_camera_internal(self, cam_id: str): if cam_id in self.active_cameras: self.active_cameras[cam_id].release() del self.active_cameras[cam_id] self.camera_indices.pop(cam_id, None) self.latest_raw_frames.pop(cam_id, None) logger.info(f"Released camera {cam_id}") @staticmethod def _open_with_timeout(index, backend, timeout_s: float = 10.0): """Open a cv2.VideoCapture in a daemon thread with a timeout. Returns the cap if successful, None on timeout or failure.""" result = [None] def _worker(): try: cap = cv2.VideoCapture(index, backend) result[0] = cap except Exception: result[0] = None t = threading.Thread(target=_worker, daemon=True) t.start() t.join(timeout=timeout_s) if t.is_alive(): logger.warning(f"cv2.VideoCapture({index}, ...) timed out after {timeout_s}s") return None return result[0] def _open_cap(self, index) -> cv2.VideoCapture | None: """Try backends in order: DSHOW (fastest on Windows) → MSMF → ANY. Called OUTSIDE the lock to avoid blocking the entire engine. Each cv2.VideoCapture attempt is wrapped in a 10-second timeout.""" if isinstance(index, str) and not index.isdigit(): backends = [("FFMPEG", cv2.CAP_FFMPEG, 15), ("ANY", cv2.CAP_ANY, 15)] else: index = int(index) backends = [ ("ANY", cv2.CAP_ANY, 10), ("MSMF", cv2.CAP_MSMF, 12), ("DSHOW", cv2.CAP_DSHOW, 8), ] for name, backend, timeout in backends: try: logger.info(f"Trying camera index {index} via {name} (timeout {timeout}s)...") cap = self._open_with_timeout(index, backend, timeout_s=timeout) if cap is None: continue if cap.isOpened(): # We avoid setting CAP_PROP properties here (width/height/fourcc) # as it frequently breaks virtual cameras (like OBS or Samsung S23). # Wait for virtual camera pipelines (especially MSMF) to fully buffer import time time.sleep(1.5) # Verify we can actually read a frame (retry up to 5 times for virtual cams) ret = False for _ in range(5): ret, _ = cap.read() if ret: break time.sleep(0.5) if ret: logger.info(f"Camera index {index} opened via {name} ✓") return cap else: logger.warning(f"Camera index {index} opened via {name} but read() failed after retries") cap.release() else: cap.release() except Exception as e: logger.warning(f"Backend {name} failed for index {index}: {e}") return None # ── Public camera API ───────────────────────────────────────────────────── def set_camera(self, cam_id: str, index) -> bool: if isinstance(index, str): if index == "browser": pass elif index.isdigit(): index = int(index) elif index.lower().startswith("rstp://"): index = "rtsp://" + index[7:] elif index.lower().startswith("rtsp://") and not index.startswith("rtsp://"): index = "rtsp://" + index[7:] # Check under lock if already open or already being opened with self.lock: if cam_id in self._pending_opens: logger.info(f"Camera {cam_id} is already being opened, skipping duplicate request") return False if cam_id in self.active_cameras and self.camera_indices.get(cam_id) == index: if self.active_cameras[cam_id].isOpened(): logger.info(f"Camera {cam_id} already open at index {index}") return True # Release any OTHER cam_id that claims this index for other_id, other_idx in list(self.camera_indices.items()): if other_id != cam_id and other_idx == index: logger.info(f"Releasing {other_id} (index {index}) → reassigning to {cam_id}") self._release_camera_internal(other_id) # Release current capture for this cam_id if cam_id in self.active_cameras: self._release_camera_internal(cam_id) if index == "browser": self.camera_indices[cam_id] = index self.latest_raw_frames[cam_id] = None self.browser_annotated_frames[cam_id] = (0, None) logger.info(f"Camera {cam_id} → index {index} (Browser Feed) OK") return True self._pending_opens.add(cam_id) # Open camera OUTSIDE the main lock, but INSIDE a strict open_lock. # Windows DirectShow crashes/deadlocks if cv2.VideoCapture is called concurrently. logger.info(f"Opening camera {cam_id} at index {index}...") try: with self.open_lock: cap = self._open_cap(index) finally: with self.lock: self._pending_opens.discard(cam_id) with self.lock: if cap: self.active_cameras[cam_id] = cap self.camera_indices[cam_id] = index self.latest_raw_frames[cam_id] = None logger.info(f"Camera {cam_id} → index {index} OK") return True logger.error(f"All backends failed for {cam_id} at index {index}") return False def process_browser_frame(self, cam_id: str, base64_str: str): if not base64_str: return try: import base64 encoded = base64_str.split(",")[1] if "," in base64_str else base64_str nparr = np.frombuffer(base64.b64decode(encoded), np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: return with self.lock: self.latest_raw_frames[cam_id] = frame.copy() crowd_on = self.ai_models.get("crowd", True) facial_on = self.ai_models.get("facial", False) medical_on = self.ai_models.get("medical", False) anomaly_on = self.ai_models.get("anomaly", False) fight_on = self.ai_models.get("fight", False) annotated = frame.copy() count = 0 boxes = [] if crowd_on or medical_on or anomaly_on or fight_on: count, annotated, boxes = self._run_yolo(annotated) if medical_on: annotated, _ = self._run_medical_fall(annotated, cam_id, boxes) if anomaly_on: annotated, _ = self._run_anomaly_heuristic(annotated, cam_id, boxes) if fight_on: annotated, _ = self._run_fight_heuristic(annotated, cam_id, boxes) if facial_on: fe = self.face_engine try: if hasattr(fe, "match_all_faces"): faces = fe.match_all_faces(annotated) elif hasattr(fe, "recognize"): faces = fe.recognize(annotated) else: faces = [] except (AttributeError, TypeError) as e: logger.warning("[WS] face match failed: %s — returning empty result", e) faces = [] with self.lock: self.face_results[cam_id] = faces if isinstance(faces, list) else [] if isinstance(faces, list): annotated = self._annotate_faces(annotated, faces) else: with self.lock: self.face_results[cam_id] = [] with self.lock: self.browser_annotated_frames[cam_id] = (count, annotated) except Exception as e: logger.error(f"process_browser_frame error for {cam_id}: {e}") def release_camera(self, cam_id: str): with self.lock: self._release_camera_internal(cam_id) def set_camera_for_client(self, client_id: str, cam_id: str, index) -> bool: """Per-client camera registration — browser feeds do not steal from other clients.""" if isinstance(index, str): if index == "browser": pass elif index.isdigit(): index = int(index) elif index.lower().startswith("rstp://"): index = "rtsp://" + index[7:] elif index.lower().startswith("rtsp://") and not index.startswith("rtsp://"): index = "rtsp://" + index[7:] if index == "browser": with self.lock: self._client_cameras.setdefault(client_id, set()).add(cam_id) self._cam_refcounts[cam_id] = self._cam_refcounts.get(cam_id, 0) + 1 self.camera_indices[cam_id] = "browser" self.latest_raw_frames[cam_id] = None self.browser_annotated_frames[cam_id] = (0, None) logger.info("Camera %s → index browser (client %s) OK", cam_id, client_id[:8]) return True with self.lock: self._client_cameras.setdefault(client_id, set()).add(cam_id) return self.set_camera(cam_id, index) def release_camera_for_client(self, client_id: str, cam_id: str) -> None: with self.lock: client_cams = self._client_cameras.get(client_id) if not client_cams or cam_id not in client_cams: return client_cams.discard(cam_id) if not client_cams: self._client_cameras.pop(client_id, None) idx = self.camera_indices.get(cam_id) if idx == "browser": with self.lock: ref = self._cam_refcounts.get(cam_id, 1) - 1 if ref <= 0: self._cam_refcounts.pop(cam_id, None) self._release_camera_internal(cam_id) else: self._cam_refcounts[cam_id] = ref return with self.lock: other_holders = any( cam_id in cams for cid, cams in self._client_cameras.items() if cid != client_id ) if not other_holders: self.release_camera(cam_id) def release_client_cameras(self, client_id: str) -> None: for cam_id in list(self._client_cameras.get(client_id, set())): self.release_camera_for_client(client_id, cam_id) # ── Frame processing ────────────────────────────────────────────────────── def _run_yolo(self, frame: np.ndarray) -> tuple[int, np.ndarray, list]: """Run crowd detection YOLO. Only called when crowd or medical model is enabled.""" if frame is None or self.model is None: return 0, frame, [] results = self.model(frame, verbose=False, classes=[0]) count = 0 boxes_out = [] for r in results: for box in r.boxes: count += 1 x1, y1, x2, y2 = map(int, box.xyxy[0]) boxes_out.append((x1, y1, x2, y2)) cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(frame, f"People: {count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2) return count, frame, boxes_out def _run_medical_fall(self, frame: np.ndarray, cam_id: str, boxes: list) -> tuple[np.ndarray, bool]: fall_detected = False for (x1, y1, x2, y2) in boxes: w = x2 - x1 h = y2 - y1 if w > h * 1.5: # Person is horizontal fall_detected = True cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 3) cv2.putText(frame, "FALL DETECTED", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2) if cam_id not in self.fall_state: self.fall_state[cam_id] = 0 event_triggered = False if fall_detected: self.fall_state[cam_id] += 1 if self.fall_state[cam_id] == 20: # Stayed down for ~1.5 seconds event_key = f"{cam_id}_fall" now = datetime.now() last = self.last_event_ts.get(event_key, datetime.min) if (now - last).total_seconds() > 60: # 1 min cooldown logger.warning(f"MEDICAL EMERGENCY: Persistent fall detected on {cam_id}") self.last_event_ts[event_key] = now event_triggered = True else: self.fall_state[cam_id] = max(0, self.fall_state[cam_id] - 1) return frame, event_triggered def _run_stampede_flow(self, frame: np.ndarray, cam_id: str, orig_frame: np.ndarray) -> tuple[np.ndarray, bool]: gray = cv2.cvtColor(orig_frame, cv2.COLOR_BGR2GRAY) gray = cv2.resize(gray, (320, 240)) if cam_id not in self.prev_gray: self.prev_gray[cam_id] = gray return frame, False prev = self.prev_gray[cam_id] flow = cv2.calcOpticalFlowFarneback(prev, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0) self.prev_gray[cam_id] = gray mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1]) avg_mag = np.mean(mag) event_triggered = False if avg_mag > 2.8: # High average motion magnitude cv2.putText(frame, "STAMPEDE / CHAOS DETECTED", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 165, 255), 2) event_key = f"{cam_id}_stampede" now = datetime.now() last = self.last_event_ts.get(event_key, datetime.min) if (now - last).total_seconds() > 60: logger.warning(f"STAMPEDE DETECTED on {cam_id}") self.last_event_ts[event_key] = now event_triggered = True return frame, event_triggered def _run_fight_heuristic( self, frame: np.ndarray, cam_id: str, boxes: list ) -> tuple[np.ndarray, bool]: """Experimental fight stub: flag when multiple persons are extremely close.""" if len(boxes) < 2: return frame, False close_pairs = 0 for i in range(len(boxes)): for j in range(i + 1, len(boxes)): x1a, y1a, x2a, y2a = boxes[i] x1b, y1b, x2b, y2b = boxes[j] cax, cay = (x1a + x2a) / 2, (y1a + y2a) / 2 cbx, cby = (x1b + x2b) / 2, (y1b + y2b) / 2 dist = ((cax - cbx) ** 2 + (cay - cby) ** 2) ** 0.5 scale = max(1.0, ((x2a - x1a) + (x2b - x1b)) / 2) if dist < scale * 0.55: close_pairs += 1 cv2.rectangle(frame, (x1a, y1a), (x2a, y2a), (0, 0, 255), 3) cv2.rectangle(frame, (x1b, y1b), (x2b, y2b), (0, 0, 255), 3) if close_pairs < 1: return frame, False cv2.putText( frame, "FIGHT? (heuristic)", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, ) event_key = f"{cam_id}_fight" now = datetime.now() last = self.last_event_ts.get(event_key, datetime.min) if (now - last).total_seconds() > 90: self.last_event_ts[event_key] = now logger.warning("FIGHT HEURISTIC triggered on %s", cam_id) return frame, True return frame, False def _annotate_faces(self, frame: np.ndarray, faces: list[dict]) -> np.ndarray: for f in faces: x1, y1, x2, y2 = f["bbox"] name_disp = f["name"] if name_disp.startswith("unknown_"): try: num = name_disp.split("_")[1] name_disp = f"Unknown #{num}" except IndexError: pass label = f'{name_disp} ({f["confidence"]:.2f})' is_unknown = f["name"].startswith("unknown") color = (0, 140, 255) if is_unknown else (0, 255, 0) cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2) cv2.putText(frame, label, (x1, max(y1 - 10, 15)), cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 1) return frame def get_active_frames(self) -> tuple[dict[str, tuple[int, np.ndarray | None]], list[dict]]: """ Read one frame from each active camera. Returns: (results_dict, list_of_new_alerts) """ results: dict[str, tuple[int, np.ndarray | None]] = {} new_alerts = [] with self.lock: browser_feeds = [cid for cid, idx in self.camera_indices.items() if idx == "browser"] if not self.active_cameras and not browser_feeds: return results, new_alerts cam_items = list(self.active_cameras.items()) crowd_on = self.ai_models.get("crowd", False) facial_on = self.ai_models.get("facial", False) medical_on = self.ai_models.get("medical", False) anomaly_on = self.ai_models.get("anomaly", False) fight_on = self.ai_models.get("fight", False) # Periodic status log if not hasattr(self, "_last_status_log"): self._last_status_log = 0 self._last_status_log += 1 if self._last_status_log % 50 == 0: logger.info(f"Vision Engine AI Status: Crowd={crowd_on}, Facial={facial_on}, Medical={medical_on}") for cam_id, cap in cam_items: try: ret, frame = cap.read() except Exception as e: logger.warning(f"cap.read() error for {cam_id}: {e}") ret, frame = False, None if not ret or frame is None: results[cam_id] = (0, None) continue # Store raw frame for missing-person search with self.lock: self.latest_raw_frames[cam_id] = frame.copy() annotated = frame.copy() count = 0 # ── Crowd, medical & fight detection (YOLO based) ───────────────── boxes = [] if crowd_on or medical_on or fight_on: count, annotated, boxes = self._run_yolo(annotated) if fight_on: annotated, fight_trig = self._run_fight_heuristic(annotated, cam_id, boxes) if fight_trig: new_alerts.append({ "type": "fight", "severity": "high", "location": cam_id, "message": ( f"POSSIBLE ALTERCATION (heuristic): close proximity of multiple " f"persons on {cam_id} — verify manually" ), }) if medical_on: annotated, fall_trig = self._run_medical_fall(annotated, cam_id, boxes) if fall_trig: new_alerts.append({ "type": "medical", "severity": "critical", "location": cam_id, "message": f"WARNING: MEDICAL EMERGENCY: Persistent fall detected on {cam_id}" }) # ── Stampede detection (Optical Flow based) ─────────────────────── if anomaly_on: annotated, stampede_trig = self._run_stampede_flow(annotated, cam_id, frame) if stampede_trig: new_alerts.append({ "type": "stampede", "severity": "critical", "location": cam_id, "message": f"WARNING: CROWD ANOMALY: Stampede/chaos detected on {cam_id}" }) # ── Face recognition ────────────────────────────────────────────── faces: list[dict] = [] if facial_on: fe = self.face_engine if hasattr(fe, "match_all_faces"): faces = fe.match_all_faces(frame) elif hasattr(fe, "recognize"): faces = fe.recognize(frame) else: faces = [] # Extract full frame as thumbnail for context for f in faces: # We send a small version of the FULL frame instead of just the face crop. # This allows the user to see exactly where the person was in the frame. small_frame = cv2.resize(frame, (640, int(640 * frame.shape[0] / frame.shape[1]))) _, buf = cv2.imencode(".jpg", small_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 65]) f["thumbnail"] = f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}" annotated = self._annotate_faces(annotated, faces) # Push detections into gossip_bridge (no camera opened there) names = [f["name"] for f in faces] bboxes = [tuple(f["bbox"]) for f in faces] if names: gossip_bridge.on_detections(cam_id, names, bboxes, frame.shape[1]) self.face_results[cam_id] = faces results[cam_id] = (count, annotated) # Merge browser feeds with self.lock: for cid in [k for k, v in self.camera_indices.items() if v == "browser"]: if cid in self.browser_annotated_frames: results[cid] = self.browser_annotated_frames[cid] return results, new_alerts def get_face_results(self) -> dict[str, list[dict]]: return dict(self.face_results) def search_missing_person(self, query_frame: np.ndarray) -> dict: fe = self.face_engine if hasattr(fe, "search_missing_person") and not hasattr(fe, "match_all_faces"): with self.lock: live_frames = {k: v for k, v in self.latest_raw_frames.items() if v is not None} return fe.search_missing_person(query_frame, live_frames) if hasattr(fe, "ensure_db"): fe.ensure_db() db_match = dict(fe.match_frame(query_frame)) if db_match.get("found"): db_match.setdefault("search_mode", "database") return db_match try: import face_live_search live = face_live_search.search_query_in_live_feeds(query_frame, fe, self) if live.get("found"): return live except Exception as exc: logger.debug("live search fallback skipped: %s", exc) db_match.setdefault("search_mode", "database") return db_match # ── Legacy: base64 single-frame processing ──────────────────────────────── def process_frame(self, base64_string: str) -> tuple[int, str]: try: encoded = base64_string.split(",")[1] if "," in base64_string else base64_string nparr = np.frombuffer(base64.b64decode(encoded), np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) count, annotated, _ = self._run_yolo(frame.copy()) if self.model else (0, frame, []) if annotated is None: return 0, base64_string _, buf = cv2.imencode(".jpg", annotated, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) return count, f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}" except Exception as e: logger.error(f"process_frame error: {e}") return 0, base64_string