solution_challenge_backend / backend /vision_engine.py
github-actions
Deploy to Hugging Face
c794b6b
Raw
History Blame Contribute Delete
46.8 kB
"""
vision_engine.py β€” Single source of all camera frames and AI processing.
Key principles:
- ONE cv2.VideoCapture per camera index. No other module opens cameras.
- AI processing (YOLO crowd count + face recognition) is gated by `ai_enabled`.
- When AI runs, detections are pushed into gossip_bridge for interaction tracking.
- gossip_bridge never opens a camera itself.
"""
import cv2
import base64
import numpy as np
import logging
import threading
import os
import sys
import uuid
from datetime import datetime, timezone
from ultralytics import YOLO
# ── Add Face_Recognition/ to path so gossip_bridge can be imported
_FR_DIR = os.path.join(os.path.dirname(__file__), "Face_Recognition")
if _FR_DIR not in sys.path:
sys.path.insert(0, _FR_DIR)
import gossip_bridge # noqa: E402
from vision_runtime import detect_acceleration, insightface_ctx_id
logger = logging.getLogger(__name__)
def _build_face_engine():
"""Prefer FaceMatcher (buffalo_sc, enrolled-first, unknown persistence) over legacy engine."""
try:
from face_matcher import FaceMatcher
logger.info(
"VisionEngine using FaceMatcher (model=%s)",
os.getenv("FACE_MODEL_PACK", "buffalo_sc"),
)
return FaceMatcher()
except Exception as exc:
logger.warning("FaceMatcher unavailable (%s) β€” using legacy FaceRecognitionEngine", exc)
return FaceRecognitionEngine()
# ─────────────────────────────────────────────────────────────────────────────
# Face Recognition Engine (lazy-loads InsightFace)
# ─────────────────────────────────────────────────────────────────────────────
class FaceRecognitionEngine:
def __init__(self):
self.app = None
self.db: dict[str, np.ndarray] = {}
self.lock = threading.Lock()
self._load_model()
self._load_db()
def _load_model(self):
try:
from insightface.app import FaceAnalysis # type: ignore
accel = detect_acceleration()
ctx_id = insightface_ctx_id()
provider = "cuda" if ctx_id >= 0 else "cpu"
fa = FaceAnalysis(name="buffalo_l", allowed_modules=["detection", "recognition"])
fa.prepare(ctx_id=ctx_id, det_size=(640, 640))
self.app = fa
if provider == "cuda":
logger.info(
"InsightFace model loaded (provider=cuda, device=%s).",
accel.get("device_name") or "cuda:0",
)
else:
reason = accel.get("fallback_reason") or "no GPU detected"
logger.info("InsightFace model loaded (provider=cpu, fallback=%s).", reason)
except Exception as e:
logger.warning(f"InsightFace not available ({e}). Face recognition disabled.")
def _load_db(self):
base = _FR_DIR
self.db = {}
for folder in ["faces_db", "temp_faces_db"]:
path = os.path.join(base, folder)
if not os.path.exists(path):
logger.warning(f"Face DB folder not found: {path}")
continue
files = [f for f in os.listdir(path) if f.endswith(".npy")]
for f in files:
name = f.replace(".npy", "")
try:
emb = np.load(os.path.join(path, f))
self.db[name] = emb
logger.info(f"Loaded face: {name} from {folder}")
except Exception as e:
logger.error(f"Failed to load face {f}: {e}")
if not self.db:
logger.warning("Face database is EMPTY. No faces will be recognized.")
else:
logger.info(f"Face DB loaded successfully: {list(self.db.keys())}")
@staticmethod
def _cosine(a: np.ndarray, b: np.ndarray) -> float:
na, nb = np.linalg.norm(a), np.linalg.norm(b)
if na == 0 or nb == 0:
return 0.0
return float(np.dot(a, b) / (na * nb))
def _allocate_unknown_id(self) -> int:
existing_ids = []
for k in self.db.keys():
if k.startswith("unknown_"):
try:
existing_ids.append(int(k.split("_")[1]))
except (IndexError, ValueError):
pass
base = _FR_DIR
for folder in ["faces_db", "temp_faces_db", "face_database", "temp_face_database"]:
path = os.path.join(base, folder)
if os.path.exists(path):
for item in os.listdir(path):
name = item.replace(".npy", "")
if name.startswith("unknown_"):
try:
existing_ids.append(int(name.split("_")[1]))
except (IndexError, ValueError):
pass
return max(existing_ids) + 1 if existing_ids else 1
def recognize(self, frame: np.ndarray) -> list[dict]:
"""Return recognized faces with bbox, name, confidence."""
if self.app is None:
return []
with self.lock:
try:
faces = self.app.get(frame)
except Exception as e:
logger.error(f"InsightFace inference error: {e}")
return []
results = []
for face in faces:
x1, y1, x2, y2 = face.bbox.astype(int)
emb = face.embedding
best_name = "Unknown"
best_score = 0.0
for name, db_emb in self.db.items():
# db_emb can be (512,) or (N, 512)
if db_emb.ndim == 1:
score = self._cosine(emb, db_emb)
else:
score = max((self._cosine(emb, v) for v in db_emb), default=0.0)
if score > best_score:
best_score = score
best_name = name
# Conservative threshold to reduce false-positive identity matches.
threshold = 0.35 if best_name.startswith("unknown") else 0.30
if best_score < threshold:
# Assign a new persistent unknown identity
next_id = self._allocate_unknown_id()
new_name = f"unknown_{next_id}"
# Update in-memory db
self.db[new_name] = np.array([emb])
# Save embedding on disk
emb_dir = os.path.join(_FR_DIR, "temp_faces_db")
os.makedirs(emb_dir, exist_ok=True)
np.save(os.path.join(emb_dir, f"{new_name}.npy"), np.array([emb]))
# Save crop image to face_database
img_dir = os.path.join(_FR_DIR, "face_database", new_name)
os.makedirs(img_dir, exist_ok=True)
h, w, _ = frame.shape
x1_c, y1_c = max(0, x1), max(0, y1)
x2_c, y2_c = min(w, x2), min(h, y2)
face_img = frame[y1_c:y2_c, x1_c:x2_c]
if face_img.size > 0:
cv2.imwrite(os.path.join(img_dir, "0.jpg"), face_img)
best_name = new_name
best_score = 1.0 # Initial creation match confidence
results.append({
"name": best_name,
"confidence": round(best_score, 3),
"bbox": [int(x1), int(y1), int(x2), int(y2)],
})
return results
def match_all_faces(self, frame: np.ndarray, threshold: float | None = None) -> list[dict]:
"""Detect and identify every face in the frame. Parity with FaceMatcher."""
return self.recognize(frame)
def register_face_from_frame(self, name: str, frame: np.ndarray) -> bool:
"""
Register a new face:
1. Saves image to face_database/<name>/
2. Generates embeddings (including occluded version)
3. Saves .npy to faces_db/
4. Updates in-memory DB
"""
if self.app is None:
return False
try:
import register_face
# Detect face to get initial embedding (to ensure it works and for robustness)
faces = self.app.get(frame)
if not faces:
logger.warning(f"Registration failed: No face detected in frame for '{name}'")
return False
primary_emb = faces[0].embedding
# Define paths
db_root = os.path.join(_FR_DIR, "face_database")
emb_root = os.path.join(_FR_DIR, "faces_db")
os.makedirs(db_root, exist_ok=True)
os.makedirs(emb_root, exist_ok=True)
# Save temporary image for the register_face module to process
temp_path = os.path.join(_FR_DIR, f"temp_reg_{uuid.uuid4().hex}.jpg")
cv2.imwrite(temp_path, frame)
try:
# Call the specialized register_face logic (handles folder creation, image copying, occlusion)
# We pass self.app to avoid redundant model loading
new_embs = register_face.register_face(
name, temp_path, db_root=db_root, emb_root=emb_root,
known_embedding=primary_emb, app=self.app
)
# Update in-memory DB so it's available immediately without restart
with self.lock:
self.db[name] = new_embs
logger.info(f"Registered face '{name}': Image saved to database and embeddings generated.")
return True
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
except Exception as e:
logger.error(f"Error registering face: {e}")
return False
def search_missing_person(self, query_frame: np.ndarray, cam_frames: dict) -> dict:
if self.app is None:
return {"found": False, "reason": "Face recognition unavailable"}
try:
query_faces = self.app.get(query_frame)
except Exception as e:
return {"found": False, "reason": str(e)}
if not query_faces:
return {"found": False, "reason": "No face detected in query image"}
query_emb = max(query_faces, key=lambda f: (f.bbox[2] - f.bbox[0]) * (f.bbox[3] - f.bbox[1])).embedding
best_name = None
best_score = 0.0
for name, db_emb in self.db.items():
if db_emb.ndim == 1:
score = self._cosine(query_emb, db_emb)
else:
score = max((self._cosine(query_emb, v) for v in db_emb), default=0.0)
if score > best_score:
best_score = score
best_name = name
threshold = 0.35 if (best_name and best_name.startswith("unknown")) else 0.30
if best_name and best_score >= threshold:
return {
"found": True,
"name": best_name.replace("_", " "),
"confidence": round(best_score, 3),
"cam_id": "database",
"location": "Enrolled database",
"search_mode": "database",
"timestamp": datetime.now(timezone.utc).isoformat(),
"reason": "Matched enrolled face database",
}
best_match = {"found": False, "cam_id": None, "confidence": round(best_score, 3), "timestamp": None}
for cam_id, frame in cam_frames.items():
if frame is None:
continue
try:
live_faces = self.app.get(frame)
except Exception:
continue
for face in live_faces:
score = self._cosine(query_emb, face.embedding)
if score > best_match["confidence"]:
matched = score >= threshold
best_match = {
"found": matched,
"name": best_name.replace("_", " ") if best_name and matched else None,
"cam_id": cam_id,
"confidence": round(score, 3),
"timestamp": datetime.now(timezone.utc).isoformat(),
"location": cam_id,
"search_mode": "live" if matched else None,
}
if not best_match.get("found"):
best_match["reason"] = "Face detected but no match in enrolled database"
best_match["best_score"] = round(best_score, 3)
best_match["threshold"] = threshold
best_match["enrolled_count"] = len([k for k in self.db if not k.startswith("unknown_")])
best_match.setdefault("search_mode", "none")
elif best_match.get("found"):
best_match.setdefault("search_mode", "live")
return best_match
# ─────────────────────────────────────────────────────────────────────────────
# Vision Engine β€” single owner of all VideoCapture objects
# ─────────────────────────────────────────────────────────────────────────────
class VisionEngine:
def __init__(self):
model_path = os.getenv("YOLO_MODEL_PATH", "yolov5nu.pt")
logger.info("Loading YOLO model (%s)...", model_path)
try:
self.model = YOLO(model_path)
logger.info("YOLO model loaded.")
except Exception as e:
logger.error(f"Failed to load YOLO: {e}")
self.model = None
self.active_cameras: dict[str, cv2.VideoCapture] = {}
self.camera_indices: dict[str, int | str] = {}
self._client_cameras: dict[str, set[str]] = {}
self._cam_refcounts: dict[str, int] = {}
self.latest_raw_frames: dict[str, np.ndarray | None] = {}
self.browser_annotated_frames: dict[str, tuple[int, np.ndarray | None]] = {}
self.lock = threading.Lock()
self.open_lock = threading.Lock() # strictly sequential cv2.VideoCapture
self._pending_opens: set[str] = set() # cam_ids currently being opened
self.face_engine = _build_face_engine()
self.face_results: dict[str, list[dict]] = {}
# ── AI toggle β€” controlled by the Spatial page models button ──────────
# Dict of model_id β†’ enabled bool.
# Facial recognition is ON by default: it auto-starts on every camera
# feed at launch and stays on until explicitly disabled.
self.ai_models: dict[str, bool] = {
"crowd": True, # YOLO crowd detection
"facial": True, # InsightFace recognition β€” always-on by default
"fight": False,
"anomaly": False, # Used for Stampede Detection
"medical": False, # Used for Fall Detection
}
# ── Model States ──────────────────────────────────────────────────────
self.fall_state: dict[str, int] = {}
self.prev_gray: dict[str, np.ndarray] = {}
self.last_event_ts: dict[str, datetime] = {} # {cam_id_event_type: ts}
self.last_total_count: int = 0
self.last_crowd_count: int = 0
self.last_face_count: int = 0
self.room_stats: dict[str, dict] = {}
accel = detect_acceleration()
if self.model is not None and accel["cuda_available"]:
logger.info(
"YOLO will use CUDA (device=%s).",
accel.get("device_name") or "cuda:0",
)
elif self.model is not None:
logger.info(
"YOLO using CPU (fallback=%s).",
accel.get("fallback_reason") or "no GPU",
)
def warmload_models(self) -> dict:
"""Run lightweight dummy inference to populate GPU memory and warm ONNX/torch."""
accel = detect_acceleration()
result: dict = {
"yolo": False,
"insightface": False,
"provider": accel.get("provider", "cpu"),
"cuda_available": accel.get("cuda_available", False),
"device_name": accel.get("device_name"),
"fallback_reason": accel.get("fallback_reason"),
}
dummy_bgr = np.zeros((480, 640, 3), dtype=np.uint8)
if self.model is not None:
try:
self.model.predict(dummy_bgr, verbose=False)
result["yolo"] = True
logger.info("Warmload: YOLO dummy inference OK (provider=%s).", result["provider"])
except Exception as exc:
logger.warning("Warmload: YOLO dummy inference failed: %s", exc)
result["yolo_error"] = str(exc)
if self.face_engine.app is not None:
try:
# Step 1: Call prepare() exactly once
if hasattr(self.face_engine, "_force_prepare"):
self.face_engine._force_prepare()
# Step 2: Force detection model with real-sized image
dummy_insight = np.zeros((320, 320, 3), dtype=np.uint8)
with self.face_engine.lock:
faces = self.face_engine.app.get(dummy_insight)
det_ok = True
rec_ok = True
# Force InsightFace's internal per-model ONNX sessions β€” prevent lazy init on first WS frame
import numpy as _np
for _mname, _mobj in self.face_engine.app.models.items():
_session = getattr(_mobj, 'session', None)
if _session is not None:
try:
_inp_meta = _session.get_inputs()[0]
_shape = [1 if (d is None or isinstance(d, str)) else d
for d in _inp_meta.shape]
_dummy = _np.zeros(_shape, dtype=_np.float32)
_session.run(None, {_inp_meta.name: _dummy})
logger.info("Warmload: InsightFace model '%s' ONNX session force-run OK", _mname)
except Exception as _exc:
logger.warning("Warmload: InsightFace model '%s' ONNX force-run skipped: %s",
_mname, _exc)
else:
logger.warning("Warmload: InsightFace model '%s' has no session attribute β€” "
"may still lazy-init", _mname)
result["insightface"] = True
ctx = insightface_ctx_id()
logger.info(
"Warmload: InsightFace force-warmload OK (ctx_id=%s, faces on blank: %d, det_ok: %s, rec_ok: %s).",
ctx, len(faces), det_ok, rec_ok
)
except Exception as exc:
logger.warning("Warmload: InsightFace dummy inference failed: %s", exc)
result["insightface_error"] = str(exc)
ok = result["yolo"] or result["insightface"] or (self.model is None and self.face_engine.app is None)
result["success"] = bool(ok)
if result["success"]:
logger.info("Startup warmload success: %s", result)
else:
logger.warning("Startup warmload incomplete: %s", result)
return result
# ── AI toggle API ─────────────────────────────────────────────────────────
def toggle_ai_model(self, model_id: str) -> bool:
"""Toggle a specific AI model on/off. Returns new state."""
with self.lock:
current = self.ai_models.get(model_id, False)
self.ai_models[model_id] = not current
logger.info(f"AI model '{model_id}' β†’ {'ON' if not current else 'OFF'}")
return not current
def set_ai_model(self, model_id: str, enabled: bool):
with self.lock:
self.ai_models[model_id] = enabled
logger.info(f"AI model '{model_id}' set to {'ON' if enabled else 'OFF'}")
def get_ai_status(self) -> dict[str, bool]:
with self.lock:
return dict(self.ai_models)
def get_ai_capabilities(self) -> dict[str, dict]:
"""Per-model support metadata for UI and cloud/local deployment hints."""
yolo_ok = self.model is not None
return {
"facial": {"supported": True, "mode": "browser_frames"},
"crowd": {"supported": yolo_ok, "mode": "yolo" if yolo_ok else "unavailable"},
"medical": {"supported": yolo_ok, "mode": "heuristic_fall" if yolo_ok else "unavailable"},
"anomaly": {"supported": True, "mode": "optical_flow"},
"fight": {"supported": yolo_ok, "mode": "heuristic_proximity" if yolo_ok else "unavailable"},
"fire": {
"supported": False,
"mode": "not_implemented",
"note": "Fire alerts use SOS/IOT integrations β€” not a CV model",
},
}
# ── Internal helpers ──────────────────────────────────────────────────────
def _release_camera_internal(self, cam_id: str):
if cam_id in self.active_cameras:
self.active_cameras[cam_id].release()
del self.active_cameras[cam_id]
self.camera_indices.pop(cam_id, None)
self.latest_raw_frames.pop(cam_id, None)
logger.info(f"Released camera {cam_id}")
@staticmethod
def _open_with_timeout(index, backend, timeout_s: float = 10.0):
"""Open a cv2.VideoCapture in a daemon thread with a timeout.
Returns the cap if successful, None on timeout or failure."""
result = [None]
def _worker():
try:
cap = cv2.VideoCapture(index, backend)
result[0] = cap
except Exception:
result[0] = None
t = threading.Thread(target=_worker, daemon=True)
t.start()
t.join(timeout=timeout_s)
if t.is_alive():
logger.warning(f"cv2.VideoCapture({index}, ...) timed out after {timeout_s}s")
return None
return result[0]
def _open_cap(self, index) -> cv2.VideoCapture | None:
"""Try backends in order: DSHOW (fastest on Windows) β†’ MSMF β†’ ANY.
Called OUTSIDE the lock to avoid blocking the entire engine.
Each cv2.VideoCapture attempt is wrapped in a 10-second timeout."""
if isinstance(index, str) and not index.isdigit():
backends = [("FFMPEG", cv2.CAP_FFMPEG, 15), ("ANY", cv2.CAP_ANY, 15)]
else:
index = int(index)
backends = [
("ANY", cv2.CAP_ANY, 10),
("MSMF", cv2.CAP_MSMF, 12),
("DSHOW", cv2.CAP_DSHOW, 8),
]
for name, backend, timeout in backends:
try:
logger.info(f"Trying camera index {index} via {name} (timeout {timeout}s)...")
cap = self._open_with_timeout(index, backend, timeout_s=timeout)
if cap is None:
continue
if cap.isOpened():
# We avoid setting CAP_PROP properties here (width/height/fourcc)
# as it frequently breaks virtual cameras (like OBS or Samsung S23).
# Wait for virtual camera pipelines (especially MSMF) to fully buffer
import time
time.sleep(1.5)
# Verify we can actually read a frame (retry up to 5 times for virtual cams)
ret = False
for _ in range(5):
ret, _ = cap.read()
if ret:
break
time.sleep(0.5)
if ret:
logger.info(f"Camera index {index} opened via {name} βœ“")
return cap
else:
logger.warning(f"Camera index {index} opened via {name} but read() failed after retries")
cap.release()
else:
cap.release()
except Exception as e:
logger.warning(f"Backend {name} failed for index {index}: {e}")
return None
# ── Public camera API ─────────────────────────────────────────────────────
def set_camera(self, cam_id: str, index) -> bool:
if isinstance(index, str):
if index == "browser":
pass
elif index.isdigit():
index = int(index)
elif index.lower().startswith("rstp://"):
index = "rtsp://" + index[7:]
elif index.lower().startswith("rtsp://") and not index.startswith("rtsp://"):
index = "rtsp://" + index[7:]
# Check under lock if already open or already being opened
with self.lock:
if cam_id in self._pending_opens:
logger.info(f"Camera {cam_id} is already being opened, skipping duplicate request")
return False
if cam_id in self.active_cameras and self.camera_indices.get(cam_id) == index:
if self.active_cameras[cam_id].isOpened():
logger.info(f"Camera {cam_id} already open at index {index}")
return True
# Release any OTHER cam_id that claims this index
for other_id, other_idx in list(self.camera_indices.items()):
if other_id != cam_id and other_idx == index:
logger.info(f"Releasing {other_id} (index {index}) β†’ reassigning to {cam_id}")
self._release_camera_internal(other_id)
# Release current capture for this cam_id
if cam_id in self.active_cameras:
self._release_camera_internal(cam_id)
if index == "browser":
self.camera_indices[cam_id] = index
self.latest_raw_frames[cam_id] = None
self.browser_annotated_frames[cam_id] = (0, None)
logger.info(f"Camera {cam_id} β†’ index {index} (Browser Feed) OK")
return True
self._pending_opens.add(cam_id)
# Open camera OUTSIDE the main lock, but INSIDE a strict open_lock.
# Windows DirectShow crashes/deadlocks if cv2.VideoCapture is called concurrently.
logger.info(f"Opening camera {cam_id} at index {index}...")
try:
with self.open_lock:
cap = self._open_cap(index)
finally:
with self.lock:
self._pending_opens.discard(cam_id)
with self.lock:
if cap:
self.active_cameras[cam_id] = cap
self.camera_indices[cam_id] = index
self.latest_raw_frames[cam_id] = None
logger.info(f"Camera {cam_id} β†’ index {index} OK")
return True
logger.error(f"All backends failed for {cam_id} at index {index}")
return False
def process_browser_frame(self, cam_id: str, base64_str: str):
if not base64_str:
return
try:
import base64
encoded = base64_str.split(",")[1] if "," in base64_str else base64_str
nparr = np.frombuffer(base64.b64decode(encoded), np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
return
with self.lock:
self.latest_raw_frames[cam_id] = frame.copy()
crowd_on = self.ai_models.get("crowd", True)
facial_on = self.ai_models.get("facial", False)
medical_on = self.ai_models.get("medical", False)
anomaly_on = self.ai_models.get("anomaly", False)
fight_on = self.ai_models.get("fight", False)
annotated = frame.copy()
count = 0
boxes = []
if crowd_on or medical_on or anomaly_on or fight_on:
count, annotated, boxes = self._run_yolo(annotated)
if medical_on:
annotated, _ = self._run_medical_fall(annotated, cam_id, boxes)
if anomaly_on:
annotated, _ = self._run_anomaly_heuristic(annotated, cam_id, boxes)
if fight_on:
annotated, _ = self._run_fight_heuristic(annotated, cam_id, boxes)
if facial_on:
fe = self.face_engine
try:
if hasattr(fe, "match_all_faces"):
faces = fe.match_all_faces(annotated)
elif hasattr(fe, "recognize"):
faces = fe.recognize(annotated)
else:
faces = []
except (AttributeError, TypeError) as e:
logger.warning("[WS] face match failed: %s β€” returning empty result", e)
faces = []
with self.lock:
self.face_results[cam_id] = faces if isinstance(faces, list) else []
if isinstance(faces, list):
annotated = self._annotate_faces(annotated, faces)
else:
with self.lock:
self.face_results[cam_id] = []
with self.lock:
self.browser_annotated_frames[cam_id] = (count, annotated)
except Exception as e:
logger.error(f"process_browser_frame error for {cam_id}: {e}")
def release_camera(self, cam_id: str):
with self.lock:
self._release_camera_internal(cam_id)
def set_camera_for_client(self, client_id: str, cam_id: str, index) -> bool:
"""Per-client camera registration β€” browser feeds do not steal from other clients."""
if isinstance(index, str):
if index == "browser":
pass
elif index.isdigit():
index = int(index)
elif index.lower().startswith("rstp://"):
index = "rtsp://" + index[7:]
elif index.lower().startswith("rtsp://") and not index.startswith("rtsp://"):
index = "rtsp://" + index[7:]
if index == "browser":
with self.lock:
self._client_cameras.setdefault(client_id, set()).add(cam_id)
self._cam_refcounts[cam_id] = self._cam_refcounts.get(cam_id, 0) + 1
self.camera_indices[cam_id] = "browser"
self.latest_raw_frames[cam_id] = None
self.browser_annotated_frames[cam_id] = (0, None)
logger.info("Camera %s β†’ index browser (client %s) OK", cam_id, client_id[:8])
return True
with self.lock:
self._client_cameras.setdefault(client_id, set()).add(cam_id)
return self.set_camera(cam_id, index)
def release_camera_for_client(self, client_id: str, cam_id: str) -> None:
with self.lock:
client_cams = self._client_cameras.get(client_id)
if not client_cams or cam_id not in client_cams:
return
client_cams.discard(cam_id)
if not client_cams:
self._client_cameras.pop(client_id, None)
idx = self.camera_indices.get(cam_id)
if idx == "browser":
with self.lock:
ref = self._cam_refcounts.get(cam_id, 1) - 1
if ref <= 0:
self._cam_refcounts.pop(cam_id, None)
self._release_camera_internal(cam_id)
else:
self._cam_refcounts[cam_id] = ref
return
with self.lock:
other_holders = any(
cam_id in cams for cid, cams in self._client_cameras.items() if cid != client_id
)
if not other_holders:
self.release_camera(cam_id)
def release_client_cameras(self, client_id: str) -> None:
for cam_id in list(self._client_cameras.get(client_id, set())):
self.release_camera_for_client(client_id, cam_id)
# ── Frame processing ──────────────────────────────────────────────────────
def _run_yolo(self, frame: np.ndarray) -> tuple[int, np.ndarray, list]:
"""Run crowd detection YOLO. Only called when crowd or medical model is enabled."""
if frame is None or self.model is None:
return 0, frame, []
results = self.model(frame, verbose=False, classes=[0])
count = 0
boxes_out = []
for r in results:
for box in r.boxes:
count += 1
x1, y1, x2, y2 = map(int, box.xyxy[0])
boxes_out.append((x1, y1, x2, y2))
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(frame, f"People: {count}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2)
return count, frame, boxes_out
def _run_medical_fall(self, frame: np.ndarray, cam_id: str, boxes: list) -> tuple[np.ndarray, bool]:
fall_detected = False
for (x1, y1, x2, y2) in boxes:
w = x2 - x1
h = y2 - y1
if w > h * 1.5: # Person is horizontal
fall_detected = True
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 0, 255), 3)
cv2.putText(frame, "FALL DETECTED", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2)
if cam_id not in self.fall_state:
self.fall_state[cam_id] = 0
event_triggered = False
if fall_detected:
self.fall_state[cam_id] += 1
if self.fall_state[cam_id] == 20: # Stayed down for ~1.5 seconds
event_key = f"{cam_id}_fall"
now = datetime.now()
last = self.last_event_ts.get(event_key, datetime.min)
if (now - last).total_seconds() > 60: # 1 min cooldown
logger.warning(f"MEDICAL EMERGENCY: Persistent fall detected on {cam_id}")
self.last_event_ts[event_key] = now
event_triggered = True
else:
self.fall_state[cam_id] = max(0, self.fall_state[cam_id] - 1)
return frame, event_triggered
def _run_stampede_flow(self, frame: np.ndarray, cam_id: str, orig_frame: np.ndarray) -> tuple[np.ndarray, bool]:
gray = cv2.cvtColor(orig_frame, cv2.COLOR_BGR2GRAY)
gray = cv2.resize(gray, (320, 240))
if cam_id not in self.prev_gray:
self.prev_gray[cam_id] = gray
return frame, False
prev = self.prev_gray[cam_id]
flow = cv2.calcOpticalFlowFarneback(prev, gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
self.prev_gray[cam_id] = gray
mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
avg_mag = np.mean(mag)
event_triggered = False
if avg_mag > 2.8: # High average motion magnitude
cv2.putText(frame, "STAMPEDE / CHAOS DETECTED", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 165, 255), 2)
event_key = f"{cam_id}_stampede"
now = datetime.now()
last = self.last_event_ts.get(event_key, datetime.min)
if (now - last).total_seconds() > 60:
logger.warning(f"STAMPEDE DETECTED on {cam_id}")
self.last_event_ts[event_key] = now
event_triggered = True
return frame, event_triggered
def _run_fight_heuristic(
self, frame: np.ndarray, cam_id: str, boxes: list
) -> tuple[np.ndarray, bool]:
"""Experimental fight stub: flag when multiple persons are extremely close."""
if len(boxes) < 2:
return frame, False
close_pairs = 0
for i in range(len(boxes)):
for j in range(i + 1, len(boxes)):
x1a, y1a, x2a, y2a = boxes[i]
x1b, y1b, x2b, y2b = boxes[j]
cax, cay = (x1a + x2a) / 2, (y1a + y2a) / 2
cbx, cby = (x1b + x2b) / 2, (y1b + y2b) / 2
dist = ((cax - cbx) ** 2 + (cay - cby) ** 2) ** 0.5
scale = max(1.0, ((x2a - x1a) + (x2b - x1b)) / 2)
if dist < scale * 0.55:
close_pairs += 1
cv2.rectangle(frame, (x1a, y1a), (x2a, y2a), (0, 0, 255), 3)
cv2.rectangle(frame, (x1b, y1b), (x2b, y2b), (0, 0, 255), 3)
if close_pairs < 1:
return frame, False
cv2.putText(
frame, "FIGHT? (heuristic)", (10, 90),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2,
)
event_key = f"{cam_id}_fight"
now = datetime.now()
last = self.last_event_ts.get(event_key, datetime.min)
if (now - last).total_seconds() > 90:
self.last_event_ts[event_key] = now
logger.warning("FIGHT HEURISTIC triggered on %s", cam_id)
return frame, True
return frame, False
def _annotate_faces(self, frame: np.ndarray, faces: list[dict]) -> np.ndarray:
for f in faces:
x1, y1, x2, y2 = f["bbox"]
name_disp = f["name"]
if name_disp.startswith("unknown_"):
try:
num = name_disp.split("_")[1]
name_disp = f"Unknown #{num}"
except IndexError:
pass
label = f'{name_disp} ({f["confidence"]:.2f})'
is_unknown = f["name"].startswith("unknown")
color = (0, 140, 255) if is_unknown else (0, 255, 0)
cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
cv2.putText(frame, label, (x1, max(y1 - 10, 15)),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 1)
return frame
def get_active_frames(self) -> tuple[dict[str, tuple[int, np.ndarray | None]], list[dict]]:
"""
Read one frame from each active camera.
Returns:
(results_dict, list_of_new_alerts)
"""
results: dict[str, tuple[int, np.ndarray | None]] = {}
new_alerts = []
with self.lock:
browser_feeds = [cid for cid, idx in self.camera_indices.items() if idx == "browser"]
if not self.active_cameras and not browser_feeds:
return results, new_alerts
cam_items = list(self.active_cameras.items())
crowd_on = self.ai_models.get("crowd", False)
facial_on = self.ai_models.get("facial", False)
medical_on = self.ai_models.get("medical", False)
anomaly_on = self.ai_models.get("anomaly", False)
fight_on = self.ai_models.get("fight", False)
# Periodic status log
if not hasattr(self, "_last_status_log"): self._last_status_log = 0
self._last_status_log += 1
if self._last_status_log % 50 == 0:
logger.info(f"Vision Engine AI Status: Crowd={crowd_on}, Facial={facial_on}, Medical={medical_on}")
for cam_id, cap in cam_items:
try:
ret, frame = cap.read()
except Exception as e:
logger.warning(f"cap.read() error for {cam_id}: {e}")
ret, frame = False, None
if not ret or frame is None:
results[cam_id] = (0, None)
continue
# Store raw frame for missing-person search
with self.lock:
self.latest_raw_frames[cam_id] = frame.copy()
annotated = frame.copy()
count = 0
# ── Crowd, medical & fight detection (YOLO based) ─────────────────
boxes = []
if crowd_on or medical_on or fight_on:
count, annotated, boxes = self._run_yolo(annotated)
if fight_on:
annotated, fight_trig = self._run_fight_heuristic(annotated, cam_id, boxes)
if fight_trig:
new_alerts.append({
"type": "fight",
"severity": "high",
"location": cam_id,
"message": (
f"POSSIBLE ALTERCATION (heuristic): close proximity of multiple "
f"persons on {cam_id} β€” verify manually"
),
})
if medical_on:
annotated, fall_trig = self._run_medical_fall(annotated, cam_id, boxes)
if fall_trig:
new_alerts.append({
"type": "medical",
"severity": "critical",
"location": cam_id,
"message": f"WARNING: MEDICAL EMERGENCY: Persistent fall detected on {cam_id}"
})
# ── Stampede detection (Optical Flow based) ───────────────────────
if anomaly_on:
annotated, stampede_trig = self._run_stampede_flow(annotated, cam_id, frame)
if stampede_trig:
new_alerts.append({
"type": "stampede",
"severity": "critical",
"location": cam_id,
"message": f"WARNING: CROWD ANOMALY: Stampede/chaos detected on {cam_id}"
})
# ── Face recognition ──────────────────────────────────────────────
faces: list[dict] = []
if facial_on:
fe = self.face_engine
if hasattr(fe, "match_all_faces"):
faces = fe.match_all_faces(frame)
elif hasattr(fe, "recognize"):
faces = fe.recognize(frame)
else:
faces = []
# Extract full frame as thumbnail for context
for f in faces:
# We send a small version of the FULL frame instead of just the face crop.
# This allows the user to see exactly where the person was in the frame.
small_frame = cv2.resize(frame, (640, int(640 * frame.shape[0] / frame.shape[1])))
_, buf = cv2.imencode(".jpg", small_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 65])
f["thumbnail"] = f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}"
annotated = self._annotate_faces(annotated, faces)
# Push detections into gossip_bridge (no camera opened there)
names = [f["name"] for f in faces]
bboxes = [tuple(f["bbox"]) for f in faces]
if names:
gossip_bridge.on_detections(cam_id, names, bboxes, frame.shape[1])
self.face_results[cam_id] = faces
results[cam_id] = (count, annotated)
# Merge browser feeds
with self.lock:
for cid in [k for k, v in self.camera_indices.items() if v == "browser"]:
if cid in self.browser_annotated_frames:
results[cid] = self.browser_annotated_frames[cid]
return results, new_alerts
def get_face_results(self) -> dict[str, list[dict]]:
return dict(self.face_results)
def search_missing_person(self, query_frame: np.ndarray) -> dict:
fe = self.face_engine
if hasattr(fe, "search_missing_person") and not hasattr(fe, "match_all_faces"):
with self.lock:
live_frames = {k: v for k, v in self.latest_raw_frames.items() if v is not None}
return fe.search_missing_person(query_frame, live_frames)
if hasattr(fe, "ensure_db"):
fe.ensure_db()
db_match = dict(fe.match_frame(query_frame))
if db_match.get("found"):
db_match.setdefault("search_mode", "database")
return db_match
try:
import face_live_search
live = face_live_search.search_query_in_live_feeds(query_frame, fe, self)
if live.get("found"):
return live
except Exception as exc:
logger.debug("live search fallback skipped: %s", exc)
db_match.setdefault("search_mode", "database")
return db_match
# ── Legacy: base64 single-frame processing ────────────────────────────────
def process_frame(self, base64_string: str) -> tuple[int, str]:
try:
encoded = base64_string.split(",")[1] if "," in base64_string else base64_string
nparr = np.frombuffer(base64.b64decode(encoded), np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
count, annotated, _ = self._run_yolo(frame.copy()) if self.model else (0, frame, [])
if annotated is None:
return 0, base64_string
_, buf = cv2.imencode(".jpg", annotated, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
return count, f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}"
except Exception as e:
logger.error(f"process_frame error: {e}")
return 0, base64_string