from __future__ import annotations from fastapi import FastAPI, WebSocket, WebSocketDisconnect, File, UploadFile, Form, Depends, Header, HTTPException, status, Request, Query, Body from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from typing import List, Optional import datetime import uuid import logging import json import cv2 import base64 import asyncio import time import math import numpy as np import sys import os from concurrent.futures import ThreadPoolExecutor from vision_pipeline import ( LIVE_INFER_WIDTH, VisionFramePipeline, resize_for_infer, scale_match_bboxes, ) import vision_session # Triggering reload to load updated issues.json try: import httpx except Exception: # pragma: no cover - httpx is a hard dependency httpx = None from pathlib import Path from dotenv import load_dotenv # ── Add Face_Recognition/ to path so gossip_bridge can be imported _FR_PATH = os.path.join(os.path.dirname(__file__), "Face_Recognition") if _FR_PATH not in sys.path: sys.path.insert(0, _FR_PATH) import gossip_bridge import auth_service import store_locks from observability import RequestContextMiddleware from security_headers import SecurityHeadersMiddleware from rate_limiter import login_limiter, refresh_limiter, sos_limiter from agentic_service import generate_agentic_plan, generate_chat_response from alert_routing import route_alert import persistence as persist import security_config import face_metadata try: import face_live_search except ImportError as e: import traceback print(f"ERROR: Failed to import face_live_search: {e}") traceback.print_exc() face_live_search = None try: import agentic_orchestrator except ImportError as _orch_import_err: print(f"WARNING: agentic_orchestrator unavailable ({_orch_import_err}) — agent WS uses stub responses.") class _AgenticOrchestratorStub: @staticmethod def inject_context(**_kwargs): pass @staticmethod async def run_agent_stream(prompt: str, session_id: str = "default", **_kwargs): yield { "agent": "System", "content": "Agent runtime unavailable. Install google-adk and set GEMINI_API_KEY for live agent queries.", "step_type": "error", } agentic_orchestrator = _AgenticOrchestratorStub() load_dotenv(os.path.join(os.path.dirname(__file__), ".env"), override=True) if not os.getenv("GEMINI_API_KEY"): print("WARNING: GEMINI_API_KEY not found in environment!") else: print("INFO: GEMINI_API_KEY loaded successfully.") # Cloud flag: skips local camera broadcast loop; stub vision unless GPU vision is enabled. CEPHEUS_CLOUD = os.getenv("CEPHEUS_CLOUD", "").lower() in ("1", "true", "yes") from vision_runtime import ( detect_acceleration, get_warmload_state, live_status_payload, mark_warmload_complete, mark_warmload_failed, mark_warmload_started, register_face_ready_probe, use_full_vision_engine, ) if use_full_vision_engine(): from vision_engine import VisionEngine else: from vision_engine_cloud import VisionEngine # ── Persistent Storage ─────────────────────────────── DATA_DIR = os.path.join(os.path.dirname(__file__), "data") os.makedirs(DATA_DIR, exist_ok=True) DETECTIONS_FILE = os.path.join(DATA_DIR, "detections_history.json") ALERTS_FILE = os.path.join(DATA_DIR, "alerts.json") ISSUES_FILE = os.path.join(DATA_DIR, "issues.json") LOGS_FILE = os.path.join(DATA_DIR, "incident_logs.json") SOS_FILE = os.path.join(DATA_DIR, "sos_events.json") AGENT_STEPS_FILE = os.path.join(DATA_DIR, "agent_steps.json") STAFF_REQUESTS_FILE = os.path.join(DATA_DIR, "staff_requests.json") SITE_BLUEPRINT_FILE = os.path.join(DATA_DIR, "site_blueprint.json") SIGNAGE_PLACEMENTS_FILE = os.path.join(DATA_DIR, "signage_placements.json") SIGNAGE_STATE_FILE = os.path.join(DATA_DIR, "signage_state.json") AGENTIC_PLANS_FILE = os.path.join(DATA_DIR, "agentic_plans.json") TARGETED_DISPATCHES_FILE = os.path.join(DATA_DIR, "targeted_dispatches.json") DISPATCH_LOG_FILE = os.path.join(DATA_DIR, "emergency_dispatch_log.json") ALERTS_LOG_FILE = os.path.join(DATA_DIR, "alerts_broadcast_log.json") STAFF_ACTIVITY_FILE = os.path.join(DATA_DIR, "staff_activity.json") STAFF_DUTY_FILE = os.path.join(DATA_DIR, "staff_duty.json") STAFF_INBOX_ACKS_FILE = os.path.join(DATA_DIR, "staff_inbox_acks.json") SIGNAGE_RECORDS_FILE = os.path.join(DATA_DIR, "signage_records.json") GOSSIP_HISTORY_DIR = os.path.join(DATA_DIR, "gossip_history") BLUEPRINT_DIR = os.path.join(DATA_DIR, "uploads", "blueprints") _warmload_complete = False detections_history: List[dict] = [] alerts_db: List[dict] = [] issues_db: List[dict] = [] incident_logs: List[dict] = [] sos_events: List[dict] = [] agent_steps_history: List[dict] = [] staff_requests_db: List[dict] = [] site_blueprint: dict = {} signage_placements: dict = {} emergency_dispatch_log: List[dict] = [] alerts_broadcast_log: List[dict] = [] staff_activity: List[dict] = [] staff_duty: dict[str, dict] = {} staff_inbox_acks: List[dict] = [] signage_records: List[dict] = [] SIGNAGE_BROADCAST_TEMPLATES = { "FIRE_HERE": "FIRE ALERT: Fire reported at {location}. Evacuate the area immediately. Follow E-L markers to exits.", "DO_NOT_ENTER": "AREA CLOSED: {location} is blocked. Do not enter. Seek alternate route.", "SHELTER": "SHELTER IN PLACE: Report to shelter zone at {location}. Await further instructions.", "ALL_CLEAR": "ALL CLEAR: {location} is now safe. Normal operations may resume.", "LOCKDOWN": "LOCKDOWN INITIATED: Full lockdown active at {location}. Remain in place. Do not move until further notice.", "EL": "EMERGENCY EXIT: Follow E-L route at {location} to nearest exit.", } def load_all_data(): global detections_history, alerts_db, issues_db, incident_logs, sos_events global agent_steps_history, staff_requests_db, site_blueprint, signage_placements global signage_state, agentic_plans, targeted_dispatches global emergency_dispatch_log, alerts_broadcast_log, staff_activity, signage_records global staff_duty, staff_inbox_acks list_files = { DETECTIONS_FILE: "detections_history", ALERTS_FILE: "alerts_db", ISSUES_FILE: "issues_db", LOGS_FILE: "incident_logs", SOS_FILE: "sos_events", AGENT_STEPS_FILE: "agent_steps_history", STAFF_REQUESTS_FILE: "staff_requests_db", } dict_files = { SITE_BLUEPRINT_FILE: "site_blueprint", SIGNAGE_PLACEMENTS_FILE: "signage_placements", SIGNAGE_STATE_FILE: "signage_state", } list_defaults = { AGENTIC_PLANS_FILE: "agentic_plans", TARGETED_DISPATCHES_FILE: "targeted_dispatches", DISPATCH_LOG_FILE: "emergency_dispatch_log", ALERTS_LOG_FILE: "alerts_broadcast_log", STAFF_ACTIVITY_FILE: "staff_activity", STAFF_INBOX_ACKS_FILE: "staff_inbox_acks", SIGNAGE_RECORDS_FILE: "signage_records", } dict_list_defaults = { STAFF_DUTY_FILE: "staff_duty", } for path, var_name in list_files.items(): loaded = persist.load_json(path, []) if isinstance(loaded, list): globals()[var_name] = loaded logger.info("Loaded %s items from %s", len(loaded), os.path.basename(path)) detections_history[:] = [_normalize_detection_entry(d) for d in detections_history] for path, var_name in dict_files.items(): loaded = persist.load_json(path, {}) if isinstance(loaded, dict): globals()[var_name] = loaded logger.info("Loaded dict from %s", os.path.basename(path)) for path, var_name in list_defaults.items(): loaded = persist.load_json(path, []) if isinstance(loaded, list): globals()[var_name] = loaded for path, var_name in dict_list_defaults.items(): loaded = persist.load_json(path, {}) if isinstance(loaded, dict): globals()[var_name] = loaded if not signage_state: signage_state.update({ "s1": False, "s2": False, "s3": False, "s4": False, "s5": False, "s6": True, "s8": False, }) def save_json(path, data): persist.save_json(path, data) def save_detections(): save_json(DETECTIONS_FILE, detections_history[-100:]) def _normalize_detection_entry(raw: dict) -> dict: """Ensure persisted sightings use ISO seen_at (ISSUE-031).""" entry = dict(raw) if not entry.get("seen_at"): ts = entry.get("timestamp") if ts and "T" in str(ts): entry["seen_at"] = ts else: entry["seen_at"] = None if entry.get("seen_at") and not entry.get("timestamp"): try: entry["timestamp"] = datetime.datetime.fromisoformat( str(entry["seen_at"]).replace("Z", "+00:00") ).strftime("%H:%M:%S") except ValueError: entry["timestamp"] = str(entry["seen_at"]) return entry def _upsert_detection_sighting( name: str, *, confidence: float, cam_id: str, thumbnail: str | None = None, ) -> dict: """Record or refresh one person's sighting with ISO seen_at.""" seen_at = datetime.datetime.now(datetime.timezone.utc).isoformat() entry: dict = { "name": name, "confidence": round(float(confidence), 3), "camId": cam_id, "seen_at": seen_at, "timestamp": datetime.datetime.now().strftime("%H:%M:%S"), } if thumbnail is not None: entry["thumbnail"] = thumbnail idx = next((i for i, d in enumerate(detections_history) if d.get("name") == name), None) if idx is not None: detections_history[idx] = entry else: detections_history.append(entry) _trim_memory_list(detections_history, 500) return entry async def _ensure_staff_request_issue_id(alert: "Alert", extra_context: dict | None = None) -> str | None: """Require a real issue for staff dispatch — create one if missing (ISSUE-033).""" issue_id = alert.issue_id or alert.issue or (extra_context or {}).get("issue_id") if issue_id or alert.type != "staff_request": return issue_id new_issue = { "id": f"ISS-{uuid.uuid4().hex[:6].upper()}", "title": f"Staff request @ {alert.location or 'site'}", "desc": alert.message, "status": "ONGOING", "progress": 0, "priority": alert.severity or "medium", "timestamp": datetime.datetime.now().isoformat(), "staff_request_status": "pending", "staff_needed": 1, } async with store_locks.issues_lock: issues_db.insert(0, new_issue) save_issues() await manager.broadcast(json.dumps({"type": "issue_update", "issue": new_issue})) return new_issue["id"] def _issue_context_for_staff(issue_id: str | None) -> dict: """Snapshot issue fields for staff request detail views.""" if not issue_id: return {} for iss in issues_db: if iss.get("id") != issue_id: continue meta = iss.get("metadata") or {} attachments = list(meta.get("attachments") or []) if iss.get("image") and iss["image"] not in attachments: attachments.append(iss["image"]) return { "issue_title": iss.get("title") or "", "issue_priority": str(iss.get("priority") or "MEDIUM").upper(), "issue_status": iss.get("status") or "ONGOING", "issue_desc": iss.get("desc") or "", "instructions": meta.get("instructions") or (iss.get("desc") or "")[:800], "zone": meta.get("zone") or meta.get("location") or "", "lat": meta.get("lat") or iss.get("lat"), "lng": meta.get("lng") or iss.get("lng"), "linked_sos_id": meta.get("sos_id") or meta.get("linked_sos_id"), "attachments": attachments, } return {} def _enriched_staff_request(req: dict) -> dict: """Merge live issue context into a staff request for detail views.""" merged = dict(req) merged.update(_issue_context_for_staff(req.get("issue_id"))) return merged _STAFF_MILESTONE_PROGRESS = { "accepted": 25, "en_route": 60, "on_scene": 85, "resolved": 100, } async def _append_staff_activity_entry( name: str, zone: str, message: str, *, event_type: str = "staff_action", metadata: dict | None = None, ) -> dict: entry = { "id": f"act-{uuid.uuid4().hex[:8]}", "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), "name": name or "Staff", "zone": zone or "", "message": message or "", "event_type": event_type, "metadata": metadata or {}, } staff_activity.insert(0, entry) del staff_activity[100:] save_json(STAFF_ACTIVITY_FILE, staff_activity) await manager.broadcast(json.dumps({"type": "staff_activity", "entry": entry})) return entry _ISSUE_TERMINAL = frozenset({"RESOLVED", "CLOSED", "CANCELLED"}) def _validate_issue_patch(current: dict, patch: dict) -> None: """Guard invalid issue state transitions (ISSUE-041).""" cur_status = str(current.get("status", "ONGOING")).upper() new_status = patch.get("status") if new_status is not None: new_status_u = str(new_status).upper() if cur_status in _ISSUE_TERMINAL and new_status_u not in _ISSUE_TERMINAL: raise HTTPException( status_code=409, detail=f"Cannot reopen issue from terminal status {cur_status}", ) new_progress = patch.get("progress") if new_progress is not None: prog = int(new_progress) if prog >= 100 and new_status is None and cur_status not in _ISSUE_TERMINAL: patch["status"] = "RESOLVED" if prog < 100 and str(patch.get("status", cur_status)).upper() in _ISSUE_TERMINAL: raise HTTPException( status_code=409, detail="Cannot set progress below 100 on a resolved/closed issue", ) def save_alerts(): save_json(ALERTS_FILE, [a if isinstance(a, dict) else a.model_dump() for a in alerts_db[-200:]]) def save_issues(): save_json(ISSUES_FILE, issues_db[-100:]) def save_logs(): save_json(LOGS_FILE, incident_logs[-300:]) def save_sos(): save_json(SOS_FILE, sos_events[-100:]) def save_agent_steps(): save_json(AGENT_STEPS_FILE, agent_steps_history[-200:]) def save_staff_requests(): save_json(STAFF_REQUESTS_FILE, staff_requests_db[-200:]) def save_site_blueprint(): save_json(SITE_BLUEPRINT_FILE, site_blueprint) def save_signage_placements(): save_json(SIGNAGE_PLACEMENTS_FILE, signage_placements) def save_signage_state(): save_json(SIGNAGE_STATE_FILE, signage_state) def save_agentic_plans(): save_json(AGENTIC_PLANS_FILE, agentic_plans[-100:]) def save_targeted_dispatches(): save_json(TARGETED_DISPATCHES_FILE, targeted_dispatches[-200:]) def save_signage_records(): save_json(SIGNAGE_RECORDS_FILE, signage_records[-500:]) def _trim_memory_list(store: list, max_len: int) -> None: if len(store) > max_len: del store[max_len:] def _operator_id(principal: dict | None) -> str: return str((principal or {}).get("sub") or "operator") async def _append_agent_step(step: dict, user_id: str, session_id: str) -> None: record = { **step, "user_id": user_id, "session_id": session_id, "timestamp": datetime.datetime.now().isoformat(), } async with store_locks.agent_steps_lock: agent_steps_history.append(record) _trim_memory_list(agent_steps_history, 500) save_agent_steps() async def _clear_agent_steps(user_id: str, session_id: str | None = None) -> None: global agent_steps_history async with store_locks.agent_steps_lock: if session_id: agent_steps_history = [ s for s in agent_steps_history if not (s.get("user_id") == user_id and s.get("session_id") == session_id) ] else: agent_steps_history = [s for s in agent_steps_history if s.get("user_id") != user_id] save_agent_steps() _detections_dirty = False _last_detection_flush = datetime.datetime.min DETECTION_SAVE_INTERVAL_SECONDS = float(os.getenv("DETECTION_SAVE_INTERVAL_SECONDS", "2")) MAX_IMAGE_BYTES = int(os.getenv("MAX_IMAGE_UPLOAD_BYTES", str(5 * 1024 * 1024))) MAX_VIDEO_BYTES = int(os.getenv("MAX_VIDEO_UPLOAD_BYTES", str(50 * 1024 * 1024))) ALLOWED_VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm"} ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"} ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp"} API_KEY = security_config.resolve_api_key() DEMO_MODE = security_config.is_demo_mode() IS_PRODUCTION = security_config.is_production() # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig( level=logging.INFO, format="%(levelname)s: %(message)s", ) logger = logging.getLogger(__name__) def _task_done_callback(task: asyncio.Task) -> None: if task.cancelled(): return exc = task.exception() if exc is not None: logger.error("Unhandled error in background task: %s", exc, exc_info=exc) def _spawn(coro) -> asyncio.Task: task = asyncio.create_task(coro) task.add_done_callback(_task_done_callback) return task # --------------------------------------------------------------------------- # App # --------------------------------------------------------------------------- app = FastAPI( title="Crisis Communication System", docs_url=None if IS_PRODUCTION else "/docs", redoc_url=None if IS_PRODUCTION else "/redoc", openapi_url=None if IS_PRODUCTION else "/openapi.json", ) def _parse_origins() -> list[str]: origins_env = os.getenv("CORS_ORIGINS", "") if origins_env.strip(): return [o.strip() for o in origins_env.split(",") if o.strip()] return [ "https://community-security-and-emergency-ma.vercel.app", "https://community-security-and-emergency-ma-gamma.vercel.app", "https://community-security-manag-78489.web.app", "https://community-security-manag-78489.firebaseapp.com", "https://rapid-eec43.web.app", "https://rapid-eec43.firebaseapp.com", "http://localhost:5173", "http://localhost:5174", "http://127.0.0.1:5173", "http://127.0.0.1:5174", ] def _vision_warming_response() -> JSONResponse | None: """Return 503 while InsightFace/YOLO warmload is still running on cold start.""" if not use_full_vision_engine(): return None if _warmload_complete or get_warmload_state().get("complete"): return None return JSONResponse( status_code=503, content={"status": "warming_up", "message": "Vision model loading. Retry in 10s."}, ) def _api_key_valid(provided: str | None) -> bool: if not provided: return False if API_KEY and security_config.safe_compare_strings(provided, API_KEY): return True readonly = security_config.resolve_readonly_api_key() return bool(readonly and security_config.safe_compare_strings(provided, readonly)) def _api_key_role(provided: str | None) -> str: if provided and API_KEY and security_config.safe_compare_strings(provided, API_KEY): return os.getenv("CEPHEUS_API_KEY_ROLE", "service") readonly = security_config.resolve_readonly_api_key() if provided and readonly and security_config.safe_compare_strings(provided, readonly): return "readonly" guest = os.getenv("CEPHEUS_GUEST_API_KEY", "").strip() if provided and guest and security_config.safe_compare_strings(provided, guest): return "guest" return "service" def _enforce_readonly_writes(request: Request, principal: dict) -> None: if principal.get("role") == "readonly" and request.method not in ("GET", "HEAD", "OPTIONS"): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Readonly API key cannot perform write operations", ) def _guest_api_key_valid(provided: str | None) -> bool: if not provided: return False guest = os.getenv("CEPHEUS_GUEST_API_KEY", "").strip() if guest and security_config.safe_compare_strings(provided, guest): return True # Local dev fallback: primary API key with guest role scope if not security_config.is_production() and _api_key_valid(provided): return _api_key_role(provided) in ("guest", "service") return False def _demo_login_allowed(request: Request) -> bool: if security_config.is_production() or not DEMO_MODE: return False host = (request.client.host if request.client else "") or "" return host in ("127.0.0.1", "localhost", "::1") def require_api_key(x_api_key: str | None = Header(default=None, alias="X-API-Key")) -> None: if not API_KEY: raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Server API key not configured") if not _api_key_valid(x_api_key): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") def _resolve_bearer(authorization: str | None) -> dict | None: if not authorization or not authorization.lower().startswith("bearer "): return None token = authorization.split(" ", 1)[1].strip() if not token: return None try: payload = auth_service.decode_access_token(token) return {"auth": "jwt", "role": payload.get("role"), "sub": payload.get("sub")} except Exception: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired session") def public_vision_allowed() -> bool: """Anonymous access to vision/WS routes (HF Spaces — no JWT refresh interruptions).""" val = os.getenv("ALLOW_PUBLIC_VISION", os.getenv("CEPHEUS_PUBLIC_VISION", "")) return str(val).strip().lower() in ("1", "true", "yes", "on") PUBLIC_VISION_PRINCIPAL = {"auth": "public", "role": "admin", "sub": "public-vision"} def require_vision_access( request: Request, x_api_key: str | None = Header(default=None, alias="X-API-Key"), authorization: str | None = Header(default=None), ) -> dict: if public_vision_allowed(): return PUBLIC_VISION_PRINCIPAL return require_operator(request, x_api_key, authorization) def require_operator( request: Request, x_api_key: str | None = Header(default=None, alias="X-API-Key"), authorization: str | None = Header(default=None), ) -> dict: """API key or JWT for operator routes. CEPHEUS_API_KEY grants service-role automation (full operator API access). Optional CEPHEUS_READONLY_API_KEY maps to role ``readonly`` (GET-only routes). Set CEPHEUS_API_KEY_SCOPE to document intended key use for operators/audit. """ if not auth_service.auth_enabled(): require_api_key(x_api_key) principal = {"auth": "api_key", "role": _api_key_role(x_api_key)} _enforce_readonly_writes(request, principal) return principal bearer = _resolve_bearer(authorization) if bearer: _enforce_readonly_writes(request, bearer) return bearer if _api_key_valid(x_api_key): principal = {"auth": "api_key", "role": _api_key_role(x_api_key)} _enforce_readonly_writes(request, principal) return principal raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required") def require_operator_flexible( request: Request, x_api_key: str | None = Header(default=None, alias="X-API-Key"), authorization: str | None = Header(default=None), api_key: str | None = Query(default=None), token: str | None = Query(default=None), ) -> dict: """Like require_operator; dev-only query credentials for legacy img/file tags.""" q_api = request.query_params.get("api_key") q_token = request.query_params.get("token") if security_config.is_production() and (q_api or q_token or api_key or token): raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Query-string authentication is disabled in production", ) if token and auth_service.auth_enabled() and not security_config.is_production(): try: payload = auth_service.decode_access_token(token) return {"auth": "jwt", "role": payload.get("role"), "sub": payload.get("sub")} except Exception: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired session") return require_operator(request, x_api_key or api_key, authorization) def require_operator_if_auth_enabled( request: Request, x_api_key: str | None = Header(default=None, alias="X-API-Key"), authorization: str | None = Header(default=None), ) -> dict: """Require credentials when auth is on; allow anonymous access on public-vision HF.""" if public_vision_allowed(): try: return require_operator(request, x_api_key, authorization) except HTTPException as exc: if exc.status_code == status.HTTP_401_UNAUTHORIZED: return PUBLIC_VISION_PRINCIPAL raise return require_operator(request, x_api_key, authorization) def require_dashboard_read( request: Request, x_api_key: str | None = Header(default=None, alias="X-API-Key"), authorization: str | None = Header(default=None), ) -> dict: """SOS/staff read routes — anonymous on HF public vision (stale JWT must not 401).""" if public_vision_allowed(): return PUBLIC_VISION_PRINCIPAL principal = require_operator(request, x_api_key, authorization) if not auth_service.has_role(principal, "staff", "admin", "operator", "service"): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") return principal def require_role(*roles: str): def _dep(principal: dict = Depends(require_operator)) -> dict: if not auth_service.has_role(principal, *roles): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") return principal return _dep require_admin = require_role("admin") require_staff = require_role("staff", "admin", "operator") require_staff_read = require_role("staff", "admin", "operator", "service") def _audit_destructive(principal: dict, action: str, detail: str = "") -> None: """Log destructive operations for interim RBAC audit trail (ISSUE-182 mitigation).""" logger.warning( "DESTRUCTIVE action=%s user=%s role=%s detail=%s", action, principal.get("sub"), principal.get("role"), detail, ) def require_admin_audited(action: str): """Admin-only dependency with destructive-action audit logging.""" def _dep(principal: dict = Depends(require_admin)) -> dict: _audit_destructive(principal, action) return principal return _dep def _ws_auth_ok(websocket: WebSocket) -> bool: if public_vision_allowed(): return True if os.getenv("CEPHEUS_WS_OPEN", "").strip() == "1": return True if not auth_service.auth_enabled() and not API_KEY: if not security_config.is_production(): return True return False ticket = websocket.query_params.get("ticket") if ticket and auth_service.decode_ws_ticket(ticket): return True hdr_key = websocket.headers.get("x-api-key") if hdr_key and _api_key_valid(hdr_key): return True api_key = websocket.query_params.get("api_key") if api_key and _api_key_valid(api_key): return True if security_config.is_production(): return False token = websocket.query_params.get("token") if token and auth_service.auth_enabled() and auth_service.decode_ws_token(token): return True return False def _ws_principal(websocket: WebSocket) -> dict: if public_vision_allowed(): return PUBLIC_VISION_PRINCIPAL ticket = websocket.query_params.get("ticket") if ticket: decoded = auth_service.decode_ws_ticket(ticket) if decoded: return {"auth": "jwt", "role": decoded.get("role"), "sub": decoded.get("sub")} if not IS_PRODUCTION: token = websocket.query_params.get("token") if token and auth_service.auth_enabled(): ws_decoded = auth_service.decode_ws_token(token) if ws_decoded: return {"auth": "jwt", "role": ws_decoded.get("role"), "sub": ws_decoded.get("sub")} try: payload = auth_service.decode_access_token(token) return {"auth": "jwt", "role": payload.get("role"), "sub": payload.get("sub")} except Exception: pass hdr_key = websocket.headers.get("x-api-key") if hdr_key and _api_key_valid(hdr_key): return {"auth": "api_key", "role": _api_key_role(hdr_key), "sub": "api_key"} api_key = websocket.query_params.get("api_key") if api_key and _api_key_valid(api_key): return {"auth": "api_key", "role": _api_key_role(api_key), "sub": "api_key"} return {"auth": "open", "role": "operator", "sub": "operator"} def _ws_can_mutate(principal: dict) -> bool: """Signage/camera WS mutations require operator-level roles (not guest/readonly).""" if public_vision_allowed(): return True role = principal.get("role") if role in ("readonly", "guest"): return False if role in ("admin", "operator", "staff", "service"): return True return principal.get("auth") != "open" def _read_limited_bytes(data: bytes, max_bytes: int, kind: str) -> bytes: if len(data) > max_bytes: raise HTTPException( status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, detail=f"{kind} exceeds maximum allowed size", ) return data _UPLOADS_PATH = os.path.join(DATA_DIR, "uploads") def _safe_upload_path(original_name: str) -> tuple[str, str]: suffix = Path(original_name).suffix.lower() if suffix not in ALLOWED_VIDEO_EXTENSIONS: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported video format") safe_name = f"{uuid.uuid4().hex}{suffix}" upload_dir = _UPLOADS_PATH os.makedirs(upload_dir, exist_ok=True) return upload_dir, os.path.join(upload_dir, safe_name) def _safe_image_upload_path(original_name: str) -> tuple[str, str]: suffix = Path(original_name).suffix.lower() or ".jpg" if suffix == ".jpeg": suffix = ".jpg" if suffix not in ALLOWED_IMAGE_EXTENSIONS: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image format") safe_name = f"{uuid.uuid4().hex}{suffix}" upload_dir = _UPLOADS_PATH os.makedirs(upload_dir, exist_ok=True) return upload_dir, os.path.join(upload_dir, safe_name) from fastapi.responses import FileResponse app.add_middleware(SecurityHeadersMiddleware) app.add_middleware(RequestContextMiddleware) app.add_middleware( CORSMiddleware, allow_origins=_parse_origins(), allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Sensitive file directories — served only via authenticated proxy routes (not public mounts) _FACE_DB_PATH = os.path.join(_FR_PATH, "face_database") _TEMP_UNKNOWN_IMG_ROOT = os.path.join(_FR_PATH, "temp_unknown_faces") os.makedirs(_FACE_DB_PATH, exist_ok=True) os.makedirs(_UPLOADS_PATH, exist_ok=True) os.makedirs(BLUEPRINT_DIR, exist_ok=True) os.makedirs(GOSSIP_HISTORY_DIR, exist_ok=True) vision_engine = VisionEngine() def _face_engine_usable() -> bool: fe = getattr(vision_engine, "face_engine", None) if fe is None or getattr(fe, "app", None) is None: return False return True register_face_ready_probe(_face_engine_usable) # InsightFace loads at import — mark ready immediately so /health/live never blocks clients. if _face_engine_usable(): mark_warmload_complete({"insightface": True, "early_ready": True}) _warmload_complete = True logger.info("Face engine ready at startup (InsightFace loaded, enrolled DB pending warmload).") # Parallel face inference — single worker: InsightFace ONNX is not thread-safe on CPU. _FACE_WORKERS = 1 os.environ["OMP_NUM_THREADS"] = "4" _FACE_EXECUTOR = ThreadPoolExecutor(max_workers=_FACE_WORKERS, thread_name_prefix="cepheus-face") _CAMERA_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="cepheus-camera") _vision_pipeline: VisionFramePipeline | None = None def _infer_matches_sync(cam_id: str, frame) -> list: fe = vision_engine.face_engine _ensure_face_db(fe) if hasattr(fe, "match_all_faces"): from Face_Recognition.face_matcher import scaled_min_bbox_area return fe.match_all_faces( frame, allow_near_match=False, min_bbox_area=scaled_min_bbox_area(frame), ) return [] def _infer_live_matches_sync(cam_id: str, frame) -> list: """Fresh live scan — strict enrolled threshold; unknown faces become unknown_N.""" fe = vision_engine.face_engine _ensure_face_db(fe) if hasattr(fe, "match_all_faces"): from Face_Recognition.face_matcher import scaled_min_bbox_area return fe.match_all_faces( frame, allow_near_match=False, min_det_score=0.30, min_bbox_area=scaled_min_bbox_area(frame, floor=180), ) return [] async def _run_face_work(fn, *args, timeout: float = 20.0): loop = asyncio.get_event_loop() try: return await asyncio.wait_for( loop.run_in_executor(_FACE_EXECUTOR, fn, *args), timeout=timeout, ) except asyncio.TimeoutError: logger.error( "Face inference timed out after %.0fs (%s)", timeout, getattr(fn, "__name__", repr(fn)), ) return [] def _get_vision_pipeline() -> VisionFramePipeline: global _vision_pipeline if _vision_pipeline is None: _vision_pipeline = VisionFramePipeline(_infer_matches_sync, async_runner=_run_face_work) return _vision_pipeline def _ensure_face_db(fe) -> None: """Reload embeddings only when the on-disk store changed (not every frame).""" if fe is None: return if hasattr(fe, "ensure_db"): fe.ensure_db() elif hasattr(fe, "reload_db"): fe.reload_db() def _invalidate_face_db(fe) -> None: if fe is None: return if hasattr(fe, "invalidate_db"): fe.invalidate_db() elif hasattr(fe, "reload_db"): fe.reload_db() # --------------------------------------------------------------------------- # Data models # --------------------------------------------------------------------------- class Alert(BaseModel): id: str = Field(default_factory=lambda: str(uuid.uuid4())) type: str location: str = "" message: str severity: str = "medium" timestamp: str = Field(default_factory=lambda: datetime.datetime.now().isoformat()) recipient: Optional[str] = None sender: Optional[str] = None issue_id: Optional[str] = None issue: Optional[str] = None attachment_url: Optional[str] = None lat: Optional[float] = None lng: Optional[float] = None class IssueCreate(BaseModel): title: Optional[str] = Field(default=None, max_length=200) desc: str = Field(..., max_length=8000) status: Optional[str] = Field(default="ONGOING", max_length=32) priority: Optional[str] = Field(default="medium", max_length=32) staff: Optional[int] = Field(default=1, ge=0, le=100) room: Optional[str] = Field(default=None, max_length=120) class IssuePatch(BaseModel): status: Optional[str] = Field(default=None, max_length=32) progress: Optional[int] = Field(default=None, ge=0, le=100) desc: Optional[str] = Field(default=None, max_length=8000) staff_request_status: Optional[str] = Field(default=None, max_length=32) class SOSPayload(BaseModel): guest_id: Optional[str] = None lat: float lng: float location_label: Optional[str] = "Unknown" message: Optional[str] = "SOS Activated" class GossipTrackingStart(BaseModel): staffId: Optional[str] = None personName: Optional[str] = None cause: Optional[str] = None doc_names: Optional[List[str]] = None doc_urls: Optional[List[str]] = None class LoginPayload(BaseModel): username: str password: str class RefreshPayload(BaseModel): refresh_token: str class LogoutPayload(BaseModel): refresh_token: Optional[str] = None class TrackingResetPayload(BaseModel): broadcast: bool = False session_id: Optional[str] = None clear_agent_steps: bool = True full_reset: bool = False class ChatPayload(BaseModel): prompt: str # --------------------------------------------------------------------------- # In-memory stores # --------------------------------------------------------------------------- # signage state: id → active signage_state: dict[str, bool] = { "s1": False, "s2": False, "s3": False, "s4": False, "s5": False, "s6": True, "s8": False, } agentic_plans: list[dict] = [] targeted_dispatches: list[dict] = [] user_profile: dict = { "name": os.getenv("USER_PROFILE_NAME", "Pranav Kumar"), "role": os.getenv("USER_PROFILE_ROLE", "Command Operator"), "age": int(os.getenv("USER_PROFILE_AGE", "27")), "department": os.getenv("USER_PROFILE_DEPARTMENT", "Crisis Response"), "shift": os.getenv("USER_PROFILE_SHIFT", "Night"), } def normalize_files_url(path: str | None) -> str | None: if not path: return None if path.startswith("/files/"): return path if path.startswith("/uploads/"): return f"/files{path}" return path async def _broadcast_signage_state(sign_id: str, active: bool) -> dict: async with store_locks.signage_lock: if sign_id not in signage_state: signage_state[sign_id] = False signage_state[sign_id] = active save_signage_state() payload = json.dumps({ "type": "signage_update", "id": sign_id, "active": active, "all": signage_state, }) await manager.broadcast(payload) return {"id": sign_id, "active": active} def _sanitize_person_name(name: str) -> str: import re cleaned = name.strip().replace(" ", "_") if not re.fullmatch(r"[A-Za-z0-9_-]{1,64}", cleaned): raise HTTPException(status_code=400, detail="Name must be 1-64 alphanumeric characters, spaces, hyphens, or underscores") return cleaned def _reverse_geocode_label(lat: float, lng: float) -> str: api_key = os.getenv("GOOGLE_MAPS_API_KEY", "").strip() if not api_key: return f"Lat: {lat:.4f}, Lng: {lng:.4f}" try: import requests resp = requests.get( "https://maps.googleapis.com/maps/api/geocode/json", params={"latlng": f"{lat},{lng}", "key": api_key}, timeout=5, ) data = resp.json() results = data.get("results") or [] if results: return str(results[0].get("formatted_address", ""))[:120] or f"Lat: {lat:.4f}, Lng: {lng:.4f}" except Exception: pass return f"Lat: {lat:.4f}, Lng: {lng:.4f}" def _build_contact_network(hours: float | None = None) -> dict: """Aggregate gossip co-presence into contact-network nodes/edges.""" raw = gossip_bridge.get_gossip_json() detection_by_name = {d.get("name"): d for d in detections_history if d.get("name")} cutoff = None if hours is not None: cutoff = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=hours) meet_counts: dict[str, int] = {} all_people: set[str] = set() for link in raw.get("links") or []: if link.get("source"): all_people.add(link["source"]) if link.get("target"): all_people.add(link["target"]) # Include people currently on camera even if no edges yet import re as _re fr = getattr(vision_engine, "face_results", None) or {} for faces in fr.values(): for f in faces or []: name = f.get("name") if isinstance(f, dict) else None if not name: continue lowered = name.lower() # Allow unknown_N (system-assigned persistent IDs), reject bare "Unknown" if lowered in ("unknown", "unidentified", ""): continue all_people.add(name) nodes = [] for person in sorted(p for p in all_people if p): det = detection_by_name.get(person) or {} meet_counts[person] = sum( 1 for lnk in (raw.get("links") or []) if person in (lnk.get("source"), lnk.get("target")) ) nodes.append({ "id": person, "name": person, "role": "Staff", "photoUrl": normalize_files_url(f"/files/face/{person}/{person}.jpg"), "detectionCount": meet_counts.get(person, 0) + (1 if det else 0), "lastSeen": det.get("seen_at"), "lastLocation": det.get("camId") or "", }) edges = [] for link in raw.get("links") or []: pa, pb = link.get("source"), link.get("target") interactions = link.get("interactions") or [] filtered = interactions if cutoff: filtered = [ i for i in interactions if i.get("timestamp") and datetime.datetime.fromisoformat( str(i["timestamp"]).replace("Z", "+00:00") ) >= cutoff ] if cutoff and not filtered: continue use = filtered or interactions last_ts = max((i.get("timestamp") or "") for i in use) if use else "" locations = sorted({i.get("camera") or i.get("cam_id") or "Unknown" for i in use}) edges.append({ "source": pa, "target": pb, "meetCount": len(use) or link.get("strength", 1), "lastMet": last_ts, "locations": locations, }) return {"nodes": nodes, "edges": edges, "updatedAt": datetime.datetime.now(datetime.timezone.utc).isoformat()} async def _broadcast_signage_alert(sign_type: str, lat: float, lng: float, action: str = "deployed"): location = await asyncio.get_event_loop().run_in_executor(None, _reverse_geocode_label, lat, lng) if action == "removed": message = f"SIGNAGE REMOVED: {sign_type.replace('_', ' ')} at {location} has been cleared." else: template = SIGNAGE_BROADCAST_TEMPLATES.get(sign_type, "NEW SIGNAGE DEPLOYED: {type} at {location}.") message = template.format(location=location, type=sign_type.replace("_", " ")) alert = Alert( id=f"signage-{uuid.uuid4().hex[:8]}", type="broadcast", location=location, message=message, severity="high", timestamp=datetime.datetime.now().isoformat(), ) await _process_new_alert(alert, source="SIGNAGE") entry = { "id": f"bc-{uuid.uuid4().hex[:8]}", "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), "message": message, "recipients": ["All Staff & Units"], "relatedIssue": None, "sentBy": "signage-system", "deliveryCount": 1, } alerts_broadcast_log.insert(0, entry) del alerts_broadcast_log[100:] save_json(ALERTS_LOG_FILE, alerts_broadcast_log) return message async def _append_incident_log(level: str, source: str, event_type: str, message: str, metadata: dict | None = None): log_entry = { "type": "incident_log_entry", "ts": datetime.datetime.now().strftime("%H:%M:%S"), "level": level, "source": source, "event_type": event_type, "msg": message, "metadata": metadata or {}, } incident_logs.insert(0, log_entry) del incident_logs[300:] save_logs() await manager.broadcast(json.dumps(log_entry)) def _count_live_faces() -> int: fr = getattr(vision_engine, "face_results", None) or {} total = 0 for faces in fr.values(): total += len([ f for f in (faces or []) if str(f.get("name", "")).lower() not in ("unknown", "unidentified", "none", "") ]) return total def _strip_face_embeddings(face_results: dict | None) -> dict: """Return face results suitable for API/WS clients without raw embeddings.""" sanitized: dict = {} for cam_id, faces in (face_results or {}).items(): sanitized[cam_id] = [ {k: v for k, v in (face or {}).items() if k != "embedding"} for face in (faces or []) ] return sanitized def _normalize_dispatch_event( *, dispatch_id: str | None = None, alert_id: str | None = None, issue_id: str | None = None, recipient: str | None = None, recipients: list | None = None, message: str | None = None, location: str | None = None, severity: str | None = None, alert_type: str | None = None, status: str = "sent", timestamp: str | None = None, simulation: bool = False, routing_demo: bool = False, ) -> dict: """Single schema for targeted_alert_dispatch WS events and /dispatches storage.""" ts = timestamp or datetime.datetime.now().isoformat() rec_list = list(recipients) if recipients else [] rec = recipient or (", ".join(str(r) for r in rec_list[:5]) if rec_list else "Staff") if not rec_list and rec: rec_list = [rec] msg = message or (f"Dispatch to {rec}" + (f" @ {location}" if location else "")) return { "type": "targeted_alert_dispatch", "dispatch_id": dispatch_id or str(uuid.uuid4()), "timestamp": ts, "created_at": ts, "alert_id": alert_id, "issue_id": issue_id, "recipient": rec, "recipients": rec_list, "message": msg, "location": location or "", "severity": severity or "medium", "alert_type": alert_type, "status": status, "simulation": bool(simulation), "routing_demo": bool(routing_demo), } def _build_incident_payload(alert: Alert, extra_context: dict | None = None) -> dict: room_counts = {room["label"]: room.get("occupancy", 0) for room in vision_engine.room_stats.values()} if hasattr(vision_engine, "room_stats") else {} # Include recent alerts for context recent_alerts = [a if isinstance(a, dict) else a.model_dump() for a in alerts_db[-5:]] if alerts_db else [] crowd = getattr(vision_engine, "last_crowd_count", None) if crowd is None: crowd = getattr(vision_engine, "last_total_count", 0) context = { "alert_id": alert.id, "type": alert.type, "severity": alert.severity, "location": alert.location, "message": alert.message, "lat": alert.lat, "lng": alert.lng, "crowd_count": crowd, "live_face_count": _count_live_faces(), "room_counts": room_counts, "recent_alerts": recent_alerts, "timestamp": alert.timestamp, } if extra_context: context.update(extra_context) return context async def _process_new_alert(alert: Alert, source: str = "SYSTEM", extra_context: dict | None = None, *, sos_mode: bool = False): async with store_locks.alerts_lock: alerts_db.append(alert) _trim_memory_list(alerts_db, 200) save_alerts() logger.info(f"Alert: {alert.type} @ {alert.location}") if not sos_mode: await manager.broadcast(json.dumps({"type": "alert_record", "alert": alert.model_dump()})) log_entry = { "type": "log_entry", "ts": datetime.datetime.now().strftime("%H:%M:%S"), "level": "CRIT" if alert.severity == "critical" else "WARN", "source": source, "event_type": alert.type, "msg": alert.message, "metadata": {"severity": alert.severity}, } await manager.broadcast(json.dumps(log_entry)) if alert.type == "chat" or sos_mode: return if alert.type in ("direct_message", "staff_request", "broadcast"): recipient = alert.recipient or (extra_context or {}).get("recipient") or alert.location or "Staff" issue_id = await _ensure_staff_request_issue_id(alert, extra_context) if alert.type == "staff_request" and not issue_id: logger.error("Staff request dispatch blocked: could not resolve issue_id") return dispatch_event = _normalize_dispatch_event( issue_id=issue_id, recipient=recipient, message=alert.message, location=alert.location, severity=alert.severity, alert_type=alert.type, alert_id=alert.id, ) targeted_dispatches.insert(0, dispatch_event) _trim_memory_list(targeted_dispatches, 200) save_targeted_dispatches() if alert.type == "broadcast": bc_entry = { "id": alert.id, "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), "message": alert.message, "recipients": [recipient] if recipient else ["All Staff & Units"], "relatedIssue": issue_id, "sentBy": source or "command", "deliveryCount": 1, } alerts_broadcast_log.insert(0, bc_entry) del alerts_broadcast_log[100:] save_json(ALERTS_LOG_FILE, alerts_broadcast_log) await manager.broadcast(json.dumps(dispatch_event)) if alert.type == "staff_request": staff_req = { "id": f"SR-{uuid.uuid4().hex[:6].upper()}", "issue_id": issue_id, "location": alert.location, "message": alert.message, "status": "pending", "created_at": datetime.datetime.now().isoformat(), "assignments": [], "progress": 0, "severity": alert.severity or "medium", "alert_id": alert.id, **_issue_context_for_staff(issue_id), } staff_requests_db.insert(0, staff_req) save_staff_requests() await manager.broadcast(json.dumps({"type": "staff_request_created", "request": staff_req})) if issue_id: for iss in issues_db: if iss.get("id") == issue_id: iss["staff_request_status"] = "pending" iss.setdefault("staff_needed", iss.get("staff", 1)) save_issues() await manager.broadcast(json.dumps({"type": "issue_update", "issue": iss})) break if alert.type in ("staff_request", "broadcast"): incident_payload = _build_incident_payload(alert, extra_context) plan = generate_agentic_plan(incident_payload) plan_event = { "type": "agentic_plan_update", "plan_id": str(uuid.uuid4()), "created_at": datetime.datetime.now().isoformat(), "incident": incident_payload, "plan": plan, } agentic_plans.insert(0, plan_event) _trim_memory_list(agentic_plans, 100) save_agentic_plans() await manager.broadcast(json.dumps(plan_event)) return incident_payload = _build_incident_payload(alert, extra_context) plan = generate_agentic_plan(incident_payload) plan_event = { "type": "agentic_plan_update", "plan_id": str(uuid.uuid4()), "created_at": datetime.datetime.now().isoformat(), "incident": incident_payload, "plan": plan, } agentic_plans.insert(0, plan_event) _trim_memory_list(agentic_plans, 100) save_agentic_plans() await manager.broadcast(json.dumps(plan_event)) recipients = route_alert(incident_payload) dispatch_event = _normalize_dispatch_event( alert_id=alert.id, issue_id=alert.issue_id or alert.issue, location=alert.location, severity=alert.severity, recipients=recipients, message=alert.message, alert_type=alert.type, routing_demo=True, ) targeted_dispatches.insert(0, dispatch_event) _trim_memory_list(targeted_dispatches, 200) save_targeted_dispatches() await manager.broadcast(json.dumps(dispatch_event)) await _append_incident_log( "CRIT" if alert.severity == "critical" else "WARN", source, alert.type, f"Agentic plan generated and dispatched to {len(recipients)} recipients.", {"alert_id": alert.id, "plan_id": plan_event["plan_id"], "dispatch_id": dispatch_event["dispatch_id"]}, ) # --------------------------------------------------------------------------- # WebSocket Connection Manager # --------------------------------------------------------------------------- class ConnectionManager: def __init__(self): self.active_connections: List[WebSocket] = [] self.agent_connections: List[WebSocket] = [] self._locks: dict[WebSocket, asyncio.Lock] = {} async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) self._locks[websocket] = asyncio.Lock() logger.info(f"Client connected. Active: {len(self.active_connections)}") async def connect_agent(self, websocket: WebSocket): await websocket.accept() self.agent_connections.append(websocket) self._locks[websocket] = asyncio.Lock() logger.info(f"Agent Client connected. Active: {len(self.agent_connections)}") def disconnect(self, websocket: WebSocket): if websocket in self.active_connections: self.active_connections.remove(websocket) if websocket in self.agent_connections: self.agent_connections.remove(websocket) if websocket in self._locks: del self._locks[websocket] logger.info(f"Client disconnected. Active: {len(self.active_connections)}, Agents: {len(self.agent_connections)}") async def send_personal_message(self, message: str, websocket: WebSocket): try: if websocket in self._locks: async with self._locks[websocket]: await asyncio.wait_for(websocket.send_text(message), timeout=2.0) else: await asyncio.wait_for(websocket.send_text(message), timeout=2.0) except Exception: pass async def broadcast(self, message: str): dead = [] for ws in list(self.active_connections): try: if ws in self._locks: async with self._locks[ws]: await asyncio.wait_for(ws.send_text(message), timeout=2.0) else: await asyncio.wait_for(ws.send_text(message), timeout=2.0) except Exception: dead.append(ws) for ws in dead: self.disconnect(ws) async def broadcast_to_agents(self, message: str): dead = [] for ws in list(self.agent_connections): try: if ws in self._locks: async with self._locks[ws]: await asyncio.wait_for(ws.send_text(message), timeout=2.0) else: await asyncio.wait_for(ws.send_text(message), timeout=2.0) except Exception: dead.append(ws) for ws in dead: self.disconnect(ws) manager = ConnectionManager() # --------------------------------------------------------------------------- # REST Endpoints # --------------------------------------------------------------------------- @app.get("/") async def root(): """Base URL: browsers often open this first; the API lives on named paths, not here.""" return { "service": "Cepheus API", "status": "running", "liveness": "/health", "websocket": "/ws", } @app.get("/health") async def health(): if IS_PRODUCTION: return {"status": "ok"} return { "status": "ok", "active_ws": len(manager.active_connections), "cameras": list(vision_engine.camera_indices.keys()), "auth_enabled": auth_service.auth_enabled(), "refresh_store": auth_service.refresh_store_backend(), } @app.get("/health/live") async def health_live(): """Lightweight liveness probe — no auth, no ML warmload side effects.""" try: return live_status_payload() except Exception as exc: logger.warning("health/live failed: %s", exc) return {"status": "warming", "message": "recovering"} @app.get("/debug/face_status") async def debug_face_status(): """Diagnose face engine state on deployed instances (no auth).""" fe = getattr(vision_engine, "face_engine", None) db = getattr(fe, "db", None) or {} cache = getattr(fe, "_unknown_cache", None) or {} try: return { "warmload_complete": bool(_warmload_complete or get_warmload_state().get("complete")), "model_pack": os.environ.get("FACE_MODEL_PACK", "buffalo_sc"), "model_loaded": fe is not None and getattr(fe, "app", None) is not None, "enrolled_faces": len([k for k in db if not str(k).startswith("unknown_")]), "unknown_cache_size": len(cache), "threshold": float(os.environ.get("FACE_MATCH_THRESHOLD", "0.22")), } except Exception as exc: return { "error": str(exc), "warmload_complete": bool(_warmload_complete or get_warmload_state().get("complete")), } @app.get("/health/ready") async def health_ready(): if os.getenv("CEPHEUS_PRODUCTION", "").strip() == "1": if not os.getenv("CEPHEUS_JWT_SECRET", "").strip(): raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="JWT secret not configured") if os.getenv("CEPHEUS_AUTH_DEV_MODE", "").strip() == "1": raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Dev auth disabled in production") return {"status": "ready", "auth_enabled": auth_service.auth_enabled()} @app.get("/auth/status") async def auth_status(): try: mode = "demo" if auth_service.auth_enabled(): mode = "production" if os.getenv("CEPHEUS_JWT_SECRET", "").strip() else "dev" return {"auth_enabled": auth_service.auth_enabled(), "mode": mode} except Exception as exc: logger.warning("auth/status failed: %s", exc) return {"auth_enabled": False, "mode": "demo", "error": "status_unavailable"} @app.post("/auth/login") async def auth_login(payload: LoginPayload, request: Request): login_limiter.check(request, "login") if not auth_service.auth_enabled(): if not _demo_login_allowed(request): raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Authentication not configured", ) role = "admin" if (payload.username or "").lower() in ("admin", "command") else "staff" return { "mode": "demo", "token_type": "demo", "user": {"username": payload.username or "operator", "role": role}, } try: user = auth_service.verify_user(payload.username, payload.password) except RuntimeError as exc: logger.error("Auth misconfiguration: %s", exc) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Authentication misconfigured", ) from exc if not user: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") role = user.get("role") or "admin" # HF public vision uses tokenless demo sessions for the admin dashboard, but staff # portal accept/complete routes require a real JWT (auth == "jwt"). if public_vision_allowed(): if role in ("staff", "admin"): try: return auth_service.create_token_pair(user["username"], role) except RuntimeError as exc: logger.error("Auth misconfiguration: %s", exc) raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Authentication misconfigured", ) from exc return { "mode": "demo", "token_type": "demo", "user": {"username": user.get("username") or payload.username, "role": role}, } return auth_service.create_token_pair(user["username"], role) @app.post("/auth/refresh") async def auth_refresh(payload: RefreshPayload, request: Request): refresh_limiter.check(request, "refresh") if not auth_service.auth_enabled(): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Auth not enabled") if public_vision_allowed(): if not payload.refresh_token: return { "mode": "demo", "token_type": "demo", "access_token": "", "refresh_token": "", "expires_in": 86400, } try: return auth_service.refresh_access_token(payload.refresh_token) except Exception: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token") try: return auth_service.refresh_access_token(payload.refresh_token) except Exception: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token") @app.post("/auth/logout") async def auth_logout(payload: LogoutPayload): if payload.refresh_token: auth_service.revoke_refresh_token(payload.refresh_token) return {"status": "ok"} @app.post("/auth/ws-ticket") async def auth_ws_ticket(principal: dict = Depends(require_operator_if_auth_enabled)): """Issue a short-lived WebSocket ticket (preferred over api_key query param).""" if principal.get("auth") == "open": raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Auth not enabled") username = principal.get("sub") or "operator" role = principal.get("role") or "staff" ticket = auth_service.create_ws_ticket(username, role) return {"ticket": ticket, "expires_in": 60} @app.get("/files/face/{person}/{filename}") async def serve_face_image(person: str, filename: str, _: dict = Depends(require_operator_flexible)): safe_person = Path(person).name safe_name = Path(filename).name path = os.path.join(_FACE_DB_PATH, safe_person, safe_name) if not os.path.isfile(path) and safe_person.startswith("unknown_"): path = os.path.join(_TEMP_UNKNOWN_IMG_ROOT, safe_person, safe_name) if not os.path.isfile(path): raise HTTPException(status_code=404, detail="Image not found") return FileResponse(path) @app.get("/face_db/image/{name}") async def get_enrolled_face_image(name: str): import os, glob base = os.path.join(_FACE_DB_PATH, name) if not os.path.isdir(base): raise HTTPException(status_code=404, detail="Not found") exts = ["*.jpg", "*.jpeg", "*.png", "*.webp"] files = [] for ext in exts: files.extend(glob.glob(os.path.join(base, ext))) if not files: raise HTTPException(status_code=404, detail="No image") return FileResponse(files[0]) @app.post("/files/upload") async def upload_image_file(file: UploadFile = File(...), _: dict = Depends(require_operator)): """Upload an image; returns authenticated file URL path.""" if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=400, detail="Unsupported image type") contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") _, file_path = _safe_image_upload_path(file.filename or "upload.jpg") with open(file_path, "wb") as f: f.write(contents) rel = normalize_files_url(f"/uploads/{os.path.basename(file_path)}") return {"url": rel, "path": rel} @app.get("/files/uploads/{file_path:path}") async def serve_upload_file(file_path: str, _: dict = Depends(require_operator_if_auth_enabled)): base = os.path.abspath(_UPLOADS_PATH) target = os.path.abspath(os.path.join(_UPLOADS_PATH, file_path)) if not target.startswith(base + os.sep) and target != base: raise HTTPException(status_code=400, detail="Invalid path") if not os.path.isfile(target): raise HTTPException(status_code=404, detail="File not found") return FileResponse(target) @app.get("/auth/me") async def auth_me(principal: dict = Depends(require_operator_if_auth_enabled)): if principal.get("auth") == "public": return {"authenticated": True, "username": "public-vision", "role": principal.get("role", "admin")} if principal.get("auth") == "jwt": return { "authenticated": True, "username": principal.get("sub"), "role": principal.get("role"), } return {"authenticated": True, "role": principal.get("role", "service"), "auth": "api_key"} @app.get("/alerts", response_model=List[Alert]) async def get_alerts(_: dict = Depends(require_operator_if_auth_enabled)): return alerts_db @app.post("/alert", response_model=Alert) async def create_alert(alert: Alert, _: dict = Depends(require_operator_if_auth_enabled)): if any( (a.get("id") if isinstance(a, dict) else getattr(a, "id", None)) == alert.id for a in alerts_db ): return alert await _process_new_alert(alert, source=alert.location or "ALERT_API") return alert async def _process_sos_event(payload: SOSPayload, *, source: str = "GUEST_APP") -> dict: event = { "id": str(uuid.uuid4()), "guest_id": payload.guest_id or "unknown-guest", "lat": payload.lat, "lng": payload.lng, "location_label": payload.location_label, "message": payload.message, "timestamp": datetime.datetime.now().isoformat(), } async with store_locks.sos_lock: sos_events.append(event) _trim_memory_list(sos_events, 100) save_sos() alert = Alert( id=event["id"], type="sos", location=payload.location_label or f"{payload.lat:.4f},{payload.lng:.4f}", message=payload.message, severity="critical", lat=payload.lat, lng=payload.lng, ) await _process_new_alert(alert, source=source, extra_context={"guest_id": event["guest_id"]}, sos_mode=True) await manager.broadcast(json.dumps({"type": "sos_event", **event})) await _append_incident_log( "CRIT", source, "sos", f"SOS: {payload.message}", {"event_id": event["id"], "lat": payload.lat, "lng": payload.lng}, ) logger.info(f"SOS received: {event}") return {"status": "received", "event": event} @app.post("/sos") async def handle_sos(payload: SOSPayload, request: Request, _: dict = Depends(require_operator)): sos_limiter.check(request, "sos") return await _process_sos_event(payload, source="GUEST_APP") @app.post("/sos/guest") async def handle_guest_sos( payload: SOSPayload, request: Request, x_api_key: str | None = Header(default=None, alias="X-API-Key"), ): """Guest-scoped SOS — uses CEPHEUS_GUEST_API_KEY (or service key in local dev).""" if not _guest_api_key_valid(x_api_key): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid guest API key") sos_limiter.check(request, "sos_guest") return await _process_sos_event(payload, source="GUEST_APP") @app.get("/sos") async def get_sos_events(_: dict = Depends(require_dashboard_read)): return sos_events[-50:] def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float: radius = 6371.0 d_lat = math.radians(lat2 - lat1) d_lng = math.radians(lng2 - lng1) a = ( math.sin(d_lat / 2) ** 2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lng / 2) ** 2 ) return radius * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) _EMERGENCY_AMENITIES = { "hospital": {"label": "Hospitals", "icon": "H"}, "fire_station": {"label": "Fire Stn", "icon": "F"}, "police": {"label": "Police", "icon": "P"}, "ambulance": {"label": "Ambulance", "icon": "A"}, "emergency_supplies": {"label": "Supplies", "icon": "S"}, } _OSM_AMENITY_TO_SERVICE = { "hospital": "hospital", "fire_station": "fire_station", "police": "police", "ambulance_station": "ambulance", "clinic": "ambulance", "pharmacy": "emergency_supplies", "medical_supply": "emergency_supplies", } _NEARBY_CACHE_MAX = 64 _nearby_cache: dict[str, tuple[float, dict]] = {} async def _fetch_emergency_nearby_overpass(lat: float, lng: float, radius_m: int = 6000) -> dict: """OpenStreetMap Overpass lookup — shared by /emergency/nearby and Maps fallback.""" results: dict[str, list] = {k: [] for k in _EMERGENCY_AMENITIES} cache_key = f"{round(lat, 3)},{round(lng, 3)},{radius_m}" cached = _nearby_cache.get(cache_key) now = datetime.datetime.now().timestamp() if cached and now - cached[0] < 600: return cached[1] if httpx is None: return {"services": results, "source": "unavailable", "error": "httpx not installed"} query = f""" [out:json][timeout:15]; ( node["amenity"="hospital"](around:{radius_m},{lat},{lng}); node["amenity"="fire_station"](around:{radius_m},{lat},{lng}); node["amenity"="police"](around:{radius_m},{lat},{lng}); node["emergency"="ambulance_station"](around:{radius_m},{lat},{lng}); node["amenity"="clinic"]["emergency"="yes"](around:{radius_m},{lat},{lng}); node["amenity"="pharmacy"](around:{radius_m},{lat},{lng}); node["shop"="medical_supply"](around:{radius_m},{lat},{lng}); ); out body 120; """ mirrors = [ "https://overpass-api.de/api/interpreter", "https://overpass.kumi.systems/api/interpreter", "https://maps.mail.ru/osm/tools/overpass/api/interpreter", ] payload = None last_error = None import asyncio async with httpx.AsyncClient(timeout=30.0) as client: for url in mirrors: for attempt in range(2): try: resp = await client.post( url, data={"data": query}, headers={"User-Agent": "CepheusEmergencyConsole/1.0"}, ) resp.raise_for_status() payload = resp.json() break except Exception as exc: last_error = exc if attempt == 0: await asyncio.sleep(1.5) if payload is not None: break logger.warning("emergency_nearby: Overpass mirror %s failed: %s", url, last_error) if payload is None: logger.warning("All Overpass mirrors failed, falling back to synthetic mock data.") import random for key, meta in _EMERGENCY_AMENITIES.items(): for i in range(random.randint(1, 3)): dlat = (random.random() - 0.5) * 0.05 dlng = (random.random() - 0.5) * 0.05 elat, elng = lat + dlat, lng + dlng results[key].append({ "id": f"mock-{key}-{i}", "name": f"Local {meta['label']} {i+1}", "vicinity": f"{random.randint(100, 999)} Emergency Road", "phone": f"+1-555-01{random.randint(10,99)}", "lat": elat, "lng": elng, "icon": meta["icon"], "label": meta["label"], "type": key, "distKm": round(_haversine_km(lat, lng, elat, elng), 2), }) for key in results: results[key].sort(key=lambda x: x["distKm"]) results[key] = results[key][:5] return {"services": results, "source": "mock_fallback", "error": str(last_error)} for el in payload.get("elements", []): tags = el.get("tags", {}) amenity = tags.get("amenity") emergency_tag = tags.get("emergency") shop = tags.get("shop") service_key = _OSM_AMENITY_TO_SERVICE.get(amenity) if not service_key and emergency_tag == "ambulance_station": service_key = "ambulance" if not service_key and shop == "medical_supply": service_key = "emergency_supplies" meta = _EMERGENCY_AMENITIES.get(service_key) if service_key else None if not meta: continue elat, elng = el.get("lat"), el.get("lon") if elat is None or elng is None: continue results[service_key].append({ "id": str(el.get("id")), "name": tags.get("name") or meta["label"], "vicinity": tags.get("addr:full") or ", ".join(filter(None, [tags.get("addr:street"), tags.get("addr:city")])) or tags.get("operator", ""), "phone": tags.get("phone") or tags.get("contact:phone", ""), "lat": elat, "lng": elng, "icon": meta["icon"], "label": meta["label"], "type": service_key, "distKm": round(_haversine_km(lat, lng, elat, elng), 2), }) for key in results: results[key].sort(key=lambda x: x["distKm"]) results[key] = results[key][:5] response = {"services": results, "source": "overpass"} if len(_nearby_cache) >= _NEARBY_CACHE_MAX: oldest_key = min(_nearby_cache, key=lambda k: _nearby_cache[k][0]) _nearby_cache.pop(oldest_key, None) _nearby_cache[cache_key] = (now, response) return response @app.get("/emergency/nearby") async def emergency_nearby( lat: float, lng: float, radius_m: int = 6000, _: dict = Depends(require_operator_if_auth_enabled), ): """Nearest hospitals / fire stations / police via OpenStreetMap Overpass.""" try: return await _fetch_emergency_nearby_overpass(lat, lng, radius_m) except Exception as exc: logger.warning("emergency/nearby failed: %s", exc) empty = {k: [] for k in _EMERGENCY_AMENITIES} return {"services": empty, "source": "overpass", "error": str(exc)} @app.get("/emergency/dispatch-log") async def get_emergency_dispatch_log(_: dict = Depends(require_operator_if_auth_enabled)): try: return emergency_dispatch_log[-100:] except Exception as exc: logger.warning("emergency/dispatch-log failed: %s", exc) return [] # ── Google Maps emergency intelligence ──────────────────────────────────────── from emergency_maps_service import ( # noqa: E402 get_directions_with_traffic, get_hexagonal_coverage_data, maps_configured, maps_status_detail, ) @app.get("/maps/status") async def maps_status(_: dict = Depends(require_operator_if_auth_enabled)): return maps_status_detail() @app.get("/maps/directions") async def get_route( origin_lat: float, origin_lng: float, dest_lat: float, dest_lng: float, place_name: str = "", _: dict = Depends(require_operator), ): return get_directions_with_traffic(origin_lat, origin_lng, dest_lat, dest_lng, place_name) @app.get("/maps/hex-coverage") async def hex_coverage( lat: float, lng: float, radius_km: float = 5.0, _: dict = Depends(require_operator), ): return get_hexagonal_coverage_data(lat, lng, radius_km=radius_km) @app.post("/maps/dispatch-route") async def dispatch_route(body: dict, principal: dict = Depends(require_operator)): origin = body.get("origin", {}) dest = body.get("destination", {}) result = get_directions_with_traffic( origin.get("lat"), origin.get("lng"), dest.get("lat"), dest.get("lng"), dest.get("name", ""), ) _audit_destructive( principal, "DISPATCH_ROUTE", f"destination={dest.get('name')} eta={result.get('duration_traffic')}", ) return result @app.get("/incident/logs") async def get_incident_logs(_: dict = Depends(require_operator_if_auth_enabled)): return incident_logs @app.get("/agentic/plans") async def get_agentic_plans(_: dict = Depends(require_operator_if_auth_enabled)): return agentic_plans @app.get("/issues") async def get_issues(_: dict = Depends(require_operator_if_auth_enabled)): return issues_db @app.get("/issues/{issue_id}") async def get_issue_by_id(issue_id: str, _: dict = Depends(require_staff_read)): """Read-only issue snapshot for staff portal (linked request detail).""" for iss in issues_db: if iss.get("id") == issue_id: return iss raise HTTPException(status_code=404, detail="Issue not found") @app.get("/staff/requests") async def list_staff_requests(status: Optional[str] = None, _: dict = Depends(require_dashboard_read)): rows = staff_requests_db if status: rows = [r for r in staff_requests_db if r.get("status") == status] return [_enriched_staff_request(r) for r in rows] @app.post("/staff/requests/{request_id}/accept") async def accept_staff_request(request_id: str, payload: dict, principal: dict = Depends(require_staff)): if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): raise HTTPException(status_code=403, detail="Only staff or admin JWT accounts may accept requests") staff_name = (payload.get("staff_name") or principal.get("sub") or "").strip() or "Staff" staff_id = (payload.get("staff_id") or staff_name).strip() or staff_name async with store_locks.staff_requests_lock: req = next((r for r in staff_requests_db if r.get("id") == request_id), None) if not req: raise HTTPException(status_code=404, detail="Staff request not found") if req.get("status") not in ("pending", "sent"): raise HTTPException(status_code=409, detail=f"Request already {req.get('status')}") req["status"] = "accepted" req["accepted_at"] = datetime.datetime.now().isoformat() req["assignments"] = [{"staff_id": staff_id, "name": staff_name, "progress": 25, "stage": "accepted"}] req["progress"] = 25 save_staff_requests() issue_id = req.get("issue_id") if issue_id: await _broadcast_issue_update( issue_id, f"[UPDATE] {staff_name} accepted the staff request.", progress=25, staff_request_status="accepted", staff_assignments=req["assignments"], ) if DEMO_MODE: _spawn(simulate_issue_progress(issue_id, staff_name)) await _append_staff_activity_entry( staff_name, req.get("location") or "", f"Accepted staff request {request_id}", event_type="staff_accept", metadata={"request_id": request_id, "issue_id": issue_id}, ) await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req})) return req @app.post("/staff/requests/{request_id}/decline") async def decline_staff_request(request_id: str, payload: dict, principal: dict = Depends(require_staff)): if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): raise HTTPException(status_code=403, detail="Only staff or admin JWT accounts may decline requests") async with store_locks.staff_requests_lock: req = next((r for r in staff_requests_db if r.get("id") == request_id), None) if not req: raise HTTPException(status_code=404, detail="Staff request not found") if req.get("status") not in ("pending", "sent"): raise HTTPException(status_code=409, detail=f"Request already {req.get('status')}") req["status"] = "declined" req["declined_at"] = datetime.datetime.now().isoformat() req["decline_reason"] = payload.get("reason", "") save_staff_requests() issue_id = req.get("issue_id") if issue_id: await _broadcast_issue_update( issue_id, f"[UPDATE] Staff declined request{(': ' + req['decline_reason']) if req.get('decline_reason') else ''}.", staff_request_status="declined", ) await _append_staff_activity_entry( (principal.get("sub") or "Staff").strip() or "Staff", req.get("location") or "", f"Declined request {request_id}: {req.get('decline_reason') or 'no reason'}", event_type="staff_decline", metadata={"request_id": request_id, "issue_id": issue_id}, ) await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req})) return req async def _sync_staff_request_progress( issue_id: str, progress: int, stage: str, *, status: str | None = None, ): """Keep staff_requests_db in sync when issue progress changes.""" async with store_locks.staff_requests_lock: req = next( ( r for r in staff_requests_db if r.get("issue_id") == issue_id and r.get("status") in ("pending", "sent", "accepted") ), None, ) if not req: return None req["progress"] = progress if status: req["status"] = status if status == "resolved": req["resolved_at"] = datetime.datetime.now().isoformat() for assignment in req.get("assignments") or []: assignment["progress"] = progress assignment["stage"] = stage save_staff_requests() snapshot = dict(req) await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": snapshot})) return snapshot @app.post("/staff/requests/{request_id}/complete") async def complete_staff_request( request_id: str, payload: dict | None = None, principal: dict = Depends(require_staff), ): payload = payload or {} if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): raise HTTPException(status_code=403, detail="Only staff or admin JWT accounts may complete requests") staff_name = (principal.get("sub") or "").strip() or "Staff" notes = (payload.get("notes") or "").strip() async with store_locks.staff_requests_lock: req = next((r for r in staff_requests_db if r.get("id") == request_id), None) if not req: raise HTTPException(status_code=404, detail="Staff request not found") if req.get("status") != "accepted": raise HTTPException( status_code=409, detail=f"Request must be accepted before completion (current: {req.get('status')})", ) req["status"] = "resolved" req["progress"] = 100 req["resolved_at"] = datetime.datetime.now().isoformat() if notes: req["completion_notes"] = notes assignments = req.get("assignments") or [] if assignments: for assignment in assignments: assignment["progress"] = 100 assignment["stage"] = "resolved" else: req["assignments"] = [ {"staff_id": staff_name, "name": staff_name, "progress": 100, "stage": "resolved"} ] save_staff_requests() issue_id = req.get("issue_id") location = req.get("location") or "General" request_message = req.get("message") or "" if issue_id: completion_msg = f"[RESOLVED] {staff_name} marked staff request complete." if notes: completion_msg += f" Notes: {notes}" await _broadcast_issue_update( issue_id, completion_msg, progress=100, status="RESOLVED", staff_request_status="resolved", staff_assignments=req["assignments"], ) log_msg = f"{staff_name} completed staff request {request_id}" if issue_id: log_msg += f" for issue {issue_id}" if location: log_msg += f" ({location})" if notes: log_msg += f" — {notes}" await _append_incident_log( "INFO", "staff_portal", "staff_request_completed", log_msg, metadata={ "request_id": request_id, "issue_id": issue_id, "staff_name": staff_name, "location": location, "message": request_message, "notes": notes or None, }, ) await _append_staff_activity_entry( staff_name, location, log_msg, event_type="staff_complete", metadata={"request_id": request_id, "issue_id": issue_id, "notes": notes or None}, ) await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req})) return req @app.post("/staff/requests/{request_id}/progress") async def update_staff_request_progress( request_id: str, payload: dict, principal: dict = Depends(require_staff), ): """Report en route / on scene milestones back to command.""" if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): raise HTTPException(status_code=403, detail="Only staff JWT accounts may update progress") stage = (payload.get("stage") or "").strip().lower().replace(" ", "_") if stage not in ("en_route", "on_scene"): raise HTTPException(status_code=400, detail="stage must be en_route or on_scene") progress = _STAFF_MILESTONE_PROGRESS[stage] staff_name = (principal.get("sub") or "").strip() or "Staff" note = (payload.get("note") or "").strip() async with store_locks.staff_requests_lock: req = next((r for r in staff_requests_db if r.get("id") == request_id), None) if not req: raise HTTPException(status_code=404, detail="Staff request not found") if req.get("status") != "accepted": raise HTTPException(status_code=409, detail="Progress updates require an accepted request") req["progress"] = progress req["stage"] = stage for assignment in req.get("assignments") or []: assignment["progress"] = progress assignment["stage"] = stage save_staff_requests() issue_id = req.get("issue_id") location = req.get("location") or req.get("zone") or "site" label = "En route" if stage == "en_route" else "On scene" update_msg = f"[UPDATE] {staff_name} — {label}." if note: update_msg += f" Note: {note}" if issue_id: await _broadcast_issue_update( issue_id, update_msg, progress=progress, staff_request_status="accepted", staff_assignments=req.get("assignments"), ) await _append_staff_activity_entry( staff_name, location, update_msg.replace("[UPDATE] ", ""), event_type="staff_progress", metadata={"request_id": request_id, "issue_id": issue_id, "stage": stage}, ) await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req})) return req @app.get("/staff/inbox") async def get_staff_inbox(_: dict = Depends(require_staff_read)): """Live alerts, broadcasts, signage, and ack state for staff portal.""" active_signage = [ {"id": sid, "active": bool(active), "label": sid.upper()} for sid, active in (signage_state or {}).items() if active ] return { "alerts": alerts_broadcast_log[:40], "dispatches": targeted_dispatches[:40], "signage_active": active_signage, "acks": staff_inbox_acks[:100], } @app.post("/staff/inbox/ack") async def post_staff_inbox_ack(body: dict, principal: dict = Depends(require_staff)): if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): raise HTTPException(status_code=403, detail="Staff JWT required") staff_name = (principal.get("sub") or "").strip() or "Staff" ref_id = (body.get("ref_id") or body.get("alert_id") or body.get("dispatch_id") or "").strip() ref_type = (body.get("ref_type") or "broadcast").strip() message = (body.get("message") or body.get("title") or "Broadcast acknowledged").strip() entry = { "id": f"ack-{uuid.uuid4().hex[:8]}", "staff_name": staff_name, "ref_id": ref_id, "ref_type": ref_type, "message": message, "timestamp": datetime.datetime.now().isoformat(), } staff_inbox_acks.insert(0, entry) del staff_inbox_acks[200:] save_json(STAFF_INBOX_ACKS_FILE, staff_inbox_acks) await _append_staff_activity_entry( staff_name, body.get("zone") or "", f"Acknowledged: {message}", event_type="broadcast_ack", metadata={"ref_id": ref_id, "ref_type": ref_type}, ) await manager.broadcast(json.dumps({"type": "staff_inbox_ack", "ack": entry})) return entry @app.get("/staff/duty") async def get_staff_duty(_: dict = Depends(require_staff_read)): return staff_duty @app.post("/staff/duty") async def set_staff_duty(body: dict, principal: dict = Depends(require_staff)): if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): raise HTTPException(status_code=403, detail="Staff JWT required") name = (principal.get("sub") or "").strip() or "staff" status = (body.get("status") or "available").strip().lower() if status not in ("available", "busy", "off_duty"): raise HTTPException(status_code=400, detail="status must be available, busy, or off_duty") staff_duty[name] = { "status": status, "updated_at": datetime.datetime.now().isoformat(), } save_json(STAFF_DUTY_FILE, staff_duty) await manager.broadcast(json.dumps({"type": "staff_duty_update", "staff": name, **staff_duty[name]})) return staff_duty[name] @app.post("/staff/check-in") async def staff_check_in(body: dict, principal: dict = Depends(require_staff)): if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): raise HTTPException(status_code=403, detail="Staff JWT required") staff_name = (principal.get("sub") or "").strip() or "Staff" zone = (body.get("zone") or body.get("location") or "Unknown zone").strip() request_id = (body.get("request_id") or "").strip() message = f"Checked in at {zone}" entry = await _append_staff_activity_entry( staff_name, zone, message, event_type="check_in", metadata={"request_id": request_id or None, "zone": zone}, ) if request_id: try: await update_staff_request_progress( request_id, {"stage": "on_scene", "note": f"Checked in at {zone}"}, principal, ) except HTTPException as exc: if exc.status_code not in (404, 409): raise logger.warning("check-in progress skipped for %s: %s", request_id, exc.detail) return entry @app.get("/staff/activity") async def get_staff_activity(_: dict = Depends(require_dashboard_read)): try: if staff_activity: return staff_activity[:50] built: list[dict] = [] for row in incident_logs[:30]: meta = row.get("metadata") or {} if row.get("event_type") in ("staff_request_completed", "staff_request_sent", "sos"): built.append({ "id": meta.get("request_id") or row.get("ts", ""), "timestamp": row.get("ts", ""), "name": meta.get("staff_name") or row.get("source", "Staff"), "zone": meta.get("location") or row.get("source", ""), "message": row.get("msg") or "", }) for d in targeted_dispatches[:20]: built.append({ "id": d.get("dispatch_id") or d.get("id", ""), "timestamp": d.get("timestamp") or "", "name": d.get("recipient") or "Staff", "zone": d.get("status") or "", "message": d.get("message") or "Dispatch update", }) return built[:50] except Exception as exc: logger.warning("staff/activity failed: %s", exc) return [] @app.post("/staff/activity") async def post_staff_activity(body: dict, principal: dict = Depends(require_staff)): if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): raise HTTPException(status_code=403, detail="Staff JWT required") name = (body.get("name") or principal.get("sub") or "Staff").strip() entry = await _append_staff_activity_entry( name, body.get("zone") or "", body.get("message") or "", event_type=body.get("event_type") or "staff_note", metadata=body.get("metadata") or {}, ) return entry @app.get("/site/signage-placements") async def get_signage_placements(_: dict = Depends(require_operator_if_auth_enabled)): return signage_placements or {} @app.delete("/site/signage-placements/{sign_id}") async def delete_signage_placement( sign_id: str, _: dict = Depends(require_admin_audited("site/signage-placements/delete")), ): global signage_placements if sign_id not in signage_placements: raise HTTPException(status_code=404, detail="Placement not found") del signage_placements[sign_id] save_signage_placements() if signage_state.get(sign_id): await _broadcast_signage_state(sign_id, False) await manager.broadcast(json.dumps({"type": "signage_placement_removed", "id": sign_id})) return {"removed": sign_id} @app.post("/site/signage-placements/{sign_id}/remove") async def remove_signage_placement_post( sign_id: str, _: dict = Depends(require_admin_audited("site/signage-placements/remove")), ): """POST fallback for clients where DELETE preflight fails.""" return await delete_signage_placement(sign_id, _) @app.post("/site/signage-placements") async def save_signage_placement(payload: dict, _: None = Depends(require_operator)): global signage_placements sign_id = payload.get("id") if not sign_id: raise HTTPException(status_code=400, detail="Signage id required") signage_placements[sign_id] = { "lat": payload.get("lat"), "lng": payload.get("lng"), "updatedAt": datetime.datetime.now().isoformat(), } save_signage_placements() await manager.broadcast(json.dumps({"type": "signage_placement", "id": sign_id, **signage_placements[sign_id]})) return signage_placements[sign_id] @app.get("/api/signage") async def list_active_signage(_: dict = Depends(require_operator_if_auth_enabled)): active = [r for r in signage_records if r.get("status", "active") == "active"] return active @app.get("/api/signage/history") async def signage_history(_: dict = Depends(require_operator_if_auth_enabled)): return signage_records[-200:] @app.post("/api/signage") async def create_signage_record(body: dict, principal: dict = Depends(require_operator)): sign_type = (body.get("type") or "").upper().replace(" ", "_").replace("-", "_") lat = body.get("lat") lng = body.get("lng") if not sign_type or lat is None or lng is None: raise HTTPException(status_code=400, detail="type, lat, lng required") placed_by = principal.get("sub") or principal.get("username") or "admin" record = { "id": f"sign-{uuid.uuid4().hex[:8]}", "type": sign_type, "lat": float(lat), "lng": float(lng), "placedAt": datetime.datetime.now(datetime.timezone.utc).isoformat(), "placedBy": placed_by, "status": "active", "broadcastSent": False, } signage_records.insert(0, record) del signage_records[500:] save_signage_records() try: await _broadcast_signage_alert(sign_type, float(lat), float(lng), action="deployed") record["broadcastSent"] = True save_signage_records() except Exception as exc: logger.warning("Signage broadcast failed: %s", exc) await manager.broadcast(json.dumps({"type": "signage_record_created", "record": record})) return record @app.delete("/api/signage/{sign_id}") async def remove_signage_record(sign_id: str, _: dict = Depends(require_operator)): global signage_records record = next((r for r in signage_records if r.get("id") == sign_id), None) if not record: raise HTTPException(status_code=404, detail="Sign not found") record["status"] = "removed" record["removedAt"] = datetime.datetime.now(datetime.timezone.utc).isoformat() save_signage_records() try: await _broadcast_signage_alert(record.get("type", "SIGN"), record["lat"], record["lng"], action="removed") except Exception as exc: logger.warning("Signage removal broadcast failed: %s", exc) await manager.broadcast(json.dumps({"type": "signage_record_removed", "id": sign_id})) return {"removed": sign_id} @app.get("/site/blueprint") async def get_site_blueprint(_: dict = Depends(require_operator_if_auth_enabled)): return site_blueprint or {} @app.post("/site/blueprint") async def upload_site_blueprint( file: UploadFile = File(...), sw_lat: float = Form(...), sw_lng: float = Form(...), ne_lat: float = Form(...), ne_lng: float = Form(...), _: dict = Depends(require_admin_audited("site/blueprint/upload")), ): global site_blueprint if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=400, detail="Blueprint must be JPEG, PNG, or WebP") contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Blueprint") ext = Path(file.filename or "blueprint.png").suffix.lower() if ext not in ALLOWED_IMAGE_EXTENSIONS: ext = ".png" fname = f"site_blueprint{ext}" out_path = os.path.join(BLUEPRINT_DIR, fname) with open(out_path, "wb") as f: f.write(contents) site_blueprint = { "url": f"/files/uploads/blueprints/{fname}", "bounds": [[sw_lat, sw_lng], [ne_lat, ne_lng]], "uploaded_at": datetime.datetime.now().isoformat(), } save_site_blueprint() return site_blueprint @app.get("/agentic/steps") async def get_agentic_steps( session_id: Optional[str] = Query(default=None), principal: dict = Depends(require_operator_if_auth_enabled), ): user_id = _operator_id(principal) async with store_locks.agent_steps_lock: scoped = [s for s in agent_steps_history if s.get("user_id") == user_id] if session_id: scoped = [s for s in scoped if s.get("session_id") == session_id] return scoped[-50:] async def _broadcast_issue_update(issue_id: str, append_text: str | None = None, **kwargs): issue_snapshot = None for iss in issues_db: if iss["id"] == issue_id: if append_text: if "original_desc" not in iss: iss["original_desc"] = iss.get("desc", "") iss["desc"] = iss["original_desc"] + f"\n\n{append_text}" iss.update(kwargs) save_issues() issue_snapshot = dict(iss) break if issue_snapshot: await manager.broadcast(json.dumps({"type": "issue_update", "issue": issue_snapshot})) return issue_snapshot async def simulate_issue_progress(issue_id: str, staff_name: str = "Staff"): if not DEMO_MODE or security_config.is_production(): return await asyncio.sleep(4) await _broadcast_issue_update( issue_id, f"[UPDATE] {staff_name} is en route.", progress=60, staff_assignments=[{"name": staff_name, "progress": 60, "stage": "en_route"}], ) await _sync_staff_request_progress(issue_id, 60, "en_route") await asyncio.sleep(4) await _broadcast_issue_update( issue_id, f"[UPDATE] {staff_name} on scene — handling incident.", progress=85, staff_assignments=[{"name": staff_name, "progress": 85, "stage": "on_scene"}], ) await _sync_staff_request_progress(issue_id, 85, "on_scene") await asyncio.sleep(4) await _broadcast_issue_update( issue_id, "[RESOLVED] Incident resolved.", progress=100, status="RESOLVED", staff_request_status="resolved", staff_assignments=[{"name": staff_name, "progress": 100, "stage": "resolved"}], ) await _sync_staff_request_progress(issue_id, 100, "resolved", status="resolved") await _append_incident_log( "INFO", "staff_portal", "staff_request_completed", f"{staff_name} completed issue {issue_id} (demo auto-progress).", metadata={"issue_id": issue_id, "staff_name": staff_name, "demo": True}, ) async def simulate_vivek_progress(issue_id: str): await simulate_issue_progress(issue_id, "Vivek") @app.post("/issues") async def create_issue(body: IssueCreate, _: None = Depends(require_operator_if_auth_enabled)): issue = body.model_dump() issue.setdefault("id", f"ISS-{uuid.uuid4().hex[:6].upper()}") issue.setdefault("progress", 0) issue.setdefault("timestamp", datetime.datetime.now().isoformat()) if not issue.get("title"): desc = (issue.get("desc") or "").strip() issue["title"] = desc.split("\n")[0][:80] if desc else "Incident" is_missing = "missing" in issue.get("title", "").lower() or "missing" in issue.get("desc", "").lower() if is_missing and getattr(app.state, "last_missing_person_img", None): issue["image"] = normalize_files_url(app.state.last_missing_person_img) async with store_locks.issues_lock: issues_db.insert(0, issue) save_issues() await manager.broadcast(json.dumps({"type": "issue_update", "issue": issue})) return issue @app.patch("/issues/{issue_id}") async def patch_issue(issue_id: str, body: IssuePatch, _: None = Depends(require_operator_if_auth_enabled)): async with store_locks.issues_lock: snapshot = None for iss in issues_db: if iss.get("id") == issue_id: data = body.model_dump(exclude_unset=True) _validate_issue_patch(iss, data) iss.update(data) save_issues() snapshot = dict(iss) break if not snapshot: raise HTTPException(status_code=404, detail="Issue not found") await manager.broadcast(json.dumps({"type": "issue_update", "issue": snapshot})) return snapshot @app.post("/agentic/plan") async def create_agentic_plan(payload: dict, _: None = Depends(require_operator)): plan = generate_agentic_plan(payload) event = { "type": "agentic_plan_update", "plan_id": str(uuid.uuid4()), "created_at": datetime.datetime.now().isoformat(), "incident": payload, "plan": plan, } async with store_locks.agentic_plans_lock: agentic_plans.insert(0, event) _trim_memory_list(agentic_plans, 100) save_agentic_plans() await manager.broadcast(json.dumps(event)) return event @app.post("/agentic/chat") async def agentic_chat(payload: ChatPayload, _: None = Depends(require_operator)): room_counts = {room["label"]: room.get("occupancy", 0) for room in vision_engine.room_stats.values()} if hasattr(vision_engine, "room_stats") else {} crowd = getattr(vision_engine, "last_crowd_count", None) if crowd is None: crowd = getattr(vision_engine, "last_total_count", 0) context = { "crowd_count": crowd, "live_face_count": _count_live_faces(), "room_counts": room_counts, "recent_alerts": [a if isinstance(a, dict) else a.model_dump() for a in alerts_db[-5:]] if alerts_db else [], "active_models": vision_engine.get_ai_status() } loop = asyncio.get_event_loop() reply = await loop.run_in_executor(None, generate_chat_response, payload.prompt, context) return {"reply": reply} @app.get("/dispatches") async def get_dispatches(_: dict = Depends(require_operator_if_auth_enabled)): return targeted_dispatches @app.get("/user/profile") async def get_user_profile(principal: dict = Depends(require_operator_if_auth_enabled)): profile = dict(user_profile) sub = principal.get("sub") role = principal.get("role") if principal.get("auth") == "jwt" and sub: profile["name"] = sub if role: profile["role"] = role.replace("_", " ").title() elif principal.get("auth") == "api_key" and role: profile["role"] = f"API ({role})" return profile @app.get("/signage") async def get_signage(_: dict = Depends(require_operator_if_auth_enabled)): return signage_state @app.post("/signage/{sign_id}/toggle") async def toggle_signage(sign_id: str, payload: dict | None = Body(default=None), _: None = Depends(require_operator)): """Set signage to explicit active state, or flip when active omitted.""" body = payload or {} if "active" in body: return await _broadcast_signage_state(sign_id, bool(body["active"])) current = signage_state.get(sign_id, False) return await _broadcast_signage_state(sign_id, not current) @app.get("/gossip/contact-network") async def get_gossip_contact_network( hours: Optional[float] = Query(default=None), _: dict = Depends(require_operator_if_auth_enabled), ): return _build_contact_network(hours) @app.get("/gossip") async def get_gossip( root: Optional[str] = None, date: Optional[str] = None, _: dict = Depends(require_operator_if_auth_enabled), ): """ Return gossip graph. Optional ?date=YYYY-MM-DD loads a persisted snapshot. Pass ?root=PersonName to filter the contact tree. """ if date: snap_path = os.path.join(GOSSIP_HISTORY_DIR, f"{date}.json") if os.path.isfile(snap_path): return persist.load_json(snap_path, {}) return {"nodes": [], "links": [], "root_person": root or "", "is_tracking": False, "date": date, "empty": True} if root: gossip_bridge.set_root_person(root) loop = asyncio.get_event_loop() data = await loop.run_in_executor(None, gossip_bridge.get_gossip_json, root) return data @app.post("/gossip/set_root") async def set_gossip_root(root: str, _: None = Depends(require_vision_access)): """Set the root person for gossip graph tracking.""" gossip_bridge.set_root_person(root) return {"status": "ok", "root_person": root} @app.post("/gossip/start") async def start_gossip_tracking( payload: GossipTrackingStart = GossipTrackingStart(), _: None = Depends(require_vision_access), ): person = (payload.personName or "").strip() if person: gossip_bridge.set_root_person(person) gossip_bridge.start_tracking() tracking = { "staffId": payload.staffId, "personName": payload.personName, "cause": payload.cause, "doc_names": payload.doc_names or [], "started_at": datetime.datetime.now().isoformat(), } gossip_bridge.set_tracking_meta(tracking) await manager.broadcast(json.dumps({"type": "gossip_tracking_update", "status": "started", **tracking})) return {"status": "started", "tracking": tracking, "root_person": gossip_bridge.get_gossip_json().get("root_person")} @app.post("/gossip/stop") async def stop_gossip_tracking(_: None = Depends(require_vision_access)): gossip_bridge.stop_tracking() gossip_bridge.clear_tracking_meta() await manager.broadcast(json.dumps({"type": "gossip_tracking_update", "status": "stopped"})) return {"status": "stopped"} @app.post("/gossip/clear") async def clear_gossip_tracking(_: dict = Depends(require_admin_audited("gossip/clear"))): gossip_bridge.clear_graph() return {"status": "cleared"} @app.post("/gossip/ingest_frame") async def gossip_ingest_frame( file: UploadFile = File(...), cam_id: str = Form("cam-01"), _: None = Depends(require_vision_access), ): """Run face recognition on a (browser webcam) frame and feed the gossip graph. This is what makes contact tracing work without a server-side camera: the UI streams frames here while tracking is active. Detected, enrolled people are linked to the root person in the interaction graph. """ if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") nparr = np.frombuffer(contents, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") fe = vision_engine.face_engine await _run_face_work(_ensure_face_db, fe) matches: list[dict] = [] if hasattr(fe, "match_all_faces"): matches = await _run_face_work(fe.match_all_faces, frame) else: single = await _run_face_work(vision_engine.search_missing_person, frame) if single.get("found"): matches = [{"name": single["name"], "confidence": single.get("confidence", 0.0), "bbox": [0, 0, 0, 0], "found": True}] detected = [m["name"] for m in matches] known_names = [m["name"] for m in matches if m.get("found") and m["name"] != "Unknown"] summary = gossip_bridge.ingest_detected_names(cam_id, detected) data = gossip_bridge.get_gossip_json() if summary.get("tracking"): async with store_locks.gossip_lock: today = datetime.datetime.now().strftime("%Y-%m-%d") persist.save_json(os.path.join(GOSSIP_HISTORY_DIR, f"{today}.json"), data) await manager.broadcast(json.dumps({ "type": "gossip_tracking_update", "status": "detections", "cam_id": cam_id, "names": known_names, })) return { "names": known_names, "matches": matches, "graph": data, "is_tracking": data.get("is_tracking", False), } @app.get("/debug/vision") async def debug_vision(_: None = Depends(require_vision_access)): """Regression detector for vision pipeline.""" fe = vision_engine.face_engine pipeline = _get_vision_pipeline() stats = pipeline.stats.as_dict() if hasattr(pipeline, "stats") else {} return { "engine_ready": getattr(fe, "ready", False), "db_size": len(fe.db) if hasattr(fe, "db") else 0, "queue_depth": stats.get("queue_depth", 0), "fps": stats.get("fps", 0), "inference_ms": stats.get("last_inference_ms", 0), "processed": stats.get("processed", 0), "dropped": stats.get("dropped", 0) } @app.get("/debug/backend") async def debug_backend(_: None = Depends(require_vision_access)): """Regression detector for backend health.""" import psutil process = psutil.Process() return { "active_sockets": len(manager.active_connections), "agent_sockets": len(manager.agent_connections), "memory_mb": round(process.memory_info().rss / 1024 / 1024, 2), "detections_history_size": len(detections_history), "alerts_size": len(alerts) } async def _build_track_frame_result( cam_id: str, frame, matches: list, *, scan_mode: str = "batch", skipped: bool = False, ) -> dict: """Shared response builder after face inference (live scan or batch track_frame).""" def _face_thumbnail_url(match: dict) -> str | None: try: x1, y1, x2, y2 = [int(v) for v in (match.get("bbox") or [0, 0, 0, 0])] h, w = frame.shape[:2] x1, y1 = max(0, x1), max(0, y1) x2, y2 = min(w, x2), min(h, y2) if x2 <= x1 or y2 <= y1: return None face_img = frame[y1:y2, x1:x2] if face_img.size == 0: return None base_name = str(match.get("name", "unknown")).strip().lower().replace(" ", "_") thumb_filename = f"face_{base_name}_{cam_id}.jpg" thumb_path = os.path.join(_UPLOADS_PATH, thumb_filename) if not cv2.imwrite(thumb_path, face_img): return None return f"/files/uploads/{thumb_filename}" except Exception: return None faces_payload = [] for m in matches: thumbnail_url = _face_thumbnail_url(m) face_payload = { "name": m.get("name", "Unknown"), "confidence": round(float(m.get("confidence", 0.0)), 3), "bbox": m.get("bbox", [0, 0, 0, 0]), "found": bool(m.get("found", True)), "is_unknown": bool(m.get("is_unknown")) or str(m.get("name", "")).lower().startswith("unknown_"), **({"embedding": m.get("embedding")} if m.get("embedding") is not None else {}), } if thumbnail_url: face_payload["thumbnail"] = normalize_files_url(thumbnail_url) faces_payload.append(face_payload) public_faces_payload = [ {k: v for k, v in face.items() if k != "embedding"} for face in faces_payload ] with vision_engine.lock: vision_engine.face_results[cam_id] = faces_payload tracked_names: list[str] = [] gossip_names: list[str] = [] gossip_bboxes: list[tuple] = [] frame_updates: list[dict] = [] global _detections_dirty async with store_locks.detections_lock: for m in matches: name = str(m.get("name") or "Unknown").strip() if name in ("Unknown", "unknown", "") or not m.get("found", True): continue tracked_names.append(name) gossip_names.append(name) gossip_bboxes.append(tuple(m.get("bbox", [0, 0, 0, 0]))) thumbnail_filename = f"sighting_{name.lower()}_{cam_id}.jpg" thumbnail_path = os.path.join(_UPLOADS_PATH, thumbnail_filename) cv2.imwrite(thumbnail_path, frame) thumbnail_url = f"/files/uploads/{thumbnail_filename}" entry = _upsert_detection_sighting( name, confidence=float(m.get("confidence", 0.0)), cam_id=cam_id, thumbnail=thumbnail_url, ) frame_updates.append(entry) _detections_dirty = True if _detections_dirty: save_detections() _detections_dirty = False try: if gossip_names: gossip_bridge.on_detections(cam_id, gossip_names, gossip_bboxes, frame.shape[1]) gossip_bridge.ingest_detected_names(cam_id, gossip_names) except Exception as exc: logger.debug("gossip ingest from track_frame skipped: %s", exc) frame_id = vision_session.record_processed_frame(cam_id, had_match=bool(tracked_names)) meta = vision_session.get_cam_meta(cam_id) count = len(public_faces_payload) vision_engine.last_face_count = count browser_feed_count = len(getattr(vision_engine, "browser_feeds", {}) or {}) await manager.broadcast(json.dumps({ "type": "camera_broadcast", "total_count": count, "cameras": {cam_id: { "count": count, "faces": public_faces_payload, "live": True, "stale": False, "frame_id": frame_id, }}, "active_cam_count": browser_feed_count, "detection_updates": frame_updates, "timestamp": datetime.datetime.now().isoformat(), })) return { "tracking": True, "cam_id": cam_id, "live": True, "stale": False, "skipped": skipped, "fresh": not skipped, "scan_mode": scan_mode, "presence": bool(tracked_names), "faces": public_faces_payload, "tracked_faces": public_faces_payload, "frame_id": frame_id, "last_frame_ts": meta.get("last_frame_ts"), "presence_expires_at": meta.get("presence_expires_at"), "known_names": tracked_names, "detection_updates": frame_updates, } @app.post("/vision/track_frame") async def vision_track_frame( file: UploadFile = File(...), cam_id: str = Form("cam-01"), _: None = Depends(require_vision_access), ): """Continuous always-on tracking from a browser-supplied camera frame. The frontend's global CameraTracker streams frames here as long as facial recognition is enabled. This recognizes every face, records sightings into the live detection history + per-camera face results (so the Agentic Copilot and every page see the same live tracking), feeds the interaction graph, and broadcasts a camera update to all clients. """ warming = _vision_warming_response() if warming is not None: return warming if not vision_engine.get_ai_status().get("facial", False): return {"tracking": False, "faces": [], "reason": "Facial recognition is disabled"} if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") nparr = np.frombuffer(contents, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") fe = vision_engine.face_engine pipeline = _get_vision_pipeline() with vision_engine.lock: vision_engine.camera_indices[cam_id] = "browser" vision_engine.latest_raw_frames[cam_id] = frame.copy() infer_frame = resize_for_infer(frame) matches, skipped = await pipeline.infer(cam_id, frame) if not skipped: scale_match_bboxes(matches, infer_frame, frame) if skipped: if vision_session.is_cam_stale(cam_id): return { "tracking": True, "cam_id": cam_id, "live": False, "stale": True, "reason": "frame_timeout", "faces": [], "tracked_faces": [], "presence": False, "skipped": True, "known_names": [], "detection_updates": [], } cached = vision_engine.get_face_results().get(cam_id, []) public_cached = [ {k: v for k, v in (f if isinstance(f, dict) else {}).items() if k != "embedding"} for f in cached ] # We skipped inference, but the frame arrived successfully. Keep the TTL alive. had_match = any(f.get("found") and f.get("name") not in ("Unknown", "unknown", "") for f in public_cached) frame_id = vision_session.record_processed_frame(cam_id, had_match=had_match) meta = vision_session.get_cam_meta(cam_id) return { "tracking": True, "cam_id": cam_id, "live": True, "stale": False, "faces": public_cached, "tracked_faces": public_cached, "presence": bool(public_cached), "skipped": True, "frame_id": meta.get("frame_id"), "presence_expires_at": meta.get("presence_expires_at"), "known_names": [ f.get("name") for f in public_cached if f.get("found") and f.get("name") not in ("Unknown", "unknown", "") ], "detection_updates": [], } return await _build_track_frame_result(cam_id, frame, matches, scan_mode="batch", skipped=False) @app.post("/vision/live_scan") async def vision_live_scan( file: UploadFile = File(...), cam_id: str = Form("cam-01"), _: None = Depends(require_vision_access), ): """On-demand live face scan — always runs fresh inference (bypasses batch skip queue).""" warming = _vision_warming_response() if warming is not None: return warming if not vision_engine.get_ai_status().get("facial", False): return {"tracking": False, "faces": [], "reason": "Facial recognition is disabled", "scan_mode": "live"} if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") nparr = np.frombuffer(contents, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") with vision_engine.lock: vision_engine.camera_indices[cam_id] = "browser" vision_engine.latest_raw_frames[cam_id] = frame.copy() infer_frame = resize_for_infer(frame, width=LIVE_INFER_WIDTH) matches = await _run_face_work(_infer_live_matches_sync, cam_id, infer_frame.copy(), timeout=20.0) if not isinstance(matches, list): matches = [] scale_match_bboxes(matches, infer_frame, frame) result = await _build_track_frame_result(cam_id, frame, matches, scan_mode="live", skipped=False) if not matches: result["reason"] = "no_face_detected" return result @app.post("/tracking/session/reset") async def reset_tracking_session( body: TrackingResetPayload = Body(default_factory=TrackingResetPayload), principal: dict = Depends(require_operator), ): """Clear live tracking state for a fresh operator session. Wipes sighting history, live face results, browser feed registry, and the gossip interaction graph (edges). Enrolled face DB is untouched. Gossip tracking stays enabled so new co-presence events accumulate cleanly. By default does not broadcast — the initiating client resets local UI state. Pass ``broadcast: true`` only for an explicit global coordination reset (admin only). Pass ``full_reset: true`` when the operator explicitly starts a new session. """ if body.broadcast and not auth_service.has_role(principal, "admin"): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Global tracking reset requires admin role", ) _audit_destructive( principal, "tracking/session/reset", f"broadcast={body.broadcast} full_reset={body.full_reset}", ) global detections_history, _detections_dirty user_id = _operator_id(principal) async with store_locks.detections_lock: detections_history.clear() _detections_dirty = False save_detections() vision_engine.face_results.clear() if hasattr(vision_engine, "clear_browser_feeds"): vision_engine.clear_browser_feeds() gossip_bridge.clear_graph() gossip_bridge.clear_tracking_meta() gossip_bridge.start_tracking() if os.path.exists(_FACE_DB_PATH): enrolled = [ n for n in os.listdir(_FACE_DB_PATH) if os.path.isdir(os.path.join(_FACE_DB_PATH, n)) and not n.startswith("unknown_") ] gossip_bridge.register_enrolled_roster(enrolled) today_path = os.path.join(GOSSIP_HISTORY_DIR, f"{datetime.datetime.now():%Y-%m-%d}.json") if os.path.exists(today_path): async with store_locks.gossip_lock: try: os.remove(today_path) except OSError: pass if body.clear_agent_steps and body.session_id: await _clear_agent_steps(user_id, body.session_id) await agentic_orchestrator.prune_adk_sessions(user_id, keep_session_id=None) if os.path.exists(_UPLOADS_PATH): for item in os.listdir(_UPLOADS_PATH): item_path = os.path.join(_UPLOADS_PATH, item) if os.path.isfile(item_path): try: os.remove(item_path) except OSError: pass if hasattr(app.state, "last_missing_person_img"): app.state.last_missing_person_img = None payload = { "type": "tracking_session_reset", "detections_history": [], "timestamp": datetime.datetime.now().isoformat(), "broadcast": body.broadcast, } if body.broadcast: await manager.broadcast(json.dumps(payload)) agentic_orchestrator.inject_context( alerts=alerts_db, sos_events=sos_events, vision_engine=vision_engine, rooms=[], detections_history=detections_history, issues=issues_db, incident_logs=incident_logs, signage_state=signage_state, ) return {"status": "reset", "message": "Live tracking session cleared", "broadcast": body.broadcast} @app.get("/gossip/known_people") async def get_known_people(_: dict = Depends(require_operator_if_auth_enabled)): """Return all people currently recognized by the face engine.""" # Merge: gossip_bridge known people + face DB keys known = set(gossip_bridge.get_known_people()) known.update(vision_engine.face_engine.db.keys()) return {"known_people": sorted(known)} # ── AI model toggle endpoints ────────────────────────────────────────────── @app.get("/ai/status") async def ai_status(_: dict = Depends(require_vision_access)): """Return enabled AI models plus per-model capability metadata.""" caps = ( vision_engine.get_ai_capabilities() if hasattr(vision_engine, "get_ai_capabilities") else {} ) return { "models": vision_engine.get_ai_status(), "capabilities": caps, "cloud": CEPHEUS_CLOUD, } @app.post("/ai/toggle/{model_id}") async def ai_toggle(model_id: str, _: dict = Depends(require_operator)): """Toggle a specific AI model on/off.""" caps = ( vision_engine.get_ai_capabilities() if hasattr(vision_engine, "get_ai_capabilities") else {} ) model_cap = caps.get(model_id, {}) if model_cap.get("supported") is False: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail=model_cap.get("note", f"Model '{model_id}' is not available in this deployment"), ) new_state = vision_engine.toggle_ai_model(model_id) # Broadcast state change to all WS clients so frontend store stays in sync await manager.broadcast(json.dumps({ "type": "ai_model_update", "model_id": model_id, "enabled": new_state, "all": vision_engine.get_ai_status(), })) return {"model_id": model_id, "enabled": new_state} @app.get("/face_db/list") async def list_face_db(_: dict = Depends(require_operator_if_auth_enabled)): """List enrolled persons — roster aligned with the embedding store used for search.""" enrolled_keys: set[str] = set() fe = getattr(vision_engine, "face_engine", None) if fe is not None: try: _ensure_face_db(fe) db = getattr(fe, "db", {}) or {} enrolled_keys = {str(k) for k in db if not str(k).startswith("unknown_")} except Exception as exc: logger.debug("face_db list reload skipped: %s", exc) people: list[dict] = [] disk_names: set[str] = set() if os.path.exists(_FACE_DB_PATH): for person_name in os.listdir(_FACE_DB_PATH): if person_name.startswith("unknown_"): continue person_dir = os.path.join(_FACE_DB_PATH, person_name) if not os.path.isdir(person_dir): continue disk_names.add(person_name) imgs = [f for f in os.listdir(person_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] img_url = f"/files/face/{person_name}/{imgs[0]}" if imgs else None has_emb = person_name in enrolled_keys meta = face_metadata.read_person_metadata(person_name, _FACE_DB_PATH) person_entry = { "name": person_name, "image": img_url, "status": "AUTHORIZED" if has_emb else "PENDING_EMBEDDING", "role": meta.get("role") or face_metadata.DEFAULT_ROLE, } people.append(person_entry) for name in enrolled_keys: if name not in disk_names: meta = face_metadata.read_person_metadata(name, _FACE_DB_PATH) person_entry = { "name": name, "image": None, "status": "AUTHORIZED", "role": meta.get("role") or face_metadata.DEFAULT_ROLE, } people.append(person_entry) for p in people: person_dir = os.path.join(_FACE_DB_PATH, p["name"]) if os.path.isdir(person_dir): try: mtime = os.path.getmtime(person_dir) p["enrolledAt"] = datetime.datetime.fromtimestamp(mtime, tz=datetime.timezone.utc).isoformat() except OSError: pass if not p.get("role"): p["role"] = face_metadata.DEFAULT_ROLE det = next((d for d in detections_history if d.get("name") == p["name"]), None) if det: p["lastSeen"] = det.get("seen_at") p["lastLocation"] = det.get("camId") if os.path.isdir(_TEMP_UNKNOWN_IMG_ROOT): for unknown_name in sorted(os.listdir(_TEMP_UNKNOWN_IMG_ROOT)): if not unknown_name.startswith("unknown_"): continue unknown_dir = os.path.join(_TEMP_UNKNOWN_IMG_ROOT, unknown_name) if not os.path.isdir(unknown_dir): continue imgs = [f for f in os.listdir(unknown_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] img_url = f"/files/face/{unknown_name}/{imgs[0]}" if imgs else None det = next((d for d in detections_history if d.get("name") == unknown_name), None) people.append({ "name": unknown_name, "image": img_url, "status": "TEMP_UNKNOWN", "role": "visitor", "lastSeen": det.get("seen_at") if det else None, "lastLocation": det.get("camId") if det else None, }) return sorted(people, key=lambda x: x["name"]) @app.delete("/face_db/{person_name}") async def delete_face_db_person( person_name: str, _: dict = Depends(require_admin_audited("face_db/delete")), ): cleaned = _sanitize_person_name(person_name) # 1. Remove image folder from face_database/ person_dir = os.path.join(_FACE_DB_PATH, cleaned) if os.path.isdir(person_dir): import shutil shutil.rmtree(person_dir, ignore_errors=True) # 2. Remove .npy embedding files (the root cause of the bug — these were never deleted) fe = getattr(vision_engine, "face_engine", None) fr_dir = os.path.dirname(_FACE_DB_PATH) # Face_Recognition/ for emb_folder in ("faces_db", "temp_faces_db"): npy_path = os.path.join(fr_dir, emb_folder, f"{cleaned}.npy") if os.path.exists(npy_path): try: os.remove(npy_path) logger.info("Deleted embedding file: %s", npy_path) except OSError as exc: logger.warning("Could not delete %s: %s", npy_path, exc) # 3. Remove from in-memory db (both cleaned and display-name variants) if fe and hasattr(fe, "db"): for key in [cleaned, cleaned.replace("_", " "), person_name.strip()]: fe.db.pop(key, None) # 4. Invalidate DB so next access reloads from disk (where the npy is now gone) _invalidate_face_db(fe) logger.info("Deleted face identity: %s", cleaned) return {"status": "deleted", "name": cleaned} @app.get("/face_db/debug") async def face_db_debug(_: dict = Depends(require_operator_if_auth_enabled)): """Return embedding shape and norm for every enrolled identity — useful for diagnosing score=0 issues.""" fe = getattr(vision_engine, "face_engine", None) if fe is None: return {"error": "face_engine not initialized", "db": []} _ensure_face_db(fe) db = getattr(fe, "db", {}) or {} entries = [] for name, emb in db.items(): if emb is None: entries.append({"name": name, "shape": None, "norm": None, "status": "null_embedding", "is_unknown": name.startswith("unknown_")}) continue try: shape = list(emb.shape) norm = float(np.linalg.norm(emb)) status = "ok" if norm > 0.1 else "zero_or_near_zero" except Exception as exc: shape = None norm = None status = f"error: {exc}" entries.append({ "name": name, "shape": shape, "norm": round(norm, 4) if norm is not None else None, "status": status, "is_unknown": name.startswith("unknown_"), }) return { "insightface_loaded": getattr(fe, "app", None) is not None, "db_size": len(db), "enrolled_named": len([e for e in entries if not e["is_unknown"]]), "entries": sorted(entries, key=lambda e: e["name"]), } @app.post("/face_db/clear_temp") async def clear_temp_faces(_: dict = Depends(require_vision_access)): """Clear temporary faces and embeddings from the current session.""" fr_dir = os.path.dirname(_FACE_DB_PATH) temp_emb = os.path.join(fr_dir, "temp_faces_db") temp_img = os.path.join(fr_dir, "temp_face_database") deleted = 0 for d in (temp_emb, temp_img): if not os.path.exists(d): continue for f in os.listdir(d): # Preserve persisted unknown identities across sessions if f.startswith("unknown_"): continue fp = os.path.join(d, f) if os.path.isfile(fp): try: os.remove(fp) deleted += 1 except Exception as e: logger.warning("Failed to delete temp face file %s: %s", fp, e) fe = getattr(vision_engine, "face_engine", None) if fe: _invalidate_face_db(fe) return {"status": "success", "deleted": deleted} @app.post("/emergency/dispatch-log") async def post_emergency_dispatch_log(body: dict, _: None = Depends(require_operator)): entry = { "id": body.get("id") or f"disp-{uuid.uuid4().hex[:8]}", "timestamp": body.get("timestamp") or datetime.datetime.now(datetime.timezone.utc).isoformat(), "type": body.get("type") or "general", "message": body.get("message") or "", "status": body.get("status") or "pending", } emergency_dispatch_log.insert(0, entry) del emergency_dispatch_log[200:] save_json(DISPATCH_LOG_FILE, emergency_dispatch_log) return entry @app.get("/alerts/log") async def get_alerts_log(_: dict = Depends(require_operator_if_auth_enabled)): return alerts_broadcast_log[-50:] @app.post("/alerts/log") async def post_alerts_log(body: dict, _: None = Depends(require_operator_if_auth_enabled)): entry = { "id": f"bc-{uuid.uuid4().hex[:8]}", "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), "message": body.get("message") or "", "recipients": body.get("recipients") or [], "relatedIssue": body.get("relatedIssue"), "sentBy": body.get("sentBy") or "admin", "deliveryCount": body.get("deliveryCount") or len(body.get("recipients") or []), } alerts_broadcast_log.insert(0, entry) del alerts_broadcast_log[100:] save_json(ALERTS_LOG_FILE, alerts_broadcast_log) return entry @app.get("/face_results") async def get_face_results(_: dict = Depends(require_vision_access)): """Return latest face recognition hits per camera (expired results are empty/stale).""" raw = vision_engine.get_face_results() return vision_session.build_face_results_payload(raw, _strip_face_embeddings) @app.get("/face_results/session-persons") async def get_session_detected_persons(_: dict = Depends(require_vision_access)): """Unique names detected since server startup (enrolled + unknown_N), with last-seen metadata.""" persons: dict[str, dict] = {} for det in detections_history: name = det.get("name") or det.get("identity") if not name: continue name = str(name).strip() lowered = name.lower() if lowered in ("unknown", "unidentified", "none"): continue seen_at = det.get("seen_at") or det.get("timestamp") conf = float(det.get("confidence") or det.get("score") or 0.0) cam = det.get("camId") or det.get("cam_id") is_unknown = lowered.startswith("unknown_") existing = persons.get(name) if existing: existing["count"] += 1 if seen_at and (not existing.get("last_seen") or str(seen_at) >= str(existing.get("last_seen"))): existing["last_seen"] = seen_at existing["last_cam"] = cam existing["last_score"] = round(conf, 4) else: persons[name] = { "name": name, "count": 1, "last_seen": seen_at, "last_cam": cam, "last_score": round(conf, 4), "group": "unknown" if is_unknown else "known", } values = list(persons.values()) return {"persons": values, "total": len(values)} @app.get("/debug/vision") async def debug_vision(_: dict = Depends(require_vision_access)): """Live vision pipeline diagnostics for HF debugging.""" try: pipe = _get_vision_pipeline() stats = pipe.stats.as_dict() except Exception as exc: logger.warning("debug/vision pipeline stats failed: %s", exc) stats = {} presence = vision_session.presence_summary() agent_open = len(manager.agent_connections) > 0 ws_open = len(manager.active_connections) > 0 raw_faces = vision_engine.get_face_results() last_match = None last_score = None for cam_id, faces in (raw_faces or {}).items(): for f in faces or []: if not isinstance(f, dict): continue if f.get("found") and f.get("name"): last_match = str(f.get("name")) last_score = float(f.get("confidence") or 0) break if last_match: break return { "fps": stats.get("fps", 0), "queue": stats.get("queue_depth", 0), "socket_count": len(manager.active_connections), "agent_socket_count": len(manager.agent_connections), "camera_count": vision_session.camera_count(), "polling_enabled": False, "websocket_state": "open" if ws_open else "closed", "ws_connected": ws_open, "agent_ws_connected": agent_open, "agent_ws_state": "open" if agent_open else "closed", "ws_state": "open" if ws_open else "closed", "watchdog_state": "client_side", "inference_ms": stats.get("last_inference_ms", 0), "dropped_frames": stats.get("dropped", 0), "skipped_frames": stats.get("skipped", 0), "processed_frames": stats.get("processed", 0), "face_workers": _FACE_WORKERS, "public_vision": public_vision_allowed(), "facial_enabled": vision_engine.get_ai_status().get("facial", False), "presence_state": presence.get("presence_state"), "presence_is_stale": presence.get("presence_is_stale"), "last_frame_age_ms": presence.get("last_frame_age_ms"), "last_presence_update_age_ms": presence.get("last_presence_update_age_ms"), "last_match_age_ms": presence.get("last_match_age_ms"), "presence_ttl_s": vision_session.PRESENCE_TTL_S, "track_frame_ok": bool(raw_faces), "inference_complete": stats.get("processed", 0) > 0, "result_emitted": last_match is not None, "last_match": last_match, "last_score": last_score, "last_result_age_ms": presence.get("last_match_age_ms"), } @app.get("/debug/emergency") async def debug_emergency(): """Emergency subsystem diagnostics.""" return { "dispatch_log_count": len(emergency_dispatch_log), "staff_activity_count": len(staff_activity), "sos_event_count": len(sos_events), "staff_request_count": len(staff_requests_db), "httpx_available": httpx is not None, "nearby_cache_entries": len(_nearby_cache), "maps_configured": maps_configured() if "maps_configured" in globals() else False, "public_vision": public_vision_allowed(), "fallback_mode": "overpass" if httpx is not None else "unavailable", "service_availability": { "overpass": httpx is not None, "maps_api": maps_configured() if "maps_configured" in globals() else False, }, } @app.post("/missing_person") async def missing_person_search(file: UploadFile = File(...), _: None = Depends(require_operator)): """Upload an image, search for that person across all live camera feeds, and trigger agent.""" try: if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") nparr = np.frombuffer(contents, np.uint8) query_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if query_frame is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") fe = vision_engine.face_engine await _run_face_work(_ensure_face_db, fe) result = await _run_face_work(vision_engine.search_missing_person, query_frame) # Save image for evidence upload_dir, file_path = _safe_image_upload_path(file.filename or "capture.jpg") with open(file_path, "wb") as f: f.write(contents) app.state.last_missing_person_img = normalize_files_url(f"/uploads/{os.path.basename(file_path)}") result = dict(result) result["image_url"] = app.state.last_missing_person_img if CEPHEUS_CLOUD: result.setdefault("search_mode", result.get("search_mode") or "database_only") if not result.get("found"): best_score = result.get("best_score", 0) enrolled = result.get("enrolled_count", 0) if enrolled == 0 or best_score == 0: result["reason"] = ( "Face database is empty — no enrolled faces to match against. " "Go to Issues → Face Database to enroll faces first." ) else: result["reason"] = result.get("reason") or ( "No match in enrolled database. Live camera search requires GPU vision backend." ) # Operator decides whether to raise an issue — do not auto-create via agent. return result except HTTPException: raise except Exception as e: logger.error(f"Missing person search error: {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Missing person search failed") @app.post("/face/search_live") async def face_search_live(file: UploadFile = File(...), _: None = Depends(require_operator)): """ Search for the face in the uploaded image across ALL active live camera feeds. Unlike /missing_person (which searches the enrolled face DB), this endpoint takes the embedding of the uploaded image and compares it directly against the live face embeddings currently detected by the vision engine. This is the correct implementation of "Search Live Feed" — the uploaded image is the query, not the current camera frame. """ warming = _vision_warming_response() if warming is not None: return warming try: if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") nparr = np.frombuffer(contents, np.uint8) query_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if query_frame is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") fe = vision_engine.face_engine await _run_face_work(_ensure_face_db, fe) if face_live_search is not None: loop = asyncio.get_event_loop() result = await loop.run_in_executor( None, face_live_search.search_query_in_live_feeds, query_frame, fe, vision_engine, None, ) result["search_mode"] = "live" else: logger.info("face_live_search unavailable, falling back to database search") result = await _run_face_work(vision_engine.search_missing_person, query_frame) result["search_mode"] = "database_only" # Save upload for evidence / UI preview upload_dir, file_path = _safe_image_upload_path(file.filename or "search_query.jpg") with open(file_path, "wb") as f: f.write(contents) result["image_url"] = result.get("match_image") or normalize_files_url(f"/uploads/{os.path.basename(file_path)}") return result except HTTPException: raise except Exception as e: logger.error("face/search_live error: %s", e) raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Live face search failed") @app.post("/register_face") async def register_face( name: str = Form(...), role: str = Form(default="Staff"), file: UploadFile = File(...), _: dict = Depends(require_admin_audited("register_face")), ): """Register a named face from an uploaded image into the face database.""" try: if file.content_type not in ALLOWED_IMAGE_TYPES: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") cleaned_name = _sanitize_person_name(name) contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") nparr = np.frombuffer(contents, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") fe = vision_engine.face_engine if hasattr(fe, "register_from_frame"): ok = await _run_face_work(fe.register_from_frame, cleaned_name, frame) else: ok = await _run_face_work(fe.register_face_from_frame, cleaned_name, frame) if not ok: raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="No face detected for registration") face_metadata.write_person_metadata(cleaned_name, _FACE_DB_PATH, role=role) _invalidate_face_db(fe) return {"success": ok, "name": cleaned_name, "role": role.strip() or face_metadata.DEFAULT_ROLE} except HTTPException: raise except Exception as e: logger.error(f"register_face error: {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Face registration failed") @app.post("/upload_video") async def upload_video(file: UploadFile = File(...), _: None = Depends(require_operator)): """Upload a video file to be used as a camera source.""" try: upload_dir, file_path = _safe_upload_path(file.filename) with open(file_path, "wb") as f: f.write(_read_limited_bytes(await file.read(MAX_VIDEO_BYTES + 1), MAX_VIDEO_BYTES, "Video")) rel_name = os.path.basename(file_path) rel_url = f"/files/uploads/{rel_name}" logger.info("Video uploaded: %s", rel_url) return {"success": True, "file_path": rel_url, "url": rel_url} except HTTPException: raise except Exception as e: logger.error(f"upload_video error: {e}") raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Video upload failed") # --------------------------------------------------------------------------- # WebSocket — /ws (alerts + camera control + signage sync + SOS) # --------------------------------------------------------------------------- WS_HEARTBEAT_INTERVAL_S = 20.0 WS_RECEIVE_TIMEOUT_S = float(os.getenv("CEPHEUS_WS_RECEIVE_TIMEOUT", "300") or "300") async def _ws_send_heartbeat(websocket: WebSocket) -> None: """Keep HF/nginx proxy from killing idle WebSocket connections.""" try: while True: await asyncio.sleep(WS_HEARTBEAT_INTERVAL_S) await manager.send_personal_message( json.dumps({"type": "ping", "ts": time.time()}), websocket, ) except Exception: pass def _vision_set_camera_for_client(client_id: str, cam_id: str, index) -> bool: if hasattr(vision_engine, "set_camera_for_client"): return vision_engine.set_camera_for_client(client_id, cam_id, index) return vision_engine.set_camera(cam_id, index) def _vision_release_camera_for_client(client_id: str, cam_id: str) -> None: if hasattr(vision_engine, "release_camera_for_client"): vision_engine.release_camera_for_client(client_id, cam_id) else: vision_engine.release_camera(cam_id) def _vision_release_client_cameras(client_id: str) -> None: if hasattr(vision_engine, "release_client_cameras"): vision_engine.release_client_cameras(client_id) import threading _camera_state_lock = threading.Lock() # ONLY _set_camera_sync touches this _face_inference_lock = threading.Lock() # ONLY _process_frame_sync touches this _active_cam_id = None _active_cam_index = None def _set_camera_sync(cam_id: str, cam_index: str) -> None: """Pure state assignment. No inference, no shared locks, no I/O.""" global _active_cam_id, _active_cam_index with _camera_state_lock: _active_cam_id = cam_id _active_cam_index = cam_index logger.info("_set_camera_sync: cam=%s index=%s", cam_id, cam_index) def _process_frame_sync(frame_bytes: bytes) -> dict: """ Runs inside thread pool. Pure sync. No asyncio. No app.prepare(). """ import cv2 import numpy as np nparr = np.frombuffer(frame_bytes, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: return {"type": "face_results", "faces": []} # Vision engine should NOT use asyncio primitives internally. # All locking inside must use threading.Lock (NOT asyncio.Lock) with _face_inference_lock: matches = vision_engine.face_engine.match_all_faces(frame) faces_payload = [] for m in matches: faces_payload.append({ "name": m.get("name", "Unknown"), "confidence": round(float(m.get("confidence", 0.0)), 3), "bbox": m.get("bbox", [0, 0, 0, 0]), "found": bool(m.get("found", True)) }) cam_id = _active_cam_id if cam_id: with vision_engine.lock: vision_engine.camera_indices[cam_id] = "browser" vision_engine.latest_raw_frames[cam_id] = frame.copy() vision_engine.face_results[cam_id] = faces_payload return {"type": "face_results", "faces": faces_payload} @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await manager.connect(websocket) loop = asyncio.get_running_loop() try: while True: try: raw = await asyncio.wait_for(websocket.receive_text(), timeout=30.0) except asyncio.TimeoutError: await websocket.send_json({"type": "ping"}) continue except WebSocketDisconnect: break try: msg = json.loads(raw) except json.JSONDecodeError: continue msg_type = msg.get("type", "") logger.info("Received from client: %s", raw[:120]) if msg_type == "select_camera": cam_id = msg.get("cam_id", "cam-01") cam_index = msg.get("index", "browser") await loop.run_in_executor( _CAMERA_EXECUTOR, _set_camera_sync, cam_id, cam_index, ) await websocket.send_json({"type": "camera_selected", "cam_id": cam_id}) logger.info("camera_selected %s", cam_id) continue if msg_type == "frame": frame_b64 = msg.get("data", "") if not frame_b64: continue try: frame_bytes = base64.b64decode(frame_b64) except Exception: continue result = await loop.run_in_executor( _FACE_EXECUTOR, _process_frame_sync, frame_bytes, ) await websocket.send_json(result) continue # Unknown message type — ack and continue await websocket.send_json({"type": "ack", "received": msg_type}) except WebSocketDisconnect: pass except Exception as exc: logger.error("WebSocket /ws error: %s", exc, exc_info=True) finally: manager.disconnect(websocket) # --------------------------------------------------------------------------- # WebSocket — /ws/agents (ADK multi-agent streaming) # --------------------------------------------------------------------------- def _decode_copilot_image_data(image_data: str) -> tuple[bytes | None, str]: """Decode a data-URL or raw base64 image from the agent copilot UI.""" if not image_data or not isinstance(image_data, str): return None, "image/jpeg" data = image_data.strip() mime = "image/jpeg" if data.startswith("data:"): header, _, payload = data.partition(",") if ";" in header: mime = header[5:].split(";")[0].strip() or mime data = payload try: return base64.b64decode(data, validate=False), mime except Exception: return None, mime async def _vision_context_from_copilot_image(image_bytes: bytes) -> dict | None: """Run missing-person vision search on a copilot-attached image.""" if not image_bytes: return None try: nparr = np.frombuffer(image_bytes, np.uint8) frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if frame is None: return None result = await _run_face_work(vision_engine.search_missing_person, frame) return dict(result) if result else None except Exception as exc: logger.warning("Copilot image vision search failed: %s", exc) return None @app.websocket("/ws/agents") async def agents_ws(websocket: WebSocket): """Streaming WebSocket for the ADK multi-agent orchestrator.""" if not _ws_auth_ok(websocket): await websocket.close(code=1008, reason="Unauthorized") return try: await manager.connect_agent(websocket) except Exception as exc: logger.warning("Agent WS accept failed: %s", exc) try: await websocket.close(code=1011, reason="Agent socket unavailable") except Exception: pass return try: await manager.send_personal_message( json.dumps({"type": "connected", "msg": "agent_ws_ready"}), websocket, ) except Exception: pass ws_principal = _ws_principal(websocket) ws_user_id = _operator_id(ws_principal) heartbeat_task = asyncio.create_task(_ws_send_heartbeat(websocket)) try: while True: try: raw = await asyncio.wait_for( websocket.receive_text(), timeout=WS_RECEIVE_TIMEOUT_S, ) except asyncio.TimeoutError: logger.debug("Agents WS receive timeout — keeping connection") continue try: msg = json.loads(raw) except Exception: await manager.send_personal_message(json.dumps({"type": "error", "msg": "Invalid JSON"}), websocket) continue if msg.get("type") == "pong": continue if msg.get("type") == "ping": await manager.send_personal_message( json.dumps({"type": "pong", "ts": msg.get("ts", time.time())}), websocket, ) continue if msg.get("type") != "agent_query": continue prompt = (msg.get("prompt") or "").strip() raw_image_data = msg.get("image_data") image_bytes, image_mime = _decode_copilot_image_data(raw_image_data) if raw_image_data else (None, "image/jpeg") if raw_image_data and image_bytes is None: await manager.send_personal_message( json.dumps({"type": "error", "msg": "Could not decode attached image"}), websocket, ) continue if image_bytes and not prompt: prompt = "Analyze the attached image and report tactical findings." elif image_bytes: vision_ctx = await _vision_context_from_copilot_image(image_bytes) if vision_ctx: prompt = ( f"{prompt}\n\n[Vision engine analysis of attached image: " f"{json.dumps(vision_ctx)}]" ) else: prompt = f"{prompt}\n\n[User attached an image for visual analysis.]" if not prompt: await manager.send_personal_message( json.dumps({"type": "error", "msg": "Prompt or image required"}), websocket, ) continue session_id = msg.get("session_id", "default") agent_override = (msg.get("agent_override") or "").strip() or None # Update orchestrator context with freshest live data each query agentic_orchestrator.inject_context( alerts=alerts_db, sos_events=sos_events, vision_engine=vision_engine, rooms=[], detections_history=detections_history, issues=issues_db, incident_logs=incident_logs, signage_state=signage_state, ) try: async for step in agentic_orchestrator.run_agent_stream( prompt, session_id, user_id=ws_user_id, agent_override=agent_override, image_data=image_bytes, image_mime=image_mime, ): await _append_agent_step(step, ws_user_id, session_id) await manager.send_personal_message(json.dumps({"type": "agent_step", **step}), websocket) save_issues() # Save after activity await manager.send_personal_message(json.dumps({"type": "agent_done"}), websocket) except Exception as exc: logger.error(f"Agent stream error: {exc}") await manager.send_personal_message( json.dumps({"type": "agent_step", "agent": "System", "content": "Agent request failed", "step_type": "error"}), websocket, ) await manager.send_personal_message(json.dumps({"type": "agent_done"}), websocket) except WebSocketDisconnect: logger.info("Agent client disconnected.") except Exception as e: logger.error(f"Agents WS error: {e}") finally: heartbeat_task.cancel() try: await heartbeat_task except asyncio.CancelledError: pass manager.disconnect(websocket) # --------------------------------------------------------------------------- # WebSocket — /ws/vision (legacy single-frame processing) # --------------------------------------------------------------------------- @app.websocket("/ws/vision") async def vision_ws(websocket: WebSocket): """Legacy single-frame endpoint — prefer browser track_frame + /ws camera_broadcast.""" if not _ws_auth_ok(websocket): await websocket.close(code=1008, reason="Unauthorized") return logger.warning("Client connected to deprecated /ws/vision — migrate to track_frame flow") await websocket.accept() try: while True: data = await websocket.receive_text() count, frame_b64 = vision_engine.process_frame(data) await websocket.send_text(json.dumps({"count": count, "frame": frame_b64})) except WebSocketDisconnect: pass except Exception as e: logger.error(f"Vision WS error: {e}") # --------------------------------------------------------------------------- # Background task — camera broadcast loop # --------------------------------------------------------------------------- async def background_vision_task(): """Read from all cameras every ~100ms, broadcast frames + AI results.""" loop = asyncio.get_event_loop() while True: try: active_results, new_events = await loop.run_in_executor(None, vision_engine.get_active_frames) # Process AI-triggered alerts (Falls, Stampedes) for evt in new_events: alert = Alert(**evt) # This will broadcast to both Tactical and Agentic via existing logic _spawn(_process_new_alert(alert, source="VISION_ENGINE_AI")) def to_b64(frame) -> str | None: if frame is None: return None _, buf = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) return f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}" total_count = sum(count for count, _ in active_results.values()) vision_engine.last_crowd_count = total_count vision_engine.last_total_count = total_count prev_crowd = getattr(vision_engine, "_last_broadcast_crowd", None) if prev_crowd != total_count: vision_engine._last_broadcast_crowd = total_count await manager.broadcast(json.dumps({ "type": "crowd_update", "total": total_count, "timestamp": datetime.datetime.now().isoformat(), })) cameras_payload = {} for cam_id, (count, annotated) in active_results.items(): face_hits = vision_engine.face_results.get(cam_id, []) cameras_payload[cam_id] = { "count": count, "frame": to_b64(annotated), "faces": [ {k: v for k, v in (face or {}).items() if k != "embedding"} for face in face_hits ], } # Update persistent history for known faces async with store_locks.detections_lock: for f in face_hits: if f["name"] != "Unknown": _upsert_detection_sighting( f["name"], confidence=float(f.get("confidence", 0.0)), cam_id=cam_id, thumbnail=f.get("thumbnail"), ) global _detections_dirty _detections_dirty = True global _last_detection_flush now = datetime.datetime.now() if _detections_dirty and (now - _last_detection_flush).total_seconds() >= DETECTION_SAVE_INTERVAL_SECONDS: save_detections() _detections_dirty = False _last_detection_flush = now frame_detection_updates: list[dict] = [] for cam_id, (count, _) in active_results.items(): for f in vision_engine.face_results.get(cam_id, []): if f.get("name") and f["name"] != "Unknown": frame_detection_updates.append( next( (d for d in detections_history if d.get("name") == f["name"]), { "name": f["name"], "camId": cam_id, "confidence": f.get("confidence"), "seen_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), }, ) ) broadcast_data = { "type": "camera_broadcast", "total_count": total_count, "cameras": cameras_payload, "active_cam_count": len(active_results), "detection_updates": frame_detection_updates, "timestamp": datetime.datetime.now().isoformat(), } await manager.broadcast(json.dumps(broadcast_data)) except Exception as e: logger.error(f"Background vision task error: {e}") await asyncio.sleep(0.1) # ~10 fps def _schedule_coro(coro) -> None: """Schedule a coroutine on the main event loop from any thread (agent tools run in a worker thread).""" loop = getattr(app.state, "loop", None) if loop is None: try: _spawn(coro) except RuntimeError: logger.warning("No event loop available to schedule agent broadcast") return try: asyncio.run_coroutine_threadsafe(coro, loop) except Exception as exc: # pragma: no cover - defensive logger.warning("Failed to schedule agent broadcast: %s", exc) def _on_agent_issue_created(issue: dict) -> None: """Persist and broadcast issues created/updated by the agent orchestrator.""" save_issues() _schedule_coro(manager.broadcast(json.dumps({"type": "issue_update", "issue": issue}))) def _on_agent_ai_model_changed(model_id: str, enabled: bool, all_status: dict) -> None: """Broadcast vision AI model changes triggered by the agent so the UI stays in sync.""" _schedule_coro(manager.broadcast(json.dumps({ "type": "ai_model_update", "model_id": model_id, "enabled": enabled, "all": all_status, }))) def _agent_broadcast_alert(alert_type: str, message: str, location: str, severity: str) -> dict: """Agent action: raise a real alert through the standard alert pipeline.""" alert = Alert( type=alert_type or "info", location=location or "", message=message or "", severity=severity or "high", ) _schedule_coro(_process_new_alert(alert, source="AGENT")) return alert.model_dump() def _agent_set_signage(signage_id: str, active: bool) -> None: """Agent action: toggle a digital signage panel.""" _schedule_coro(_broadcast_signage_state(signage_id, bool(active))) def _agent_update_issue(issue: dict) -> None: """Agent action: persist + broadcast an issue the agent mutated in place.""" save_issues() _schedule_coro(manager.broadcast(json.dumps({"type": "issue_update", "issue": issue}))) _warmload_in_progress_lock = asyncio.Lock() _warmload_in_progress = False async def _safe_warmload_models() -> dict | None: global _warmload_in_progress if _warmload_in_progress: logger.info("Warmload already in progress — skipping concurrent call.") return None _warmload_in_progress = True try: loop = asyncio.get_event_loop() return await loop.run_in_executor(_FACE_EXECUTOR, vision_engine.warmload_models) finally: _warmload_in_progress = False async def _run_vision_warmload() -> None: """Background model warmload — idempotent, safe to call from startup or /health/live.""" global _warmload_complete if not mark_warmload_started(): return accel = detect_acceleration() logger.info( "Vision warmload starting (full_vision=%s, provider=%s, cuda=%s).", use_full_vision_engine(), accel.get("provider"), accel.get("cuda_available"), ) loop = asyncio.get_event_loop() try: result = await _safe_warmload_models() if result is None: return mark_warmload_complete(result) _warmload_complete = True logger.info("[Startup] Vision warmload complete") except Exception as exc: mark_warmload_failed(str(exc)) logger.error("Vision warmload failed: %s", exc) _startup_warmload_done: bool = False async def _keep_warm_loop() -> None: """Sleep FIRST. Never run any warmload at t=0.""" while not _startup_warmload_done: await asyncio.sleep(1) logger.info("Keep-warm loop started — first YOLO ping in 120s.") await asyncio.sleep(120) # mandatory first sleep — no warmload at t=0 while True: try: loop = asyncio.get_running_loop() await loop.run_in_executor(_FACE_EXECUTOR, _blocking_keepwarm) except Exception as exc: logger.warning("Keep-warm error (non-fatal): %s", exc) await asyncio.sleep(120) def _blocking_keepwarm() -> None: """YOLO ping only. MUST NOT call app.prepare() or any InsightFace inference.""" import numpy as np if vision_engine and vision_engine.model: dummy = np.zeros((320, 320, 3), dtype=np.uint8) _ = vision_engine.model(dummy, verbose=False) logger.info("Keep-warm: YOLO ping OK.") @app.on_event("startup") async def startup_event(): app.state.loop = asyncio.get_running_loop() security_config.validate_startup() auth_service.validate_production_users() load_all_data() auth_service.init_refresh_store() if os.getenv("CEPHEUS_PRODUCTION", "").strip() == "1": if not os.getenv("CEPHEUS_JWT_SECRET", "").strip(): logger.critical("CEPHEUS_PRODUCTION=1 but CEPHEUS_JWT_SECRET is missing") if os.getenv("CEPHEUS_AUTH_DEV_MODE", "").strip() == "1": logger.critical("CEPHEUS_AUTH_DEV_MODE must be 0 in production") logger.info("Application started. Persistent data loaded. Refresh store: %s", auth_service.refresh_store_backend()) logger.info("Face inference pool: %d workers (OMP_NUM_THREADS=4)", _FACE_WORKERS) # Cloud engine loads InsightFace at import time; warmload + keep-warm maintain hot models. global _startup_warmload_done await _run_vision_warmload() _startup_warmload_done = True if vision_engine.face_engine is not None: fe = vision_engine.face_engine fe.reload_db() fe.backfill_from_db() fe._db_stamp = fe._enrolled_dirs_mtime() _spawn(_keep_warm_loop()) if not CEPHEUS_CLOUD: _spawn(background_vision_task()) if os.getenv("CEPHEUS_GOSSIP_AUTO_START", "1").strip().lower() not in ("0", "false", "no", "off"): gossip_bridge.start_tracking() gossip_root = os.getenv("CEPHEUS_GOSSIP_ROOT", "").strip() enrolled_names: list[str] = [] if os.path.exists(_FACE_DB_PATH): for person_name in os.listdir(_FACE_DB_PATH): if person_name.startswith("unknown_"): continue person_dir = os.path.join(_FACE_DB_PATH, person_name) if os.path.isdir(person_dir): gossip_bridge.seed_known_person(person_name) enrolled_names.append(person_name) if gossip_root: gossip_bridge.set_root_person(gossip_root) elif enrolled_names: gossip_bridge.set_root_person(sorted(enrolled_names)[0]) if CEPHEUS_CLOUD and use_full_vision_engine(): accel = detect_acceleration() mode = ( "cloud full vision (GPU, no local camera broadcast loop)" if accel.get("cuda_available") else "cloud full vision (CPU, no local camera broadcast loop)" ) elif CEPHEUS_CLOUD: mode = "cloud stub (no ML / cameras)" else: mode = "full vision (local)" logger.info("Gossip bridge ready. Root: %s. Mode: %s", gossip_bridge.get_gossip_json().get("root_person"), mode) # Inject live context into ADK agent orchestrator agentic_orchestrator.inject_context( alerts=alerts_db, sos_events=sos_events, vision_engine=vision_engine, rooms=[], detections_history=detections_history, issues=issues_db, incident_logs=incident_logs, signage_state=signage_state, ) if hasattr(agentic_orchestrator, "set_issue_created_callback"): agentic_orchestrator.set_issue_created_callback(_on_agent_issue_created) if hasattr(agentic_orchestrator, "set_ai_model_changed_callback"): agentic_orchestrator.set_ai_model_changed_callback(_on_agent_ai_model_changed) if hasattr(agentic_orchestrator, "set_action_handlers"): agentic_orchestrator.set_action_handlers( broadcast_alert=_agent_broadcast_alert, set_signage=_agent_set_signage, update_issue=_agent_update_issue, ) logger.info("Vision AI defaults: %s", vision_engine.get_ai_status()) _orch_mode = "ADK multi-agent" if getattr(agentic_orchestrator, "_ADK_AVAILABLE", False) else "native google.genai" logger.info("Agentic orchestrator ready (%s).", _orch_mode) if __name__ == "__main__": import uvicorn _port = int(os.environ.get("PORT", os.environ.get("UVICORN_PORT", "8000"))) uvicorn.run(app, host="0.0.0.0", port=_port)