github-actions
Deploy to Hugging Face
c794b6b
Raw
History Blame Contribute Delete
175 kB
from __future__ import annotations
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, File, UploadFile, Form, Depends, Header, HTTPException, status, Request, Query, Body
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from typing import List, Optional
import datetime
import uuid
import logging
import json
import cv2
import base64
import asyncio
import time
import math
import numpy as np
import sys
import os
from concurrent.futures import ThreadPoolExecutor
from vision_pipeline import VisionFramePipeline, resize_for_infer, scale_match_bboxes
import vision_session
# Triggering reload to load updated issues.json
try:
import httpx
except Exception: # pragma: no cover - httpx is a hard dependency
httpx = None
from pathlib import Path
from dotenv import load_dotenv
# ── Add Face_Recognition/ to path so gossip_bridge can be imported
_FR_PATH = os.path.join(os.path.dirname(__file__), "Face_Recognition")
if _FR_PATH not in sys.path:
sys.path.insert(0, _FR_PATH)
import gossip_bridge
import auth_service
import store_locks
from observability import RequestContextMiddleware
from security_headers import SecurityHeadersMiddleware
from rate_limiter import login_limiter, refresh_limiter, sos_limiter
from agentic_service import generate_agentic_plan, generate_chat_response
from alert_routing import route_alert
import persistence as persist
import security_config
import face_metadata
try:
import face_live_search
except ImportError as e:
import traceback
print(f"ERROR: Failed to import face_live_search: {e}")
traceback.print_exc()
face_live_search = None
try:
import agentic_orchestrator
except ImportError as _orch_import_err:
print(f"WARNING: agentic_orchestrator unavailable ({_orch_import_err}) — agent WS uses stub responses.")
class _AgenticOrchestratorStub:
@staticmethod
def inject_context(**_kwargs):
pass
@staticmethod
async def run_agent_stream(prompt: str, session_id: str = "default", **_kwargs):
yield {
"agent": "System",
"content": "Agent runtime unavailable. Install google-adk and set GEMINI_API_KEY for live agent queries.",
"step_type": "error",
}
agentic_orchestrator = _AgenticOrchestratorStub()
load_dotenv(os.path.join(os.path.dirname(__file__), ".env"), override=True)
if not os.getenv("GEMINI_API_KEY"):
print("WARNING: GEMINI_API_KEY not found in environment!")
else:
print("INFO: GEMINI_API_KEY loaded successfully.")
# Cloud flag: skips local camera broadcast loop; stub vision unless GPU vision is enabled.
CEPHEUS_CLOUD = os.getenv("CEPHEUS_CLOUD", "").lower() in ("1", "true", "yes")
from vision_runtime import (
detect_acceleration,
get_warmload_state,
live_status_payload,
mark_warmload_complete,
mark_warmload_failed,
mark_warmload_started,
register_face_ready_probe,
use_full_vision_engine,
)
if use_full_vision_engine():
from vision_engine import VisionEngine
else:
from vision_engine_cloud import VisionEngine
# ── Persistent Storage ───────────────────────────────
DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
os.makedirs(DATA_DIR, exist_ok=True)
DETECTIONS_FILE = os.path.join(DATA_DIR, "detections_history.json")
ALERTS_FILE = os.path.join(DATA_DIR, "alerts.json")
ISSUES_FILE = os.path.join(DATA_DIR, "issues.json")
LOGS_FILE = os.path.join(DATA_DIR, "incident_logs.json")
SOS_FILE = os.path.join(DATA_DIR, "sos_events.json")
AGENT_STEPS_FILE = os.path.join(DATA_DIR, "agent_steps.json")
STAFF_REQUESTS_FILE = os.path.join(DATA_DIR, "staff_requests.json")
SITE_BLUEPRINT_FILE = os.path.join(DATA_DIR, "site_blueprint.json")
SIGNAGE_PLACEMENTS_FILE = os.path.join(DATA_DIR, "signage_placements.json")
SIGNAGE_STATE_FILE = os.path.join(DATA_DIR, "signage_state.json")
AGENTIC_PLANS_FILE = os.path.join(DATA_DIR, "agentic_plans.json")
TARGETED_DISPATCHES_FILE = os.path.join(DATA_DIR, "targeted_dispatches.json")
DISPATCH_LOG_FILE = os.path.join(DATA_DIR, "emergency_dispatch_log.json")
ALERTS_LOG_FILE = os.path.join(DATA_DIR, "alerts_broadcast_log.json")
STAFF_ACTIVITY_FILE = os.path.join(DATA_DIR, "staff_activity.json")
STAFF_DUTY_FILE = os.path.join(DATA_DIR, "staff_duty.json")
STAFF_INBOX_ACKS_FILE = os.path.join(DATA_DIR, "staff_inbox_acks.json")
SIGNAGE_RECORDS_FILE = os.path.join(DATA_DIR, "signage_records.json")
GOSSIP_HISTORY_DIR = os.path.join(DATA_DIR, "gossip_history")
BLUEPRINT_DIR = os.path.join(DATA_DIR, "uploads", "blueprints")
_warmload_complete = False
detections_history: List[dict] = []
alerts_db: List[dict] = []
issues_db: List[dict] = []
incident_logs: List[dict] = []
sos_events: List[dict] = []
agent_steps_history: List[dict] = []
staff_requests_db: List[dict] = []
site_blueprint: dict = {}
signage_placements: dict = {}
emergency_dispatch_log: List[dict] = []
alerts_broadcast_log: List[dict] = []
staff_activity: List[dict] = []
staff_duty: dict[str, dict] = {}
staff_inbox_acks: List[dict] = []
signage_records: List[dict] = []
SIGNAGE_BROADCAST_TEMPLATES = {
"FIRE_HERE": "FIRE ALERT: Fire reported at {location}. Evacuate the area immediately. Follow E-L markers to exits.",
"DO_NOT_ENTER": "AREA CLOSED: {location} is blocked. Do not enter. Seek alternate route.",
"SHELTER": "SHELTER IN PLACE: Report to shelter zone at {location}. Await further instructions.",
"ALL_CLEAR": "ALL CLEAR: {location} is now safe. Normal operations may resume.",
"LOCKDOWN": "LOCKDOWN INITIATED: Full lockdown active at {location}. Remain in place. Do not move until further notice.",
"EL": "EMERGENCY EXIT: Follow E-L route at {location} to nearest exit.",
}
def load_all_data():
global detections_history, alerts_db, issues_db, incident_logs, sos_events
global agent_steps_history, staff_requests_db, site_blueprint, signage_placements
global signage_state, agentic_plans, targeted_dispatches
global emergency_dispatch_log, alerts_broadcast_log, staff_activity, signage_records
global staff_duty, staff_inbox_acks
list_files = {
DETECTIONS_FILE: "detections_history",
ALERTS_FILE: "alerts_db",
ISSUES_FILE: "issues_db",
LOGS_FILE: "incident_logs",
SOS_FILE: "sos_events",
AGENT_STEPS_FILE: "agent_steps_history",
STAFF_REQUESTS_FILE: "staff_requests_db",
}
dict_files = {
SITE_BLUEPRINT_FILE: "site_blueprint",
SIGNAGE_PLACEMENTS_FILE: "signage_placements",
SIGNAGE_STATE_FILE: "signage_state",
}
list_defaults = {
AGENTIC_PLANS_FILE: "agentic_plans",
TARGETED_DISPATCHES_FILE: "targeted_dispatches",
DISPATCH_LOG_FILE: "emergency_dispatch_log",
ALERTS_LOG_FILE: "alerts_broadcast_log",
STAFF_ACTIVITY_FILE: "staff_activity",
STAFF_INBOX_ACKS_FILE: "staff_inbox_acks",
SIGNAGE_RECORDS_FILE: "signage_records",
}
dict_list_defaults = {
STAFF_DUTY_FILE: "staff_duty",
}
for path, var_name in list_files.items():
loaded = persist.load_json(path, [])
if isinstance(loaded, list):
globals()[var_name] = loaded
logger.info("Loaded %s items from %s", len(loaded), os.path.basename(path))
detections_history[:] = [_normalize_detection_entry(d) for d in detections_history]
for path, var_name in dict_files.items():
loaded = persist.load_json(path, {})
if isinstance(loaded, dict):
globals()[var_name] = loaded
logger.info("Loaded dict from %s", os.path.basename(path))
for path, var_name in list_defaults.items():
loaded = persist.load_json(path, [])
if isinstance(loaded, list):
globals()[var_name] = loaded
for path, var_name in dict_list_defaults.items():
loaded = persist.load_json(path, {})
if isinstance(loaded, dict):
globals()[var_name] = loaded
if not signage_state:
signage_state.update({
"s1": False, "s2": False, "s3": False,
"s4": False, "s5": False, "s6": True, "s8": False,
})
def save_json(path, data):
persist.save_json(path, data)
def save_detections(): save_json(DETECTIONS_FILE, detections_history[-100:])
def _normalize_detection_entry(raw: dict) -> dict:
"""Ensure persisted sightings use ISO seen_at (ISSUE-031)."""
entry = dict(raw)
if not entry.get("seen_at"):
ts = entry.get("timestamp")
if ts and "T" in str(ts):
entry["seen_at"] = ts
else:
entry["seen_at"] = None
if entry.get("seen_at") and not entry.get("timestamp"):
try:
entry["timestamp"] = datetime.datetime.fromisoformat(
str(entry["seen_at"]).replace("Z", "+00:00")
).strftime("%H:%M:%S")
except ValueError:
entry["timestamp"] = str(entry["seen_at"])
return entry
def _upsert_detection_sighting(
name: str,
*,
confidence: float,
cam_id: str,
thumbnail: str | None = None,
) -> dict:
"""Record or refresh one person's sighting with ISO seen_at."""
seen_at = datetime.datetime.now(datetime.timezone.utc).isoformat()
entry: dict = {
"name": name,
"confidence": round(float(confidence), 3),
"camId": cam_id,
"seen_at": seen_at,
"timestamp": datetime.datetime.now().strftime("%H:%M:%S"),
}
if thumbnail is not None:
entry["thumbnail"] = thumbnail
idx = next((i for i, d in enumerate(detections_history) if d.get("name") == name), None)
if idx is not None:
detections_history[idx] = entry
else:
detections_history.append(entry)
_trim_memory_list(detections_history, 500)
return entry
async def _ensure_staff_request_issue_id(alert: "Alert", extra_context: dict | None = None) -> str | None:
"""Require a real issue for staff dispatch — create one if missing (ISSUE-033)."""
issue_id = alert.issue_id or alert.issue or (extra_context or {}).get("issue_id")
if issue_id or alert.type != "staff_request":
return issue_id
new_issue = {
"id": f"ISS-{uuid.uuid4().hex[:6].upper()}",
"title": f"Staff request @ {alert.location or 'site'}",
"desc": alert.message,
"status": "ONGOING",
"progress": 0,
"priority": alert.severity or "medium",
"timestamp": datetime.datetime.now().isoformat(),
"staff_request_status": "pending",
"staff_needed": 1,
}
async with store_locks.issues_lock:
issues_db.insert(0, new_issue)
save_issues()
await manager.broadcast(json.dumps({"type": "issue_update", "issue": new_issue}))
return new_issue["id"]
def _issue_context_for_staff(issue_id: str | None) -> dict:
"""Snapshot issue fields for staff request detail views."""
if not issue_id:
return {}
for iss in issues_db:
if iss.get("id") != issue_id:
continue
meta = iss.get("metadata") or {}
attachments = list(meta.get("attachments") or [])
if iss.get("image") and iss["image"] not in attachments:
attachments.append(iss["image"])
return {
"issue_title": iss.get("title") or "",
"issue_priority": str(iss.get("priority") or "MEDIUM").upper(),
"issue_status": iss.get("status") or "ONGOING",
"issue_desc": iss.get("desc") or "",
"instructions": meta.get("instructions") or (iss.get("desc") or "")[:800],
"zone": meta.get("zone") or meta.get("location") or "",
"lat": meta.get("lat") or iss.get("lat"),
"lng": meta.get("lng") or iss.get("lng"),
"linked_sos_id": meta.get("sos_id") or meta.get("linked_sos_id"),
"attachments": attachments,
}
return {}
def _enriched_staff_request(req: dict) -> dict:
"""Merge live issue context into a staff request for detail views."""
merged = dict(req)
merged.update(_issue_context_for_staff(req.get("issue_id")))
return merged
_STAFF_MILESTONE_PROGRESS = {
"accepted": 25,
"en_route": 60,
"on_scene": 85,
"resolved": 100,
}
async def _append_staff_activity_entry(
name: str,
zone: str,
message: str,
*,
event_type: str = "staff_action",
metadata: dict | None = None,
) -> dict:
entry = {
"id": f"act-{uuid.uuid4().hex[:8]}",
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"name": name or "Staff",
"zone": zone or "",
"message": message or "",
"event_type": event_type,
"metadata": metadata or {},
}
staff_activity.insert(0, entry)
del staff_activity[100:]
save_json(STAFF_ACTIVITY_FILE, staff_activity)
await manager.broadcast(json.dumps({"type": "staff_activity", "entry": entry}))
return entry
_ISSUE_TERMINAL = frozenset({"RESOLVED", "CLOSED", "CANCELLED"})
def _validate_issue_patch(current: dict, patch: dict) -> None:
"""Guard invalid issue state transitions (ISSUE-041)."""
cur_status = str(current.get("status", "ONGOING")).upper()
new_status = patch.get("status")
if new_status is not None:
new_status_u = str(new_status).upper()
if cur_status in _ISSUE_TERMINAL and new_status_u not in _ISSUE_TERMINAL:
raise HTTPException(
status_code=409,
detail=f"Cannot reopen issue from terminal status {cur_status}",
)
new_progress = patch.get("progress")
if new_progress is not None:
prog = int(new_progress)
if prog >= 100 and new_status is None and cur_status not in _ISSUE_TERMINAL:
patch["status"] = "RESOLVED"
if prog < 100 and str(patch.get("status", cur_status)).upper() in _ISSUE_TERMINAL:
raise HTTPException(
status_code=409,
detail="Cannot set progress below 100 on a resolved/closed issue",
)
def save_alerts(): save_json(ALERTS_FILE, [a if isinstance(a, dict) else a.model_dump() for a in alerts_db[-200:]])
def save_issues(): save_json(ISSUES_FILE, issues_db[-100:])
def save_logs(): save_json(LOGS_FILE, incident_logs[-300:])
def save_sos(): save_json(SOS_FILE, sos_events[-100:])
def save_agent_steps(): save_json(AGENT_STEPS_FILE, agent_steps_history[-200:])
def save_staff_requests(): save_json(STAFF_REQUESTS_FILE, staff_requests_db[-200:])
def save_site_blueprint(): save_json(SITE_BLUEPRINT_FILE, site_blueprint)
def save_signage_placements(): save_json(SIGNAGE_PLACEMENTS_FILE, signage_placements)
def save_signage_state(): save_json(SIGNAGE_STATE_FILE, signage_state)
def save_agentic_plans(): save_json(AGENTIC_PLANS_FILE, agentic_plans[-100:])
def save_targeted_dispatches(): save_json(TARGETED_DISPATCHES_FILE, targeted_dispatches[-200:])
def save_signage_records(): save_json(SIGNAGE_RECORDS_FILE, signage_records[-500:])
def _trim_memory_list(store: list, max_len: int) -> None:
if len(store) > max_len:
del store[max_len:]
def _operator_id(principal: dict | None) -> str:
return str((principal or {}).get("sub") or "operator")
async def _append_agent_step(step: dict, user_id: str, session_id: str) -> None:
record = {
**step,
"user_id": user_id,
"session_id": session_id,
"timestamp": datetime.datetime.now().isoformat(),
}
async with store_locks.agent_steps_lock:
agent_steps_history.append(record)
_trim_memory_list(agent_steps_history, 500)
save_agent_steps()
async def _clear_agent_steps(user_id: str, session_id: str | None = None) -> None:
global agent_steps_history
async with store_locks.agent_steps_lock:
if session_id:
agent_steps_history = [
s for s in agent_steps_history
if not (s.get("user_id") == user_id and s.get("session_id") == session_id)
]
else:
agent_steps_history = [s for s in agent_steps_history if s.get("user_id") != user_id]
save_agent_steps()
_detections_dirty = False
_last_detection_flush = datetime.datetime.min
DETECTION_SAVE_INTERVAL_SECONDS = float(os.getenv("DETECTION_SAVE_INTERVAL_SECONDS", "2"))
MAX_IMAGE_BYTES = int(os.getenv("MAX_IMAGE_UPLOAD_BYTES", str(5 * 1024 * 1024)))
MAX_VIDEO_BYTES = int(os.getenv("MAX_VIDEO_UPLOAD_BYTES", str(50 * 1024 * 1024)))
ALLOWED_VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm"}
ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"}
ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp"}
API_KEY = security_config.resolve_api_key()
DEMO_MODE = security_config.is_demo_mode()
IS_PRODUCTION = security_config.is_production()
# ---------------------------------------------------------------------------
# Logging
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(levelname)s: %(message)s",
)
logger = logging.getLogger(__name__)
def _task_done_callback(task: asyncio.Task) -> None:
if task.cancelled():
return
exc = task.exception()
if exc is not None:
logger.error("Unhandled error in background task: %s", exc, exc_info=exc)
def _spawn(coro) -> asyncio.Task:
task = asyncio.create_task(coro)
task.add_done_callback(_task_done_callback)
return task
# ---------------------------------------------------------------------------
# App
# ---------------------------------------------------------------------------
app = FastAPI(
title="Crisis Communication System",
docs_url=None if IS_PRODUCTION else "/docs",
redoc_url=None if IS_PRODUCTION else "/redoc",
openapi_url=None if IS_PRODUCTION else "/openapi.json",
)
def _parse_origins() -> list[str]:
origins_env = os.getenv("CORS_ORIGINS", "")
if origins_env.strip():
return [o.strip() for o in origins_env.split(",") if o.strip()]
return [
"https://community-security-and-emergency-ma.vercel.app",
"https://rapid-eec43.web.app",
"https://rapid-eec43.firebaseapp.com",
"http://localhost:5173",
"http://localhost:5174",
"http://127.0.0.1:5173",
"http://127.0.0.1:5174",
]
def _vision_warming_response() -> JSONResponse | None:
"""Return 503 while InsightFace/YOLO warmload is still running on cold start."""
if not use_full_vision_engine():
return None
if _warmload_complete or get_warmload_state().get("complete"):
return None
return JSONResponse(
status_code=503,
content={"status": "warming_up", "message": "Vision model loading. Retry in 10s."},
)
def _api_key_valid(provided: str | None) -> bool:
if not provided:
return False
if API_KEY and security_config.safe_compare_strings(provided, API_KEY):
return True
readonly = security_config.resolve_readonly_api_key()
return bool(readonly and security_config.safe_compare_strings(provided, readonly))
def _api_key_role(provided: str | None) -> str:
if provided and API_KEY and security_config.safe_compare_strings(provided, API_KEY):
return os.getenv("CEPHEUS_API_KEY_ROLE", "service")
readonly = security_config.resolve_readonly_api_key()
if provided and readonly and security_config.safe_compare_strings(provided, readonly):
return "readonly"
guest = os.getenv("CEPHEUS_GUEST_API_KEY", "").strip()
if provided and guest and security_config.safe_compare_strings(provided, guest):
return "guest"
return "service"
def _enforce_readonly_writes(request: Request, principal: dict) -> None:
if principal.get("role") == "readonly" and request.method not in ("GET", "HEAD", "OPTIONS"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Readonly API key cannot perform write operations",
)
def _guest_api_key_valid(provided: str | None) -> bool:
if not provided:
return False
guest = os.getenv("CEPHEUS_GUEST_API_KEY", "").strip()
if guest and security_config.safe_compare_strings(provided, guest):
return True
# Local dev fallback: primary API key with guest role scope
if not security_config.is_production() and _api_key_valid(provided):
return _api_key_role(provided) in ("guest", "service")
return False
def _demo_login_allowed(request: Request) -> bool:
if security_config.is_production() or not DEMO_MODE:
return False
host = (request.client.host if request.client else "") or ""
return host in ("127.0.0.1", "localhost", "::1")
def require_api_key(x_api_key: str | None = Header(default=None, alias="X-API-Key")) -> None:
if not API_KEY:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Server API key not configured")
if not _api_key_valid(x_api_key):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized")
def _resolve_bearer(authorization: str | None) -> dict | None:
if not authorization or not authorization.lower().startswith("bearer "):
return None
token = authorization.split(" ", 1)[1].strip()
if not token:
return None
try:
payload = auth_service.decode_access_token(token)
return {"auth": "jwt", "role": payload.get("role"), "sub": payload.get("sub")}
except Exception:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired session")
def public_vision_allowed() -> bool:
"""Anonymous access to vision/WS routes (HF Spaces — no JWT refresh interruptions)."""
val = os.getenv("ALLOW_PUBLIC_VISION", os.getenv("CEPHEUS_PUBLIC_VISION", ""))
return str(val).strip().lower() in ("1", "true", "yes", "on")
PUBLIC_VISION_PRINCIPAL = {"auth": "public", "role": "admin", "sub": "public-vision"}
def require_vision_access(
request: Request,
x_api_key: str | None = Header(default=None, alias="X-API-Key"),
authorization: str | None = Header(default=None),
) -> dict:
if public_vision_allowed():
return PUBLIC_VISION_PRINCIPAL
return require_operator(request, x_api_key, authorization)
def require_operator(
request: Request,
x_api_key: str | None = Header(default=None, alias="X-API-Key"),
authorization: str | None = Header(default=None),
) -> dict:
"""API key or JWT for operator routes.
CEPHEUS_API_KEY grants service-role automation (full operator API access).
Optional CEPHEUS_READONLY_API_KEY maps to role ``readonly`` (GET-only routes).
Set CEPHEUS_API_KEY_SCOPE to document intended key use for operators/audit.
"""
if not auth_service.auth_enabled():
require_api_key(x_api_key)
principal = {"auth": "api_key", "role": _api_key_role(x_api_key)}
_enforce_readonly_writes(request, principal)
return principal
bearer = _resolve_bearer(authorization)
if bearer:
_enforce_readonly_writes(request, bearer)
return bearer
if _api_key_valid(x_api_key):
principal = {"auth": "api_key", "role": _api_key_role(x_api_key)}
_enforce_readonly_writes(request, principal)
return principal
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required")
def require_operator_flexible(
request: Request,
x_api_key: str | None = Header(default=None, alias="X-API-Key"),
authorization: str | None = Header(default=None),
api_key: str | None = Query(default=None),
token: str | None = Query(default=None),
) -> dict:
"""Like require_operator; dev-only query credentials for legacy img/file tags."""
q_api = request.query_params.get("api_key")
q_token = request.query_params.get("token")
if security_config.is_production() and (q_api or q_token or api_key or token):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Query-string authentication is disabled in production",
)
if token and auth_service.auth_enabled() and not security_config.is_production():
try:
payload = auth_service.decode_access_token(token)
return {"auth": "jwt", "role": payload.get("role"), "sub": payload.get("sub")}
except Exception:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired session")
return require_operator(request, x_api_key or api_key, authorization)
def require_operator_if_auth_enabled(
request: Request,
x_api_key: str | None = Header(default=None, alias="X-API-Key"),
authorization: str | None = Header(default=None),
) -> dict:
"""Require credentials when auth is on; allow anonymous access on public-vision HF."""
if public_vision_allowed():
try:
return require_operator(request, x_api_key, authorization)
except HTTPException as exc:
if exc.status_code == status.HTTP_401_UNAUTHORIZED:
return PUBLIC_VISION_PRINCIPAL
raise
return require_operator(request, x_api_key, authorization)
def require_dashboard_read(
request: Request,
x_api_key: str | None = Header(default=None, alias="X-API-Key"),
authorization: str | None = Header(default=None),
) -> dict:
"""SOS/staff read routes — anonymous on HF public vision (stale JWT must not 401)."""
if public_vision_allowed():
return PUBLIC_VISION_PRINCIPAL
principal = require_operator(request, x_api_key, authorization)
if not auth_service.has_role(principal, "staff", "admin", "operator", "service"):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
return principal
def require_role(*roles: str):
def _dep(principal: dict = Depends(require_operator)) -> dict:
if not auth_service.has_role(principal, *roles):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions")
return principal
return _dep
require_admin = require_role("admin")
require_staff = require_role("staff", "admin", "operator")
require_staff_read = require_role("staff", "admin", "operator", "service")
def _audit_destructive(principal: dict, action: str, detail: str = "") -> None:
"""Log destructive operations for interim RBAC audit trail (ISSUE-182 mitigation)."""
logger.warning(
"DESTRUCTIVE action=%s user=%s role=%s detail=%s",
action,
principal.get("sub"),
principal.get("role"),
detail,
)
def require_admin_audited(action: str):
"""Admin-only dependency with destructive-action audit logging."""
def _dep(principal: dict = Depends(require_admin)) -> dict:
_audit_destructive(principal, action)
return principal
return _dep
def _ws_auth_ok(websocket: WebSocket) -> bool:
if public_vision_allowed():
return True
if os.getenv("CEPHEUS_WS_OPEN", "").strip() == "1":
return True
if not auth_service.auth_enabled() and not API_KEY:
if not security_config.is_production():
return True
return False
ticket = websocket.query_params.get("ticket")
if ticket and auth_service.decode_ws_ticket(ticket):
return True
hdr_key = websocket.headers.get("x-api-key")
if hdr_key and _api_key_valid(hdr_key):
return True
api_key = websocket.query_params.get("api_key")
if api_key and _api_key_valid(api_key):
return True
if security_config.is_production():
return False
token = websocket.query_params.get("token")
if token and auth_service.auth_enabled() and auth_service.decode_ws_token(token):
return True
return False
def _ws_principal(websocket: WebSocket) -> dict:
if public_vision_allowed():
return PUBLIC_VISION_PRINCIPAL
ticket = websocket.query_params.get("ticket")
if ticket:
decoded = auth_service.decode_ws_ticket(ticket)
if decoded:
return {"auth": "jwt", "role": decoded.get("role"), "sub": decoded.get("sub")}
if not IS_PRODUCTION:
token = websocket.query_params.get("token")
if token and auth_service.auth_enabled():
ws_decoded = auth_service.decode_ws_token(token)
if ws_decoded:
return {"auth": "jwt", "role": ws_decoded.get("role"), "sub": ws_decoded.get("sub")}
try:
payload = auth_service.decode_access_token(token)
return {"auth": "jwt", "role": payload.get("role"), "sub": payload.get("sub")}
except Exception:
pass
hdr_key = websocket.headers.get("x-api-key")
if hdr_key and _api_key_valid(hdr_key):
return {"auth": "api_key", "role": _api_key_role(hdr_key), "sub": "api_key"}
api_key = websocket.query_params.get("api_key")
if api_key and _api_key_valid(api_key):
return {"auth": "api_key", "role": _api_key_role(api_key), "sub": "api_key"}
return {"auth": "open", "role": "operator", "sub": "operator"}
def _ws_can_mutate(principal: dict) -> bool:
"""Signage/camera WS mutations require operator-level roles (not guest/readonly)."""
if public_vision_allowed():
return True
role = principal.get("role")
if role in ("readonly", "guest"):
return False
if role in ("admin", "operator", "staff", "service"):
return True
return principal.get("auth") != "open"
def _read_limited_bytes(data: bytes, max_bytes: int, kind: str) -> bytes:
if len(data) > max_bytes:
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"{kind} exceeds maximum allowed size",
)
return data
_UPLOADS_PATH = os.path.join(DATA_DIR, "uploads")
def _safe_upload_path(original_name: str) -> tuple[str, str]:
suffix = Path(original_name).suffix.lower()
if suffix not in ALLOWED_VIDEO_EXTENSIONS:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported video format")
safe_name = f"{uuid.uuid4().hex}{suffix}"
upload_dir = _UPLOADS_PATH
os.makedirs(upload_dir, exist_ok=True)
return upload_dir, os.path.join(upload_dir, safe_name)
def _safe_image_upload_path(original_name: str) -> tuple[str, str]:
suffix = Path(original_name).suffix.lower() or ".jpg"
if suffix == ".jpeg":
suffix = ".jpg"
if suffix not in ALLOWED_IMAGE_EXTENSIONS:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image format")
safe_name = f"{uuid.uuid4().hex}{suffix}"
upload_dir = _UPLOADS_PATH
os.makedirs(upload_dir, exist_ok=True)
return upload_dir, os.path.join(upload_dir, safe_name)
from fastapi.responses import FileResponse
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(RequestContextMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=_parse_origins(),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Sensitive file directories — served only via authenticated proxy routes (not public mounts)
_FACE_DB_PATH = os.path.join(_FR_PATH, "face_database")
os.makedirs(_FACE_DB_PATH, exist_ok=True)
os.makedirs(_UPLOADS_PATH, exist_ok=True)
os.makedirs(BLUEPRINT_DIR, exist_ok=True)
os.makedirs(GOSSIP_HISTORY_DIR, exist_ok=True)
vision_engine = VisionEngine()
def _face_engine_usable() -> bool:
fe = getattr(vision_engine, "face_engine", None)
if fe is None or getattr(fe, "app", None) is None:
return False
return True
register_face_ready_probe(_face_engine_usable)
# InsightFace loads at import — mark ready immediately so /health/live never blocks clients.
if _face_engine_usable():
mark_warmload_complete({"insightface": True, "early_ready": True})
_warmload_complete = True
logger.info("Face engine ready at startup (InsightFace loaded, enrolled DB pending warmload).")
# Parallel face inference — single worker on HF keeps CPU predictable.
_FACE_WORKERS = max(2, min(4, int(os.getenv("CEPHEUS_FACE_WORKERS", "0") or 0) or (os.cpu_count() or 2)))
if public_vision_allowed() or os.getenv("CEPHEUS_FORCE_SINGLE_INFER", "").strip() == "1":
_FACE_WORKERS = 2 # Minimum 2 to prevent deadlock between keep-warm and live inference
_FACE_EXECUTOR = ThreadPoolExecutor(max_workers=_FACE_WORKERS, thread_name_prefix="cepheus-face")
_CAMERA_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="cepheus-camera")
_vision_pipeline: VisionFramePipeline | None = None
def _infer_matches_sync(cam_id: str, frame) -> list:
fe = vision_engine.face_engine
_ensure_face_db(fe)
if hasattr(fe, "match_all_faces"):
return fe.match_all_faces(frame)
return []
def _get_vision_pipeline() -> VisionFramePipeline:
global _vision_pipeline
if _vision_pipeline is None:
_vision_pipeline = VisionFramePipeline(_infer_matches_sync)
return _vision_pipeline
async def _run_face_work(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_FACE_EXECUTOR, fn, *args)
def _ensure_face_db(fe) -> None:
"""Reload embeddings only when the on-disk store changed (not every frame)."""
if fe is None:
return
if hasattr(fe, "ensure_db"):
fe.ensure_db()
elif hasattr(fe, "reload_db"):
fe.reload_db()
def _invalidate_face_db(fe) -> None:
if fe is None:
return
if hasattr(fe, "invalidate_db"):
fe.invalidate_db()
elif hasattr(fe, "reload_db"):
fe.reload_db()
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
class Alert(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
type: str
location: str = ""
message: str
severity: str = "medium"
timestamp: str = Field(default_factory=lambda: datetime.datetime.now().isoformat())
recipient: Optional[str] = None
sender: Optional[str] = None
issue_id: Optional[str] = None
issue: Optional[str] = None
attachment_url: Optional[str] = None
lat: Optional[float] = None
lng: Optional[float] = None
class IssueCreate(BaseModel):
title: Optional[str] = Field(default=None, max_length=200)
desc: str = Field(..., max_length=8000)
status: Optional[str] = Field(default="ONGOING", max_length=32)
priority: Optional[str] = Field(default="medium", max_length=32)
staff: Optional[int] = Field(default=1, ge=0, le=100)
room: Optional[str] = Field(default=None, max_length=120)
class IssuePatch(BaseModel):
status: Optional[str] = Field(default=None, max_length=32)
progress: Optional[int] = Field(default=None, ge=0, le=100)
desc: Optional[str] = Field(default=None, max_length=8000)
staff_request_status: Optional[str] = Field(default=None, max_length=32)
class SOSPayload(BaseModel):
guest_id: Optional[str] = None
lat: float
lng: float
location_label: Optional[str] = "Unknown"
message: Optional[str] = "SOS Activated"
class GossipTrackingStart(BaseModel):
staffId: Optional[str] = None
personName: Optional[str] = None
cause: Optional[str] = None
doc_names: Optional[List[str]] = None
doc_urls: Optional[List[str]] = None
class LoginPayload(BaseModel):
username: str
password: str
class RefreshPayload(BaseModel):
refresh_token: str
class LogoutPayload(BaseModel):
refresh_token: Optional[str] = None
class TrackingResetPayload(BaseModel):
broadcast: bool = False
session_id: Optional[str] = None
clear_agent_steps: bool = True
full_reset: bool = False
class ChatPayload(BaseModel):
prompt: str
# ---------------------------------------------------------------------------
# In-memory stores
# ---------------------------------------------------------------------------
# signage state: id → active
signage_state: dict[str, bool] = {
"s1": False, "s2": False, "s3": False,
"s4": False, "s5": False, "s6": True,
"s8": False,
}
agentic_plans: list[dict] = []
targeted_dispatches: list[dict] = []
user_profile: dict = {
"name": os.getenv("USER_PROFILE_NAME", "Pranav Kumar"),
"role": os.getenv("USER_PROFILE_ROLE", "Command Operator"),
"age": int(os.getenv("USER_PROFILE_AGE", "27")),
"department": os.getenv("USER_PROFILE_DEPARTMENT", "Crisis Response"),
"shift": os.getenv("USER_PROFILE_SHIFT", "Night"),
}
def normalize_files_url(path: str | None) -> str | None:
if not path:
return None
if path.startswith("/files/"):
return path
if path.startswith("/uploads/"):
return f"/files{path}"
return path
async def _broadcast_signage_state(sign_id: str, active: bool) -> dict:
async with store_locks.signage_lock:
if sign_id not in signage_state:
signage_state[sign_id] = False
signage_state[sign_id] = active
save_signage_state()
payload = json.dumps({
"type": "signage_update",
"id": sign_id,
"active": active,
"all": signage_state,
})
await manager.broadcast(payload)
return {"id": sign_id, "active": active}
def _sanitize_person_name(name: str) -> str:
import re
cleaned = name.strip().replace(" ", "_")
if not re.fullmatch(r"[A-Za-z0-9_-]{1,64}", cleaned):
raise HTTPException(status_code=400, detail="Name must be 1-64 alphanumeric characters, spaces, hyphens, or underscores")
return cleaned
def _reverse_geocode_label(lat: float, lng: float) -> str:
api_key = os.getenv("GOOGLE_MAPS_API_KEY", "").strip()
if not api_key:
return f"Lat: {lat:.4f}, Lng: {lng:.4f}"
try:
import requests
resp = requests.get(
"https://maps.googleapis.com/maps/api/geocode/json",
params={"latlng": f"{lat},{lng}", "key": api_key},
timeout=5,
)
data = resp.json()
results = data.get("results") or []
if results:
return str(results[0].get("formatted_address", ""))[:120] or f"Lat: {lat:.4f}, Lng: {lng:.4f}"
except Exception:
pass
return f"Lat: {lat:.4f}, Lng: {lng:.4f}"
def _build_contact_network(hours: float | None = None) -> dict:
"""Aggregate gossip co-presence into contact-network nodes/edges."""
raw = gossip_bridge.get_gossip_json()
detection_by_name = {d.get("name"): d for d in detections_history if d.get("name")}
cutoff = None
if hours is not None:
cutoff = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=hours)
meet_counts: dict[str, int] = {}
all_people: set[str] = set()
for link in raw.get("links") or []:
if link.get("source"):
all_people.add(link["source"])
if link.get("target"):
all_people.add(link["target"])
# Include people currently on camera even if no edges yet
import re as _re
fr = getattr(vision_engine, "face_results", None) or {}
for faces in fr.values():
for f in faces or []:
name = f.get("name") if isinstance(f, dict) else None
if not name:
continue
lowered = name.lower()
# Allow unknown_N (system-assigned persistent IDs), reject bare "Unknown"
if lowered in ("unknown", "unidentified", ""):
continue
all_people.add(name)
nodes = []
for person in sorted(p for p in all_people if p):
det = detection_by_name.get(person) or {}
meet_counts[person] = sum(
1 for lnk in (raw.get("links") or [])
if person in (lnk.get("source"), lnk.get("target"))
)
nodes.append({
"id": person,
"name": person,
"role": "Staff",
"photoUrl": normalize_files_url(f"/files/face/{person}/{person}.jpg"),
"detectionCount": meet_counts.get(person, 0) + (1 if det else 0),
"lastSeen": det.get("seen_at"),
"lastLocation": det.get("camId") or "",
})
edges = []
for link in raw.get("links") or []:
pa, pb = link.get("source"), link.get("target")
interactions = link.get("interactions") or []
filtered = interactions
if cutoff:
filtered = [
i for i in interactions
if i.get("timestamp") and datetime.datetime.fromisoformat(
str(i["timestamp"]).replace("Z", "+00:00")
) >= cutoff
]
if cutoff and not filtered:
continue
use = filtered or interactions
last_ts = max((i.get("timestamp") or "") for i in use) if use else ""
locations = sorted({i.get("camera") or i.get("cam_id") or "Unknown" for i in use})
edges.append({
"source": pa,
"target": pb,
"meetCount": len(use) or link.get("strength", 1),
"lastMet": last_ts,
"locations": locations,
})
return {"nodes": nodes, "edges": edges, "updatedAt": datetime.datetime.now(datetime.timezone.utc).isoformat()}
async def _broadcast_signage_alert(sign_type: str, lat: float, lng: float, action: str = "deployed"):
location = await asyncio.get_event_loop().run_in_executor(None, _reverse_geocode_label, lat, lng)
if action == "removed":
message = f"SIGNAGE REMOVED: {sign_type.replace('_', ' ')} at {location} has been cleared."
else:
template = SIGNAGE_BROADCAST_TEMPLATES.get(sign_type, "NEW SIGNAGE DEPLOYED: {type} at {location}.")
message = template.format(location=location, type=sign_type.replace("_", " "))
alert = Alert(
id=f"signage-{uuid.uuid4().hex[:8]}",
type="broadcast",
location=location,
message=message,
severity="high",
timestamp=datetime.datetime.now().isoformat(),
)
await _process_new_alert(alert, source="SIGNAGE")
entry = {
"id": f"bc-{uuid.uuid4().hex[:8]}",
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"message": message,
"recipients": ["All Staff & Units"],
"relatedIssue": None,
"sentBy": "signage-system",
"deliveryCount": 1,
}
alerts_broadcast_log.insert(0, entry)
del alerts_broadcast_log[100:]
save_json(ALERTS_LOG_FILE, alerts_broadcast_log)
return message
async def _append_incident_log(level: str, source: str, event_type: str, message: str, metadata: dict | None = None):
log_entry = {
"type": "incident_log_entry",
"ts": datetime.datetime.now().strftime("%H:%M:%S"),
"level": level,
"source": source,
"event_type": event_type,
"msg": message,
"metadata": metadata or {},
}
incident_logs.insert(0, log_entry)
del incident_logs[300:]
save_logs()
await manager.broadcast(json.dumps(log_entry))
def _count_live_faces() -> int:
fr = getattr(vision_engine, "face_results", None) or {}
total = 0
for faces in fr.values():
total += len([
f for f in (faces or [])
if str(f.get("name", "")).lower() not in ("unknown", "unidentified", "none", "")
])
return total
def _strip_face_embeddings(face_results: dict | None) -> dict:
"""Return face results suitable for API/WS clients without raw embeddings."""
sanitized: dict = {}
for cam_id, faces in (face_results or {}).items():
sanitized[cam_id] = [
{k: v for k, v in (face or {}).items() if k != "embedding"}
for face in (faces or [])
]
return sanitized
def _normalize_dispatch_event(
*,
dispatch_id: str | None = None,
alert_id: str | None = None,
issue_id: str | None = None,
recipient: str | None = None,
recipients: list | None = None,
message: str | None = None,
location: str | None = None,
severity: str | None = None,
alert_type: str | None = None,
status: str = "sent",
timestamp: str | None = None,
simulation: bool = False,
routing_demo: bool = False,
) -> dict:
"""Single schema for targeted_alert_dispatch WS events and /dispatches storage."""
ts = timestamp or datetime.datetime.now().isoformat()
rec_list = list(recipients) if recipients else []
rec = recipient or (", ".join(str(r) for r in rec_list[:5]) if rec_list else "Staff")
if not rec_list and rec:
rec_list = [rec]
msg = message or (f"Dispatch to {rec}" + (f" @ {location}" if location else ""))
return {
"type": "targeted_alert_dispatch",
"dispatch_id": dispatch_id or str(uuid.uuid4()),
"timestamp": ts,
"created_at": ts,
"alert_id": alert_id,
"issue_id": issue_id,
"recipient": rec,
"recipients": rec_list,
"message": msg,
"location": location or "",
"severity": severity or "medium",
"alert_type": alert_type,
"status": status,
"simulation": bool(simulation),
"routing_demo": bool(routing_demo),
}
def _build_incident_payload(alert: Alert, extra_context: dict | None = None) -> dict:
room_counts = {room["label"]: room.get("occupancy", 0) for room in vision_engine.room_stats.values()} if hasattr(vision_engine, "room_stats") else {}
# Include recent alerts for context
recent_alerts = [a if isinstance(a, dict) else a.model_dump() for a in alerts_db[-5:]] if alerts_db else []
crowd = getattr(vision_engine, "last_crowd_count", None)
if crowd is None:
crowd = getattr(vision_engine, "last_total_count", 0)
context = {
"alert_id": alert.id,
"type": alert.type,
"severity": alert.severity,
"location": alert.location,
"message": alert.message,
"lat": alert.lat,
"lng": alert.lng,
"crowd_count": crowd,
"live_face_count": _count_live_faces(),
"room_counts": room_counts,
"recent_alerts": recent_alerts,
"timestamp": alert.timestamp,
}
if extra_context:
context.update(extra_context)
return context
async def _process_new_alert(alert: Alert, source: str = "SYSTEM", extra_context: dict | None = None, *, sos_mode: bool = False):
async with store_locks.alerts_lock:
alerts_db.append(alert)
_trim_memory_list(alerts_db, 200)
save_alerts()
logger.info(f"Alert: {alert.type} @ {alert.location}")
if not sos_mode:
await manager.broadcast(json.dumps({"type": "alert_record", "alert": alert.model_dump()}))
log_entry = {
"type": "log_entry",
"ts": datetime.datetime.now().strftime("%H:%M:%S"),
"level": "CRIT" if alert.severity == "critical" else "WARN",
"source": source,
"event_type": alert.type,
"msg": alert.message,
"metadata": {"severity": alert.severity},
}
await manager.broadcast(json.dumps(log_entry))
if alert.type == "chat" or sos_mode:
return
if alert.type in ("direct_message", "staff_request", "broadcast"):
recipient = alert.recipient or (extra_context or {}).get("recipient") or alert.location or "Staff"
issue_id = await _ensure_staff_request_issue_id(alert, extra_context)
if alert.type == "staff_request" and not issue_id:
logger.error("Staff request dispatch blocked: could not resolve issue_id")
return
dispatch_event = _normalize_dispatch_event(
issue_id=issue_id,
recipient=recipient,
message=alert.message,
location=alert.location,
severity=alert.severity,
alert_type=alert.type,
alert_id=alert.id,
)
targeted_dispatches.insert(0, dispatch_event)
_trim_memory_list(targeted_dispatches, 200)
save_targeted_dispatches()
if alert.type == "broadcast":
bc_entry = {
"id": alert.id,
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"message": alert.message,
"recipients": [recipient] if recipient else ["All Staff & Units"],
"relatedIssue": issue_id,
"sentBy": source or "command",
"deliveryCount": 1,
}
alerts_broadcast_log.insert(0, bc_entry)
del alerts_broadcast_log[100:]
save_json(ALERTS_LOG_FILE, alerts_broadcast_log)
await manager.broadcast(json.dumps(dispatch_event))
if alert.type == "staff_request":
staff_req = {
"id": f"SR-{uuid.uuid4().hex[:6].upper()}",
"issue_id": issue_id,
"location": alert.location,
"message": alert.message,
"status": "pending",
"created_at": datetime.datetime.now().isoformat(),
"assignments": [],
"progress": 0,
"severity": alert.severity or "medium",
"alert_id": alert.id,
**_issue_context_for_staff(issue_id),
}
staff_requests_db.insert(0, staff_req)
save_staff_requests()
await manager.broadcast(json.dumps({"type": "staff_request_created", "request": staff_req}))
if issue_id:
for iss in issues_db:
if iss.get("id") == issue_id:
iss["staff_request_status"] = "pending"
iss.setdefault("staff_needed", iss.get("staff", 1))
save_issues()
await manager.broadcast(json.dumps({"type": "issue_update", "issue": iss}))
break
if alert.type in ("staff_request", "broadcast"):
incident_payload = _build_incident_payload(alert, extra_context)
plan = generate_agentic_plan(incident_payload)
plan_event = {
"type": "agentic_plan_update",
"plan_id": str(uuid.uuid4()),
"created_at": datetime.datetime.now().isoformat(),
"incident": incident_payload,
"plan": plan,
}
agentic_plans.insert(0, plan_event)
_trim_memory_list(agentic_plans, 100)
save_agentic_plans()
await manager.broadcast(json.dumps(plan_event))
return
incident_payload = _build_incident_payload(alert, extra_context)
plan = generate_agentic_plan(incident_payload)
plan_event = {
"type": "agentic_plan_update",
"plan_id": str(uuid.uuid4()),
"created_at": datetime.datetime.now().isoformat(),
"incident": incident_payload,
"plan": plan,
}
agentic_plans.insert(0, plan_event)
_trim_memory_list(agentic_plans, 100)
save_agentic_plans()
await manager.broadcast(json.dumps(plan_event))
recipients = route_alert(incident_payload)
dispatch_event = _normalize_dispatch_event(
alert_id=alert.id,
issue_id=alert.issue_id or alert.issue,
location=alert.location,
severity=alert.severity,
recipients=recipients,
message=alert.message,
alert_type=alert.type,
routing_demo=True,
)
targeted_dispatches.insert(0, dispatch_event)
_trim_memory_list(targeted_dispatches, 200)
save_targeted_dispatches()
await manager.broadcast(json.dumps(dispatch_event))
await _append_incident_log(
"CRIT" if alert.severity == "critical" else "WARN",
source,
alert.type,
f"Agentic plan generated and dispatched to {len(recipients)} recipients.",
{"alert_id": alert.id, "plan_id": plan_event["plan_id"], "dispatch_id": dispatch_event["dispatch_id"]},
)
# ---------------------------------------------------------------------------
# WebSocket Connection Manager
# ---------------------------------------------------------------------------
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
self.agent_connections: List[WebSocket] = []
self._locks: dict[WebSocket, asyncio.Lock] = {}
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
self._locks[websocket] = asyncio.Lock()
logger.info(f"Client connected. Active: {len(self.active_connections)}")
async def connect_agent(self, websocket: WebSocket):
await websocket.accept()
self.agent_connections.append(websocket)
self._locks[websocket] = asyncio.Lock()
logger.info(f"Agent Client connected. Active: {len(self.agent_connections)}")
def disconnect(self, websocket: WebSocket):
if websocket in self.active_connections:
self.active_connections.remove(websocket)
if websocket in self.agent_connections:
self.agent_connections.remove(websocket)
if websocket in self._locks:
del self._locks[websocket]
logger.info(f"Client disconnected. Active: {len(self.active_connections)}, Agents: {len(self.agent_connections)}")
async def send_personal_message(self, message: str, websocket: WebSocket):
try:
if websocket in self._locks:
async with self._locks[websocket]:
await asyncio.wait_for(websocket.send_text(message), timeout=2.0)
else:
await asyncio.wait_for(websocket.send_text(message), timeout=2.0)
except Exception:
pass
async def broadcast(self, message: str):
dead = []
for ws in list(self.active_connections):
try:
if ws in self._locks:
async with self._locks[ws]:
await asyncio.wait_for(ws.send_text(message), timeout=2.0)
else:
await asyncio.wait_for(ws.send_text(message), timeout=2.0)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws)
async def broadcast_to_agents(self, message: str):
dead = []
for ws in list(self.agent_connections):
try:
if ws in self._locks:
async with self._locks[ws]:
await asyncio.wait_for(ws.send_text(message), timeout=2.0)
else:
await asyncio.wait_for(ws.send_text(message), timeout=2.0)
except Exception:
dead.append(ws)
for ws in dead:
self.disconnect(ws)
manager = ConnectionManager()
# ---------------------------------------------------------------------------
# REST Endpoints
# ---------------------------------------------------------------------------
@app.get("/")
async def root():
"""Base URL: browsers often open this first; the API lives on named paths, not here."""
return {
"service": "Cepheus API",
"status": "running",
"liveness": "/health",
"websocket": "/ws",
}
@app.get("/health")
async def health():
if IS_PRODUCTION:
return {"status": "ok"}
return {
"status": "ok",
"active_ws": len(manager.active_connections),
"cameras": list(vision_engine.camera_indices.keys()),
"auth_enabled": auth_service.auth_enabled(),
"refresh_store": auth_service.refresh_store_backend(),
}
@app.get("/health/live")
async def health_live():
"""Lightweight liveness probe — no auth, no ML warmload side effects."""
try:
return live_status_payload()
except Exception as exc:
logger.warning("health/live failed: %s", exc)
return {"status": "warming", "message": "recovering"}
@app.get("/debug/face_status")
async def debug_face_status():
"""Diagnose face engine state on deployed instances (no auth)."""
fe = getattr(vision_engine, "face_engine", None)
db = getattr(fe, "db", None) or {}
cache = getattr(fe, "_unknown_cache", None) or {}
try:
return {
"warmload_complete": bool(_warmload_complete or get_warmload_state().get("complete")),
"model_pack": os.environ.get("FACE_MODEL_PACK", "buffalo_sc"),
"model_loaded": fe is not None and getattr(fe, "app", None) is not None,
"enrolled_faces": len([k for k in db if not str(k).startswith("unknown_")]),
"unknown_cache_size": len(cache),
"threshold": float(os.environ.get("FACE_MATCH_THRESHOLD", "0.22")),
}
except Exception as exc:
return {
"error": str(exc),
"warmload_complete": bool(_warmload_complete or get_warmload_state().get("complete")),
}
@app.get("/health/ready")
async def health_ready():
if os.getenv("CEPHEUS_PRODUCTION", "").strip() == "1":
if not os.getenv("CEPHEUS_JWT_SECRET", "").strip():
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="JWT secret not configured")
if os.getenv("CEPHEUS_AUTH_DEV_MODE", "").strip() == "1":
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Dev auth disabled in production")
return {"status": "ready", "auth_enabled": auth_service.auth_enabled()}
@app.get("/auth/status")
async def auth_status():
try:
mode = "demo"
if auth_service.auth_enabled():
mode = "production" if os.getenv("CEPHEUS_JWT_SECRET", "").strip() else "dev"
return {"auth_enabled": auth_service.auth_enabled(), "mode": mode}
except Exception as exc:
logger.warning("auth/status failed: %s", exc)
return {"auth_enabled": False, "mode": "demo", "error": "status_unavailable"}
@app.post("/auth/login")
async def auth_login(payload: LoginPayload, request: Request):
login_limiter.check(request, "login")
if not auth_service.auth_enabled():
if not _demo_login_allowed(request):
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Authentication not configured",
)
role = "admin" if (payload.username or "").lower() in ("admin", "command") else "staff"
return {
"mode": "demo",
"token_type": "demo",
"user": {"username": payload.username or "operator", "role": role},
}
try:
user = auth_service.verify_user(payload.username, payload.password)
except RuntimeError as exc:
logger.error("Auth misconfiguration: %s", exc)
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Authentication misconfigured",
) from exc
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials")
# HF Spaces: stateless replicas break refresh tokens — demo session for dashboard UI.
if public_vision_allowed():
role = user.get("role") or "admin"
return {
"mode": "demo",
"token_type": "demo",
"user": {"username": user.get("username") or payload.username, "role": role},
}
return auth_service.create_token_pair(user["username"], user["role"])
@app.post("/auth/refresh")
async def auth_refresh(payload: RefreshPayload, request: Request):
refresh_limiter.check(request, "refresh")
if public_vision_allowed():
return {
"mode": "demo",
"token_type": "demo",
"access_token": "",
"refresh_token": "",
"expires_in": 86400,
}
if not auth_service.auth_enabled():
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Auth not enabled")
try:
return auth_service.refresh_access_token(payload.refresh_token)
except Exception:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token")
@app.post("/auth/logout")
async def auth_logout(payload: LogoutPayload):
if payload.refresh_token:
auth_service.revoke_refresh_token(payload.refresh_token)
return {"status": "ok"}
@app.post("/auth/ws-ticket")
async def auth_ws_ticket(principal: dict = Depends(require_operator_if_auth_enabled)):
"""Issue a short-lived WebSocket ticket (preferred over api_key query param)."""
if principal.get("auth") == "open":
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Auth not enabled")
username = principal.get("sub") or "operator"
role = principal.get("role") or "staff"
ticket = auth_service.create_ws_ticket(username, role)
return {"ticket": ticket, "expires_in": 60}
@app.get("/files/face/{person}/{filename}")
async def serve_face_image(person: str, filename: str, _: dict = Depends(require_operator_flexible)):
safe_person = Path(person).name
safe_name = Path(filename).name
path = os.path.join(_FACE_DB_PATH, safe_person, safe_name)
if not os.path.isfile(path):
raise HTTPException(status_code=404, detail="Image not found")
return FileResponse(path)
@app.post("/files/upload")
async def upload_image_file(file: UploadFile = File(...), _: dict = Depends(require_operator)):
"""Upload an image; returns authenticated file URL path."""
if file.content_type not in ALLOWED_IMAGE_TYPES:
raise HTTPException(status_code=400, detail="Unsupported image type")
contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image")
_, file_path = _safe_image_upload_path(file.filename or "upload.jpg")
with open(file_path, "wb") as f:
f.write(contents)
rel = normalize_files_url(f"/uploads/{os.path.basename(file_path)}")
return {"url": rel, "path": rel}
@app.get("/files/uploads/{file_path:path}")
async def serve_upload_file(file_path: str, _: dict = Depends(require_operator_if_auth_enabled)):
base = os.path.abspath(_UPLOADS_PATH)
target = os.path.abspath(os.path.join(_UPLOADS_PATH, file_path))
if not target.startswith(base + os.sep) and target != base:
raise HTTPException(status_code=400, detail="Invalid path")
if not os.path.isfile(target):
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(target)
@app.get("/auth/me")
async def auth_me(principal: dict = Depends(require_operator_if_auth_enabled)):
if principal.get("auth") == "public":
return {"authenticated": True, "username": "public-vision", "role": principal.get("role", "admin")}
if principal.get("auth") == "jwt":
return {
"authenticated": True,
"username": principal.get("sub"),
"role": principal.get("role"),
}
return {"authenticated": True, "role": principal.get("role", "service"), "auth": "api_key"}
@app.get("/alerts", response_model=List[Alert])
async def get_alerts(_: dict = Depends(require_operator_if_auth_enabled)):
return alerts_db
@app.post("/alert", response_model=Alert)
async def create_alert(alert: Alert, _: dict = Depends(require_operator)):
if any(
(a.get("id") if isinstance(a, dict) else getattr(a, "id", None)) == alert.id
for a in alerts_db
):
return alert
await _process_new_alert(alert, source=alert.location or "ALERT_API")
return alert
async def _process_sos_event(payload: SOSPayload, *, source: str = "GUEST_APP") -> dict:
event = {
"id": str(uuid.uuid4()),
"guest_id": payload.guest_id or "unknown-guest",
"lat": payload.lat,
"lng": payload.lng,
"location_label": payload.location_label,
"message": payload.message,
"timestamp": datetime.datetime.now().isoformat(),
}
async with store_locks.sos_lock:
sos_events.append(event)
_trim_memory_list(sos_events, 100)
save_sos()
alert = Alert(
id=event["id"],
type="sos",
location=payload.location_label or f"{payload.lat:.4f},{payload.lng:.4f}",
message=payload.message,
severity="critical",
lat=payload.lat,
lng=payload.lng,
)
await _process_new_alert(alert, source=source, extra_context={"guest_id": event["guest_id"]}, sos_mode=True)
await manager.broadcast(json.dumps({"type": "sos_event", **event}))
await _append_incident_log(
"CRIT",
source,
"sos",
f"SOS: {payload.message}",
{"event_id": event["id"], "lat": payload.lat, "lng": payload.lng},
)
logger.info(f"SOS received: {event}")
return {"status": "received", "event": event}
@app.post("/sos")
async def handle_sos(payload: SOSPayload, request: Request, _: dict = Depends(require_operator)):
sos_limiter.check(request, "sos")
return await _process_sos_event(payload, source="GUEST_APP")
@app.post("/sos/guest")
async def handle_guest_sos(
payload: SOSPayload,
request: Request,
x_api_key: str | None = Header(default=None, alias="X-API-Key"),
):
"""Guest-scoped SOS — uses CEPHEUS_GUEST_API_KEY (or service key in local dev)."""
if not _guest_api_key_valid(x_api_key):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid guest API key")
sos_limiter.check(request, "sos_guest")
return await _process_sos_event(payload, source="GUEST_APP")
@app.get("/sos")
async def get_sos_events(_: dict = Depends(require_dashboard_read)):
return sos_events[-50:]
def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float:
radius = 6371.0
d_lat = math.radians(lat2 - lat1)
d_lng = math.radians(lng2 - lng1)
a = (
math.sin(d_lat / 2) ** 2
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lng / 2) ** 2
)
return radius * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
_EMERGENCY_AMENITIES = {
"hospital": {"label": "Hospitals", "icon": "H"},
"fire_station": {"label": "Fire Stn", "icon": "F"},
"police": {"label": "Police", "icon": "P"},
"ambulance": {"label": "Ambulance", "icon": "A"},
"emergency_supplies": {"label": "Supplies", "icon": "S"},
}
_OSM_AMENITY_TO_SERVICE = {
"hospital": "hospital",
"fire_station": "fire_station",
"police": "police",
"ambulance_station": "ambulance",
"clinic": "ambulance",
"pharmacy": "emergency_supplies",
"medical_supply": "emergency_supplies",
}
_NEARBY_CACHE_MAX = 64
_nearby_cache: dict[str, tuple[float, dict]] = {}
async def _fetch_emergency_nearby_overpass(lat: float, lng: float, radius_m: int = 6000) -> dict:
"""OpenStreetMap Overpass lookup — shared by /emergency/nearby and Maps fallback."""
results: dict[str, list] = {k: [] for k in _EMERGENCY_AMENITIES}
cache_key = f"{round(lat, 3)},{round(lng, 3)},{radius_m}"
cached = _nearby_cache.get(cache_key)
now = datetime.datetime.now().timestamp()
if cached and now - cached[0] < 600:
return cached[1]
if httpx is None:
return {"services": results, "source": "unavailable", "error": "httpx not installed"}
query = f"""
[out:json][timeout:15];
(
node["amenity"="hospital"](around:{radius_m},{lat},{lng});
node["amenity"="fire_station"](around:{radius_m},{lat},{lng});
node["amenity"="police"](around:{radius_m},{lat},{lng});
node["emergency"="ambulance_station"](around:{radius_m},{lat},{lng});
node["amenity"="clinic"]["emergency"="yes"](around:{radius_m},{lat},{lng});
node["amenity"="pharmacy"](around:{radius_m},{lat},{lng});
node["shop"="medical_supply"](around:{radius_m},{lat},{lng});
);
out body 120;
"""
mirrors = [
"https://overpass-api.de/api/interpreter",
"https://overpass.kumi.systems/api/interpreter",
"https://maps.mail.ru/osm/tools/overpass/api/interpreter",
]
payload = None
last_error = None
import asyncio
async with httpx.AsyncClient(timeout=30.0) as client:
for url in mirrors:
for attempt in range(2):
try:
resp = await client.post(
url,
data={"data": query},
headers={"User-Agent": "CepheusEmergencyConsole/1.0"},
)
resp.raise_for_status()
payload = resp.json()
break
except Exception as exc:
last_error = exc
if attempt == 0:
await asyncio.sleep(1.5)
if payload is not None:
break
logger.warning("emergency_nearby: Overpass mirror %s failed: %s", url, last_error)
if payload is None:
logger.warning("All Overpass mirrors failed, falling back to synthetic mock data.")
import random
for key, meta in _EMERGENCY_AMENITIES.items():
for i in range(random.randint(1, 3)):
dlat = (random.random() - 0.5) * 0.05
dlng = (random.random() - 0.5) * 0.05
elat, elng = lat + dlat, lng + dlng
results[key].append({
"id": f"mock-{key}-{i}",
"name": f"Local {meta['label']} {i+1}",
"vicinity": f"{random.randint(100, 999)} Emergency Road",
"phone": f"+1-555-01{random.randint(10,99)}",
"lat": elat,
"lng": elng,
"icon": meta["icon"],
"label": meta["label"],
"type": key,
"distKm": round(_haversine_km(lat, lng, elat, elng), 2),
})
for key in results:
results[key].sort(key=lambda x: x["distKm"])
results[key] = results[key][:5]
return {"services": results, "source": "mock_fallback", "error": str(last_error)}
for el in payload.get("elements", []):
tags = el.get("tags", {})
amenity = tags.get("amenity")
emergency_tag = tags.get("emergency")
shop = tags.get("shop")
service_key = _OSM_AMENITY_TO_SERVICE.get(amenity)
if not service_key and emergency_tag == "ambulance_station":
service_key = "ambulance"
if not service_key and shop == "medical_supply":
service_key = "emergency_supplies"
meta = _EMERGENCY_AMENITIES.get(service_key) if service_key else None
if not meta:
continue
elat, elng = el.get("lat"), el.get("lon")
if elat is None or elng is None:
continue
results[service_key].append({
"id": str(el.get("id")),
"name": tags.get("name") or meta["label"],
"vicinity": tags.get("addr:full")
or ", ".join(filter(None, [tags.get("addr:street"), tags.get("addr:city")]))
or tags.get("operator", ""),
"phone": tags.get("phone") or tags.get("contact:phone", ""),
"lat": elat,
"lng": elng,
"icon": meta["icon"],
"label": meta["label"],
"type": service_key,
"distKm": round(_haversine_km(lat, lng, elat, elng), 2),
})
for key in results:
results[key].sort(key=lambda x: x["distKm"])
results[key] = results[key][:5]
response = {"services": results, "source": "overpass"}
if len(_nearby_cache) >= _NEARBY_CACHE_MAX:
oldest_key = min(_nearby_cache, key=lambda k: _nearby_cache[k][0])
_nearby_cache.pop(oldest_key, None)
_nearby_cache[cache_key] = (now, response)
return response
@app.get("/emergency/nearby")
async def emergency_nearby(
lat: float,
lng: float,
radius_m: int = 6000,
_: dict = Depends(require_operator_if_auth_enabled),
):
"""Nearest hospitals / fire stations / police via OpenStreetMap Overpass."""
try:
return await _fetch_emergency_nearby_overpass(lat, lng, radius_m)
except Exception as exc:
logger.warning("emergency/nearby failed: %s", exc)
empty = {k: [] for k in _EMERGENCY_AMENITIES}
return {"services": empty, "source": "overpass", "error": str(exc)}
@app.get("/emergency/dispatch-log")
async def get_emergency_dispatch_log(_: dict = Depends(require_operator_if_auth_enabled)):
try:
return emergency_dispatch_log[-100:]
except Exception as exc:
logger.warning("emergency/dispatch-log failed: %s", exc)
return []
# ── Google Maps emergency intelligence ────────────────────────────────────────
from emergency_maps_service import ( # noqa: E402
get_directions_with_traffic,
get_hexagonal_coverage_data,
maps_configured,
maps_status_detail,
)
@app.get("/maps/status")
async def maps_status(_: dict = Depends(require_operator_if_auth_enabled)):
return maps_status_detail()
@app.get("/maps/directions")
async def get_route(
origin_lat: float,
origin_lng: float,
dest_lat: float,
dest_lng: float,
place_name: str = "",
_: dict = Depends(require_operator),
):
return get_directions_with_traffic(origin_lat, origin_lng, dest_lat, dest_lng, place_name)
@app.get("/maps/hex-coverage")
async def hex_coverage(
lat: float,
lng: float,
radius_km: float = 5.0,
_: dict = Depends(require_operator),
):
return get_hexagonal_coverage_data(lat, lng, radius_km=radius_km)
@app.post("/maps/dispatch-route")
async def dispatch_route(body: dict, principal: dict = Depends(require_operator)):
origin = body.get("origin", {})
dest = body.get("destination", {})
result = get_directions_with_traffic(
origin.get("lat"),
origin.get("lng"),
dest.get("lat"),
dest.get("lng"),
dest.get("name", ""),
)
_audit_destructive(
principal,
"DISPATCH_ROUTE",
f"destination={dest.get('name')} eta={result.get('duration_traffic')}",
)
return result
@app.get("/incident/logs")
async def get_incident_logs(_: dict = Depends(require_operator_if_auth_enabled)):
return incident_logs
@app.get("/agentic/plans")
async def get_agentic_plans(_: dict = Depends(require_operator_if_auth_enabled)):
return agentic_plans
@app.get("/issues")
async def get_issues(_: dict = Depends(require_operator_if_auth_enabled)):
return issues_db
@app.get("/issues/{issue_id}")
async def get_issue_by_id(issue_id: str, _: dict = Depends(require_staff_read)):
"""Read-only issue snapshot for staff portal (linked request detail)."""
for iss in issues_db:
if iss.get("id") == issue_id:
return iss
raise HTTPException(status_code=404, detail="Issue not found")
@app.get("/staff/requests")
async def list_staff_requests(status: Optional[str] = None, _: dict = Depends(require_dashboard_read)):
rows = staff_requests_db
if status:
rows = [r for r in staff_requests_db if r.get("status") == status]
return [_enriched_staff_request(r) for r in rows]
@app.post("/staff/requests/{request_id}/accept")
async def accept_staff_request(request_id: str, payload: dict, principal: dict = Depends(require_staff)):
if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"):
raise HTTPException(status_code=403, detail="Only staff or admin JWT accounts may accept requests")
staff_name = (payload.get("staff_name") or principal.get("sub") or "").strip() or "Staff"
staff_id = (payload.get("staff_id") or staff_name).strip() or staff_name
async with store_locks.staff_requests_lock:
req = next((r for r in staff_requests_db if r.get("id") == request_id), None)
if not req:
raise HTTPException(status_code=404, detail="Staff request not found")
if req.get("status") not in ("pending", "sent"):
raise HTTPException(status_code=409, detail=f"Request already {req.get('status')}")
req["status"] = "accepted"
req["accepted_at"] = datetime.datetime.now().isoformat()
req["assignments"] = [{"staff_id": staff_id, "name": staff_name, "progress": 25, "stage": "accepted"}]
req["progress"] = 25
save_staff_requests()
issue_id = req.get("issue_id")
if issue_id:
await _broadcast_issue_update(
issue_id,
f"[UPDATE] {staff_name} accepted the staff request.",
progress=25,
staff_request_status="accepted",
staff_assignments=req["assignments"],
)
if DEMO_MODE:
_spawn(simulate_issue_progress(issue_id, staff_name))
await _append_staff_activity_entry(
staff_name,
req.get("location") or "",
f"Accepted staff request {request_id}",
event_type="staff_accept",
metadata={"request_id": request_id, "issue_id": issue_id},
)
await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req}))
return req
@app.post("/staff/requests/{request_id}/decline")
async def decline_staff_request(request_id: str, payload: dict, principal: dict = Depends(require_staff)):
if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"):
raise HTTPException(status_code=403, detail="Only staff or admin JWT accounts may decline requests")
async with store_locks.staff_requests_lock:
req = next((r for r in staff_requests_db if r.get("id") == request_id), None)
if not req:
raise HTTPException(status_code=404, detail="Staff request not found")
if req.get("status") not in ("pending", "sent"):
raise HTTPException(status_code=409, detail=f"Request already {req.get('status')}")
req["status"] = "declined"
req["declined_at"] = datetime.datetime.now().isoformat()
req["decline_reason"] = payload.get("reason", "")
save_staff_requests()
issue_id = req.get("issue_id")
if issue_id:
await _broadcast_issue_update(
issue_id,
f"[UPDATE] Staff declined request{(': ' + req['decline_reason']) if req.get('decline_reason') else ''}.",
staff_request_status="declined",
)
await _append_staff_activity_entry(
(principal.get("sub") or "Staff").strip() or "Staff",
req.get("location") or "",
f"Declined request {request_id}: {req.get('decline_reason') or 'no reason'}",
event_type="staff_decline",
metadata={"request_id": request_id, "issue_id": issue_id},
)
await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req}))
return req
async def _sync_staff_request_progress(
issue_id: str,
progress: int,
stage: str,
*,
status: str | None = None,
):
"""Keep staff_requests_db in sync when issue progress changes."""
async with store_locks.staff_requests_lock:
req = next(
(
r
for r in staff_requests_db
if r.get("issue_id") == issue_id and r.get("status") in ("pending", "sent", "accepted")
),
None,
)
if not req:
return None
req["progress"] = progress
if status:
req["status"] = status
if status == "resolved":
req["resolved_at"] = datetime.datetime.now().isoformat()
for assignment in req.get("assignments") or []:
assignment["progress"] = progress
assignment["stage"] = stage
save_staff_requests()
snapshot = dict(req)
await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": snapshot}))
return snapshot
@app.post("/staff/requests/{request_id}/complete")
async def complete_staff_request(
request_id: str,
payload: dict | None = None,
principal: dict = Depends(require_staff),
):
payload = payload or {}
if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"):
raise HTTPException(status_code=403, detail="Only staff or admin JWT accounts may complete requests")
staff_name = (principal.get("sub") or "").strip() or "Staff"
notes = (payload.get("notes") or "").strip()
async with store_locks.staff_requests_lock:
req = next((r for r in staff_requests_db if r.get("id") == request_id), None)
if not req:
raise HTTPException(status_code=404, detail="Staff request not found")
if req.get("status") != "accepted":
raise HTTPException(
status_code=409,
detail=f"Request must be accepted before completion (current: {req.get('status')})",
)
req["status"] = "resolved"
req["progress"] = 100
req["resolved_at"] = datetime.datetime.now().isoformat()
if notes:
req["completion_notes"] = notes
assignments = req.get("assignments") or []
if assignments:
for assignment in assignments:
assignment["progress"] = 100
assignment["stage"] = "resolved"
else:
req["assignments"] = [
{"staff_id": staff_name, "name": staff_name, "progress": 100, "stage": "resolved"}
]
save_staff_requests()
issue_id = req.get("issue_id")
location = req.get("location") or "General"
request_message = req.get("message") or ""
if issue_id:
completion_msg = f"[RESOLVED] {staff_name} marked staff request complete."
if notes:
completion_msg += f" Notes: {notes}"
await _broadcast_issue_update(
issue_id,
completion_msg,
progress=100,
status="RESOLVED",
staff_request_status="resolved",
staff_assignments=req["assignments"],
)
log_msg = f"{staff_name} completed staff request {request_id}"
if issue_id:
log_msg += f" for issue {issue_id}"
if location:
log_msg += f" ({location})"
if notes:
log_msg += f" — {notes}"
await _append_incident_log(
"INFO",
"staff_portal",
"staff_request_completed",
log_msg,
metadata={
"request_id": request_id,
"issue_id": issue_id,
"staff_name": staff_name,
"location": location,
"message": request_message,
"notes": notes or None,
},
)
await _append_staff_activity_entry(
staff_name,
location,
log_msg,
event_type="staff_complete",
metadata={"request_id": request_id, "issue_id": issue_id, "notes": notes or None},
)
await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req}))
return req
@app.post("/staff/requests/{request_id}/progress")
async def update_staff_request_progress(
request_id: str,
payload: dict,
principal: dict = Depends(require_staff),
):
"""Report en route / on scene milestones back to command."""
if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"):
raise HTTPException(status_code=403, detail="Only staff JWT accounts may update progress")
stage = (payload.get("stage") or "").strip().lower().replace(" ", "_")
if stage not in ("en_route", "on_scene"):
raise HTTPException(status_code=400, detail="stage must be en_route or on_scene")
progress = _STAFF_MILESTONE_PROGRESS[stage]
staff_name = (principal.get("sub") or "").strip() or "Staff"
note = (payload.get("note") or "").strip()
async with store_locks.staff_requests_lock:
req = next((r for r in staff_requests_db if r.get("id") == request_id), None)
if not req:
raise HTTPException(status_code=404, detail="Staff request not found")
if req.get("status") != "accepted":
raise HTTPException(status_code=409, detail="Progress updates require an accepted request")
req["progress"] = progress
req["stage"] = stage
for assignment in req.get("assignments") or []:
assignment["progress"] = progress
assignment["stage"] = stage
save_staff_requests()
issue_id = req.get("issue_id")
location = req.get("location") or req.get("zone") or "site"
label = "En route" if stage == "en_route" else "On scene"
update_msg = f"[UPDATE] {staff_name}{label}."
if note:
update_msg += f" Note: {note}"
if issue_id:
await _broadcast_issue_update(
issue_id,
update_msg,
progress=progress,
staff_request_status="accepted",
staff_assignments=req.get("assignments"),
)
await _append_staff_activity_entry(
staff_name,
location,
update_msg.replace("[UPDATE] ", ""),
event_type="staff_progress",
metadata={"request_id": request_id, "issue_id": issue_id, "stage": stage},
)
await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req}))
return req
@app.get("/staff/inbox")
async def get_staff_inbox(_: dict = Depends(require_staff_read)):
"""Live alerts, broadcasts, signage, and ack state for staff portal."""
active_signage = [
{"id": sid, "active": bool(active), "label": sid.upper()}
for sid, active in (signage_state or {}).items()
if active
]
return {
"alerts": alerts_broadcast_log[:40],
"dispatches": targeted_dispatches[:40],
"signage_active": active_signage,
"acks": staff_inbox_acks[:100],
}
@app.post("/staff/inbox/ack")
async def post_staff_inbox_ack(body: dict, principal: dict = Depends(require_staff)):
if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"):
raise HTTPException(status_code=403, detail="Staff JWT required")
staff_name = (principal.get("sub") or "").strip() or "Staff"
ref_id = (body.get("ref_id") or body.get("alert_id") or body.get("dispatch_id") or "").strip()
ref_type = (body.get("ref_type") or "broadcast").strip()
message = (body.get("message") or body.get("title") or "Broadcast acknowledged").strip()
entry = {
"id": f"ack-{uuid.uuid4().hex[:8]}",
"staff_name": staff_name,
"ref_id": ref_id,
"ref_type": ref_type,
"message": message,
"timestamp": datetime.datetime.now().isoformat(),
}
staff_inbox_acks.insert(0, entry)
del staff_inbox_acks[200:]
save_json(STAFF_INBOX_ACKS_FILE, staff_inbox_acks)
await _append_staff_activity_entry(
staff_name,
body.get("zone") or "",
f"Acknowledged: {message}",
event_type="broadcast_ack",
metadata={"ref_id": ref_id, "ref_type": ref_type},
)
await manager.broadcast(json.dumps({"type": "staff_inbox_ack", "ack": entry}))
return entry
@app.get("/staff/duty")
async def get_staff_duty(_: dict = Depends(require_staff_read)):
return staff_duty
@app.post("/staff/duty")
async def set_staff_duty(body: dict, principal: dict = Depends(require_staff)):
if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"):
raise HTTPException(status_code=403, detail="Staff JWT required")
name = (principal.get("sub") or "").strip() or "staff"
status = (body.get("status") or "available").strip().lower()
if status not in ("available", "busy", "off_duty"):
raise HTTPException(status_code=400, detail="status must be available, busy, or off_duty")
staff_duty[name] = {
"status": status,
"updated_at": datetime.datetime.now().isoformat(),
}
save_json(STAFF_DUTY_FILE, staff_duty)
await manager.broadcast(json.dumps({"type": "staff_duty_update", "staff": name, **staff_duty[name]}))
return staff_duty[name]
@app.post("/staff/check-in")
async def staff_check_in(body: dict, principal: dict = Depends(require_staff)):
if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"):
raise HTTPException(status_code=403, detail="Staff JWT required")
staff_name = (principal.get("sub") or "").strip() or "Staff"
zone = (body.get("zone") or body.get("location") or "Unknown zone").strip()
request_id = (body.get("request_id") or "").strip()
message = f"Checked in at {zone}"
entry = await _append_staff_activity_entry(
staff_name,
zone,
message,
event_type="check_in",
metadata={"request_id": request_id or None, "zone": zone},
)
if request_id:
try:
await update_staff_request_progress(
request_id,
{"stage": "on_scene", "note": f"Checked in at {zone}"},
principal,
)
except HTTPException as exc:
if exc.status_code not in (404, 409):
raise
logger.warning("check-in progress skipped for %s: %s", request_id, exc.detail)
return entry
@app.get("/staff/activity")
async def get_staff_activity(_: dict = Depends(require_dashboard_read)):
try:
if staff_activity:
return staff_activity[:50]
built: list[dict] = []
for row in incident_logs[:30]:
meta = row.get("metadata") or {}
if row.get("event_type") in ("staff_request_completed", "staff_request_sent", "sos"):
built.append({
"id": meta.get("request_id") or row.get("ts", ""),
"timestamp": row.get("ts", ""),
"name": meta.get("staff_name") or row.get("source", "Staff"),
"zone": meta.get("location") or row.get("source", ""),
"message": row.get("msg") or "",
})
for d in targeted_dispatches[:20]:
built.append({
"id": d.get("dispatch_id") or d.get("id", ""),
"timestamp": d.get("timestamp") or "",
"name": d.get("recipient") or "Staff",
"zone": d.get("status") or "",
"message": d.get("message") or "Dispatch update",
})
return built[:50]
except Exception as exc:
logger.warning("staff/activity failed: %s", exc)
return []
@app.post("/staff/activity")
async def post_staff_activity(body: dict, principal: dict = Depends(require_staff)):
if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"):
raise HTTPException(status_code=403, detail="Staff JWT required")
name = (body.get("name") or principal.get("sub") or "Staff").strip()
entry = await _append_staff_activity_entry(
name,
body.get("zone") or "",
body.get("message") or "",
event_type=body.get("event_type") or "staff_note",
metadata=body.get("metadata") or {},
)
return entry
@app.get("/site/signage-placements")
async def get_signage_placements(_: dict = Depends(require_operator_if_auth_enabled)):
return signage_placements or {}
@app.delete("/site/signage-placements/{sign_id}")
async def delete_signage_placement(
sign_id: str,
_: dict = Depends(require_admin_audited("site/signage-placements/delete")),
):
global signage_placements
if sign_id not in signage_placements:
raise HTTPException(status_code=404, detail="Placement not found")
del signage_placements[sign_id]
save_signage_placements()
if signage_state.get(sign_id):
await _broadcast_signage_state(sign_id, False)
await manager.broadcast(json.dumps({"type": "signage_placement_removed", "id": sign_id}))
return {"removed": sign_id}
@app.post("/site/signage-placements/{sign_id}/remove")
async def remove_signage_placement_post(
sign_id: str,
_: dict = Depends(require_admin_audited("site/signage-placements/remove")),
):
"""POST fallback for clients where DELETE preflight fails."""
return await delete_signage_placement(sign_id, _)
@app.post("/site/signage-placements")
async def save_signage_placement(payload: dict, _: None = Depends(require_operator)):
global signage_placements
sign_id = payload.get("id")
if not sign_id:
raise HTTPException(status_code=400, detail="Signage id required")
signage_placements[sign_id] = {
"lat": payload.get("lat"),
"lng": payload.get("lng"),
"updatedAt": datetime.datetime.now().isoformat(),
}
save_signage_placements()
await manager.broadcast(json.dumps({"type": "signage_placement", "id": sign_id, **signage_placements[sign_id]}))
return signage_placements[sign_id]
@app.get("/api/signage")
async def list_active_signage(_: dict = Depends(require_operator_if_auth_enabled)):
active = [r for r in signage_records if r.get("status", "active") == "active"]
return active
@app.get("/api/signage/history")
async def signage_history(_: dict = Depends(require_operator_if_auth_enabled)):
return signage_records[-200:]
@app.post("/api/signage")
async def create_signage_record(body: dict, principal: dict = Depends(require_operator)):
sign_type = (body.get("type") or "").upper().replace(" ", "_").replace("-", "_")
lat = body.get("lat")
lng = body.get("lng")
if not sign_type or lat is None or lng is None:
raise HTTPException(status_code=400, detail="type, lat, lng required")
placed_by = principal.get("sub") or principal.get("username") or "admin"
record = {
"id": f"sign-{uuid.uuid4().hex[:8]}",
"type": sign_type,
"lat": float(lat),
"lng": float(lng),
"placedAt": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"placedBy": placed_by,
"status": "active",
"broadcastSent": False,
}
signage_records.insert(0, record)
del signage_records[500:]
save_signage_records()
try:
await _broadcast_signage_alert(sign_type, float(lat), float(lng), action="deployed")
record["broadcastSent"] = True
save_signage_records()
except Exception as exc:
logger.warning("Signage broadcast failed: %s", exc)
await manager.broadcast(json.dumps({"type": "signage_record_created", "record": record}))
return record
@app.delete("/api/signage/{sign_id}")
async def remove_signage_record(sign_id: str, _: dict = Depends(require_operator)):
global signage_records
record = next((r for r in signage_records if r.get("id") == sign_id), None)
if not record:
raise HTTPException(status_code=404, detail="Sign not found")
record["status"] = "removed"
record["removedAt"] = datetime.datetime.now(datetime.timezone.utc).isoformat()
save_signage_records()
try:
await _broadcast_signage_alert(record.get("type", "SIGN"), record["lat"], record["lng"], action="removed")
except Exception as exc:
logger.warning("Signage removal broadcast failed: %s", exc)
await manager.broadcast(json.dumps({"type": "signage_record_removed", "id": sign_id}))
return {"removed": sign_id}
@app.get("/site/blueprint")
async def get_site_blueprint(_: dict = Depends(require_operator_if_auth_enabled)):
return site_blueprint or {}
@app.post("/site/blueprint")
async def upload_site_blueprint(
file: UploadFile = File(...),
sw_lat: float = Form(...),
sw_lng: float = Form(...),
ne_lat: float = Form(...),
ne_lng: float = Form(...),
_: dict = Depends(require_admin_audited("site/blueprint/upload")),
):
global site_blueprint
if file.content_type not in ALLOWED_IMAGE_TYPES:
raise HTTPException(status_code=400, detail="Blueprint must be JPEG, PNG, or WebP")
contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Blueprint")
ext = Path(file.filename or "blueprint.png").suffix.lower()
if ext not in ALLOWED_IMAGE_EXTENSIONS:
ext = ".png"
fname = f"site_blueprint{ext}"
out_path = os.path.join(BLUEPRINT_DIR, fname)
with open(out_path, "wb") as f:
f.write(contents)
site_blueprint = {
"url": f"/files/uploads/blueprints/{fname}",
"bounds": [[sw_lat, sw_lng], [ne_lat, ne_lng]],
"uploaded_at": datetime.datetime.now().isoformat(),
}
save_site_blueprint()
return site_blueprint
@app.get("/agentic/steps")
async def get_agentic_steps(
session_id: Optional[str] = Query(default=None),
principal: dict = Depends(require_operator_if_auth_enabled),
):
user_id = _operator_id(principal)
async with store_locks.agent_steps_lock:
scoped = [s for s in agent_steps_history if s.get("user_id") == user_id]
if session_id:
scoped = [s for s in scoped if s.get("session_id") == session_id]
return scoped[-50:]
async def _broadcast_issue_update(issue_id: str, append_text: str | None = None, **kwargs):
issue_snapshot = None
for iss in issues_db:
if iss["id"] == issue_id:
if append_text:
if "original_desc" not in iss:
iss["original_desc"] = iss.get("desc", "")
iss["desc"] = iss["original_desc"] + f"\n\n{append_text}"
iss.update(kwargs)
save_issues()
issue_snapshot = dict(iss)
break
if issue_snapshot:
await manager.broadcast(json.dumps({"type": "issue_update", "issue": issue_snapshot}))
return issue_snapshot
async def simulate_issue_progress(issue_id: str, staff_name: str = "Staff"):
if not DEMO_MODE or security_config.is_production():
return
await asyncio.sleep(4)
await _broadcast_issue_update(
issue_id,
f"[UPDATE] {staff_name} is en route.",
progress=60,
staff_assignments=[{"name": staff_name, "progress": 60, "stage": "en_route"}],
)
await _sync_staff_request_progress(issue_id, 60, "en_route")
await asyncio.sleep(4)
await _broadcast_issue_update(
issue_id,
f"[UPDATE] {staff_name} on scene — handling incident.",
progress=85,
staff_assignments=[{"name": staff_name, "progress": 85, "stage": "on_scene"}],
)
await _sync_staff_request_progress(issue_id, 85, "on_scene")
await asyncio.sleep(4)
await _broadcast_issue_update(
issue_id,
"[RESOLVED] Incident resolved.",
progress=100,
status="RESOLVED",
staff_request_status="resolved",
staff_assignments=[{"name": staff_name, "progress": 100, "stage": "resolved"}],
)
await _sync_staff_request_progress(issue_id, 100, "resolved", status="resolved")
await _append_incident_log(
"INFO",
"staff_portal",
"staff_request_completed",
f"{staff_name} completed issue {issue_id} (demo auto-progress).",
metadata={"issue_id": issue_id, "staff_name": staff_name, "demo": True},
)
async def simulate_vivek_progress(issue_id: str):
await simulate_issue_progress(issue_id, "Vivek")
@app.post("/issues")
async def create_issue(body: IssueCreate, _: None = Depends(require_operator)):
issue = body.model_dump()
issue.setdefault("id", f"ISS-{uuid.uuid4().hex[:6].upper()}")
issue.setdefault("progress", 0)
issue.setdefault("timestamp", datetime.datetime.now().isoformat())
if not issue.get("title"):
desc = (issue.get("desc") or "").strip()
issue["title"] = desc.split("\n")[0][:80] if desc else "Incident"
is_missing = "missing" in issue.get("title", "").lower() or "missing" in issue.get("desc", "").lower()
if is_missing and getattr(app.state, "last_missing_person_img", None):
issue["image"] = normalize_files_url(app.state.last_missing_person_img)
async with store_locks.issues_lock:
issues_db.insert(0, issue)
save_issues()
await manager.broadcast(json.dumps({"type": "issue_update", "issue": issue}))
return issue
@app.patch("/issues/{issue_id}")
async def patch_issue(issue_id: str, body: IssuePatch, _: None = Depends(require_operator)):
async with store_locks.issues_lock:
snapshot = None
for iss in issues_db:
if iss.get("id") == issue_id:
data = body.model_dump(exclude_unset=True)
_validate_issue_patch(iss, data)
iss.update(data)
save_issues()
snapshot = dict(iss)
break
if not snapshot:
raise HTTPException(status_code=404, detail="Issue not found")
await manager.broadcast(json.dumps({"type": "issue_update", "issue": snapshot}))
return snapshot
@app.post("/agentic/plan")
async def create_agentic_plan(payload: dict, _: None = Depends(require_operator)):
plan = generate_agentic_plan(payload)
event = {
"type": "agentic_plan_update",
"plan_id": str(uuid.uuid4()),
"created_at": datetime.datetime.now().isoformat(),
"incident": payload,
"plan": plan,
}
async with store_locks.agentic_plans_lock:
agentic_plans.insert(0, event)
_trim_memory_list(agentic_plans, 100)
save_agentic_plans()
await manager.broadcast(json.dumps(event))
return event
@app.post("/agentic/chat")
async def agentic_chat(payload: ChatPayload, _: None = Depends(require_operator)):
room_counts = {room["label"]: room.get("occupancy", 0) for room in vision_engine.room_stats.values()} if hasattr(vision_engine, "room_stats") else {}
crowd = getattr(vision_engine, "last_crowd_count", None)
if crowd is None:
crowd = getattr(vision_engine, "last_total_count", 0)
context = {
"crowd_count": crowd,
"live_face_count": _count_live_faces(),
"room_counts": room_counts,
"recent_alerts": [a if isinstance(a, dict) else a.model_dump() for a in alerts_db[-5:]] if alerts_db else [],
"active_models": vision_engine.get_ai_status()
}
loop = asyncio.get_event_loop()
reply = await loop.run_in_executor(None, generate_chat_response, payload.prompt, context)
return {"reply": reply}
@app.get("/dispatches")
async def get_dispatches(_: dict = Depends(require_operator_if_auth_enabled)):
return targeted_dispatches
@app.get("/user/profile")
async def get_user_profile(principal: dict = Depends(require_operator_if_auth_enabled)):
profile = dict(user_profile)
sub = principal.get("sub")
role = principal.get("role")
if principal.get("auth") == "jwt" and sub:
profile["name"] = sub
if role:
profile["role"] = role.replace("_", " ").title()
elif principal.get("auth") == "api_key" and role:
profile["role"] = f"API ({role})"
return profile
@app.get("/signage")
async def get_signage(_: dict = Depends(require_operator_if_auth_enabled)):
return signage_state
@app.post("/signage/{sign_id}/toggle")
async def toggle_signage(sign_id: str, payload: dict | None = Body(default=None), _: None = Depends(require_operator)):
"""Set signage to explicit active state, or flip when active omitted."""
body = payload or {}
if "active" in body:
return await _broadcast_signage_state(sign_id, bool(body["active"]))
current = signage_state.get(sign_id, False)
return await _broadcast_signage_state(sign_id, not current)
@app.get("/gossip/contact-network")
async def get_gossip_contact_network(
hours: Optional[float] = Query(default=None),
_: dict = Depends(require_operator_if_auth_enabled),
):
return _build_contact_network(hours)
@app.get("/gossip")
async def get_gossip(
root: Optional[str] = None,
date: Optional[str] = None,
_: dict = Depends(require_operator_if_auth_enabled),
):
"""
Return gossip graph. Optional ?date=YYYY-MM-DD loads a persisted snapshot.
Pass ?root=PersonName to filter the contact tree.
"""
if date:
snap_path = os.path.join(GOSSIP_HISTORY_DIR, f"{date}.json")
if os.path.isfile(snap_path):
return persist.load_json(snap_path, {})
return {"nodes": [], "links": [], "root_person": root or "", "is_tracking": False, "date": date, "empty": True}
if root:
gossip_bridge.set_root_person(root)
loop = asyncio.get_event_loop()
data = await loop.run_in_executor(None, gossip_bridge.get_gossip_json, root)
return data
@app.post("/gossip/set_root")
async def set_gossip_root(root: str, _: None = Depends(require_vision_access)):
"""Set the root person for gossip graph tracking."""
gossip_bridge.set_root_person(root)
return {"status": "ok", "root_person": root}
@app.post("/gossip/start")
async def start_gossip_tracking(
payload: GossipTrackingStart = GossipTrackingStart(),
_: None = Depends(require_vision_access),
):
person = (payload.personName or "").strip()
if person:
gossip_bridge.set_root_person(person)
gossip_bridge.start_tracking()
tracking = {
"staffId": payload.staffId,
"personName": payload.personName,
"cause": payload.cause,
"doc_names": payload.doc_names or [],
"started_at": datetime.datetime.now().isoformat(),
}
gossip_bridge.set_tracking_meta(tracking)
await manager.broadcast(json.dumps({"type": "gossip_tracking_update", "status": "started", **tracking}))
return {"status": "started", "tracking": tracking, "root_person": gossip_bridge.get_gossip_json().get("root_person")}
@app.post("/gossip/stop")
async def stop_gossip_tracking(_: None = Depends(require_vision_access)):
gossip_bridge.stop_tracking()
gossip_bridge.clear_tracking_meta()
await manager.broadcast(json.dumps({"type": "gossip_tracking_update", "status": "stopped"}))
return {"status": "stopped"}
@app.post("/gossip/clear")
async def clear_gossip_tracking(_: dict = Depends(require_admin_audited("gossip/clear"))):
gossip_bridge.clear_graph()
return {"status": "cleared"}
@app.post("/gossip/ingest_frame")
async def gossip_ingest_frame(
file: UploadFile = File(...),
cam_id: str = Form("cam-01"),
_: None = Depends(require_vision_access),
):
"""Run face recognition on a (browser webcam) frame and feed the gossip graph.
This is what makes contact tracing work without a server-side camera: the UI
streams frames here while tracking is active. Detected, enrolled people are
linked to the root person in the interaction graph.
"""
if file.content_type not in ALLOWED_IMAGE_TYPES:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type")
contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image")
nparr = np.frombuffer(contents, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image")
fe = vision_engine.face_engine
await _run_face_work(_ensure_face_db, fe)
matches: list[dict] = []
if hasattr(fe, "match_all_faces"):
matches = await _run_face_work(fe.match_all_faces, frame)
else:
single = await _run_face_work(vision_engine.search_missing_person, frame)
if single.get("found"):
matches = [{"name": single["name"], "confidence": single.get("confidence", 0.0),
"bbox": [0, 0, 0, 0], "found": True}]
detected = [m["name"] for m in matches]
known_names = [m["name"] for m in matches if m.get("found") and m["name"] != "Unknown"]
summary = gossip_bridge.ingest_detected_names(cam_id, detected)
data = gossip_bridge.get_gossip_json()
if summary.get("tracking"):
async with store_locks.gossip_lock:
today = datetime.datetime.now().strftime("%Y-%m-%d")
persist.save_json(os.path.join(GOSSIP_HISTORY_DIR, f"{today}.json"), data)
await manager.broadcast(json.dumps({
"type": "gossip_tracking_update",
"status": "detections",
"cam_id": cam_id,
"names": known_names,
}))
return {
"names": known_names,
"matches": matches,
"graph": data,
"is_tracking": data.get("is_tracking", False),
}
@app.get("/debug/vision")
async def debug_vision(_: None = Depends(require_vision_access)):
"""Regression detector for vision pipeline."""
fe = vision_engine.face_engine
pipeline = _get_vision_pipeline()
stats = pipeline.stats.as_dict() if hasattr(pipeline, "stats") else {}
return {
"engine_ready": getattr(fe, "ready", False),
"db_size": len(fe.db) if hasattr(fe, "db") else 0,
"queue_depth": stats.get("queue_depth", 0),
"fps": stats.get("fps", 0),
"inference_ms": stats.get("last_inference_ms", 0),
"processed": stats.get("processed", 0),
"dropped": stats.get("dropped", 0)
}
@app.get("/debug/backend")
async def debug_backend(_: None = Depends(require_vision_access)):
"""Regression detector for backend health."""
import psutil
process = psutil.Process()
return {
"active_sockets": len(manager.active_connections),
"agent_sockets": len(manager.agent_connections),
"memory_mb": round(process.memory_info().rss / 1024 / 1024, 2),
"detections_history_size": len(detections_history),
"alerts_size": len(alerts)
}
@app.post("/vision/track_frame")
async def vision_track_frame(
file: UploadFile = File(...),
cam_id: str = Form("cam-01"),
_: None = Depends(require_vision_access),
):
"""Continuous always-on tracking from a browser-supplied camera frame.
The frontend's global CameraTracker streams frames here as long as facial
recognition is enabled. This recognizes every face, records sightings into
the live detection history + per-camera face results (so the Agentic Copilot
and every page see the same live tracking), feeds the interaction graph, and
broadcasts a camera update to all clients.
"""
warming = _vision_warming_response()
if warming is not None:
return warming
if not vision_engine.get_ai_status().get("facial", False):
return {"tracking": False, "faces": [], "reason": "Facial recognition is disabled"}
if file.content_type not in ALLOWED_IMAGE_TYPES:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type")
contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image")
nparr = np.frombuffer(contents, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image")
fe = vision_engine.face_engine
pipeline = _get_vision_pipeline()
infer_frame = resize_for_infer(frame)
matches, skipped = await pipeline.infer(cam_id, frame)
if not skipped:
scale_match_bboxes(matches, infer_frame, frame)
if skipped:
if vision_session.is_cam_stale(cam_id):
return {
"tracking": True,
"cam_id": cam_id,
"live": False,
"stale": True,
"reason": "frame_timeout",
"faces": [],
"tracked_faces": [],
"presence": False,
"skipped": True,
"known_names": [],
"detection_updates": [],
}
cached = vision_engine.get_face_results().get(cam_id, [])
public_cached = [
{k: v for k, v in (f if isinstance(f, dict) else {}).items() if k != "embedding"}
for f in cached
]
meta = vision_session.get_cam_meta(cam_id)
return {
"tracking": True,
"cam_id": cam_id,
"live": True,
"stale": False,
"faces": public_cached,
"tracked_faces": public_cached,
"presence": bool(public_cached),
"skipped": True,
"frame_id": meta.get("frame_id"),
"presence_expires_at": meta.get("presence_expires_at"),
"known_names": [
f.get("name") for f in public_cached
if f.get("found") and f.get("name") not in ("Unknown", "unknown", "")
],
"detection_updates": [],
}
def _face_thumbnail_url(match: dict) -> str | None:
try:
x1, y1, x2, y2 = [int(v) for v in (match.get("bbox") or [0, 0, 0, 0])]
h, w = frame.shape[:2]
x1, y1 = max(0, x1), max(0, y1)
x2, y2 = min(w, x2), min(h, y2)
if x2 <= x1 or y2 <= y1:
return None
face_img = frame[y1:y2, x1:x2]
if face_img.size == 0:
return None
base_name = str(match.get("name", "unknown")).strip().lower().replace(" ", "_")
thumb_filename = f"face_{base_name}_{cam_id}.jpg"
thumb_path = os.path.join(_UPLOADS_PATH, thumb_filename)
if not cv2.imwrite(thumb_path, face_img):
return None
return f"/files/uploads/{thumb_filename}"
except Exception:
return None
faces_payload = []
for m in matches:
thumbnail_url = _face_thumbnail_url(m)
face_payload = {
"name": m.get("name", "Unknown"),
"confidence": round(float(m.get("confidence", 0.0)), 3),
"bbox": m.get("bbox", [0, 0, 0, 0]),
"found": bool(m.get("found", True)),
**({"embedding": m.get("embedding")} if m.get("embedding") is not None else {}),
}
if thumbnail_url:
face_payload["thumbnail"] = normalize_files_url(thumbnail_url)
faces_payload.append(face_payload)
public_faces_payload = [
{k: v for k, v in face.items() if k != "embedding"}
for face in faces_payload
]
with vision_engine.lock:
vision_engine.latest_raw_frames[cam_id] = frame.copy()
vision_engine.face_results[cam_id] = faces_payload
if hasattr(vision_engine, "mark_browser_feed_active"):
vision_engine.mark_browser_feed_active(cam_id, 0)
# Record sightings + gossip for enrolled and unknown_N identities.
tracked_names: list[str] = []
gossip_names: list[str] = []
gossip_bboxes: list[tuple] = []
frame_updates: list[dict] = []
global _detections_dirty
async with store_locks.detections_lock:
for m in matches:
name = str(m.get("name") or "Unknown").strip()
if name in ("Unknown", "unknown", "") or not m.get("found"):
continue
tracked_names.append(name)
gossip_names.append(name)
gossip_bboxes.append(tuple(m.get("bbox", [0, 0, 0, 0])))
# Save the frame to uploads folder for later display
thumbnail_filename = f"sighting_{name.lower()}_{cam_id}.jpg"
thumbnail_path = os.path.join(_UPLOADS_PATH, thumbnail_filename)
cv2.imwrite(thumbnail_path, frame)
# Upsert sighting with thumbnail path relative to serve_upload_file API
thumbnail_url = f"/files/uploads/{thumbnail_filename}"
entry = _upsert_detection_sighting(
name,
confidence=float(m.get("confidence", 0.0)),
cam_id=cam_id,
thumbnail=thumbnail_url,
)
frame_updates.append(entry)
_detections_dirty = True
if _detections_dirty:
save_detections()
_detections_dirty = False
# Proximity-based gossip (same algorithm as local vision_engine — ISSUE-039).
try:
if gossip_names:
gossip_bridge.on_detections(cam_id, gossip_names, gossip_bboxes, frame.shape[1])
gossip_bridge.ingest_detected_names(cam_id, gossip_names)
except Exception as exc:
logger.debug("gossip ingest from track_frame skipped: %s", exc)
frame_id = vision_session.record_processed_frame(cam_id, had_match=bool(tracked_names))
meta = vision_session.get_cam_meta(cam_id)
count = len(public_faces_payload)
vision_engine.last_face_count = count
browser_feed_count = len(getattr(vision_engine, "browser_feeds", {}) or {})
await manager.broadcast(json.dumps({
"type": "camera_broadcast",
"total_count": count,
"cameras": {cam_id: {
"count": count,
"faces": public_faces_payload,
"live": True,
"stale": False,
"frame_id": frame_id,
}},
"active_cam_count": browser_feed_count,
"detection_updates": frame_updates,
"timestamp": datetime.datetime.now().isoformat(),
}))
return {
"tracking": True,
"cam_id": cam_id,
"live": True,
"stale": False,
"presence": bool(tracked_names),
"faces": public_faces_payload,
"tracked_faces": public_faces_payload,
"frame_id": frame_id,
"last_frame_ts": meta.get("last_frame_ts"),
"presence_expires_at": meta.get("presence_expires_at"),
"known_names": tracked_names,
"detection_updates": frame_updates,
}
@app.post("/tracking/session/reset")
async def reset_tracking_session(
body: TrackingResetPayload = Body(default_factory=TrackingResetPayload),
principal: dict = Depends(require_operator),
):
"""Clear live tracking state for a fresh operator session.
Wipes sighting history, live face results, browser feed registry, and the
gossip interaction graph (edges). Enrolled face DB is untouched. Gossip
tracking stays enabled so new co-presence events accumulate cleanly.
By default does not broadcast — the initiating client resets local UI state.
Pass ``broadcast: true`` only for an explicit global coordination reset (admin only).
Pass ``full_reset: true`` when the operator explicitly starts a new session.
"""
if body.broadcast and not auth_service.has_role(principal, "admin"):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Global tracking reset requires admin role",
)
_audit_destructive(
principal,
"tracking/session/reset",
f"broadcast={body.broadcast} full_reset={body.full_reset}",
)
global detections_history, _detections_dirty
user_id = _operator_id(principal)
async with store_locks.detections_lock:
detections_history.clear()
_detections_dirty = False
save_detections()
vision_engine.face_results.clear()
if hasattr(vision_engine, "clear_browser_feeds"):
vision_engine.clear_browser_feeds()
gossip_bridge.clear_graph()
gossip_bridge.clear_tracking_meta()
gossip_bridge.start_tracking()
if os.path.exists(_FACE_DB_PATH):
enrolled = [
n for n in os.listdir(_FACE_DB_PATH)
if os.path.isdir(os.path.join(_FACE_DB_PATH, n)) and not n.startswith("unknown_")
]
gossip_bridge.register_enrolled_roster(enrolled)
today_path = os.path.join(GOSSIP_HISTORY_DIR, f"{datetime.datetime.now():%Y-%m-%d}.json")
if os.path.exists(today_path):
async with store_locks.gossip_lock:
try:
os.remove(today_path)
except OSError:
pass
if body.clear_agent_steps and body.session_id:
await _clear_agent_steps(user_id, body.session_id)
await agentic_orchestrator.prune_adk_sessions(user_id, keep_session_id=None)
if os.path.exists(_UPLOADS_PATH):
for item in os.listdir(_UPLOADS_PATH):
item_path = os.path.join(_UPLOADS_PATH, item)
if os.path.isfile(item_path):
try:
os.remove(item_path)
except OSError:
pass
if hasattr(app.state, "last_missing_person_img"):
app.state.last_missing_person_img = None
payload = {
"type": "tracking_session_reset",
"detections_history": [],
"timestamp": datetime.datetime.now().isoformat(),
"broadcast": body.broadcast,
}
if body.broadcast:
await manager.broadcast(json.dumps(payload))
agentic_orchestrator.inject_context(
alerts=alerts_db,
sos_events=sos_events,
vision_engine=vision_engine,
rooms=[],
detections_history=detections_history,
issues=issues_db,
incident_logs=incident_logs,
signage_state=signage_state,
)
return {"status": "reset", "message": "Live tracking session cleared", "broadcast": body.broadcast}
@app.get("/gossip/known_people")
async def get_known_people(_: dict = Depends(require_operator_if_auth_enabled)):
"""Return all people currently recognized by the face engine."""
# Merge: gossip_bridge known people + face DB keys
known = set(gossip_bridge.get_known_people())
known.update(vision_engine.face_engine.db.keys())
return {"known_people": sorted(known)}
# ── AI model toggle endpoints ──────────────────────────────────────────────
@app.get("/ai/status")
async def ai_status(_: dict = Depends(require_vision_access)):
"""Return enabled AI models plus per-model capability metadata."""
caps = (
vision_engine.get_ai_capabilities()
if hasattr(vision_engine, "get_ai_capabilities")
else {}
)
return {
"models": vision_engine.get_ai_status(),
"capabilities": caps,
"cloud": CEPHEUS_CLOUD,
}
@app.post("/ai/toggle/{model_id}")
async def ai_toggle(model_id: str, _: dict = Depends(require_operator)):
"""Toggle a specific AI model on/off."""
caps = (
vision_engine.get_ai_capabilities()
if hasattr(vision_engine, "get_ai_capabilities")
else {}
)
model_cap = caps.get(model_id, {})
if model_cap.get("supported") is False:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=model_cap.get("note", f"Model '{model_id}' is not available in this deployment"),
)
new_state = vision_engine.toggle_ai_model(model_id)
# Broadcast state change to all WS clients so frontend store stays in sync
await manager.broadcast(json.dumps({
"type": "ai_model_update",
"model_id": model_id,
"enabled": new_state,
"all": vision_engine.get_ai_status(),
}))
return {"model_id": model_id, "enabled": new_state}
@app.get("/face_db/list")
async def list_face_db(_: dict = Depends(require_operator_if_auth_enabled)):
"""List enrolled persons — roster aligned with the embedding store used for search."""
enrolled_keys: set[str] = set()
fe = getattr(vision_engine, "face_engine", None)
if fe is not None:
try:
_ensure_face_db(fe)
db = getattr(fe, "db", {}) or {}
enrolled_keys = {str(k) for k in db if not str(k).startswith("unknown_")}
except Exception as exc:
logger.debug("face_db list reload skipped: %s", exc)
people: list[dict] = []
disk_names: set[str] = set()
if os.path.exists(_FACE_DB_PATH):
for person_name in os.listdir(_FACE_DB_PATH):
if person_name.startswith("unknown_"):
continue
person_dir = os.path.join(_FACE_DB_PATH, person_name)
if not os.path.isdir(person_dir):
continue
disk_names.add(person_name)
imgs = [f for f in os.listdir(person_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]
img_url = f"/files/face/{person_name}/{imgs[0]}" if imgs else None
has_emb = person_name in enrolled_keys
meta = face_metadata.read_person_metadata(person_name, _FACE_DB_PATH)
person_entry = {
"name": person_name,
"image": img_url,
"status": "AUTHORIZED" if has_emb else "PENDING_EMBEDDING",
"role": meta.get("role") or face_metadata.DEFAULT_ROLE,
}
people.append(person_entry)
for name in enrolled_keys:
if name not in disk_names:
meta = face_metadata.read_person_metadata(name, _FACE_DB_PATH)
person_entry = {
"name": name,
"image": None,
"status": "AUTHORIZED",
"role": meta.get("role") or face_metadata.DEFAULT_ROLE,
}
people.append(person_entry)
for p in people:
person_dir = os.path.join(_FACE_DB_PATH, p["name"])
if os.path.isdir(person_dir):
try:
mtime = os.path.getmtime(person_dir)
p["enrolledAt"] = datetime.datetime.fromtimestamp(mtime, tz=datetime.timezone.utc).isoformat()
except OSError:
pass
if not p.get("role"):
p["role"] = face_metadata.DEFAULT_ROLE
det = next((d for d in detections_history if d.get("name") == p["name"]), None)
if det:
p["lastSeen"] = det.get("seen_at")
p["lastLocation"] = det.get("camId")
return sorted(people, key=lambda x: x["name"])
@app.delete("/face_db/{person_name}")
async def delete_face_db_person(
person_name: str,
_: dict = Depends(require_admin_audited("face_db/delete")),
):
cleaned = _sanitize_person_name(person_name)
# 1. Remove image folder from face_database/
person_dir = os.path.join(_FACE_DB_PATH, cleaned)
if os.path.isdir(person_dir):
import shutil
shutil.rmtree(person_dir, ignore_errors=True)
# 2. Remove .npy embedding files (the root cause of the bug — these were never deleted)
fe = getattr(vision_engine, "face_engine", None)
fr_dir = os.path.dirname(_FACE_DB_PATH) # Face_Recognition/
for emb_folder in ("faces_db", "temp_faces_db"):
npy_path = os.path.join(fr_dir, emb_folder, f"{cleaned}.npy")
if os.path.exists(npy_path):
try:
os.remove(npy_path)
logger.info("Deleted embedding file: %s", npy_path)
except OSError as exc:
logger.warning("Could not delete %s: %s", npy_path, exc)
# 3. Remove from in-memory db (both cleaned and display-name variants)
if fe and hasattr(fe, "db"):
for key in [cleaned, cleaned.replace("_", " "), person_name.strip()]:
fe.db.pop(key, None)
# 4. Invalidate DB so next access reloads from disk (where the npy is now gone)
_invalidate_face_db(fe)
logger.info("Deleted face identity: %s", cleaned)
return {"status": "deleted", "name": cleaned}
@app.get("/face_db/debug")
async def face_db_debug(_: dict = Depends(require_operator_if_auth_enabled)):
"""Return embedding shape and norm for every enrolled identity — useful for diagnosing score=0 issues."""
fe = getattr(vision_engine, "face_engine", None)
if fe is None:
return {"error": "face_engine not initialized", "db": []}
_ensure_face_db(fe)
db = getattr(fe, "db", {}) or {}
entries = []
for name, emb in db.items():
if emb is None:
entries.append({"name": name, "shape": None, "norm": None, "status": "null_embedding", "is_unknown": name.startswith("unknown_")})
continue
try:
shape = list(emb.shape)
norm = float(np.linalg.norm(emb))
status = "ok" if norm > 0.1 else "zero_or_near_zero"
except Exception as exc:
shape = None
norm = None
status = f"error: {exc}"
entries.append({
"name": name,
"shape": shape,
"norm": round(norm, 4) if norm is not None else None,
"status": status,
"is_unknown": name.startswith("unknown_"),
})
return {
"insightface_loaded": getattr(fe, "app", None) is not None,
"db_size": len(db),
"enrolled_named": len([e for e in entries if not e["is_unknown"]]),
"entries": sorted(entries, key=lambda e: e["name"]),
}
@app.post("/face_db/clear_temp")
async def clear_temp_faces(_: dict = Depends(require_vision_access)):
"""Clear temporary faces and embeddings from the current session."""
fr_dir = os.path.dirname(_FACE_DB_PATH)
temp_emb = os.path.join(fr_dir, "temp_faces_db")
temp_img = os.path.join(fr_dir, "temp_face_database")
deleted = 0
for d in (temp_emb, temp_img):
if not os.path.exists(d):
continue
for f in os.listdir(d):
# Preserve persisted unknown identities across sessions
if f.startswith("unknown_"):
continue
fp = os.path.join(d, f)
if os.path.isfile(fp):
try:
os.remove(fp)
deleted += 1
except Exception as e:
logger.warning("Failed to delete temp face file %s: %s", fp, e)
fe = getattr(vision_engine, "face_engine", None)
if fe:
_invalidate_face_db(fe)
return {"status": "success", "deleted": deleted}
@app.post("/emergency/dispatch-log")
async def post_emergency_dispatch_log(body: dict, _: None = Depends(require_operator)):
entry = {
"id": body.get("id") or f"disp-{uuid.uuid4().hex[:8]}",
"timestamp": body.get("timestamp") or datetime.datetime.now(datetime.timezone.utc).isoformat(),
"type": body.get("type") or "general",
"message": body.get("message") or "",
"status": body.get("status") or "pending",
}
emergency_dispatch_log.insert(0, entry)
del emergency_dispatch_log[200:]
save_json(DISPATCH_LOG_FILE, emergency_dispatch_log)
return entry
@app.get("/alerts/log")
async def get_alerts_log(_: dict = Depends(require_operator_if_auth_enabled)):
return alerts_broadcast_log[-50:]
@app.post("/alerts/log")
async def post_alerts_log(body: dict, _: None = Depends(require_operator)):
entry = {
"id": f"bc-{uuid.uuid4().hex[:8]}",
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"message": body.get("message") or "",
"recipients": body.get("recipients") or [],
"relatedIssue": body.get("relatedIssue"),
"sentBy": body.get("sentBy") or "admin",
"deliveryCount": body.get("deliveryCount") or len(body.get("recipients") or []),
}
alerts_broadcast_log.insert(0, entry)
del alerts_broadcast_log[100:]
save_json(ALERTS_LOG_FILE, alerts_broadcast_log)
return entry
@app.get("/face_results")
async def get_face_results(_: dict = Depends(require_vision_access)):
"""Return latest face recognition hits per camera (expired results are empty/stale)."""
raw = vision_engine.get_face_results()
return vision_session.build_face_results_payload(raw, _strip_face_embeddings)
@app.get("/debug/vision")
async def debug_vision(_: dict = Depends(require_vision_access)):
"""Live vision pipeline diagnostics for HF debugging."""
try:
pipe = _get_vision_pipeline()
stats = pipe.stats.as_dict()
except Exception as exc:
logger.warning("debug/vision pipeline stats failed: %s", exc)
stats = {}
presence = vision_session.presence_summary()
agent_open = len(manager.agent_connections) > 0
ws_open = len(manager.active_connections) > 0
raw_faces = vision_engine.get_face_results()
last_match = None
last_score = None
for cam_id, faces in (raw_faces or {}).items():
for f in faces or []:
if not isinstance(f, dict):
continue
if f.get("found") and f.get("name"):
last_match = str(f.get("name"))
last_score = float(f.get("confidence") or 0)
break
if last_match:
break
return {
"fps": stats.get("fps", 0),
"queue": stats.get("queue_depth", 0),
"socket_count": len(manager.active_connections),
"agent_socket_count": len(manager.agent_connections),
"camera_count": vision_session.camera_count(),
"polling_enabled": False,
"websocket_state": "open" if ws_open else "closed",
"ws_connected": ws_open,
"agent_ws_connected": agent_open,
"agent_ws_state": "open" if agent_open else "closed",
"ws_state": "open" if ws_open else "closed",
"watchdog_state": "client_side",
"inference_ms": stats.get("last_inference_ms", 0),
"dropped_frames": stats.get("dropped", 0),
"skipped_frames": stats.get("skipped", 0),
"processed_frames": stats.get("processed", 0),
"face_workers": _FACE_WORKERS,
"public_vision": public_vision_allowed(),
"facial_enabled": vision_engine.get_ai_status().get("facial", False),
"presence_state": presence.get("presence_state"),
"presence_is_stale": presence.get("presence_is_stale"),
"last_frame_age_ms": presence.get("last_frame_age_ms"),
"last_presence_update_age_ms": presence.get("last_presence_update_age_ms"),
"last_match_age_ms": presence.get("last_match_age_ms"),
"presence_ttl_s": vision_session.PRESENCE_TTL_S,
"track_frame_ok": bool(raw_faces),
"inference_complete": stats.get("processed", 0) > 0,
"result_emitted": last_match is not None,
"last_match": last_match,
"last_score": last_score,
"last_result_age_ms": presence.get("last_match_age_ms"),
}
@app.get("/debug/emergency")
async def debug_emergency():
"""Emergency subsystem diagnostics."""
return {
"dispatch_log_count": len(emergency_dispatch_log),
"staff_activity_count": len(staff_activity),
"sos_event_count": len(sos_events),
"staff_request_count": len(staff_requests_db),
"httpx_available": httpx is not None,
"nearby_cache_entries": len(_nearby_cache),
"maps_configured": maps_configured() if "maps_configured" in globals() else False,
"public_vision": public_vision_allowed(),
"fallback_mode": "overpass" if httpx is not None else "unavailable",
"service_availability": {
"overpass": httpx is not None,
"maps_api": maps_configured() if "maps_configured" in globals() else False,
},
}
@app.post("/missing_person")
async def missing_person_search(file: UploadFile = File(...), _: None = Depends(require_operator)):
"""Upload an image, search for that person across all live camera feeds, and trigger agent."""
try:
if file.content_type not in ALLOWED_IMAGE_TYPES:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type")
contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image")
nparr = np.frombuffer(contents, np.uint8)
query_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if query_frame is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image")
fe = vision_engine.face_engine
await _run_face_work(_ensure_face_db, fe)
result = await _run_face_work(vision_engine.search_missing_person, query_frame)
# Save image for evidence
upload_dir, file_path = _safe_image_upload_path(file.filename or "capture.jpg")
with open(file_path, "wb") as f:
f.write(contents)
app.state.last_missing_person_img = normalize_files_url(f"/uploads/{os.path.basename(file_path)}")
result = dict(result)
result["image_url"] = app.state.last_missing_person_img
if CEPHEUS_CLOUD:
result.setdefault("search_mode", result.get("search_mode") or "database_only")
if not result.get("found"):
best_score = result.get("best_score", 0)
enrolled = result.get("enrolled_count", 0)
if enrolled == 0 or best_score == 0:
result["reason"] = (
"Face database is empty — no enrolled faces to match against. "
"Go to Issues → Face Database to enroll faces first."
)
else:
result["reason"] = result.get("reason") or (
"No match in enrolled database. Live camera search requires GPU vision backend."
)
# Operator decides whether to raise an issue — do not auto-create via agent.
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Missing person search error: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Missing person search failed")
@app.post("/face/search_live")
async def face_search_live(file: UploadFile = File(...), _: None = Depends(require_operator)):
"""
Search for the face in the uploaded image across ALL active live camera feeds.
Unlike /missing_person (which searches the enrolled face DB), this endpoint
takes the embedding of the uploaded image and compares it directly against the
live face embeddings currently detected by the vision engine. This is the
correct implementation of "Search Live Feed" — the uploaded image is the query,
not the current camera frame.
"""
warming = _vision_warming_response()
if warming is not None:
return warming
try:
if file.content_type not in ALLOWED_IMAGE_TYPES:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type")
contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image")
nparr = np.frombuffer(contents, np.uint8)
query_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if query_frame is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image")
fe = vision_engine.face_engine
await _run_face_work(_ensure_face_db, fe)
if face_live_search is not None:
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
None,
face_live_search.search_query_in_live_feeds,
query_frame,
fe,
vision_engine,
None,
)
result["search_mode"] = "live"
else:
logger.info("face_live_search unavailable, falling back to database search")
result = await _run_face_work(vision_engine.search_missing_person, query_frame)
result["search_mode"] = "database_only"
# Save upload for evidence / UI preview
upload_dir, file_path = _safe_image_upload_path(file.filename or "search_query.jpg")
with open(file_path, "wb") as f:
f.write(contents)
result["image_url"] = result.get("match_image") or normalize_files_url(f"/uploads/{os.path.basename(file_path)}")
return result
except HTTPException:
raise
except Exception as e:
logger.error("face/search_live error: %s", e)
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Live face search failed")
@app.post("/register_face")
async def register_face(
name: str = Form(...),
role: str = Form(default="Staff"),
file: UploadFile = File(...),
_: dict = Depends(require_admin_audited("register_face")),
):
"""Register a named face from an uploaded image into the face database."""
try:
if file.content_type not in ALLOWED_IMAGE_TYPES:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type")
cleaned_name = _sanitize_person_name(name)
contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image")
nparr = np.frombuffer(contents, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image")
fe = vision_engine.face_engine
if hasattr(fe, "register_from_frame"):
ok = await _run_face_work(fe.register_from_frame, cleaned_name, frame)
else:
ok = await _run_face_work(fe.register_face_from_frame, cleaned_name, frame)
if not ok:
raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="No face detected for registration")
face_metadata.write_person_metadata(cleaned_name, _FACE_DB_PATH, role=role)
_invalidate_face_db(fe)
return {"success": ok, "name": cleaned_name, "role": role.strip() or face_metadata.DEFAULT_ROLE}
except HTTPException:
raise
except Exception as e:
logger.error(f"register_face error: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Face registration failed")
@app.post("/upload_video")
async def upload_video(file: UploadFile = File(...), _: None = Depends(require_operator)):
"""Upload a video file to be used as a camera source."""
try:
upload_dir, file_path = _safe_upload_path(file.filename)
with open(file_path, "wb") as f:
f.write(_read_limited_bytes(await file.read(MAX_VIDEO_BYTES + 1), MAX_VIDEO_BYTES, "Video"))
rel_name = os.path.basename(file_path)
rel_url = f"/files/uploads/{rel_name}"
logger.info("Video uploaded: %s", rel_url)
return {"success": True, "file_path": rel_url, "url": rel_url}
except HTTPException:
raise
except Exception as e:
logger.error(f"upload_video error: {e}")
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Video upload failed")
# ---------------------------------------------------------------------------
# WebSocket — /ws (alerts + camera control + signage sync + SOS)
# ---------------------------------------------------------------------------
WS_HEARTBEAT_INTERVAL_S = 20.0
WS_RECEIVE_TIMEOUT_S = float(os.getenv("CEPHEUS_WS_RECEIVE_TIMEOUT", "300") or "300")
async def _ws_send_heartbeat(websocket: WebSocket) -> None:
"""Keep HF/nginx proxy from killing idle WebSocket connections."""
try:
while True:
await asyncio.sleep(WS_HEARTBEAT_INTERVAL_S)
await manager.send_personal_message(
json.dumps({"type": "ping", "ts": time.time()}),
websocket,
)
except Exception:
pass
def _vision_set_camera_for_client(client_id: str, cam_id: str, index) -> bool:
if hasattr(vision_engine, "set_camera_for_client"):
return vision_engine.set_camera_for_client(client_id, cam_id, index)
return vision_engine.set_camera(cam_id, index)
def _vision_release_camera_for_client(client_id: str, cam_id: str) -> None:
if hasattr(vision_engine, "release_camera_for_client"):
vision_engine.release_camera_for_client(client_id, cam_id)
else:
vision_engine.release_camera(cam_id)
def _vision_release_client_cameras(client_id: str) -> None:
if hasattr(vision_engine, "release_client_cameras"):
vision_engine.release_client_cameras(client_id)
_active_track_frames: set[str] = set()
def _process_frame_sync(frame_bytes: bytes) -> dict:
"""
Runs inside thread pool. Pure sync. No asyncio. No app.prepare().
"""
import cv2
import numpy as np
nparr = np.frombuffer(frame_bytes, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
return {"type": "face_results", "faces": []}
# Vision engine should NOT use asyncio primitives internally.
# All locking inside must use threading.Lock (NOT asyncio.Lock)
matches = vision_engine.face_engine.match_all_faces(frame)
faces_payload = []
for m in matches:
faces_payload.append({
"name": m.get("name", "Unknown"),
"confidence": round(float(m.get("confidence", 0.0)), 3),
"bbox": m.get("bbox", [0, 0, 0, 0]),
"found": bool(m.get("found", True))
})
return {"type": "face_results", "faces": faces_payload}
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
if not _ws_auth_ok(websocket):
await websocket.close(code=1008, reason="Unauthorized")
return
ws_principal = _ws_principal(websocket)
client_id = str(uuid.uuid4())
vision_session.register_client(client_id)
await manager.connect(websocket)
heartbeat_task = asyncio.create_task(_ws_send_heartbeat(websocket))
# Send current signage state on connection
await manager.send_personal_message(json.dumps({
"type": "signage_init",
"all": signage_state,
}), websocket)
try:
while True:
try:
data = await asyncio.wait_for(
websocket.receive_text(),
timeout=WS_RECEIVE_TIMEOUT_S,
)
except asyncio.TimeoutError:
logger.debug("WS receive timeout (no client message) — keeping connection %s", client_id[:8])
continue
try:
msg = json.loads(data)
mtype = msg.get("type")
if mtype == "pong":
continue
if mtype == "ping":
await manager.send_personal_message(
json.dumps({"type": "pong", "ts": msg.get("ts", time.time())}),
websocket,
)
continue
if mtype != "track_frame":
logger.info(f"Received from client: {data}")
if mtype == "track_frame":
cam_id = msg.get("cam_id")
image_b64 = msg.get("image")
if cam_id and image_b64:
if cam_id in _active_track_frames:
continue
_active_track_frames.add(cam_id)
loop = asyncio.get_event_loop()
async def _process_track_frame(cid: str, img: str) -> None:
try:
await _run_face_work(vision_engine.process_browser_frame, cid, img)
finally:
_active_track_frames.discard(cid)
_spawn(_process_track_frame(cam_id, image_b64))
continue
if mtype == "frame":
frame_b64 = msg.get("data", "")
if not frame_b64:
continue
try:
import base64
encoded = frame_b64.split(",")[1] if "," in frame_b64 else frame_b64
frame_bytes = base64.b64decode(encoded)
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(
_FACE_EXECUTOR,
_process_frame_sync,
frame_bytes
)
await manager.send_personal_message(json.dumps(result), websocket)
except Exception as exc:
logger.error("Frame processing error: %s", exc)
continue
if mtype == "select_camera":
if not _ws_can_mutate(ws_principal):
await manager.send_personal_message(json.dumps({
"type": "error",
"msg": "Insufficient permissions for camera control",
}), websocket)
continue
cam_id = msg.get("cam_id")
index = msg.get("index")
if cam_id is not None and index is not None:
ok_claim, claim_reason = vision_session.claim_camera(client_id, cam_id, index)
if not ok_claim:
await manager.send_personal_message(json.dumps({
"type": "camera_ack",
"cam_id": cam_id,
"index": index,
"success": False,
"reason": claim_reason,
}), websocket)
continue
if claim_reason == "duplicate_ignored":
await manager.send_personal_message(json.dumps({
"type": "camera_ack",
"cam_id": cam_id,
"index": index,
"success": True,
"duplicate": True,
}), websocket)
continue
async def _open_camera(ws, cid, idx, owner_id):
try:
loop = asyncio.get_event_loop()
success = await loop.run_in_executor(
_CAMERA_EXECUTOR, _vision_set_camera_for_client, owner_id, cid, idx
)
await manager.send_personal_message(json.dumps({
"type": "camera_ack",
"cam_id": cid,
"index": idx,
"success": success,
}), ws)
log_msg = f"Camera {cid} → index {idx}: {'OK' if success else 'FAILED'}"
logger.info(log_msg)
except Exception as e:
logger.error(f"Camera open task error for {cid}: {e}")
_spawn(_open_camera(websocket, cam_id, index, client_id))
elif mtype == "close_camera":
if not _ws_can_mutate(ws_principal):
await manager.send_personal_message(json.dumps({
"type": "error",
"msg": "Insufficient permissions for camera control",
}), websocket)
continue
cam_id = msg.get("cam_id")
if cam_id:
_vision_release_camera_for_client(client_id, cam_id)
logger.info(f"Released camera {cam_id} via WS request (client {client_id[:8]})")
await manager.send_personal_message(json.dumps({
"type": "camera_ack",
"cam_id": cam_id,
"index": None,
"success": True,
}), websocket)
elif mtype in ("set_signage", "toggle_signage"):
if not _ws_can_mutate(ws_principal):
await manager.send_personal_message(json.dumps({
"type": "error",
"msg": "Insufficient permissions for signage control",
}), websocket)
continue
sign_id = msg.get("id")
if sign_id:
if "active" in msg:
await _broadcast_signage_state(sign_id, bool(msg["active"]))
else:
current = signage_state.get(sign_id, False)
await _broadcast_signage_state(sign_id, not current)
elif mtype == "sos":
try:
sos_limiter.check_ws(websocket, "sos")
except HTTPException:
await manager.send_personal_message(json.dumps({
"type": "error",
"msg": "SOS rate limit exceeded",
}), websocket)
continue
event = {
"id": str(uuid.uuid4()),
"guest_id": msg.get("guest_id", "unknown"),
"lat": msg.get("lat", 0),
"lng": msg.get("lng", 0),
"location_label": msg.get("location", "Unknown"),
"message": msg.get("message", "SOS"),
"timestamp": datetime.datetime.now().isoformat(),
}
async with store_locks.sos_lock:
sos_events.append(event)
_trim_memory_list(sos_events, 100)
save_sos()
alert = Alert(
id=event["id"],
type="sos",
location=event["location_label"],
message=event["message"],
severity="critical",
lat=event["lat"],
lng=event["lng"],
)
await _process_new_alert(
alert,
source="GUEST_APP",
extra_context={"guest_id": event["guest_id"]},
sos_mode=True,
)
await manager.broadcast(json.dumps({"type": "sos_event", **event}))
except Exception as e:
logger.error(f"Error handling WS message: {e}")
try:
await manager.send_personal_message(
json.dumps({"type": "error", "msg": "Request failed"}),
websocket,
)
except Exception:
pass
except WebSocketDisconnect:
logger.info("Client disconnected.")
except Exception as e:
logger.error(f"WS error: {e}")
finally:
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
_vision_release_client_cameras(client_id)
vision_session.release_client(client_id)
manager.disconnect(websocket)
# ---------------------------------------------------------------------------
# WebSocket — /ws/agents (ADK multi-agent streaming)
# ---------------------------------------------------------------------------
def _decode_copilot_image_data(image_data: str) -> tuple[bytes | None, str]:
"""Decode a data-URL or raw base64 image from the agent copilot UI."""
if not image_data or not isinstance(image_data, str):
return None, "image/jpeg"
data = image_data.strip()
mime = "image/jpeg"
if data.startswith("data:"):
header, _, payload = data.partition(",")
if ";" in header:
mime = header[5:].split(";")[0].strip() or mime
data = payload
try:
return base64.b64decode(data, validate=False), mime
except Exception:
return None, mime
async def _vision_context_from_copilot_image(image_bytes: bytes) -> dict | None:
"""Run missing-person vision search on a copilot-attached image."""
if not image_bytes:
return None
try:
nparr = np.frombuffer(image_bytes, np.uint8)
frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if frame is None:
return None
result = await _run_face_work(vision_engine.search_missing_person, frame)
return dict(result) if result else None
except Exception as exc:
logger.warning("Copilot image vision search failed: %s", exc)
return None
@app.websocket("/ws/agents")
async def agents_ws(websocket: WebSocket):
"""Streaming WebSocket for the ADK multi-agent orchestrator."""
if not _ws_auth_ok(websocket):
await websocket.close(code=1008, reason="Unauthorized")
return
try:
await manager.connect_agent(websocket)
except Exception as exc:
logger.warning("Agent WS accept failed: %s", exc)
try:
await websocket.close(code=1011, reason="Agent socket unavailable")
except Exception:
pass
return
try:
await manager.send_personal_message(
json.dumps({"type": "connected", "msg": "agent_ws_ready"}),
websocket,
)
except Exception:
pass
ws_principal = _ws_principal(websocket)
ws_user_id = _operator_id(ws_principal)
heartbeat_task = asyncio.create_task(_ws_send_heartbeat(websocket))
try:
while True:
try:
raw = await asyncio.wait_for(
websocket.receive_text(),
timeout=WS_RECEIVE_TIMEOUT_S,
)
except asyncio.TimeoutError:
logger.debug("Agents WS receive timeout — keeping connection")
continue
try:
msg = json.loads(raw)
except Exception:
await manager.send_personal_message(json.dumps({"type": "error", "msg": "Invalid JSON"}), websocket)
continue
if msg.get("type") == "pong":
continue
if msg.get("type") == "ping":
await manager.send_personal_message(
json.dumps({"type": "pong", "ts": msg.get("ts", time.time())}),
websocket,
)
continue
if msg.get("type") != "agent_query":
continue
prompt = (msg.get("prompt") or "").strip()
raw_image_data = msg.get("image_data")
image_bytes, image_mime = _decode_copilot_image_data(raw_image_data) if raw_image_data else (None, "image/jpeg")
if raw_image_data and image_bytes is None:
await manager.send_personal_message(
json.dumps({"type": "error", "msg": "Could not decode attached image"}),
websocket,
)
continue
if image_bytes and not prompt:
prompt = "Analyze the attached image and report tactical findings."
elif image_bytes:
vision_ctx = await _vision_context_from_copilot_image(image_bytes)
if vision_ctx:
prompt = (
f"{prompt}\n\n[Vision engine analysis of attached image: "
f"{json.dumps(vision_ctx)}]"
)
else:
prompt = f"{prompt}\n\n[User attached an image for visual analysis.]"
if not prompt:
await manager.send_personal_message(
json.dumps({"type": "error", "msg": "Prompt or image required"}),
websocket,
)
continue
session_id = msg.get("session_id", "default")
agent_override = (msg.get("agent_override") or "").strip() or None
# Update orchestrator context with freshest live data each query
agentic_orchestrator.inject_context(
alerts=alerts_db,
sos_events=sos_events,
vision_engine=vision_engine,
rooms=[],
detections_history=detections_history,
issues=issues_db,
incident_logs=incident_logs,
signage_state=signage_state,
)
try:
async for step in agentic_orchestrator.run_agent_stream(
prompt,
session_id,
user_id=ws_user_id,
agent_override=agent_override,
image_data=image_bytes,
image_mime=image_mime,
):
await _append_agent_step(step, ws_user_id, session_id)
await manager.send_personal_message(json.dumps({"type": "agent_step", **step}), websocket)
save_issues() # Save after activity
await manager.send_personal_message(json.dumps({"type": "agent_done"}), websocket)
except Exception as exc:
logger.error(f"Agent stream error: {exc}")
await manager.send_personal_message(
json.dumps({"type": "agent_step", "agent": "System", "content": "Agent request failed", "step_type": "error"}),
websocket,
)
await manager.send_personal_message(json.dumps({"type": "agent_done"}), websocket)
except WebSocketDisconnect:
logger.info("Agent client disconnected.")
except Exception as e:
logger.error(f"Agents WS error: {e}")
finally:
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
manager.disconnect(websocket)
# ---------------------------------------------------------------------------
# WebSocket — /ws/vision (legacy single-frame processing)
# ---------------------------------------------------------------------------
@app.websocket("/ws/vision")
async def vision_ws(websocket: WebSocket):
"""Legacy single-frame endpoint — prefer browser track_frame + /ws camera_broadcast."""
if not _ws_auth_ok(websocket):
await websocket.close(code=1008, reason="Unauthorized")
return
logger.warning("Client connected to deprecated /ws/vision — migrate to track_frame flow")
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
count, frame_b64 = vision_engine.process_frame(data)
await websocket.send_text(json.dumps({"count": count, "frame": frame_b64}))
except WebSocketDisconnect:
pass
except Exception as e:
logger.error(f"Vision WS error: {e}")
# ---------------------------------------------------------------------------
# Background task — camera broadcast loop
# ---------------------------------------------------------------------------
async def background_vision_task():
"""Read from all cameras every ~100ms, broadcast frames + AI results."""
loop = asyncio.get_event_loop()
while True:
try:
active_results, new_events = await loop.run_in_executor(None, vision_engine.get_active_frames)
# Process AI-triggered alerts (Falls, Stampedes)
for evt in new_events:
alert = Alert(**evt)
# This will broadcast to both Tactical and Agentic via existing logic
_spawn(_process_new_alert(alert, source="VISION_ENGINE_AI"))
def to_b64(frame) -> str | None:
if frame is None:
return None
_, buf = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
return f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}"
total_count = sum(count for count, _ in active_results.values())
vision_engine.last_crowd_count = total_count
vision_engine.last_total_count = total_count
prev_crowd = getattr(vision_engine, "_last_broadcast_crowd", None)
if prev_crowd != total_count:
vision_engine._last_broadcast_crowd = total_count
await manager.broadcast(json.dumps({
"type": "crowd_update",
"total": total_count,
"timestamp": datetime.datetime.now().isoformat(),
}))
cameras_payload = {}
for cam_id, (count, annotated) in active_results.items():
face_hits = vision_engine.face_results.get(cam_id, [])
cameras_payload[cam_id] = {
"count": count,
"frame": to_b64(annotated),
"faces": [
{k: v for k, v in (face or {}).items() if k != "embedding"}
for face in face_hits
],
}
# Update persistent history for known faces
async with store_locks.detections_lock:
for f in face_hits:
if f["name"] != "Unknown":
_upsert_detection_sighting(
f["name"],
confidence=float(f.get("confidence", 0.0)),
cam_id=cam_id,
thumbnail=f.get("thumbnail"),
)
global _detections_dirty
_detections_dirty = True
global _last_detection_flush
now = datetime.datetime.now()
if _detections_dirty and (now - _last_detection_flush).total_seconds() >= DETECTION_SAVE_INTERVAL_SECONDS:
save_detections()
_detections_dirty = False
_last_detection_flush = now
frame_detection_updates: list[dict] = []
for cam_id, (count, _) in active_results.items():
for f in vision_engine.face_results.get(cam_id, []):
if f.get("name") and f["name"] != "Unknown":
frame_detection_updates.append(
next(
(d for d in detections_history if d.get("name") == f["name"]),
{
"name": f["name"],
"camId": cam_id,
"confidence": f.get("confidence"),
"seen_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
},
)
)
broadcast_data = {
"type": "camera_broadcast",
"total_count": total_count,
"cameras": cameras_payload,
"active_cam_count": len(active_results),
"detection_updates": frame_detection_updates,
"timestamp": datetime.datetime.now().isoformat(),
}
await manager.broadcast(json.dumps(broadcast_data))
except Exception as e:
logger.error(f"Background vision task error: {e}")
await asyncio.sleep(0.1) # ~10 fps
def _schedule_coro(coro) -> None:
"""Schedule a coroutine on the main event loop from any thread (agent tools run in a worker thread)."""
loop = getattr(app.state, "loop", None)
if loop is None:
try:
_spawn(coro)
except RuntimeError:
logger.warning("No event loop available to schedule agent broadcast")
return
try:
asyncio.run_coroutine_threadsafe(coro, loop)
except Exception as exc: # pragma: no cover - defensive
logger.warning("Failed to schedule agent broadcast: %s", exc)
def _on_agent_issue_created(issue: dict) -> None:
"""Persist and broadcast issues created/updated by the agent orchestrator."""
save_issues()
_schedule_coro(manager.broadcast(json.dumps({"type": "issue_update", "issue": issue})))
def _on_agent_ai_model_changed(model_id: str, enabled: bool, all_status: dict) -> None:
"""Broadcast vision AI model changes triggered by the agent so the UI stays in sync."""
_schedule_coro(manager.broadcast(json.dumps({
"type": "ai_model_update",
"model_id": model_id,
"enabled": enabled,
"all": all_status,
})))
def _agent_broadcast_alert(alert_type: str, message: str, location: str, severity: str) -> dict:
"""Agent action: raise a real alert through the standard alert pipeline."""
alert = Alert(
type=alert_type or "info",
location=location or "",
message=message or "",
severity=severity or "high",
)
_schedule_coro(_process_new_alert(alert, source="AGENT"))
return alert.model_dump()
def _agent_set_signage(signage_id: str, active: bool) -> None:
"""Agent action: toggle a digital signage panel."""
_schedule_coro(_broadcast_signage_state(signage_id, bool(active)))
def _agent_update_issue(issue: dict) -> None:
"""Agent action: persist + broadcast an issue the agent mutated in place."""
save_issues()
_schedule_coro(manager.broadcast(json.dumps({"type": "issue_update", "issue": issue})))
_warmload_in_progress_lock = asyncio.Lock()
_warmload_in_progress = False
async def _safe_warmload_models() -> dict | None:
global _warmload_in_progress
if _warmload_in_progress:
logger.info("Warmload already in progress — skipping concurrent call.")
return None
_warmload_in_progress = True
try:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(_FACE_EXECUTOR, vision_engine.warmload_models)
finally:
_warmload_in_progress = False
async def _run_vision_warmload() -> None:
"""Background model warmload — idempotent, safe to call from startup or /health/live."""
global _warmload_complete
if not mark_warmload_started():
return
accel = detect_acceleration()
logger.info(
"Vision warmload starting (full_vision=%s, provider=%s, cuda=%s).",
use_full_vision_engine(),
accel.get("provider"),
accel.get("cuda_available"),
)
loop = asyncio.get_event_loop()
try:
result = await _safe_warmload_models()
if result is None:
return
mark_warmload_complete(result)
_warmload_complete = True
logger.info("[Startup] Vision warmload complete")
except Exception as exc:
mark_warmload_failed(str(exc))
logger.error("Vision warmload failed: %s", exc)
async def _keep_warm_loop() -> None:
"""CRITICAL: sleep FIRST — never run warmload at t=0."""
if not use_full_vision_engine():
return
interval = max(60, int(os.getenv("CEPHEUS_KEEP_WARM_SEC", "120")))
# Wait for startup to truly complete before first iteration
while not _warmload_complete:
await asyncio.sleep(5)
# Now sleep the full interval before first keep-warm warmload
await asyncio.sleep(interval)
logger.info("Keep-warm loop started (interval=%ss).", interval)
while True:
try:
await _safe_warmload_models()
except Exception as exc:
logger.debug("Keep-warm tick failed: %s", exc)
await asyncio.sleep(interval)
@app.on_event("startup")
async def startup_event():
app.state.loop = asyncio.get_running_loop()
security_config.validate_startup()
auth_service.validate_production_users()
load_all_data()
auth_service.init_refresh_store()
if os.getenv("CEPHEUS_PRODUCTION", "").strip() == "1":
if not os.getenv("CEPHEUS_JWT_SECRET", "").strip():
logger.critical("CEPHEUS_PRODUCTION=1 but CEPHEUS_JWT_SECRET is missing")
if os.getenv("CEPHEUS_AUTH_DEV_MODE", "").strip() == "1":
logger.critical("CEPHEUS_AUTH_DEV_MODE must be 0 in production")
logger.info("Application started. Persistent data loaded. Refresh store: %s", auth_service.refresh_store_backend())
logger.info("Face inference pool: %s workers (OMP_NUM_THREADS=%s)", _FACE_WORKERS, os.getenv("OMP_NUM_THREADS", "default"))
# Cloud engine loads InsightFace at import time; warmload + keep-warm maintain hot models.
await _run_vision_warmload()
_spawn(_keep_warm_loop())
if not CEPHEUS_CLOUD:
_spawn(background_vision_task())
if os.getenv("CEPHEUS_GOSSIP_AUTO_START", "1").strip().lower() not in ("0", "false", "no", "off"):
gossip_bridge.start_tracking()
gossip_root = os.getenv("CEPHEUS_GOSSIP_ROOT", "").strip()
enrolled_names: list[str] = []
if os.path.exists(_FACE_DB_PATH):
for person_name in os.listdir(_FACE_DB_PATH):
if person_name.startswith("unknown_"):
continue
person_dir = os.path.join(_FACE_DB_PATH, person_name)
if os.path.isdir(person_dir):
gossip_bridge.seed_known_person(person_name)
enrolled_names.append(person_name)
if gossip_root:
gossip_bridge.set_root_person(gossip_root)
elif enrolled_names:
gossip_bridge.set_root_person(sorted(enrolled_names)[0])
if CEPHEUS_CLOUD and use_full_vision_engine():
accel = detect_acceleration()
mode = (
"cloud full vision (GPU, no local camera broadcast loop)"
if accel.get("cuda_available")
else "cloud full vision (CPU, no local camera broadcast loop)"
)
elif CEPHEUS_CLOUD:
mode = "cloud stub (no ML / cameras)"
else:
mode = "full vision (local)"
logger.info("Gossip bridge ready. Root: %s. Mode: %s", gossip_bridge.get_gossip_json().get("root_person"), mode)
# Inject live context into ADK agent orchestrator
agentic_orchestrator.inject_context(
alerts=alerts_db,
sos_events=sos_events,
vision_engine=vision_engine,
rooms=[],
detections_history=detections_history,
issues=issues_db,
incident_logs=incident_logs,
signage_state=signage_state,
)
if hasattr(agentic_orchestrator, "set_issue_created_callback"):
agentic_orchestrator.set_issue_created_callback(_on_agent_issue_created)
if hasattr(agentic_orchestrator, "set_ai_model_changed_callback"):
agentic_orchestrator.set_ai_model_changed_callback(_on_agent_ai_model_changed)
if hasattr(agentic_orchestrator, "set_action_handlers"):
agentic_orchestrator.set_action_handlers(
broadcast_alert=_agent_broadcast_alert,
set_signage=_agent_set_signage,
update_issue=_agent_update_issue,
)
logger.info("Vision AI defaults: %s", vision_engine.get_ai_status())
_orch_mode = "ADK multi-agent" if getattr(agentic_orchestrator, "_ADK_AVAILABLE", False) else "native google.genai"
logger.info("Agentic orchestrator ready (%s).", _orch_mode)
if __name__ == "__main__":
import uvicorn
_port = int(os.environ.get("PORT", os.environ.get("UVICORN_PORT", "8000")))
uvicorn.run(app, host="0.0.0.0", port=_port)