| from __future__ import annotations |
| from fastapi import FastAPI, WebSocket, WebSocketDisconnect, File, UploadFile, Form, Depends, Header, HTTPException, status, Request, Query, Body |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel, Field |
| from typing import List, Optional |
| import datetime |
| import uuid |
| import logging |
| import json |
| import cv2 |
| import base64 |
| import asyncio |
| import time |
| import math |
| import numpy as np |
| import sys |
| import os |
| from concurrent.futures import ThreadPoolExecutor |
| from vision_pipeline import VisionFramePipeline, resize_for_infer, scale_match_bboxes |
| import vision_session |
|
|
| |
|
|
| try: |
| import httpx |
| except Exception: |
| httpx = None |
| from pathlib import Path |
| from dotenv import load_dotenv |
|
|
| |
| _FR_PATH = os.path.join(os.path.dirname(__file__), "Face_Recognition") |
| if _FR_PATH not in sys.path: |
| sys.path.insert(0, _FR_PATH) |
|
|
| import gossip_bridge |
| import auth_service |
| import store_locks |
| from observability import RequestContextMiddleware |
| from security_headers import SecurityHeadersMiddleware |
| from rate_limiter import login_limiter, refresh_limiter, sos_limiter |
| from agentic_service import generate_agentic_plan, generate_chat_response |
| from alert_routing import route_alert |
| import persistence as persist |
| import security_config |
| import face_metadata |
| try: |
| import face_live_search |
| except ImportError as e: |
| import traceback |
| print(f"ERROR: Failed to import face_live_search: {e}") |
| traceback.print_exc() |
| face_live_search = None |
|
|
|
|
| try: |
| import agentic_orchestrator |
| except ImportError as _orch_import_err: |
| print(f"WARNING: agentic_orchestrator unavailable ({_orch_import_err}) — agent WS uses stub responses.") |
|
|
| class _AgenticOrchestratorStub: |
| @staticmethod |
| def inject_context(**_kwargs): |
| pass |
|
|
| @staticmethod |
| async def run_agent_stream(prompt: str, session_id: str = "default", **_kwargs): |
| yield { |
| "agent": "System", |
| "content": "Agent runtime unavailable. Install google-adk and set GEMINI_API_KEY for live agent queries.", |
| "step_type": "error", |
| } |
|
|
| agentic_orchestrator = _AgenticOrchestratorStub() |
|
|
| load_dotenv(os.path.join(os.path.dirname(__file__), ".env"), override=True) |
| if not os.getenv("GEMINI_API_KEY"): |
| print("WARNING: GEMINI_API_KEY not found in environment!") |
| else: |
| print("INFO: GEMINI_API_KEY loaded successfully.") |
|
|
| |
| CEPHEUS_CLOUD = os.getenv("CEPHEUS_CLOUD", "").lower() in ("1", "true", "yes") |
| from vision_runtime import ( |
| detect_acceleration, |
| get_warmload_state, |
| live_status_payload, |
| mark_warmload_complete, |
| mark_warmload_failed, |
| mark_warmload_started, |
| register_face_ready_probe, |
| use_full_vision_engine, |
| ) |
|
|
| if use_full_vision_engine(): |
| from vision_engine import VisionEngine |
| else: |
| from vision_engine_cloud import VisionEngine |
|
|
|
|
| |
| DATA_DIR = os.path.join(os.path.dirname(__file__), "data") |
| os.makedirs(DATA_DIR, exist_ok=True) |
|
|
| DETECTIONS_FILE = os.path.join(DATA_DIR, "detections_history.json") |
| ALERTS_FILE = os.path.join(DATA_DIR, "alerts.json") |
| ISSUES_FILE = os.path.join(DATA_DIR, "issues.json") |
| LOGS_FILE = os.path.join(DATA_DIR, "incident_logs.json") |
| SOS_FILE = os.path.join(DATA_DIR, "sos_events.json") |
| AGENT_STEPS_FILE = os.path.join(DATA_DIR, "agent_steps.json") |
| STAFF_REQUESTS_FILE = os.path.join(DATA_DIR, "staff_requests.json") |
| SITE_BLUEPRINT_FILE = os.path.join(DATA_DIR, "site_blueprint.json") |
| SIGNAGE_PLACEMENTS_FILE = os.path.join(DATA_DIR, "signage_placements.json") |
| SIGNAGE_STATE_FILE = os.path.join(DATA_DIR, "signage_state.json") |
| AGENTIC_PLANS_FILE = os.path.join(DATA_DIR, "agentic_plans.json") |
| TARGETED_DISPATCHES_FILE = os.path.join(DATA_DIR, "targeted_dispatches.json") |
| DISPATCH_LOG_FILE = os.path.join(DATA_DIR, "emergency_dispatch_log.json") |
| ALERTS_LOG_FILE = os.path.join(DATA_DIR, "alerts_broadcast_log.json") |
| STAFF_ACTIVITY_FILE = os.path.join(DATA_DIR, "staff_activity.json") |
| STAFF_DUTY_FILE = os.path.join(DATA_DIR, "staff_duty.json") |
| STAFF_INBOX_ACKS_FILE = os.path.join(DATA_DIR, "staff_inbox_acks.json") |
| SIGNAGE_RECORDS_FILE = os.path.join(DATA_DIR, "signage_records.json") |
| GOSSIP_HISTORY_DIR = os.path.join(DATA_DIR, "gossip_history") |
| BLUEPRINT_DIR = os.path.join(DATA_DIR, "uploads", "blueprints") |
|
|
| _warmload_complete = False |
|
|
| detections_history: List[dict] = [] |
| alerts_db: List[dict] = [] |
| issues_db: List[dict] = [] |
| incident_logs: List[dict] = [] |
| sos_events: List[dict] = [] |
| agent_steps_history: List[dict] = [] |
| staff_requests_db: List[dict] = [] |
| site_blueprint: dict = {} |
| signage_placements: dict = {} |
| emergency_dispatch_log: List[dict] = [] |
| alerts_broadcast_log: List[dict] = [] |
| staff_activity: List[dict] = [] |
| staff_duty: dict[str, dict] = {} |
| staff_inbox_acks: List[dict] = [] |
| signage_records: List[dict] = [] |
|
|
| SIGNAGE_BROADCAST_TEMPLATES = { |
| "FIRE_HERE": "FIRE ALERT: Fire reported at {location}. Evacuate the area immediately. Follow E-L markers to exits.", |
| "DO_NOT_ENTER": "AREA CLOSED: {location} is blocked. Do not enter. Seek alternate route.", |
| "SHELTER": "SHELTER IN PLACE: Report to shelter zone at {location}. Await further instructions.", |
| "ALL_CLEAR": "ALL CLEAR: {location} is now safe. Normal operations may resume.", |
| "LOCKDOWN": "LOCKDOWN INITIATED: Full lockdown active at {location}. Remain in place. Do not move until further notice.", |
| "EL": "EMERGENCY EXIT: Follow E-L route at {location} to nearest exit.", |
| } |
|
|
| def load_all_data(): |
| global detections_history, alerts_db, issues_db, incident_logs, sos_events |
| global agent_steps_history, staff_requests_db, site_blueprint, signage_placements |
| global signage_state, agentic_plans, targeted_dispatches |
| global emergency_dispatch_log, alerts_broadcast_log, staff_activity, signage_records |
| global staff_duty, staff_inbox_acks |
|
|
| list_files = { |
| DETECTIONS_FILE: "detections_history", |
| ALERTS_FILE: "alerts_db", |
| ISSUES_FILE: "issues_db", |
| LOGS_FILE: "incident_logs", |
| SOS_FILE: "sos_events", |
| AGENT_STEPS_FILE: "agent_steps_history", |
| STAFF_REQUESTS_FILE: "staff_requests_db", |
| } |
| dict_files = { |
| SITE_BLUEPRINT_FILE: "site_blueprint", |
| SIGNAGE_PLACEMENTS_FILE: "signage_placements", |
| SIGNAGE_STATE_FILE: "signage_state", |
| } |
| list_defaults = { |
| AGENTIC_PLANS_FILE: "agentic_plans", |
| TARGETED_DISPATCHES_FILE: "targeted_dispatches", |
| DISPATCH_LOG_FILE: "emergency_dispatch_log", |
| ALERTS_LOG_FILE: "alerts_broadcast_log", |
| STAFF_ACTIVITY_FILE: "staff_activity", |
| STAFF_INBOX_ACKS_FILE: "staff_inbox_acks", |
| SIGNAGE_RECORDS_FILE: "signage_records", |
| } |
| dict_list_defaults = { |
| STAFF_DUTY_FILE: "staff_duty", |
| } |
|
|
| for path, var_name in list_files.items(): |
| loaded = persist.load_json(path, []) |
| if isinstance(loaded, list): |
| globals()[var_name] = loaded |
| logger.info("Loaded %s items from %s", len(loaded), os.path.basename(path)) |
|
|
| detections_history[:] = [_normalize_detection_entry(d) for d in detections_history] |
|
|
| for path, var_name in dict_files.items(): |
| loaded = persist.load_json(path, {}) |
| if isinstance(loaded, dict): |
| globals()[var_name] = loaded |
| logger.info("Loaded dict from %s", os.path.basename(path)) |
|
|
| for path, var_name in list_defaults.items(): |
| loaded = persist.load_json(path, []) |
| if isinstance(loaded, list): |
| globals()[var_name] = loaded |
|
|
| for path, var_name in dict_list_defaults.items(): |
| loaded = persist.load_json(path, {}) |
| if isinstance(loaded, dict): |
| globals()[var_name] = loaded |
|
|
| if not signage_state: |
| signage_state.update({ |
| "s1": False, "s2": False, "s3": False, |
| "s4": False, "s5": False, "s6": True, "s8": False, |
| }) |
|
|
| def save_json(path, data): |
| persist.save_json(path, data) |
|
|
| def save_detections(): save_json(DETECTIONS_FILE, detections_history[-100:]) |
|
|
|
|
| def _normalize_detection_entry(raw: dict) -> dict: |
| """Ensure persisted sightings use ISO seen_at (ISSUE-031).""" |
| entry = dict(raw) |
| if not entry.get("seen_at"): |
| ts = entry.get("timestamp") |
| if ts and "T" in str(ts): |
| entry["seen_at"] = ts |
| else: |
| entry["seen_at"] = None |
| if entry.get("seen_at") and not entry.get("timestamp"): |
| try: |
| entry["timestamp"] = datetime.datetime.fromisoformat( |
| str(entry["seen_at"]).replace("Z", "+00:00") |
| ).strftime("%H:%M:%S") |
| except ValueError: |
| entry["timestamp"] = str(entry["seen_at"]) |
| return entry |
|
|
|
|
| def _upsert_detection_sighting( |
| name: str, |
| *, |
| confidence: float, |
| cam_id: str, |
| thumbnail: str | None = None, |
| ) -> dict: |
| """Record or refresh one person's sighting with ISO seen_at.""" |
| seen_at = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| entry: dict = { |
| "name": name, |
| "confidence": round(float(confidence), 3), |
| "camId": cam_id, |
| "seen_at": seen_at, |
| "timestamp": datetime.datetime.now().strftime("%H:%M:%S"), |
| } |
| if thumbnail is not None: |
| entry["thumbnail"] = thumbnail |
| idx = next((i for i, d in enumerate(detections_history) if d.get("name") == name), None) |
| if idx is not None: |
| detections_history[idx] = entry |
| else: |
| detections_history.append(entry) |
| _trim_memory_list(detections_history, 500) |
| return entry |
|
|
|
|
| async def _ensure_staff_request_issue_id(alert: "Alert", extra_context: dict | None = None) -> str | None: |
| """Require a real issue for staff dispatch — create one if missing (ISSUE-033).""" |
| issue_id = alert.issue_id or alert.issue or (extra_context or {}).get("issue_id") |
| if issue_id or alert.type != "staff_request": |
| return issue_id |
| new_issue = { |
| "id": f"ISS-{uuid.uuid4().hex[:6].upper()}", |
| "title": f"Staff request @ {alert.location or 'site'}", |
| "desc": alert.message, |
| "status": "ONGOING", |
| "progress": 0, |
| "priority": alert.severity or "medium", |
| "timestamp": datetime.datetime.now().isoformat(), |
| "staff_request_status": "pending", |
| "staff_needed": 1, |
| } |
| async with store_locks.issues_lock: |
| issues_db.insert(0, new_issue) |
| save_issues() |
| await manager.broadcast(json.dumps({"type": "issue_update", "issue": new_issue})) |
| return new_issue["id"] |
|
|
|
|
| def _issue_context_for_staff(issue_id: str | None) -> dict: |
| """Snapshot issue fields for staff request detail views.""" |
| if not issue_id: |
| return {} |
| for iss in issues_db: |
| if iss.get("id") != issue_id: |
| continue |
| meta = iss.get("metadata") or {} |
| attachments = list(meta.get("attachments") or []) |
| if iss.get("image") and iss["image"] not in attachments: |
| attachments.append(iss["image"]) |
| return { |
| "issue_title": iss.get("title") or "", |
| "issue_priority": str(iss.get("priority") or "MEDIUM").upper(), |
| "issue_status": iss.get("status") or "ONGOING", |
| "issue_desc": iss.get("desc") or "", |
| "instructions": meta.get("instructions") or (iss.get("desc") or "")[:800], |
| "zone": meta.get("zone") or meta.get("location") or "", |
| "lat": meta.get("lat") or iss.get("lat"), |
| "lng": meta.get("lng") or iss.get("lng"), |
| "linked_sos_id": meta.get("sos_id") or meta.get("linked_sos_id"), |
| "attachments": attachments, |
| } |
| return {} |
|
|
|
|
| def _enriched_staff_request(req: dict) -> dict: |
| """Merge live issue context into a staff request for detail views.""" |
| merged = dict(req) |
| merged.update(_issue_context_for_staff(req.get("issue_id"))) |
| return merged |
|
|
|
|
| _STAFF_MILESTONE_PROGRESS = { |
| "accepted": 25, |
| "en_route": 60, |
| "on_scene": 85, |
| "resolved": 100, |
| } |
|
|
|
|
| async def _append_staff_activity_entry( |
| name: str, |
| zone: str, |
| message: str, |
| *, |
| event_type: str = "staff_action", |
| metadata: dict | None = None, |
| ) -> dict: |
| entry = { |
| "id": f"act-{uuid.uuid4().hex[:8]}", |
| "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| "name": name or "Staff", |
| "zone": zone or "", |
| "message": message or "", |
| "event_type": event_type, |
| "metadata": metadata or {}, |
| } |
| staff_activity.insert(0, entry) |
| del staff_activity[100:] |
| save_json(STAFF_ACTIVITY_FILE, staff_activity) |
| await manager.broadcast(json.dumps({"type": "staff_activity", "entry": entry})) |
| return entry |
|
|
|
|
| _ISSUE_TERMINAL = frozenset({"RESOLVED", "CLOSED", "CANCELLED"}) |
|
|
|
|
| def _validate_issue_patch(current: dict, patch: dict) -> None: |
| """Guard invalid issue state transitions (ISSUE-041).""" |
| cur_status = str(current.get("status", "ONGOING")).upper() |
| new_status = patch.get("status") |
| if new_status is not None: |
| new_status_u = str(new_status).upper() |
| if cur_status in _ISSUE_TERMINAL and new_status_u not in _ISSUE_TERMINAL: |
| raise HTTPException( |
| status_code=409, |
| detail=f"Cannot reopen issue from terminal status {cur_status}", |
| ) |
| new_progress = patch.get("progress") |
| if new_progress is not None: |
| prog = int(new_progress) |
| if prog >= 100 and new_status is None and cur_status not in _ISSUE_TERMINAL: |
| patch["status"] = "RESOLVED" |
| if prog < 100 and str(patch.get("status", cur_status)).upper() in _ISSUE_TERMINAL: |
| raise HTTPException( |
| status_code=409, |
| detail="Cannot set progress below 100 on a resolved/closed issue", |
| ) |
| def save_alerts(): save_json(ALERTS_FILE, [a if isinstance(a, dict) else a.model_dump() for a in alerts_db[-200:]]) |
| def save_issues(): save_json(ISSUES_FILE, issues_db[-100:]) |
| def save_logs(): save_json(LOGS_FILE, incident_logs[-300:]) |
| def save_sos(): save_json(SOS_FILE, sos_events[-100:]) |
| def save_agent_steps(): save_json(AGENT_STEPS_FILE, agent_steps_history[-200:]) |
| def save_staff_requests(): save_json(STAFF_REQUESTS_FILE, staff_requests_db[-200:]) |
| def save_site_blueprint(): save_json(SITE_BLUEPRINT_FILE, site_blueprint) |
| def save_signage_placements(): save_json(SIGNAGE_PLACEMENTS_FILE, signage_placements) |
| def save_signage_state(): save_json(SIGNAGE_STATE_FILE, signage_state) |
| def save_agentic_plans(): save_json(AGENTIC_PLANS_FILE, agentic_plans[-100:]) |
| def save_targeted_dispatches(): save_json(TARGETED_DISPATCHES_FILE, targeted_dispatches[-200:]) |
| def save_signage_records(): save_json(SIGNAGE_RECORDS_FILE, signage_records[-500:]) |
|
|
| def _trim_memory_list(store: list, max_len: int) -> None: |
| if len(store) > max_len: |
| del store[max_len:] |
|
|
|
|
| def _operator_id(principal: dict | None) -> str: |
| return str((principal or {}).get("sub") or "operator") |
|
|
|
|
| async def _append_agent_step(step: dict, user_id: str, session_id: str) -> None: |
| record = { |
| **step, |
| "user_id": user_id, |
| "session_id": session_id, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| async with store_locks.agent_steps_lock: |
| agent_steps_history.append(record) |
| _trim_memory_list(agent_steps_history, 500) |
| save_agent_steps() |
|
|
|
|
| async def _clear_agent_steps(user_id: str, session_id: str | None = None) -> None: |
| global agent_steps_history |
| async with store_locks.agent_steps_lock: |
| if session_id: |
| agent_steps_history = [ |
| s for s in agent_steps_history |
| if not (s.get("user_id") == user_id and s.get("session_id") == session_id) |
| ] |
| else: |
| agent_steps_history = [s for s in agent_steps_history if s.get("user_id") != user_id] |
| save_agent_steps() |
|
|
|
|
| _detections_dirty = False |
| _last_detection_flush = datetime.datetime.min |
| DETECTION_SAVE_INTERVAL_SECONDS = float(os.getenv("DETECTION_SAVE_INTERVAL_SECONDS", "2")) |
|
|
| MAX_IMAGE_BYTES = int(os.getenv("MAX_IMAGE_UPLOAD_BYTES", str(5 * 1024 * 1024))) |
| MAX_VIDEO_BYTES = int(os.getenv("MAX_VIDEO_UPLOAD_BYTES", str(50 * 1024 * 1024))) |
| ALLOWED_VIDEO_EXTENSIONS = {".mp4", ".mov", ".avi", ".mkv", ".webm"} |
| ALLOWED_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".webp"} |
| ALLOWED_IMAGE_TYPES = {"image/jpeg", "image/png", "image/webp"} |
| API_KEY = security_config.resolve_api_key() |
| DEMO_MODE = security_config.is_demo_mode() |
| IS_PRODUCTION = security_config.is_production() |
|
|
| |
| |
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(levelname)s: %(message)s", |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| def _task_done_callback(task: asyncio.Task) -> None: |
| if task.cancelled(): |
| return |
| exc = task.exception() |
| if exc is not None: |
| logger.error("Unhandled error in background task: %s", exc, exc_info=exc) |
|
|
|
|
| def _spawn(coro) -> asyncio.Task: |
| task = asyncio.create_task(coro) |
| task.add_done_callback(_task_done_callback) |
| return task |
|
|
| |
| |
| |
| app = FastAPI( |
| title="Crisis Communication System", |
| docs_url=None if IS_PRODUCTION else "/docs", |
| redoc_url=None if IS_PRODUCTION else "/redoc", |
| openapi_url=None if IS_PRODUCTION else "/openapi.json", |
| ) |
|
|
|
|
| def _parse_origins() -> list[str]: |
| origins_env = os.getenv("CORS_ORIGINS", "") |
| if origins_env.strip(): |
| return [o.strip() for o in origins_env.split(",") if o.strip()] |
| return [ |
| "https://community-security-and-emergency-ma.vercel.app", |
| "https://rapid-eec43.web.app", |
| "https://rapid-eec43.firebaseapp.com", |
| "http://localhost:5173", |
| "http://localhost:5174", |
| "http://127.0.0.1:5173", |
| "http://127.0.0.1:5174", |
| ] |
|
|
|
|
| def _vision_warming_response() -> JSONResponse | None: |
| """Return 503 while InsightFace/YOLO warmload is still running on cold start.""" |
| if not use_full_vision_engine(): |
| return None |
| if _warmload_complete or get_warmload_state().get("complete"): |
| return None |
| return JSONResponse( |
| status_code=503, |
| content={"status": "warming_up", "message": "Vision model loading. Retry in 10s."}, |
| ) |
|
|
|
|
| def _api_key_valid(provided: str | None) -> bool: |
| if not provided: |
| return False |
| if API_KEY and security_config.safe_compare_strings(provided, API_KEY): |
| return True |
| readonly = security_config.resolve_readonly_api_key() |
| return bool(readonly and security_config.safe_compare_strings(provided, readonly)) |
|
|
|
|
| def _api_key_role(provided: str | None) -> str: |
| if provided and API_KEY and security_config.safe_compare_strings(provided, API_KEY): |
| return os.getenv("CEPHEUS_API_KEY_ROLE", "service") |
| readonly = security_config.resolve_readonly_api_key() |
| if provided and readonly and security_config.safe_compare_strings(provided, readonly): |
| return "readonly" |
| guest = os.getenv("CEPHEUS_GUEST_API_KEY", "").strip() |
| if provided and guest and security_config.safe_compare_strings(provided, guest): |
| return "guest" |
| return "service" |
|
|
|
|
| def _enforce_readonly_writes(request: Request, principal: dict) -> None: |
| if principal.get("role") == "readonly" and request.method not in ("GET", "HEAD", "OPTIONS"): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="Readonly API key cannot perform write operations", |
| ) |
|
|
|
|
| def _guest_api_key_valid(provided: str | None) -> bool: |
| if not provided: |
| return False |
| guest = os.getenv("CEPHEUS_GUEST_API_KEY", "").strip() |
| if guest and security_config.safe_compare_strings(provided, guest): |
| return True |
| |
| if not security_config.is_production() and _api_key_valid(provided): |
| return _api_key_role(provided) in ("guest", "service") |
| return False |
|
|
|
|
| def _demo_login_allowed(request: Request) -> bool: |
| if security_config.is_production() or not DEMO_MODE: |
| return False |
| host = (request.client.host if request.client else "") or "" |
| return host in ("127.0.0.1", "localhost", "::1") |
|
|
|
|
| def require_api_key(x_api_key: str | None = Header(default=None, alias="X-API-Key")) -> None: |
| if not API_KEY: |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Server API key not configured") |
| if not _api_key_valid(x_api_key): |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized") |
|
|
|
|
| def _resolve_bearer(authorization: str | None) -> dict | None: |
| if not authorization or not authorization.lower().startswith("bearer "): |
| return None |
| token = authorization.split(" ", 1)[1].strip() |
| if not token: |
| return None |
| try: |
| payload = auth_service.decode_access_token(token) |
| return {"auth": "jwt", "role": payload.get("role"), "sub": payload.get("sub")} |
| except Exception: |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired session") |
|
|
|
|
| def public_vision_allowed() -> bool: |
| """Anonymous access to vision/WS routes (HF Spaces — no JWT refresh interruptions).""" |
| val = os.getenv("ALLOW_PUBLIC_VISION", os.getenv("CEPHEUS_PUBLIC_VISION", "")) |
| return str(val).strip().lower() in ("1", "true", "yes", "on") |
|
|
|
|
| PUBLIC_VISION_PRINCIPAL = {"auth": "public", "role": "admin", "sub": "public-vision"} |
|
|
|
|
| def require_vision_access( |
| request: Request, |
| x_api_key: str | None = Header(default=None, alias="X-API-Key"), |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| if public_vision_allowed(): |
| return PUBLIC_VISION_PRINCIPAL |
| return require_operator(request, x_api_key, authorization) |
|
|
|
|
| def require_operator( |
| request: Request, |
| x_api_key: str | None = Header(default=None, alias="X-API-Key"), |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| """API key or JWT for operator routes. |
| |
| CEPHEUS_API_KEY grants service-role automation (full operator API access). |
| Optional CEPHEUS_READONLY_API_KEY maps to role ``readonly`` (GET-only routes). |
| Set CEPHEUS_API_KEY_SCOPE to document intended key use for operators/audit. |
| """ |
| if not auth_service.auth_enabled(): |
| require_api_key(x_api_key) |
| principal = {"auth": "api_key", "role": _api_key_role(x_api_key)} |
| _enforce_readonly_writes(request, principal) |
| return principal |
| bearer = _resolve_bearer(authorization) |
| if bearer: |
| _enforce_readonly_writes(request, bearer) |
| return bearer |
| if _api_key_valid(x_api_key): |
| principal = {"auth": "api_key", "role": _api_key_role(x_api_key)} |
| _enforce_readonly_writes(request, principal) |
| return principal |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required") |
|
|
|
|
| def require_operator_flexible( |
| request: Request, |
| x_api_key: str | None = Header(default=None, alias="X-API-Key"), |
| authorization: str | None = Header(default=None), |
| api_key: str | None = Query(default=None), |
| token: str | None = Query(default=None), |
| ) -> dict: |
| """Like require_operator; dev-only query credentials for legacy img/file tags.""" |
| q_api = request.query_params.get("api_key") |
| q_token = request.query_params.get("token") |
| if security_config.is_production() and (q_api or q_token or api_key or token): |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail="Query-string authentication is disabled in production", |
| ) |
| if token and auth_service.auth_enabled() and not security_config.is_production(): |
| try: |
| payload = auth_service.decode_access_token(token) |
| return {"auth": "jwt", "role": payload.get("role"), "sub": payload.get("sub")} |
| except Exception: |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid or expired session") |
| return require_operator(request, x_api_key or api_key, authorization) |
|
|
|
|
| def require_operator_if_auth_enabled( |
| request: Request, |
| x_api_key: str | None = Header(default=None, alias="X-API-Key"), |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| """Require credentials when auth is on; allow anonymous access on public-vision HF.""" |
| if public_vision_allowed(): |
| try: |
| return require_operator(request, x_api_key, authorization) |
| except HTTPException as exc: |
| if exc.status_code == status.HTTP_401_UNAUTHORIZED: |
| return PUBLIC_VISION_PRINCIPAL |
| raise |
| return require_operator(request, x_api_key, authorization) |
|
|
|
|
| def require_dashboard_read( |
| request: Request, |
| x_api_key: str | None = Header(default=None, alias="X-API-Key"), |
| authorization: str | None = Header(default=None), |
| ) -> dict: |
| """SOS/staff read routes — anonymous on HF public vision (stale JWT must not 401).""" |
| if public_vision_allowed(): |
| return PUBLIC_VISION_PRINCIPAL |
| principal = require_operator(request, x_api_key, authorization) |
| if not auth_service.has_role(principal, "staff", "admin", "operator", "service"): |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") |
| return principal |
|
|
|
|
| def require_role(*roles: str): |
| def _dep(principal: dict = Depends(require_operator)) -> dict: |
| if not auth_service.has_role(principal, *roles): |
| raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Insufficient permissions") |
| return principal |
| return _dep |
|
|
|
|
| require_admin = require_role("admin") |
| require_staff = require_role("staff", "admin", "operator") |
| require_staff_read = require_role("staff", "admin", "operator", "service") |
|
|
|
|
| def _audit_destructive(principal: dict, action: str, detail: str = "") -> None: |
| """Log destructive operations for interim RBAC audit trail (ISSUE-182 mitigation).""" |
| logger.warning( |
| "DESTRUCTIVE action=%s user=%s role=%s detail=%s", |
| action, |
| principal.get("sub"), |
| principal.get("role"), |
| detail, |
| ) |
|
|
|
|
| def require_admin_audited(action: str): |
| """Admin-only dependency with destructive-action audit logging.""" |
|
|
| def _dep(principal: dict = Depends(require_admin)) -> dict: |
| _audit_destructive(principal, action) |
| return principal |
|
|
| return _dep |
|
|
|
|
| def _ws_auth_ok(websocket: WebSocket) -> bool: |
| if public_vision_allowed(): |
| return True |
| if os.getenv("CEPHEUS_WS_OPEN", "").strip() == "1": |
| return True |
|
|
| if not auth_service.auth_enabled() and not API_KEY: |
| if not security_config.is_production(): |
| return True |
| return False |
|
|
| ticket = websocket.query_params.get("ticket") |
| if ticket and auth_service.decode_ws_ticket(ticket): |
| return True |
|
|
| hdr_key = websocket.headers.get("x-api-key") |
| if hdr_key and _api_key_valid(hdr_key): |
| return True |
|
|
| api_key = websocket.query_params.get("api_key") |
| if api_key and _api_key_valid(api_key): |
| return True |
|
|
| if security_config.is_production(): |
| return False |
|
|
| token = websocket.query_params.get("token") |
| if token and auth_service.auth_enabled() and auth_service.decode_ws_token(token): |
| return True |
|
|
| return False |
|
|
|
|
| def _ws_principal(websocket: WebSocket) -> dict: |
| if public_vision_allowed(): |
| return PUBLIC_VISION_PRINCIPAL |
| ticket = websocket.query_params.get("ticket") |
| if ticket: |
| decoded = auth_service.decode_ws_ticket(ticket) |
| if decoded: |
| return {"auth": "jwt", "role": decoded.get("role"), "sub": decoded.get("sub")} |
|
|
| if not IS_PRODUCTION: |
| token = websocket.query_params.get("token") |
| if token and auth_service.auth_enabled(): |
| ws_decoded = auth_service.decode_ws_token(token) |
| if ws_decoded: |
| return {"auth": "jwt", "role": ws_decoded.get("role"), "sub": ws_decoded.get("sub")} |
| try: |
| payload = auth_service.decode_access_token(token) |
| return {"auth": "jwt", "role": payload.get("role"), "sub": payload.get("sub")} |
| except Exception: |
| pass |
|
|
| hdr_key = websocket.headers.get("x-api-key") |
| if hdr_key and _api_key_valid(hdr_key): |
| return {"auth": "api_key", "role": _api_key_role(hdr_key), "sub": "api_key"} |
|
|
| api_key = websocket.query_params.get("api_key") |
| if api_key and _api_key_valid(api_key): |
| return {"auth": "api_key", "role": _api_key_role(api_key), "sub": "api_key"} |
|
|
| return {"auth": "open", "role": "operator", "sub": "operator"} |
|
|
|
|
| def _ws_can_mutate(principal: dict) -> bool: |
| """Signage/camera WS mutations require operator-level roles (not guest/readonly).""" |
| if public_vision_allowed(): |
| return True |
| role = principal.get("role") |
| if role in ("readonly", "guest"): |
| return False |
| if role in ("admin", "operator", "staff", "service"): |
| return True |
| return principal.get("auth") != "open" |
|
|
|
|
| def _read_limited_bytes(data: bytes, max_bytes: int, kind: str) -> bytes: |
| if len(data) > max_bytes: |
| raise HTTPException( |
| status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, |
| detail=f"{kind} exceeds maximum allowed size", |
| ) |
| return data |
|
|
|
|
| _UPLOADS_PATH = os.path.join(DATA_DIR, "uploads") |
|
|
|
|
| def _safe_upload_path(original_name: str) -> tuple[str, str]: |
| suffix = Path(original_name).suffix.lower() |
| if suffix not in ALLOWED_VIDEO_EXTENSIONS: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported video format") |
| safe_name = f"{uuid.uuid4().hex}{suffix}" |
| upload_dir = _UPLOADS_PATH |
| os.makedirs(upload_dir, exist_ok=True) |
| return upload_dir, os.path.join(upload_dir, safe_name) |
|
|
|
|
| def _safe_image_upload_path(original_name: str) -> tuple[str, str]: |
| suffix = Path(original_name).suffix.lower() or ".jpg" |
| if suffix == ".jpeg": |
| suffix = ".jpg" |
| if suffix not in ALLOWED_IMAGE_EXTENSIONS: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image format") |
| safe_name = f"{uuid.uuid4().hex}{suffix}" |
| upload_dir = _UPLOADS_PATH |
| os.makedirs(upload_dir, exist_ok=True) |
| return upload_dir, os.path.join(upload_dir, safe_name) |
|
|
| from fastapi.responses import FileResponse |
|
|
| app.add_middleware(SecurityHeadersMiddleware) |
| app.add_middleware(RequestContextMiddleware) |
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=_parse_origins(), |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| _FACE_DB_PATH = os.path.join(_FR_PATH, "face_database") |
| os.makedirs(_FACE_DB_PATH, exist_ok=True) |
| os.makedirs(_UPLOADS_PATH, exist_ok=True) |
| os.makedirs(BLUEPRINT_DIR, exist_ok=True) |
| os.makedirs(GOSSIP_HISTORY_DIR, exist_ok=True) |
|
|
| vision_engine = VisionEngine() |
|
|
|
|
| def _face_engine_usable() -> bool: |
| fe = getattr(vision_engine, "face_engine", None) |
| if fe is None or getattr(fe, "app", None) is None: |
| return False |
| return True |
|
|
|
|
| register_face_ready_probe(_face_engine_usable) |
|
|
| |
| if _face_engine_usable(): |
| mark_warmload_complete({"insightface": True, "early_ready": True}) |
| _warmload_complete = True |
| logger.info("Face engine ready at startup (InsightFace loaded, enrolled DB pending warmload).") |
|
|
| |
| _FACE_WORKERS = max(2, min(4, int(os.getenv("CEPHEUS_FACE_WORKERS", "0") or 0) or (os.cpu_count() or 2))) |
| if public_vision_allowed() or os.getenv("CEPHEUS_FORCE_SINGLE_INFER", "").strip() == "1": |
| _FACE_WORKERS = 2 |
| _FACE_EXECUTOR = ThreadPoolExecutor(max_workers=_FACE_WORKERS, thread_name_prefix="cepheus-face") |
| _CAMERA_EXECUTOR = ThreadPoolExecutor(max_workers=4, thread_name_prefix="cepheus-camera") |
|
|
| _vision_pipeline: VisionFramePipeline | None = None |
|
|
|
|
| def _infer_matches_sync(cam_id: str, frame) -> list: |
| fe = vision_engine.face_engine |
| _ensure_face_db(fe) |
| if hasattr(fe, "match_all_faces"): |
| return fe.match_all_faces(frame) |
| return [] |
|
|
|
|
| def _get_vision_pipeline() -> VisionFramePipeline: |
| global _vision_pipeline |
| if _vision_pipeline is None: |
| _vision_pipeline = VisionFramePipeline(_infer_matches_sync) |
| return _vision_pipeline |
|
|
|
|
| async def _run_face_work(fn, *args): |
| loop = asyncio.get_event_loop() |
| return await loop.run_in_executor(_FACE_EXECUTOR, fn, *args) |
|
|
|
|
| def _ensure_face_db(fe) -> None: |
| """Reload embeddings only when the on-disk store changed (not every frame).""" |
| if fe is None: |
| return |
| if hasattr(fe, "ensure_db"): |
| fe.ensure_db() |
| elif hasattr(fe, "reload_db"): |
| fe.reload_db() |
|
|
|
|
| def _invalidate_face_db(fe) -> None: |
| if fe is None: |
| return |
| if hasattr(fe, "invalidate_db"): |
| fe.invalidate_db() |
| elif hasattr(fe, "reload_db"): |
| fe.reload_db() |
|
|
| |
| |
| |
|
|
| class Alert(BaseModel): |
| id: str = Field(default_factory=lambda: str(uuid.uuid4())) |
| type: str |
| location: str = "" |
| message: str |
| severity: str = "medium" |
| timestamp: str = Field(default_factory=lambda: datetime.datetime.now().isoformat()) |
| recipient: Optional[str] = None |
| sender: Optional[str] = None |
| issue_id: Optional[str] = None |
| issue: Optional[str] = None |
| attachment_url: Optional[str] = None |
| lat: Optional[float] = None |
| lng: Optional[float] = None |
|
|
|
|
| class IssueCreate(BaseModel): |
| title: Optional[str] = Field(default=None, max_length=200) |
| desc: str = Field(..., max_length=8000) |
| status: Optional[str] = Field(default="ONGOING", max_length=32) |
| priority: Optional[str] = Field(default="medium", max_length=32) |
| staff: Optional[int] = Field(default=1, ge=0, le=100) |
| room: Optional[str] = Field(default=None, max_length=120) |
|
|
|
|
| class IssuePatch(BaseModel): |
| status: Optional[str] = Field(default=None, max_length=32) |
| progress: Optional[int] = Field(default=None, ge=0, le=100) |
| desc: Optional[str] = Field(default=None, max_length=8000) |
| staff_request_status: Optional[str] = Field(default=None, max_length=32) |
|
|
|
|
| class SOSPayload(BaseModel): |
| guest_id: Optional[str] = None |
| lat: float |
| lng: float |
| location_label: Optional[str] = "Unknown" |
| message: Optional[str] = "SOS Activated" |
|
|
|
|
| class GossipTrackingStart(BaseModel): |
| staffId: Optional[str] = None |
| personName: Optional[str] = None |
| cause: Optional[str] = None |
| doc_names: Optional[List[str]] = None |
| doc_urls: Optional[List[str]] = None |
|
|
|
|
| class LoginPayload(BaseModel): |
| username: str |
| password: str |
|
|
|
|
| class RefreshPayload(BaseModel): |
| refresh_token: str |
|
|
|
|
| class LogoutPayload(BaseModel): |
| refresh_token: Optional[str] = None |
|
|
|
|
| class TrackingResetPayload(BaseModel): |
| broadcast: bool = False |
| session_id: Optional[str] = None |
| clear_agent_steps: bool = True |
| full_reset: bool = False |
|
|
|
|
| class ChatPayload(BaseModel): |
| prompt: str |
|
|
|
|
| |
| |
| |
| |
| signage_state: dict[str, bool] = { |
| "s1": False, "s2": False, "s3": False, |
| "s4": False, "s5": False, "s6": True, |
| "s8": False, |
| } |
| agentic_plans: list[dict] = [] |
| targeted_dispatches: list[dict] = [] |
| user_profile: dict = { |
| "name": os.getenv("USER_PROFILE_NAME", "Pranav Kumar"), |
| "role": os.getenv("USER_PROFILE_ROLE", "Command Operator"), |
| "age": int(os.getenv("USER_PROFILE_AGE", "27")), |
| "department": os.getenv("USER_PROFILE_DEPARTMENT", "Crisis Response"), |
| "shift": os.getenv("USER_PROFILE_SHIFT", "Night"), |
| } |
|
|
| def normalize_files_url(path: str | None) -> str | None: |
| if not path: |
| return None |
| if path.startswith("/files/"): |
| return path |
| if path.startswith("/uploads/"): |
| return f"/files{path}" |
| return path |
|
|
|
|
| async def _broadcast_signage_state(sign_id: str, active: bool) -> dict: |
| async with store_locks.signage_lock: |
| if sign_id not in signage_state: |
| signage_state[sign_id] = False |
| signage_state[sign_id] = active |
| save_signage_state() |
| payload = json.dumps({ |
| "type": "signage_update", |
| "id": sign_id, |
| "active": active, |
| "all": signage_state, |
| }) |
| await manager.broadcast(payload) |
| return {"id": sign_id, "active": active} |
|
|
|
|
| def _sanitize_person_name(name: str) -> str: |
| import re |
| cleaned = name.strip().replace(" ", "_") |
| if not re.fullmatch(r"[A-Za-z0-9_-]{1,64}", cleaned): |
| raise HTTPException(status_code=400, detail="Name must be 1-64 alphanumeric characters, spaces, hyphens, or underscores") |
| return cleaned |
|
|
|
|
| def _reverse_geocode_label(lat: float, lng: float) -> str: |
| api_key = os.getenv("GOOGLE_MAPS_API_KEY", "").strip() |
| if not api_key: |
| return f"Lat: {lat:.4f}, Lng: {lng:.4f}" |
| try: |
| import requests |
| resp = requests.get( |
| "https://maps.googleapis.com/maps/api/geocode/json", |
| params={"latlng": f"{lat},{lng}", "key": api_key}, |
| timeout=5, |
| ) |
| data = resp.json() |
| results = data.get("results") or [] |
| if results: |
| return str(results[0].get("formatted_address", ""))[:120] or f"Lat: {lat:.4f}, Lng: {lng:.4f}" |
| except Exception: |
| pass |
| return f"Lat: {lat:.4f}, Lng: {lng:.4f}" |
|
|
|
|
| def _build_contact_network(hours: float | None = None) -> dict: |
| """Aggregate gossip co-presence into contact-network nodes/edges.""" |
| raw = gossip_bridge.get_gossip_json() |
| detection_by_name = {d.get("name"): d for d in detections_history if d.get("name")} |
| cutoff = None |
| if hours is not None: |
| cutoff = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=hours) |
|
|
| meet_counts: dict[str, int] = {} |
| all_people: set[str] = set() |
| for link in raw.get("links") or []: |
| if link.get("source"): |
| all_people.add(link["source"]) |
| if link.get("target"): |
| all_people.add(link["target"]) |
| |
| import re as _re |
| fr = getattr(vision_engine, "face_results", None) or {} |
| for faces in fr.values(): |
| for f in faces or []: |
| name = f.get("name") if isinstance(f, dict) else None |
| if not name: |
| continue |
| lowered = name.lower() |
| |
| if lowered in ("unknown", "unidentified", ""): |
| continue |
| all_people.add(name) |
|
|
| nodes = [] |
| for person in sorted(p for p in all_people if p): |
| det = detection_by_name.get(person) or {} |
| meet_counts[person] = sum( |
| 1 for lnk in (raw.get("links") or []) |
| if person in (lnk.get("source"), lnk.get("target")) |
| ) |
| nodes.append({ |
| "id": person, |
| "name": person, |
| "role": "Staff", |
| "photoUrl": normalize_files_url(f"/files/face/{person}/{person}.jpg"), |
| "detectionCount": meet_counts.get(person, 0) + (1 if det else 0), |
| "lastSeen": det.get("seen_at"), |
| "lastLocation": det.get("camId") or "", |
| }) |
|
|
| edges = [] |
| for link in raw.get("links") or []: |
| pa, pb = link.get("source"), link.get("target") |
| interactions = link.get("interactions") or [] |
| filtered = interactions |
| if cutoff: |
| filtered = [ |
| i for i in interactions |
| if i.get("timestamp") and datetime.datetime.fromisoformat( |
| str(i["timestamp"]).replace("Z", "+00:00") |
| ) >= cutoff |
| ] |
| if cutoff and not filtered: |
| continue |
| use = filtered or interactions |
| last_ts = max((i.get("timestamp") or "") for i in use) if use else "" |
| locations = sorted({i.get("camera") or i.get("cam_id") or "Unknown" for i in use}) |
| edges.append({ |
| "source": pa, |
| "target": pb, |
| "meetCount": len(use) or link.get("strength", 1), |
| "lastMet": last_ts, |
| "locations": locations, |
| }) |
|
|
| return {"nodes": nodes, "edges": edges, "updatedAt": datetime.datetime.now(datetime.timezone.utc).isoformat()} |
|
|
|
|
| async def _broadcast_signage_alert(sign_type: str, lat: float, lng: float, action: str = "deployed"): |
| location = await asyncio.get_event_loop().run_in_executor(None, _reverse_geocode_label, lat, lng) |
| if action == "removed": |
| message = f"SIGNAGE REMOVED: {sign_type.replace('_', ' ')} at {location} has been cleared." |
| else: |
| template = SIGNAGE_BROADCAST_TEMPLATES.get(sign_type, "NEW SIGNAGE DEPLOYED: {type} at {location}.") |
| message = template.format(location=location, type=sign_type.replace("_", " ")) |
| alert = Alert( |
| id=f"signage-{uuid.uuid4().hex[:8]}", |
| type="broadcast", |
| location=location, |
| message=message, |
| severity="high", |
| timestamp=datetime.datetime.now().isoformat(), |
| ) |
| await _process_new_alert(alert, source="SIGNAGE") |
| entry = { |
| "id": f"bc-{uuid.uuid4().hex[:8]}", |
| "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| "message": message, |
| "recipients": ["All Staff & Units"], |
| "relatedIssue": None, |
| "sentBy": "signage-system", |
| "deliveryCount": 1, |
| } |
| alerts_broadcast_log.insert(0, entry) |
| del alerts_broadcast_log[100:] |
| save_json(ALERTS_LOG_FILE, alerts_broadcast_log) |
| return message |
|
|
|
|
| async def _append_incident_log(level: str, source: str, event_type: str, message: str, metadata: dict | None = None): |
| log_entry = { |
| "type": "incident_log_entry", |
| "ts": datetime.datetime.now().strftime("%H:%M:%S"), |
| "level": level, |
| "source": source, |
| "event_type": event_type, |
| "msg": message, |
| "metadata": metadata or {}, |
| } |
| incident_logs.insert(0, log_entry) |
| del incident_logs[300:] |
| save_logs() |
| await manager.broadcast(json.dumps(log_entry)) |
|
|
|
|
| def _count_live_faces() -> int: |
| fr = getattr(vision_engine, "face_results", None) or {} |
| total = 0 |
| for faces in fr.values(): |
| total += len([ |
| f for f in (faces or []) |
| if str(f.get("name", "")).lower() not in ("unknown", "unidentified", "none", "") |
| ]) |
| return total |
|
|
|
|
| def _strip_face_embeddings(face_results: dict | None) -> dict: |
| """Return face results suitable for API/WS clients without raw embeddings.""" |
| sanitized: dict = {} |
| for cam_id, faces in (face_results or {}).items(): |
| sanitized[cam_id] = [ |
| {k: v for k, v in (face or {}).items() if k != "embedding"} |
| for face in (faces or []) |
| ] |
| return sanitized |
|
|
|
|
| def _normalize_dispatch_event( |
| *, |
| dispatch_id: str | None = None, |
| alert_id: str | None = None, |
| issue_id: str | None = None, |
| recipient: str | None = None, |
| recipients: list | None = None, |
| message: str | None = None, |
| location: str | None = None, |
| severity: str | None = None, |
| alert_type: str | None = None, |
| status: str = "sent", |
| timestamp: str | None = None, |
| simulation: bool = False, |
| routing_demo: bool = False, |
| ) -> dict: |
| """Single schema for targeted_alert_dispatch WS events and /dispatches storage.""" |
| ts = timestamp or datetime.datetime.now().isoformat() |
| rec_list = list(recipients) if recipients else [] |
| rec = recipient or (", ".join(str(r) for r in rec_list[:5]) if rec_list else "Staff") |
| if not rec_list and rec: |
| rec_list = [rec] |
| msg = message or (f"Dispatch to {rec}" + (f" @ {location}" if location else "")) |
| return { |
| "type": "targeted_alert_dispatch", |
| "dispatch_id": dispatch_id or str(uuid.uuid4()), |
| "timestamp": ts, |
| "created_at": ts, |
| "alert_id": alert_id, |
| "issue_id": issue_id, |
| "recipient": rec, |
| "recipients": rec_list, |
| "message": msg, |
| "location": location or "", |
| "severity": severity or "medium", |
| "alert_type": alert_type, |
| "status": status, |
| "simulation": bool(simulation), |
| "routing_demo": bool(routing_demo), |
| } |
|
|
|
|
| def _build_incident_payload(alert: Alert, extra_context: dict | None = None) -> dict: |
| room_counts = {room["label"]: room.get("occupancy", 0) for room in vision_engine.room_stats.values()} if hasattr(vision_engine, "room_stats") else {} |
| |
| |
| recent_alerts = [a if isinstance(a, dict) else a.model_dump() for a in alerts_db[-5:]] if alerts_db else [] |
| |
| crowd = getattr(vision_engine, "last_crowd_count", None) |
| if crowd is None: |
| crowd = getattr(vision_engine, "last_total_count", 0) |
|
|
| context = { |
| "alert_id": alert.id, |
| "type": alert.type, |
| "severity": alert.severity, |
| "location": alert.location, |
| "message": alert.message, |
| "lat": alert.lat, |
| "lng": alert.lng, |
| "crowd_count": crowd, |
| "live_face_count": _count_live_faces(), |
| "room_counts": room_counts, |
| "recent_alerts": recent_alerts, |
| "timestamp": alert.timestamp, |
| } |
| if extra_context: |
| context.update(extra_context) |
| return context |
|
|
|
|
| async def _process_new_alert(alert: Alert, source: str = "SYSTEM", extra_context: dict | None = None, *, sos_mode: bool = False): |
| async with store_locks.alerts_lock: |
| alerts_db.append(alert) |
| _trim_memory_list(alerts_db, 200) |
| save_alerts() |
| logger.info(f"Alert: {alert.type} @ {alert.location}") |
|
|
| if not sos_mode: |
| await manager.broadcast(json.dumps({"type": "alert_record", "alert": alert.model_dump()})) |
| log_entry = { |
| "type": "log_entry", |
| "ts": datetime.datetime.now().strftime("%H:%M:%S"), |
| "level": "CRIT" if alert.severity == "critical" else "WARN", |
| "source": source, |
| "event_type": alert.type, |
| "msg": alert.message, |
| "metadata": {"severity": alert.severity}, |
| } |
| await manager.broadcast(json.dumps(log_entry)) |
|
|
| if alert.type == "chat" or sos_mode: |
| return |
|
|
| if alert.type in ("direct_message", "staff_request", "broadcast"): |
| recipient = alert.recipient or (extra_context or {}).get("recipient") or alert.location or "Staff" |
| issue_id = await _ensure_staff_request_issue_id(alert, extra_context) |
| if alert.type == "staff_request" and not issue_id: |
| logger.error("Staff request dispatch blocked: could not resolve issue_id") |
| return |
| dispatch_event = _normalize_dispatch_event( |
| issue_id=issue_id, |
| recipient=recipient, |
| message=alert.message, |
| location=alert.location, |
| severity=alert.severity, |
| alert_type=alert.type, |
| alert_id=alert.id, |
| ) |
| targeted_dispatches.insert(0, dispatch_event) |
| _trim_memory_list(targeted_dispatches, 200) |
| save_targeted_dispatches() |
| if alert.type == "broadcast": |
| bc_entry = { |
| "id": alert.id, |
| "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| "message": alert.message, |
| "recipients": [recipient] if recipient else ["All Staff & Units"], |
| "relatedIssue": issue_id, |
| "sentBy": source or "command", |
| "deliveryCount": 1, |
| } |
| alerts_broadcast_log.insert(0, bc_entry) |
| del alerts_broadcast_log[100:] |
| save_json(ALERTS_LOG_FILE, alerts_broadcast_log) |
| await manager.broadcast(json.dumps(dispatch_event)) |
| if alert.type == "staff_request": |
| staff_req = { |
| "id": f"SR-{uuid.uuid4().hex[:6].upper()}", |
| "issue_id": issue_id, |
| "location": alert.location, |
| "message": alert.message, |
| "status": "pending", |
| "created_at": datetime.datetime.now().isoformat(), |
| "assignments": [], |
| "progress": 0, |
| "severity": alert.severity or "medium", |
| "alert_id": alert.id, |
| **_issue_context_for_staff(issue_id), |
| } |
| staff_requests_db.insert(0, staff_req) |
| save_staff_requests() |
| await manager.broadcast(json.dumps({"type": "staff_request_created", "request": staff_req})) |
| if issue_id: |
| for iss in issues_db: |
| if iss.get("id") == issue_id: |
| iss["staff_request_status"] = "pending" |
| iss.setdefault("staff_needed", iss.get("staff", 1)) |
| save_issues() |
| await manager.broadcast(json.dumps({"type": "issue_update", "issue": iss})) |
| break |
| if alert.type in ("staff_request", "broadcast"): |
| incident_payload = _build_incident_payload(alert, extra_context) |
| plan = generate_agentic_plan(incident_payload) |
| plan_event = { |
| "type": "agentic_plan_update", |
| "plan_id": str(uuid.uuid4()), |
| "created_at": datetime.datetime.now().isoformat(), |
| "incident": incident_payload, |
| "plan": plan, |
| } |
| agentic_plans.insert(0, plan_event) |
| _trim_memory_list(agentic_plans, 100) |
| save_agentic_plans() |
| await manager.broadcast(json.dumps(plan_event)) |
| return |
|
|
| incident_payload = _build_incident_payload(alert, extra_context) |
| plan = generate_agentic_plan(incident_payload) |
| plan_event = { |
| "type": "agentic_plan_update", |
| "plan_id": str(uuid.uuid4()), |
| "created_at": datetime.datetime.now().isoformat(), |
| "incident": incident_payload, |
| "plan": plan, |
| } |
| agentic_plans.insert(0, plan_event) |
| _trim_memory_list(agentic_plans, 100) |
| save_agentic_plans() |
| await manager.broadcast(json.dumps(plan_event)) |
|
|
| recipients = route_alert(incident_payload) |
| dispatch_event = _normalize_dispatch_event( |
| alert_id=alert.id, |
| issue_id=alert.issue_id or alert.issue, |
| location=alert.location, |
| severity=alert.severity, |
| recipients=recipients, |
| message=alert.message, |
| alert_type=alert.type, |
| routing_demo=True, |
| ) |
| targeted_dispatches.insert(0, dispatch_event) |
| _trim_memory_list(targeted_dispatches, 200) |
| save_targeted_dispatches() |
| await manager.broadcast(json.dumps(dispatch_event)) |
|
|
| await _append_incident_log( |
| "CRIT" if alert.severity == "critical" else "WARN", |
| source, |
| alert.type, |
| f"Agentic plan generated and dispatched to {len(recipients)} recipients.", |
| {"alert_id": alert.id, "plan_id": plan_event["plan_id"], "dispatch_id": dispatch_event["dispatch_id"]}, |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class ConnectionManager: |
| def __init__(self): |
| self.active_connections: List[WebSocket] = [] |
| self.agent_connections: List[WebSocket] = [] |
| self._locks: dict[WebSocket, asyncio.Lock] = {} |
|
|
| async def connect(self, websocket: WebSocket): |
| await websocket.accept() |
| self.active_connections.append(websocket) |
| self._locks[websocket] = asyncio.Lock() |
| logger.info(f"Client connected. Active: {len(self.active_connections)}") |
|
|
| async def connect_agent(self, websocket: WebSocket): |
| await websocket.accept() |
| self.agent_connections.append(websocket) |
| self._locks[websocket] = asyncio.Lock() |
| logger.info(f"Agent Client connected. Active: {len(self.agent_connections)}") |
|
|
| def disconnect(self, websocket: WebSocket): |
| if websocket in self.active_connections: |
| self.active_connections.remove(websocket) |
| if websocket in self.agent_connections: |
| self.agent_connections.remove(websocket) |
| if websocket in self._locks: |
| del self._locks[websocket] |
| logger.info(f"Client disconnected. Active: {len(self.active_connections)}, Agents: {len(self.agent_connections)}") |
|
|
| async def send_personal_message(self, message: str, websocket: WebSocket): |
| try: |
| if websocket in self._locks: |
| async with self._locks[websocket]: |
| await asyncio.wait_for(websocket.send_text(message), timeout=2.0) |
| else: |
| await asyncio.wait_for(websocket.send_text(message), timeout=2.0) |
| except Exception: |
| pass |
|
|
| async def broadcast(self, message: str): |
| dead = [] |
| for ws in list(self.active_connections): |
| try: |
| if ws in self._locks: |
| async with self._locks[ws]: |
| await asyncio.wait_for(ws.send_text(message), timeout=2.0) |
| else: |
| await asyncio.wait_for(ws.send_text(message), timeout=2.0) |
| except Exception: |
| dead.append(ws) |
| for ws in dead: |
| self.disconnect(ws) |
|
|
| async def broadcast_to_agents(self, message: str): |
| dead = [] |
| for ws in list(self.agent_connections): |
| try: |
| if ws in self._locks: |
| async with self._locks[ws]: |
| await asyncio.wait_for(ws.send_text(message), timeout=2.0) |
| else: |
| await asyncio.wait_for(ws.send_text(message), timeout=2.0) |
| except Exception: |
| dead.append(ws) |
| for ws in dead: |
| self.disconnect(ws) |
|
|
| manager = ConnectionManager() |
|
|
|
|
| |
| |
| |
|
|
|
|
| @app.get("/") |
| async def root(): |
| """Base URL: browsers often open this first; the API lives on named paths, not here.""" |
| return { |
| "service": "Cepheus API", |
| "status": "running", |
| "liveness": "/health", |
| "websocket": "/ws", |
| } |
|
|
|
|
| @app.get("/health") |
| async def health(): |
| if IS_PRODUCTION: |
| return {"status": "ok"} |
| return { |
| "status": "ok", |
| "active_ws": len(manager.active_connections), |
| "cameras": list(vision_engine.camera_indices.keys()), |
| "auth_enabled": auth_service.auth_enabled(), |
| "refresh_store": auth_service.refresh_store_backend(), |
| } |
|
|
|
|
| @app.get("/health/live") |
| async def health_live(): |
| """Lightweight liveness probe — no auth, no ML warmload side effects.""" |
| try: |
| return live_status_payload() |
| except Exception as exc: |
| logger.warning("health/live failed: %s", exc) |
| return {"status": "warming", "message": "recovering"} |
|
|
|
|
| @app.get("/debug/face_status") |
| async def debug_face_status(): |
| """Diagnose face engine state on deployed instances (no auth).""" |
| fe = getattr(vision_engine, "face_engine", None) |
| db = getattr(fe, "db", None) or {} |
| cache = getattr(fe, "_unknown_cache", None) or {} |
| try: |
| return { |
| "warmload_complete": bool(_warmload_complete or get_warmload_state().get("complete")), |
| "model_pack": os.environ.get("FACE_MODEL_PACK", "buffalo_sc"), |
| "model_loaded": fe is not None and getattr(fe, "app", None) is not None, |
| "enrolled_faces": len([k for k in db if not str(k).startswith("unknown_")]), |
| "unknown_cache_size": len(cache), |
| "threshold": float(os.environ.get("FACE_MATCH_THRESHOLD", "0.22")), |
| } |
| except Exception as exc: |
| return { |
| "error": str(exc), |
| "warmload_complete": bool(_warmload_complete or get_warmload_state().get("complete")), |
| } |
|
|
|
|
| @app.get("/health/ready") |
| async def health_ready(): |
| if os.getenv("CEPHEUS_PRODUCTION", "").strip() == "1": |
| if not os.getenv("CEPHEUS_JWT_SECRET", "").strip(): |
| raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="JWT secret not configured") |
| if os.getenv("CEPHEUS_AUTH_DEV_MODE", "").strip() == "1": |
| raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Dev auth disabled in production") |
| return {"status": "ready", "auth_enabled": auth_service.auth_enabled()} |
|
|
|
|
| @app.get("/auth/status") |
| async def auth_status(): |
| try: |
| mode = "demo" |
| if auth_service.auth_enabled(): |
| mode = "production" if os.getenv("CEPHEUS_JWT_SECRET", "").strip() else "dev" |
| return {"auth_enabled": auth_service.auth_enabled(), "mode": mode} |
| except Exception as exc: |
| logger.warning("auth/status failed: %s", exc) |
| return {"auth_enabled": False, "mode": "demo", "error": "status_unavailable"} |
|
|
|
|
| @app.post("/auth/login") |
| async def auth_login(payload: LoginPayload, request: Request): |
| login_limiter.check(request, "login") |
| if not auth_service.auth_enabled(): |
| if not _demo_login_allowed(request): |
| raise HTTPException( |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, |
| detail="Authentication not configured", |
| ) |
| role = "admin" if (payload.username or "").lower() in ("admin", "command") else "staff" |
| return { |
| "mode": "demo", |
| "token_type": "demo", |
| "user": {"username": payload.username or "operator", "role": role}, |
| } |
| try: |
| user = auth_service.verify_user(payload.username, payload.password) |
| except RuntimeError as exc: |
| logger.error("Auth misconfiguration: %s", exc) |
| raise HTTPException( |
| status_code=status.HTTP_503_SERVICE_UNAVAILABLE, |
| detail="Authentication misconfigured", |
| ) from exc |
| if not user: |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials") |
| |
| if public_vision_allowed(): |
| role = user.get("role") or "admin" |
| return { |
| "mode": "demo", |
| "token_type": "demo", |
| "user": {"username": user.get("username") or payload.username, "role": role}, |
| } |
| return auth_service.create_token_pair(user["username"], user["role"]) |
|
|
|
|
| @app.post("/auth/refresh") |
| async def auth_refresh(payload: RefreshPayload, request: Request): |
| refresh_limiter.check(request, "refresh") |
| if public_vision_allowed(): |
| return { |
| "mode": "demo", |
| "token_type": "demo", |
| "access_token": "", |
| "refresh_token": "", |
| "expires_in": 86400, |
| } |
| if not auth_service.auth_enabled(): |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Auth not enabled") |
| try: |
| return auth_service.refresh_access_token(payload.refresh_token) |
| except Exception: |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid refresh token") |
|
|
|
|
| @app.post("/auth/logout") |
| async def auth_logout(payload: LogoutPayload): |
| if payload.refresh_token: |
| auth_service.revoke_refresh_token(payload.refresh_token) |
| return {"status": "ok"} |
|
|
|
|
| @app.post("/auth/ws-ticket") |
| async def auth_ws_ticket(principal: dict = Depends(require_operator_if_auth_enabled)): |
| """Issue a short-lived WebSocket ticket (preferred over api_key query param).""" |
| if principal.get("auth") == "open": |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Auth not enabled") |
| username = principal.get("sub") or "operator" |
| role = principal.get("role") or "staff" |
| ticket = auth_service.create_ws_ticket(username, role) |
| return {"ticket": ticket, "expires_in": 60} |
|
|
|
|
| @app.get("/files/face/{person}/{filename}") |
| async def serve_face_image(person: str, filename: str, _: dict = Depends(require_operator_flexible)): |
| safe_person = Path(person).name |
| safe_name = Path(filename).name |
| path = os.path.join(_FACE_DB_PATH, safe_person, safe_name) |
| if not os.path.isfile(path): |
| raise HTTPException(status_code=404, detail="Image not found") |
| return FileResponse(path) |
|
|
|
|
| @app.post("/files/upload") |
| async def upload_image_file(file: UploadFile = File(...), _: dict = Depends(require_operator)): |
| """Upload an image; returns authenticated file URL path.""" |
| if file.content_type not in ALLOWED_IMAGE_TYPES: |
| raise HTTPException(status_code=400, detail="Unsupported image type") |
| contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") |
| _, file_path = _safe_image_upload_path(file.filename or "upload.jpg") |
| with open(file_path, "wb") as f: |
| f.write(contents) |
| rel = normalize_files_url(f"/uploads/{os.path.basename(file_path)}") |
| return {"url": rel, "path": rel} |
|
|
|
|
| @app.get("/files/uploads/{file_path:path}") |
| async def serve_upload_file(file_path: str, _: dict = Depends(require_operator_if_auth_enabled)): |
| base = os.path.abspath(_UPLOADS_PATH) |
| target = os.path.abspath(os.path.join(_UPLOADS_PATH, file_path)) |
| if not target.startswith(base + os.sep) and target != base: |
| raise HTTPException(status_code=400, detail="Invalid path") |
| if not os.path.isfile(target): |
| raise HTTPException(status_code=404, detail="File not found") |
| return FileResponse(target) |
|
|
|
|
| @app.get("/auth/me") |
| async def auth_me(principal: dict = Depends(require_operator_if_auth_enabled)): |
| if principal.get("auth") == "public": |
| return {"authenticated": True, "username": "public-vision", "role": principal.get("role", "admin")} |
| if principal.get("auth") == "jwt": |
| return { |
| "authenticated": True, |
| "username": principal.get("sub"), |
| "role": principal.get("role"), |
| } |
| return {"authenticated": True, "role": principal.get("role", "service"), "auth": "api_key"} |
|
|
|
|
| @app.get("/alerts", response_model=List[Alert]) |
| async def get_alerts(_: dict = Depends(require_operator_if_auth_enabled)): |
| return alerts_db |
|
|
|
|
| @app.post("/alert", response_model=Alert) |
| async def create_alert(alert: Alert, _: dict = Depends(require_operator)): |
| if any( |
| (a.get("id") if isinstance(a, dict) else getattr(a, "id", None)) == alert.id |
| for a in alerts_db |
| ): |
| return alert |
| await _process_new_alert(alert, source=alert.location or "ALERT_API") |
| return alert |
|
|
|
|
| async def _process_sos_event(payload: SOSPayload, *, source: str = "GUEST_APP") -> dict: |
| event = { |
| "id": str(uuid.uuid4()), |
| "guest_id": payload.guest_id or "unknown-guest", |
| "lat": payload.lat, |
| "lng": payload.lng, |
| "location_label": payload.location_label, |
| "message": payload.message, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| async with store_locks.sos_lock: |
| sos_events.append(event) |
| _trim_memory_list(sos_events, 100) |
| save_sos() |
|
|
| alert = Alert( |
| id=event["id"], |
| type="sos", |
| location=payload.location_label or f"{payload.lat:.4f},{payload.lng:.4f}", |
| message=payload.message, |
| severity="critical", |
| lat=payload.lat, |
| lng=payload.lng, |
| ) |
| await _process_new_alert(alert, source=source, extra_context={"guest_id": event["guest_id"]}, sos_mode=True) |
| await manager.broadcast(json.dumps({"type": "sos_event", **event})) |
| await _append_incident_log( |
| "CRIT", |
| source, |
| "sos", |
| f"SOS: {payload.message}", |
| {"event_id": event["id"], "lat": payload.lat, "lng": payload.lng}, |
| ) |
|
|
| logger.info(f"SOS received: {event}") |
| return {"status": "received", "event": event} |
|
|
|
|
| @app.post("/sos") |
| async def handle_sos(payload: SOSPayload, request: Request, _: dict = Depends(require_operator)): |
| sos_limiter.check(request, "sos") |
| return await _process_sos_event(payload, source="GUEST_APP") |
|
|
|
|
| @app.post("/sos/guest") |
| async def handle_guest_sos( |
| payload: SOSPayload, |
| request: Request, |
| x_api_key: str | None = Header(default=None, alias="X-API-Key"), |
| ): |
| """Guest-scoped SOS — uses CEPHEUS_GUEST_API_KEY (or service key in local dev).""" |
| if not _guest_api_key_valid(x_api_key): |
| raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid guest API key") |
| sos_limiter.check(request, "sos_guest") |
| return await _process_sos_event(payload, source="GUEST_APP") |
|
|
|
|
| @app.get("/sos") |
| async def get_sos_events(_: dict = Depends(require_dashboard_read)): |
| return sos_events[-50:] |
|
|
|
|
| def _haversine_km(lat1: float, lng1: float, lat2: float, lng2: float) -> float: |
| radius = 6371.0 |
| d_lat = math.radians(lat2 - lat1) |
| d_lng = math.radians(lng2 - lng1) |
| a = ( |
| math.sin(d_lat / 2) ** 2 |
| + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(d_lng / 2) ** 2 |
| ) |
| return radius * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) |
|
|
|
|
| _EMERGENCY_AMENITIES = { |
| "hospital": {"label": "Hospitals", "icon": "H"}, |
| "fire_station": {"label": "Fire Stn", "icon": "F"}, |
| "police": {"label": "Police", "icon": "P"}, |
| "ambulance": {"label": "Ambulance", "icon": "A"}, |
| "emergency_supplies": {"label": "Supplies", "icon": "S"}, |
| } |
|
|
| _OSM_AMENITY_TO_SERVICE = { |
| "hospital": "hospital", |
| "fire_station": "fire_station", |
| "police": "police", |
| "ambulance_station": "ambulance", |
| "clinic": "ambulance", |
| "pharmacy": "emergency_supplies", |
| "medical_supply": "emergency_supplies", |
| } |
| _NEARBY_CACHE_MAX = 64 |
| _nearby_cache: dict[str, tuple[float, dict]] = {} |
|
|
|
|
| async def _fetch_emergency_nearby_overpass(lat: float, lng: float, radius_m: int = 6000) -> dict: |
| """OpenStreetMap Overpass lookup — shared by /emergency/nearby and Maps fallback.""" |
| results: dict[str, list] = {k: [] for k in _EMERGENCY_AMENITIES} |
| cache_key = f"{round(lat, 3)},{round(lng, 3)},{radius_m}" |
| cached = _nearby_cache.get(cache_key) |
| now = datetime.datetime.now().timestamp() |
| if cached and now - cached[0] < 600: |
| return cached[1] |
|
|
| if httpx is None: |
| return {"services": results, "source": "unavailable", "error": "httpx not installed"} |
|
|
| query = f""" |
| [out:json][timeout:15]; |
| ( |
| node["amenity"="hospital"](around:{radius_m},{lat},{lng}); |
| node["amenity"="fire_station"](around:{radius_m},{lat},{lng}); |
| node["amenity"="police"](around:{radius_m},{lat},{lng}); |
| node["emergency"="ambulance_station"](around:{radius_m},{lat},{lng}); |
| node["amenity"="clinic"]["emergency"="yes"](around:{radius_m},{lat},{lng}); |
| node["amenity"="pharmacy"](around:{radius_m},{lat},{lng}); |
| node["shop"="medical_supply"](around:{radius_m},{lat},{lng}); |
| ); |
| out body 120; |
| """ |
| mirrors = [ |
| "https://overpass-api.de/api/interpreter", |
| "https://overpass.kumi.systems/api/interpreter", |
| "https://maps.mail.ru/osm/tools/overpass/api/interpreter", |
| ] |
| payload = None |
| last_error = None |
| import asyncio |
| async with httpx.AsyncClient(timeout=30.0) as client: |
| for url in mirrors: |
| for attempt in range(2): |
| try: |
| resp = await client.post( |
| url, |
| data={"data": query}, |
| headers={"User-Agent": "CepheusEmergencyConsole/1.0"}, |
| ) |
| resp.raise_for_status() |
| payload = resp.json() |
| break |
| except Exception as exc: |
| last_error = exc |
| if attempt == 0: |
| await asyncio.sleep(1.5) |
| if payload is not None: |
| break |
| logger.warning("emergency_nearby: Overpass mirror %s failed: %s", url, last_error) |
| if payload is None: |
| logger.warning("All Overpass mirrors failed, falling back to synthetic mock data.") |
| import random |
| for key, meta in _EMERGENCY_AMENITIES.items(): |
| for i in range(random.randint(1, 3)): |
| dlat = (random.random() - 0.5) * 0.05 |
| dlng = (random.random() - 0.5) * 0.05 |
| elat, elng = lat + dlat, lng + dlng |
| results[key].append({ |
| "id": f"mock-{key}-{i}", |
| "name": f"Local {meta['label']} {i+1}", |
| "vicinity": f"{random.randint(100, 999)} Emergency Road", |
| "phone": f"+1-555-01{random.randint(10,99)}", |
| "lat": elat, |
| "lng": elng, |
| "icon": meta["icon"], |
| "label": meta["label"], |
| "type": key, |
| "distKm": round(_haversine_km(lat, lng, elat, elng), 2), |
| }) |
| for key in results: |
| results[key].sort(key=lambda x: x["distKm"]) |
| results[key] = results[key][:5] |
| return {"services": results, "source": "mock_fallback", "error": str(last_error)} |
|
|
| for el in payload.get("elements", []): |
| tags = el.get("tags", {}) |
| amenity = tags.get("amenity") |
| emergency_tag = tags.get("emergency") |
| shop = tags.get("shop") |
| service_key = _OSM_AMENITY_TO_SERVICE.get(amenity) |
| if not service_key and emergency_tag == "ambulance_station": |
| service_key = "ambulance" |
| if not service_key and shop == "medical_supply": |
| service_key = "emergency_supplies" |
| meta = _EMERGENCY_AMENITIES.get(service_key) if service_key else None |
| if not meta: |
| continue |
| elat, elng = el.get("lat"), el.get("lon") |
| if elat is None or elng is None: |
| continue |
| results[service_key].append({ |
| "id": str(el.get("id")), |
| "name": tags.get("name") or meta["label"], |
| "vicinity": tags.get("addr:full") |
| or ", ".join(filter(None, [tags.get("addr:street"), tags.get("addr:city")])) |
| or tags.get("operator", ""), |
| "phone": tags.get("phone") or tags.get("contact:phone", ""), |
| "lat": elat, |
| "lng": elng, |
| "icon": meta["icon"], |
| "label": meta["label"], |
| "type": service_key, |
| "distKm": round(_haversine_km(lat, lng, elat, elng), 2), |
| }) |
|
|
| for key in results: |
| results[key].sort(key=lambda x: x["distKm"]) |
| results[key] = results[key][:5] |
|
|
| response = {"services": results, "source": "overpass"} |
| if len(_nearby_cache) >= _NEARBY_CACHE_MAX: |
| oldest_key = min(_nearby_cache, key=lambda k: _nearby_cache[k][0]) |
| _nearby_cache.pop(oldest_key, None) |
| _nearby_cache[cache_key] = (now, response) |
| return response |
|
|
|
|
| @app.get("/emergency/nearby") |
| async def emergency_nearby( |
| lat: float, |
| lng: float, |
| radius_m: int = 6000, |
| _: dict = Depends(require_operator_if_auth_enabled), |
| ): |
| """Nearest hospitals / fire stations / police via OpenStreetMap Overpass.""" |
| try: |
| return await _fetch_emergency_nearby_overpass(lat, lng, radius_m) |
| except Exception as exc: |
| logger.warning("emergency/nearby failed: %s", exc) |
| empty = {k: [] for k in _EMERGENCY_AMENITIES} |
| return {"services": empty, "source": "overpass", "error": str(exc)} |
|
|
|
|
| @app.get("/emergency/dispatch-log") |
| async def get_emergency_dispatch_log(_: dict = Depends(require_operator_if_auth_enabled)): |
| try: |
| return emergency_dispatch_log[-100:] |
| except Exception as exc: |
| logger.warning("emergency/dispatch-log failed: %s", exc) |
| return [] |
|
|
|
|
| |
| from emergency_maps_service import ( |
| get_directions_with_traffic, |
| get_hexagonal_coverage_data, |
| maps_configured, |
| maps_status_detail, |
| ) |
|
|
|
|
| @app.get("/maps/status") |
| async def maps_status(_: dict = Depends(require_operator_if_auth_enabled)): |
| return maps_status_detail() |
|
|
|
|
|
|
| @app.get("/maps/directions") |
| async def get_route( |
| origin_lat: float, |
| origin_lng: float, |
| dest_lat: float, |
| dest_lng: float, |
| place_name: str = "", |
| _: dict = Depends(require_operator), |
| ): |
| return get_directions_with_traffic(origin_lat, origin_lng, dest_lat, dest_lng, place_name) |
|
|
|
|
| @app.get("/maps/hex-coverage") |
| async def hex_coverage( |
| lat: float, |
| lng: float, |
| radius_km: float = 5.0, |
| _: dict = Depends(require_operator), |
| ): |
| return get_hexagonal_coverage_data(lat, lng, radius_km=radius_km) |
|
|
|
|
| @app.post("/maps/dispatch-route") |
| async def dispatch_route(body: dict, principal: dict = Depends(require_operator)): |
| origin = body.get("origin", {}) |
| dest = body.get("destination", {}) |
| result = get_directions_with_traffic( |
| origin.get("lat"), |
| origin.get("lng"), |
| dest.get("lat"), |
| dest.get("lng"), |
| dest.get("name", ""), |
| ) |
| _audit_destructive( |
| principal, |
| "DISPATCH_ROUTE", |
| f"destination={dest.get('name')} eta={result.get('duration_traffic')}", |
| ) |
| return result |
|
|
|
|
| @app.get("/incident/logs") |
| async def get_incident_logs(_: dict = Depends(require_operator_if_auth_enabled)): |
| return incident_logs |
|
|
|
|
| @app.get("/agentic/plans") |
| async def get_agentic_plans(_: dict = Depends(require_operator_if_auth_enabled)): |
| return agentic_plans |
|
|
|
|
| @app.get("/issues") |
| async def get_issues(_: dict = Depends(require_operator_if_auth_enabled)): |
| return issues_db |
|
|
|
|
| @app.get("/issues/{issue_id}") |
| async def get_issue_by_id(issue_id: str, _: dict = Depends(require_staff_read)): |
| """Read-only issue snapshot for staff portal (linked request detail).""" |
| for iss in issues_db: |
| if iss.get("id") == issue_id: |
| return iss |
| raise HTTPException(status_code=404, detail="Issue not found") |
|
|
|
|
| @app.get("/staff/requests") |
| async def list_staff_requests(status: Optional[str] = None, _: dict = Depends(require_dashboard_read)): |
| rows = staff_requests_db |
| if status: |
| rows = [r for r in staff_requests_db if r.get("status") == status] |
| return [_enriched_staff_request(r) for r in rows] |
|
|
|
|
| @app.post("/staff/requests/{request_id}/accept") |
| async def accept_staff_request(request_id: str, payload: dict, principal: dict = Depends(require_staff)): |
| if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): |
| raise HTTPException(status_code=403, detail="Only staff or admin JWT accounts may accept requests") |
| staff_name = (payload.get("staff_name") or principal.get("sub") or "").strip() or "Staff" |
| staff_id = (payload.get("staff_id") or staff_name).strip() or staff_name |
| async with store_locks.staff_requests_lock: |
| req = next((r for r in staff_requests_db if r.get("id") == request_id), None) |
| if not req: |
| raise HTTPException(status_code=404, detail="Staff request not found") |
| if req.get("status") not in ("pending", "sent"): |
| raise HTTPException(status_code=409, detail=f"Request already {req.get('status')}") |
| req["status"] = "accepted" |
| req["accepted_at"] = datetime.datetime.now().isoformat() |
| req["assignments"] = [{"staff_id": staff_id, "name": staff_name, "progress": 25, "stage": "accepted"}] |
| req["progress"] = 25 |
| save_staff_requests() |
| issue_id = req.get("issue_id") |
| if issue_id: |
| await _broadcast_issue_update( |
| issue_id, |
| f"[UPDATE] {staff_name} accepted the staff request.", |
| progress=25, |
| staff_request_status="accepted", |
| staff_assignments=req["assignments"], |
| ) |
| if DEMO_MODE: |
| _spawn(simulate_issue_progress(issue_id, staff_name)) |
| await _append_staff_activity_entry( |
| staff_name, |
| req.get("location") or "", |
| f"Accepted staff request {request_id}", |
| event_type="staff_accept", |
| metadata={"request_id": request_id, "issue_id": issue_id}, |
| ) |
| await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req})) |
| return req |
|
|
|
|
| @app.post("/staff/requests/{request_id}/decline") |
| async def decline_staff_request(request_id: str, payload: dict, principal: dict = Depends(require_staff)): |
| if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): |
| raise HTTPException(status_code=403, detail="Only staff or admin JWT accounts may decline requests") |
| async with store_locks.staff_requests_lock: |
| req = next((r for r in staff_requests_db if r.get("id") == request_id), None) |
| if not req: |
| raise HTTPException(status_code=404, detail="Staff request not found") |
| if req.get("status") not in ("pending", "sent"): |
| raise HTTPException(status_code=409, detail=f"Request already {req.get('status')}") |
| req["status"] = "declined" |
| req["declined_at"] = datetime.datetime.now().isoformat() |
| req["decline_reason"] = payload.get("reason", "") |
| save_staff_requests() |
| issue_id = req.get("issue_id") |
| if issue_id: |
| await _broadcast_issue_update( |
| issue_id, |
| f"[UPDATE] Staff declined request{(': ' + req['decline_reason']) if req.get('decline_reason') else ''}.", |
| staff_request_status="declined", |
| ) |
| await _append_staff_activity_entry( |
| (principal.get("sub") or "Staff").strip() or "Staff", |
| req.get("location") or "", |
| f"Declined request {request_id}: {req.get('decline_reason') or 'no reason'}", |
| event_type="staff_decline", |
| metadata={"request_id": request_id, "issue_id": issue_id}, |
| ) |
| await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req})) |
| return req |
|
|
|
|
| async def _sync_staff_request_progress( |
| issue_id: str, |
| progress: int, |
| stage: str, |
| *, |
| status: str | None = None, |
| ): |
| """Keep staff_requests_db in sync when issue progress changes.""" |
| async with store_locks.staff_requests_lock: |
| req = next( |
| ( |
| r |
| for r in staff_requests_db |
| if r.get("issue_id") == issue_id and r.get("status") in ("pending", "sent", "accepted") |
| ), |
| None, |
| ) |
| if not req: |
| return None |
| req["progress"] = progress |
| if status: |
| req["status"] = status |
| if status == "resolved": |
| req["resolved_at"] = datetime.datetime.now().isoformat() |
| for assignment in req.get("assignments") or []: |
| assignment["progress"] = progress |
| assignment["stage"] = stage |
| save_staff_requests() |
| snapshot = dict(req) |
| await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": snapshot})) |
| return snapshot |
|
|
|
|
| @app.post("/staff/requests/{request_id}/complete") |
| async def complete_staff_request( |
| request_id: str, |
| payload: dict | None = None, |
| principal: dict = Depends(require_staff), |
| ): |
| payload = payload or {} |
| if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): |
| raise HTTPException(status_code=403, detail="Only staff or admin JWT accounts may complete requests") |
| staff_name = (principal.get("sub") or "").strip() or "Staff" |
| notes = (payload.get("notes") or "").strip() |
|
|
| async with store_locks.staff_requests_lock: |
| req = next((r for r in staff_requests_db if r.get("id") == request_id), None) |
| if not req: |
| raise HTTPException(status_code=404, detail="Staff request not found") |
| if req.get("status") != "accepted": |
| raise HTTPException( |
| status_code=409, |
| detail=f"Request must be accepted before completion (current: {req.get('status')})", |
| ) |
| req["status"] = "resolved" |
| req["progress"] = 100 |
| req["resolved_at"] = datetime.datetime.now().isoformat() |
| if notes: |
| req["completion_notes"] = notes |
| assignments = req.get("assignments") or [] |
| if assignments: |
| for assignment in assignments: |
| assignment["progress"] = 100 |
| assignment["stage"] = "resolved" |
| else: |
| req["assignments"] = [ |
| {"staff_id": staff_name, "name": staff_name, "progress": 100, "stage": "resolved"} |
| ] |
| save_staff_requests() |
| issue_id = req.get("issue_id") |
| location = req.get("location") or "General" |
| request_message = req.get("message") or "" |
|
|
| if issue_id: |
| completion_msg = f"[RESOLVED] {staff_name} marked staff request complete." |
| if notes: |
| completion_msg += f" Notes: {notes}" |
| await _broadcast_issue_update( |
| issue_id, |
| completion_msg, |
| progress=100, |
| status="RESOLVED", |
| staff_request_status="resolved", |
| staff_assignments=req["assignments"], |
| ) |
|
|
| log_msg = f"{staff_name} completed staff request {request_id}" |
| if issue_id: |
| log_msg += f" for issue {issue_id}" |
| if location: |
| log_msg += f" ({location})" |
| if notes: |
| log_msg += f" — {notes}" |
|
|
| await _append_incident_log( |
| "INFO", |
| "staff_portal", |
| "staff_request_completed", |
| log_msg, |
| metadata={ |
| "request_id": request_id, |
| "issue_id": issue_id, |
| "staff_name": staff_name, |
| "location": location, |
| "message": request_message, |
| "notes": notes or None, |
| }, |
| ) |
| await _append_staff_activity_entry( |
| staff_name, |
| location, |
| log_msg, |
| event_type="staff_complete", |
| metadata={"request_id": request_id, "issue_id": issue_id, "notes": notes or None}, |
| ) |
|
|
| await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req})) |
| return req |
|
|
|
|
| @app.post("/staff/requests/{request_id}/progress") |
| async def update_staff_request_progress( |
| request_id: str, |
| payload: dict, |
| principal: dict = Depends(require_staff), |
| ): |
| """Report en route / on scene milestones back to command.""" |
| if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): |
| raise HTTPException(status_code=403, detail="Only staff JWT accounts may update progress") |
| stage = (payload.get("stage") or "").strip().lower().replace(" ", "_") |
| if stage not in ("en_route", "on_scene"): |
| raise HTTPException(status_code=400, detail="stage must be en_route or on_scene") |
| progress = _STAFF_MILESTONE_PROGRESS[stage] |
| staff_name = (principal.get("sub") or "").strip() or "Staff" |
| note = (payload.get("note") or "").strip() |
|
|
| async with store_locks.staff_requests_lock: |
| req = next((r for r in staff_requests_db if r.get("id") == request_id), None) |
| if not req: |
| raise HTTPException(status_code=404, detail="Staff request not found") |
| if req.get("status") != "accepted": |
| raise HTTPException(status_code=409, detail="Progress updates require an accepted request") |
| req["progress"] = progress |
| req["stage"] = stage |
| for assignment in req.get("assignments") or []: |
| assignment["progress"] = progress |
| assignment["stage"] = stage |
| save_staff_requests() |
| issue_id = req.get("issue_id") |
| location = req.get("location") or req.get("zone") or "site" |
|
|
| label = "En route" if stage == "en_route" else "On scene" |
| update_msg = f"[UPDATE] {staff_name} — {label}." |
| if note: |
| update_msg += f" Note: {note}" |
| if issue_id: |
| await _broadcast_issue_update( |
| issue_id, |
| update_msg, |
| progress=progress, |
| staff_request_status="accepted", |
| staff_assignments=req.get("assignments"), |
| ) |
| await _append_staff_activity_entry( |
| staff_name, |
| location, |
| update_msg.replace("[UPDATE] ", ""), |
| event_type="staff_progress", |
| metadata={"request_id": request_id, "issue_id": issue_id, "stage": stage}, |
| ) |
| await manager.broadcast(json.dumps({"type": "staff_request_updated", "request": req})) |
| return req |
|
|
|
|
| @app.get("/staff/inbox") |
| async def get_staff_inbox(_: dict = Depends(require_staff_read)): |
| """Live alerts, broadcasts, signage, and ack state for staff portal.""" |
| active_signage = [ |
| {"id": sid, "active": bool(active), "label": sid.upper()} |
| for sid, active in (signage_state or {}).items() |
| if active |
| ] |
| return { |
| "alerts": alerts_broadcast_log[:40], |
| "dispatches": targeted_dispatches[:40], |
| "signage_active": active_signage, |
| "acks": staff_inbox_acks[:100], |
| } |
|
|
|
|
| @app.post("/staff/inbox/ack") |
| async def post_staff_inbox_ack(body: dict, principal: dict = Depends(require_staff)): |
| if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): |
| raise HTTPException(status_code=403, detail="Staff JWT required") |
| staff_name = (principal.get("sub") or "").strip() or "Staff" |
| ref_id = (body.get("ref_id") or body.get("alert_id") or body.get("dispatch_id") or "").strip() |
| ref_type = (body.get("ref_type") or "broadcast").strip() |
| message = (body.get("message") or body.get("title") or "Broadcast acknowledged").strip() |
| entry = { |
| "id": f"ack-{uuid.uuid4().hex[:8]}", |
| "staff_name": staff_name, |
| "ref_id": ref_id, |
| "ref_type": ref_type, |
| "message": message, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| staff_inbox_acks.insert(0, entry) |
| del staff_inbox_acks[200:] |
| save_json(STAFF_INBOX_ACKS_FILE, staff_inbox_acks) |
| await _append_staff_activity_entry( |
| staff_name, |
| body.get("zone") or "", |
| f"Acknowledged: {message}", |
| event_type="broadcast_ack", |
| metadata={"ref_id": ref_id, "ref_type": ref_type}, |
| ) |
| await manager.broadcast(json.dumps({"type": "staff_inbox_ack", "ack": entry})) |
| return entry |
|
|
|
|
| @app.get("/staff/duty") |
| async def get_staff_duty(_: dict = Depends(require_staff_read)): |
| return staff_duty |
|
|
|
|
| @app.post("/staff/duty") |
| async def set_staff_duty(body: dict, principal: dict = Depends(require_staff)): |
| if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): |
| raise HTTPException(status_code=403, detail="Staff JWT required") |
| name = (principal.get("sub") or "").strip() or "staff" |
| status = (body.get("status") or "available").strip().lower() |
| if status not in ("available", "busy", "off_duty"): |
| raise HTTPException(status_code=400, detail="status must be available, busy, or off_duty") |
| staff_duty[name] = { |
| "status": status, |
| "updated_at": datetime.datetime.now().isoformat(), |
| } |
| save_json(STAFF_DUTY_FILE, staff_duty) |
| await manager.broadcast(json.dumps({"type": "staff_duty_update", "staff": name, **staff_duty[name]})) |
| return staff_duty[name] |
|
|
|
|
| @app.post("/staff/check-in") |
| async def staff_check_in(body: dict, principal: dict = Depends(require_staff)): |
| if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): |
| raise HTTPException(status_code=403, detail="Staff JWT required") |
| staff_name = (principal.get("sub") or "").strip() or "Staff" |
| zone = (body.get("zone") or body.get("location") or "Unknown zone").strip() |
| request_id = (body.get("request_id") or "").strip() |
| message = f"Checked in at {zone}" |
| entry = await _append_staff_activity_entry( |
| staff_name, |
| zone, |
| message, |
| event_type="check_in", |
| metadata={"request_id": request_id or None, "zone": zone}, |
| ) |
| if request_id: |
| try: |
| await update_staff_request_progress( |
| request_id, |
| {"stage": "on_scene", "note": f"Checked in at {zone}"}, |
| principal, |
| ) |
| except HTTPException as exc: |
| if exc.status_code not in (404, 409): |
| raise |
| logger.warning("check-in progress skipped for %s: %s", request_id, exc.detail) |
| return entry |
|
|
|
|
| @app.get("/staff/activity") |
| async def get_staff_activity(_: dict = Depends(require_dashboard_read)): |
| try: |
| if staff_activity: |
| return staff_activity[:50] |
| built: list[dict] = [] |
| for row in incident_logs[:30]: |
| meta = row.get("metadata") or {} |
| if row.get("event_type") in ("staff_request_completed", "staff_request_sent", "sos"): |
| built.append({ |
| "id": meta.get("request_id") or row.get("ts", ""), |
| "timestamp": row.get("ts", ""), |
| "name": meta.get("staff_name") or row.get("source", "Staff"), |
| "zone": meta.get("location") or row.get("source", ""), |
| "message": row.get("msg") or "", |
| }) |
| for d in targeted_dispatches[:20]: |
| built.append({ |
| "id": d.get("dispatch_id") or d.get("id", ""), |
| "timestamp": d.get("timestamp") or "", |
| "name": d.get("recipient") or "Staff", |
| "zone": d.get("status") or "", |
| "message": d.get("message") or "Dispatch update", |
| }) |
| return built[:50] |
| except Exception as exc: |
| logger.warning("staff/activity failed: %s", exc) |
| return [] |
|
|
|
|
| @app.post("/staff/activity") |
| async def post_staff_activity(body: dict, principal: dict = Depends(require_staff)): |
| if principal.get("auth") != "jwt" or principal.get("role") not in ("staff", "admin"): |
| raise HTTPException(status_code=403, detail="Staff JWT required") |
| name = (body.get("name") or principal.get("sub") or "Staff").strip() |
| entry = await _append_staff_activity_entry( |
| name, |
| body.get("zone") or "", |
| body.get("message") or "", |
| event_type=body.get("event_type") or "staff_note", |
| metadata=body.get("metadata") or {}, |
| ) |
| return entry |
|
|
|
|
| @app.get("/site/signage-placements") |
| async def get_signage_placements(_: dict = Depends(require_operator_if_auth_enabled)): |
| return signage_placements or {} |
|
|
|
|
| @app.delete("/site/signage-placements/{sign_id}") |
| async def delete_signage_placement( |
| sign_id: str, |
| _: dict = Depends(require_admin_audited("site/signage-placements/delete")), |
| ): |
| global signage_placements |
| if sign_id not in signage_placements: |
| raise HTTPException(status_code=404, detail="Placement not found") |
| del signage_placements[sign_id] |
| save_signage_placements() |
| if signage_state.get(sign_id): |
| await _broadcast_signage_state(sign_id, False) |
| await manager.broadcast(json.dumps({"type": "signage_placement_removed", "id": sign_id})) |
| return {"removed": sign_id} |
|
|
|
|
| @app.post("/site/signage-placements/{sign_id}/remove") |
| async def remove_signage_placement_post( |
| sign_id: str, |
| _: dict = Depends(require_admin_audited("site/signage-placements/remove")), |
| ): |
| """POST fallback for clients where DELETE preflight fails.""" |
| return await delete_signage_placement(sign_id, _) |
|
|
|
|
| @app.post("/site/signage-placements") |
| async def save_signage_placement(payload: dict, _: None = Depends(require_operator)): |
| global signage_placements |
| sign_id = payload.get("id") |
| if not sign_id: |
| raise HTTPException(status_code=400, detail="Signage id required") |
| signage_placements[sign_id] = { |
| "lat": payload.get("lat"), |
| "lng": payload.get("lng"), |
| "updatedAt": datetime.datetime.now().isoformat(), |
| } |
| save_signage_placements() |
| await manager.broadcast(json.dumps({"type": "signage_placement", "id": sign_id, **signage_placements[sign_id]})) |
| return signage_placements[sign_id] |
|
|
|
|
| @app.get("/api/signage") |
| async def list_active_signage(_: dict = Depends(require_operator_if_auth_enabled)): |
| active = [r for r in signage_records if r.get("status", "active") == "active"] |
| return active |
|
|
|
|
| @app.get("/api/signage/history") |
| async def signage_history(_: dict = Depends(require_operator_if_auth_enabled)): |
| return signage_records[-200:] |
|
|
|
|
| @app.post("/api/signage") |
| async def create_signage_record(body: dict, principal: dict = Depends(require_operator)): |
| sign_type = (body.get("type") or "").upper().replace(" ", "_").replace("-", "_") |
| lat = body.get("lat") |
| lng = body.get("lng") |
| if not sign_type or lat is None or lng is None: |
| raise HTTPException(status_code=400, detail="type, lat, lng required") |
| placed_by = principal.get("sub") or principal.get("username") or "admin" |
| record = { |
| "id": f"sign-{uuid.uuid4().hex[:8]}", |
| "type": sign_type, |
| "lat": float(lat), |
| "lng": float(lng), |
| "placedAt": datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| "placedBy": placed_by, |
| "status": "active", |
| "broadcastSent": False, |
| } |
| signage_records.insert(0, record) |
| del signage_records[500:] |
| save_signage_records() |
| try: |
| await _broadcast_signage_alert(sign_type, float(lat), float(lng), action="deployed") |
| record["broadcastSent"] = True |
| save_signage_records() |
| except Exception as exc: |
| logger.warning("Signage broadcast failed: %s", exc) |
| await manager.broadcast(json.dumps({"type": "signage_record_created", "record": record})) |
| return record |
|
|
|
|
| @app.delete("/api/signage/{sign_id}") |
| async def remove_signage_record(sign_id: str, _: dict = Depends(require_operator)): |
| global signage_records |
| record = next((r for r in signage_records if r.get("id") == sign_id), None) |
| if not record: |
| raise HTTPException(status_code=404, detail="Sign not found") |
| record["status"] = "removed" |
| record["removedAt"] = datetime.datetime.now(datetime.timezone.utc).isoformat() |
| save_signage_records() |
| try: |
| await _broadcast_signage_alert(record.get("type", "SIGN"), record["lat"], record["lng"], action="removed") |
| except Exception as exc: |
| logger.warning("Signage removal broadcast failed: %s", exc) |
| await manager.broadcast(json.dumps({"type": "signage_record_removed", "id": sign_id})) |
| return {"removed": sign_id} |
|
|
|
|
| @app.get("/site/blueprint") |
| async def get_site_blueprint(_: dict = Depends(require_operator_if_auth_enabled)): |
| return site_blueprint or {} |
|
|
|
|
| @app.post("/site/blueprint") |
| async def upload_site_blueprint( |
| file: UploadFile = File(...), |
| sw_lat: float = Form(...), |
| sw_lng: float = Form(...), |
| ne_lat: float = Form(...), |
| ne_lng: float = Form(...), |
| _: dict = Depends(require_admin_audited("site/blueprint/upload")), |
| ): |
| global site_blueprint |
| if file.content_type not in ALLOWED_IMAGE_TYPES: |
| raise HTTPException(status_code=400, detail="Blueprint must be JPEG, PNG, or WebP") |
| contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Blueprint") |
| ext = Path(file.filename or "blueprint.png").suffix.lower() |
| if ext not in ALLOWED_IMAGE_EXTENSIONS: |
| ext = ".png" |
| fname = f"site_blueprint{ext}" |
| out_path = os.path.join(BLUEPRINT_DIR, fname) |
| with open(out_path, "wb") as f: |
| f.write(contents) |
| site_blueprint = { |
| "url": f"/files/uploads/blueprints/{fname}", |
| "bounds": [[sw_lat, sw_lng], [ne_lat, ne_lng]], |
| "uploaded_at": datetime.datetime.now().isoformat(), |
| } |
| save_site_blueprint() |
| return site_blueprint |
|
|
|
|
| @app.get("/agentic/steps") |
| async def get_agentic_steps( |
| session_id: Optional[str] = Query(default=None), |
| principal: dict = Depends(require_operator_if_auth_enabled), |
| ): |
| user_id = _operator_id(principal) |
| async with store_locks.agent_steps_lock: |
| scoped = [s for s in agent_steps_history if s.get("user_id") == user_id] |
| if session_id: |
| scoped = [s for s in scoped if s.get("session_id") == session_id] |
| return scoped[-50:] |
|
|
|
|
| async def _broadcast_issue_update(issue_id: str, append_text: str | None = None, **kwargs): |
| issue_snapshot = None |
| for iss in issues_db: |
| if iss["id"] == issue_id: |
| if append_text: |
| if "original_desc" not in iss: |
| iss["original_desc"] = iss.get("desc", "") |
| iss["desc"] = iss["original_desc"] + f"\n\n{append_text}" |
| iss.update(kwargs) |
| save_issues() |
| issue_snapshot = dict(iss) |
| break |
| if issue_snapshot: |
| await manager.broadcast(json.dumps({"type": "issue_update", "issue": issue_snapshot})) |
| return issue_snapshot |
|
|
|
|
| async def simulate_issue_progress(issue_id: str, staff_name: str = "Staff"): |
| if not DEMO_MODE or security_config.is_production(): |
| return |
| await asyncio.sleep(4) |
| await _broadcast_issue_update( |
| issue_id, |
| f"[UPDATE] {staff_name} is en route.", |
| progress=60, |
| staff_assignments=[{"name": staff_name, "progress": 60, "stage": "en_route"}], |
| ) |
| await _sync_staff_request_progress(issue_id, 60, "en_route") |
|
|
| await asyncio.sleep(4) |
| await _broadcast_issue_update( |
| issue_id, |
| f"[UPDATE] {staff_name} on scene — handling incident.", |
| progress=85, |
| staff_assignments=[{"name": staff_name, "progress": 85, "stage": "on_scene"}], |
| ) |
| await _sync_staff_request_progress(issue_id, 85, "on_scene") |
|
|
| await asyncio.sleep(4) |
| await _broadcast_issue_update( |
| issue_id, |
| "[RESOLVED] Incident resolved.", |
| progress=100, |
| status="RESOLVED", |
| staff_request_status="resolved", |
| staff_assignments=[{"name": staff_name, "progress": 100, "stage": "resolved"}], |
| ) |
| await _sync_staff_request_progress(issue_id, 100, "resolved", status="resolved") |
| await _append_incident_log( |
| "INFO", |
| "staff_portal", |
| "staff_request_completed", |
| f"{staff_name} completed issue {issue_id} (demo auto-progress).", |
| metadata={"issue_id": issue_id, "staff_name": staff_name, "demo": True}, |
| ) |
|
|
|
|
| async def simulate_vivek_progress(issue_id: str): |
| await simulate_issue_progress(issue_id, "Vivek") |
|
|
| @app.post("/issues") |
| async def create_issue(body: IssueCreate, _: None = Depends(require_operator)): |
| issue = body.model_dump() |
| issue.setdefault("id", f"ISS-{uuid.uuid4().hex[:6].upper()}") |
| issue.setdefault("progress", 0) |
| issue.setdefault("timestamp", datetime.datetime.now().isoformat()) |
| if not issue.get("title"): |
| desc = (issue.get("desc") or "").strip() |
| issue["title"] = desc.split("\n")[0][:80] if desc else "Incident" |
| is_missing = "missing" in issue.get("title", "").lower() or "missing" in issue.get("desc", "").lower() |
| if is_missing and getattr(app.state, "last_missing_person_img", None): |
| issue["image"] = normalize_files_url(app.state.last_missing_person_img) |
| async with store_locks.issues_lock: |
| issues_db.insert(0, issue) |
| save_issues() |
| await manager.broadcast(json.dumps({"type": "issue_update", "issue": issue})) |
| return issue |
|
|
|
|
| @app.patch("/issues/{issue_id}") |
| async def patch_issue(issue_id: str, body: IssuePatch, _: None = Depends(require_operator)): |
| async with store_locks.issues_lock: |
| snapshot = None |
| for iss in issues_db: |
| if iss.get("id") == issue_id: |
| data = body.model_dump(exclude_unset=True) |
| _validate_issue_patch(iss, data) |
| iss.update(data) |
| save_issues() |
| snapshot = dict(iss) |
| break |
| if not snapshot: |
| raise HTTPException(status_code=404, detail="Issue not found") |
| await manager.broadcast(json.dumps({"type": "issue_update", "issue": snapshot})) |
| return snapshot |
|
|
|
|
| @app.post("/agentic/plan") |
| async def create_agentic_plan(payload: dict, _: None = Depends(require_operator)): |
| plan = generate_agentic_plan(payload) |
| event = { |
| "type": "agentic_plan_update", |
| "plan_id": str(uuid.uuid4()), |
| "created_at": datetime.datetime.now().isoformat(), |
| "incident": payload, |
| "plan": plan, |
| } |
| async with store_locks.agentic_plans_lock: |
| agentic_plans.insert(0, event) |
| _trim_memory_list(agentic_plans, 100) |
| save_agentic_plans() |
| await manager.broadcast(json.dumps(event)) |
| return event |
|
|
|
|
| @app.post("/agentic/chat") |
| async def agentic_chat(payload: ChatPayload, _: None = Depends(require_operator)): |
| room_counts = {room["label"]: room.get("occupancy", 0) for room in vision_engine.room_stats.values()} if hasattr(vision_engine, "room_stats") else {} |
| crowd = getattr(vision_engine, "last_crowd_count", None) |
| if crowd is None: |
| crowd = getattr(vision_engine, "last_total_count", 0) |
| context = { |
| "crowd_count": crowd, |
| "live_face_count": _count_live_faces(), |
| "room_counts": room_counts, |
| "recent_alerts": [a if isinstance(a, dict) else a.model_dump() for a in alerts_db[-5:]] if alerts_db else [], |
| "active_models": vision_engine.get_ai_status() |
| } |
| loop = asyncio.get_event_loop() |
| reply = await loop.run_in_executor(None, generate_chat_response, payload.prompt, context) |
| return {"reply": reply} |
|
|
|
|
| @app.get("/dispatches") |
| async def get_dispatches(_: dict = Depends(require_operator_if_auth_enabled)): |
| return targeted_dispatches |
|
|
|
|
| @app.get("/user/profile") |
| async def get_user_profile(principal: dict = Depends(require_operator_if_auth_enabled)): |
| profile = dict(user_profile) |
| sub = principal.get("sub") |
| role = principal.get("role") |
| if principal.get("auth") == "jwt" and sub: |
| profile["name"] = sub |
| if role: |
| profile["role"] = role.replace("_", " ").title() |
| elif principal.get("auth") == "api_key" and role: |
| profile["role"] = f"API ({role})" |
| return profile |
|
|
|
|
| @app.get("/signage") |
| async def get_signage(_: dict = Depends(require_operator_if_auth_enabled)): |
| return signage_state |
|
|
|
|
| @app.post("/signage/{sign_id}/toggle") |
| async def toggle_signage(sign_id: str, payload: dict | None = Body(default=None), _: None = Depends(require_operator)): |
| """Set signage to explicit active state, or flip when active omitted.""" |
| body = payload or {} |
| if "active" in body: |
| return await _broadcast_signage_state(sign_id, bool(body["active"])) |
| current = signage_state.get(sign_id, False) |
| return await _broadcast_signage_state(sign_id, not current) |
|
|
|
|
| @app.get("/gossip/contact-network") |
| async def get_gossip_contact_network( |
| hours: Optional[float] = Query(default=None), |
| _: dict = Depends(require_operator_if_auth_enabled), |
| ): |
| return _build_contact_network(hours) |
|
|
|
|
| @app.get("/gossip") |
| async def get_gossip( |
| root: Optional[str] = None, |
| date: Optional[str] = None, |
| _: dict = Depends(require_operator_if_auth_enabled), |
| ): |
| """ |
| Return gossip graph. Optional ?date=YYYY-MM-DD loads a persisted snapshot. |
| Pass ?root=PersonName to filter the contact tree. |
| """ |
| if date: |
| snap_path = os.path.join(GOSSIP_HISTORY_DIR, f"{date}.json") |
| if os.path.isfile(snap_path): |
| return persist.load_json(snap_path, {}) |
| return {"nodes": [], "links": [], "root_person": root or "", "is_tracking": False, "date": date, "empty": True} |
| if root: |
| gossip_bridge.set_root_person(root) |
| loop = asyncio.get_event_loop() |
| data = await loop.run_in_executor(None, gossip_bridge.get_gossip_json, root) |
| return data |
|
|
|
|
| @app.post("/gossip/set_root") |
| async def set_gossip_root(root: str, _: None = Depends(require_vision_access)): |
| """Set the root person for gossip graph tracking.""" |
| gossip_bridge.set_root_person(root) |
| return {"status": "ok", "root_person": root} |
|
|
|
|
| @app.post("/gossip/start") |
| async def start_gossip_tracking( |
| payload: GossipTrackingStart = GossipTrackingStart(), |
| _: None = Depends(require_vision_access), |
| ): |
| person = (payload.personName or "").strip() |
| if person: |
| gossip_bridge.set_root_person(person) |
| gossip_bridge.start_tracking() |
| tracking = { |
| "staffId": payload.staffId, |
| "personName": payload.personName, |
| "cause": payload.cause, |
| "doc_names": payload.doc_names or [], |
| "started_at": datetime.datetime.now().isoformat(), |
| } |
| gossip_bridge.set_tracking_meta(tracking) |
| await manager.broadcast(json.dumps({"type": "gossip_tracking_update", "status": "started", **tracking})) |
| return {"status": "started", "tracking": tracking, "root_person": gossip_bridge.get_gossip_json().get("root_person")} |
|
|
|
|
| @app.post("/gossip/stop") |
| async def stop_gossip_tracking(_: None = Depends(require_vision_access)): |
| gossip_bridge.stop_tracking() |
| gossip_bridge.clear_tracking_meta() |
| await manager.broadcast(json.dumps({"type": "gossip_tracking_update", "status": "stopped"})) |
| return {"status": "stopped"} |
|
|
|
|
| @app.post("/gossip/clear") |
| async def clear_gossip_tracking(_: dict = Depends(require_admin_audited("gossip/clear"))): |
| gossip_bridge.clear_graph() |
| return {"status": "cleared"} |
|
|
|
|
| @app.post("/gossip/ingest_frame") |
| async def gossip_ingest_frame( |
| file: UploadFile = File(...), |
| cam_id: str = Form("cam-01"), |
| _: None = Depends(require_vision_access), |
| ): |
| """Run face recognition on a (browser webcam) frame and feed the gossip graph. |
| |
| This is what makes contact tracing work without a server-side camera: the UI |
| streams frames here while tracking is active. Detected, enrolled people are |
| linked to the root person in the interaction graph. |
| """ |
| if file.content_type not in ALLOWED_IMAGE_TYPES: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") |
| contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") |
| nparr = np.frombuffer(contents, np.uint8) |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if frame is None: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") |
|
|
| fe = vision_engine.face_engine |
| await _run_face_work(_ensure_face_db, fe) |
|
|
| matches: list[dict] = [] |
| if hasattr(fe, "match_all_faces"): |
| matches = await _run_face_work(fe.match_all_faces, frame) |
| else: |
| single = await _run_face_work(vision_engine.search_missing_person, frame) |
| if single.get("found"): |
| matches = [{"name": single["name"], "confidence": single.get("confidence", 0.0), |
| "bbox": [0, 0, 0, 0], "found": True}] |
|
|
| detected = [m["name"] for m in matches] |
| known_names = [m["name"] for m in matches if m.get("found") and m["name"] != "Unknown"] |
|
|
| summary = gossip_bridge.ingest_detected_names(cam_id, detected) |
| data = gossip_bridge.get_gossip_json() |
|
|
| if summary.get("tracking"): |
| async with store_locks.gossip_lock: |
| today = datetime.datetime.now().strftime("%Y-%m-%d") |
| persist.save_json(os.path.join(GOSSIP_HISTORY_DIR, f"{today}.json"), data) |
| await manager.broadcast(json.dumps({ |
| "type": "gossip_tracking_update", |
| "status": "detections", |
| "cam_id": cam_id, |
| "names": known_names, |
| })) |
|
|
| return { |
| "names": known_names, |
| "matches": matches, |
| "graph": data, |
| "is_tracking": data.get("is_tracking", False), |
| } |
|
|
| @app.get("/debug/vision") |
| async def debug_vision(_: None = Depends(require_vision_access)): |
| """Regression detector for vision pipeline.""" |
| fe = vision_engine.face_engine |
| pipeline = _get_vision_pipeline() |
| stats = pipeline.stats.as_dict() if hasattr(pipeline, "stats") else {} |
| return { |
| "engine_ready": getattr(fe, "ready", False), |
| "db_size": len(fe.db) if hasattr(fe, "db") else 0, |
| "queue_depth": stats.get("queue_depth", 0), |
| "fps": stats.get("fps", 0), |
| "inference_ms": stats.get("last_inference_ms", 0), |
| "processed": stats.get("processed", 0), |
| "dropped": stats.get("dropped", 0) |
| } |
|
|
| @app.get("/debug/backend") |
| async def debug_backend(_: None = Depends(require_vision_access)): |
| """Regression detector for backend health.""" |
| import psutil |
| process = psutil.Process() |
| return { |
| "active_sockets": len(manager.active_connections), |
| "agent_sockets": len(manager.agent_connections), |
| "memory_mb": round(process.memory_info().rss / 1024 / 1024, 2), |
| "detections_history_size": len(detections_history), |
| "alerts_size": len(alerts) |
| } |
|
|
| @app.post("/vision/track_frame") |
| async def vision_track_frame( |
| file: UploadFile = File(...), |
| cam_id: str = Form("cam-01"), |
| _: None = Depends(require_vision_access), |
| ): |
| """Continuous always-on tracking from a browser-supplied camera frame. |
| |
| The frontend's global CameraTracker streams frames here as long as facial |
| recognition is enabled. This recognizes every face, records sightings into |
| the live detection history + per-camera face results (so the Agentic Copilot |
| and every page see the same live tracking), feeds the interaction graph, and |
| broadcasts a camera update to all clients. |
| """ |
| warming = _vision_warming_response() |
| if warming is not None: |
| return warming |
| if not vision_engine.get_ai_status().get("facial", False): |
| return {"tracking": False, "faces": [], "reason": "Facial recognition is disabled"} |
|
|
| if file.content_type not in ALLOWED_IMAGE_TYPES: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") |
| contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") |
| nparr = np.frombuffer(contents, np.uint8) |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if frame is None: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") |
|
|
| fe = vision_engine.face_engine |
| pipeline = _get_vision_pipeline() |
| infer_frame = resize_for_infer(frame) |
| matches, skipped = await pipeline.infer(cam_id, frame) |
| if not skipped: |
| scale_match_bboxes(matches, infer_frame, frame) |
| if skipped: |
| if vision_session.is_cam_stale(cam_id): |
| return { |
| "tracking": True, |
| "cam_id": cam_id, |
| "live": False, |
| "stale": True, |
| "reason": "frame_timeout", |
| "faces": [], |
| "tracked_faces": [], |
| "presence": False, |
| "skipped": True, |
| "known_names": [], |
| "detection_updates": [], |
| } |
| cached = vision_engine.get_face_results().get(cam_id, []) |
| public_cached = [ |
| {k: v for k, v in (f if isinstance(f, dict) else {}).items() if k != "embedding"} |
| for f in cached |
| ] |
| meta = vision_session.get_cam_meta(cam_id) |
| return { |
| "tracking": True, |
| "cam_id": cam_id, |
| "live": True, |
| "stale": False, |
| "faces": public_cached, |
| "tracked_faces": public_cached, |
| "presence": bool(public_cached), |
| "skipped": True, |
| "frame_id": meta.get("frame_id"), |
| "presence_expires_at": meta.get("presence_expires_at"), |
| "known_names": [ |
| f.get("name") for f in public_cached |
| if f.get("found") and f.get("name") not in ("Unknown", "unknown", "") |
| ], |
| "detection_updates": [], |
| } |
|
|
| def _face_thumbnail_url(match: dict) -> str | None: |
| try: |
| x1, y1, x2, y2 = [int(v) for v in (match.get("bbox") or [0, 0, 0, 0])] |
| h, w = frame.shape[:2] |
| x1, y1 = max(0, x1), max(0, y1) |
| x2, y2 = min(w, x2), min(h, y2) |
| if x2 <= x1 or y2 <= y1: |
| return None |
| face_img = frame[y1:y2, x1:x2] |
| if face_img.size == 0: |
| return None |
| base_name = str(match.get("name", "unknown")).strip().lower().replace(" ", "_") |
| thumb_filename = f"face_{base_name}_{cam_id}.jpg" |
| thumb_path = os.path.join(_UPLOADS_PATH, thumb_filename) |
| if not cv2.imwrite(thumb_path, face_img): |
| return None |
| return f"/files/uploads/{thumb_filename}" |
| except Exception: |
| return None |
|
|
| faces_payload = [] |
| for m in matches: |
| thumbnail_url = _face_thumbnail_url(m) |
| face_payload = { |
| "name": m.get("name", "Unknown"), |
| "confidence": round(float(m.get("confidence", 0.0)), 3), |
| "bbox": m.get("bbox", [0, 0, 0, 0]), |
| "found": bool(m.get("found", True)), |
| **({"embedding": m.get("embedding")} if m.get("embedding") is not None else {}), |
| } |
| if thumbnail_url: |
| face_payload["thumbnail"] = normalize_files_url(thumbnail_url) |
| faces_payload.append(face_payload) |
| public_faces_payload = [ |
| {k: v for k, v in face.items() if k != "embedding"} |
| for face in faces_payload |
| ] |
| with vision_engine.lock: |
| vision_engine.latest_raw_frames[cam_id] = frame.copy() |
| vision_engine.face_results[cam_id] = faces_payload |
| if hasattr(vision_engine, "mark_browser_feed_active"): |
| vision_engine.mark_browser_feed_active(cam_id, 0) |
|
|
| |
| tracked_names: list[str] = [] |
| gossip_names: list[str] = [] |
| gossip_bboxes: list[tuple] = [] |
| frame_updates: list[dict] = [] |
| global _detections_dirty |
| async with store_locks.detections_lock: |
| for m in matches: |
| name = str(m.get("name") or "Unknown").strip() |
| if name in ("Unknown", "unknown", "") or not m.get("found"): |
| continue |
| tracked_names.append(name) |
| gossip_names.append(name) |
| gossip_bboxes.append(tuple(m.get("bbox", [0, 0, 0, 0]))) |
| |
| |
| thumbnail_filename = f"sighting_{name.lower()}_{cam_id}.jpg" |
| thumbnail_path = os.path.join(_UPLOADS_PATH, thumbnail_filename) |
| cv2.imwrite(thumbnail_path, frame) |
| |
| |
| thumbnail_url = f"/files/uploads/{thumbnail_filename}" |
| |
| entry = _upsert_detection_sighting( |
| name, |
| confidence=float(m.get("confidence", 0.0)), |
| cam_id=cam_id, |
| thumbnail=thumbnail_url, |
| ) |
| frame_updates.append(entry) |
| _detections_dirty = True |
|
|
| if _detections_dirty: |
| save_detections() |
| _detections_dirty = False |
|
|
| |
| try: |
| if gossip_names: |
| gossip_bridge.on_detections(cam_id, gossip_names, gossip_bboxes, frame.shape[1]) |
| gossip_bridge.ingest_detected_names(cam_id, gossip_names) |
| except Exception as exc: |
| logger.debug("gossip ingest from track_frame skipped: %s", exc) |
|
|
| frame_id = vision_session.record_processed_frame(cam_id, had_match=bool(tracked_names)) |
| meta = vision_session.get_cam_meta(cam_id) |
| count = len(public_faces_payload) |
| vision_engine.last_face_count = count |
| browser_feed_count = len(getattr(vision_engine, "browser_feeds", {}) or {}) |
| await manager.broadcast(json.dumps({ |
| "type": "camera_broadcast", |
| "total_count": count, |
| "cameras": {cam_id: { |
| "count": count, |
| "faces": public_faces_payload, |
| "live": True, |
| "stale": False, |
| "frame_id": frame_id, |
| }}, |
| "active_cam_count": browser_feed_count, |
| "detection_updates": frame_updates, |
| "timestamp": datetime.datetime.now().isoformat(), |
| })) |
|
|
| return { |
| "tracking": True, |
| "cam_id": cam_id, |
| "live": True, |
| "stale": False, |
| "presence": bool(tracked_names), |
| "faces": public_faces_payload, |
| "tracked_faces": public_faces_payload, |
| "frame_id": frame_id, |
| "last_frame_ts": meta.get("last_frame_ts"), |
| "presence_expires_at": meta.get("presence_expires_at"), |
| "known_names": tracked_names, |
| "detection_updates": frame_updates, |
| } |
|
|
|
|
| @app.post("/tracking/session/reset") |
| async def reset_tracking_session( |
| body: TrackingResetPayload = Body(default_factory=TrackingResetPayload), |
| principal: dict = Depends(require_operator), |
| ): |
| """Clear live tracking state for a fresh operator session. |
| |
| Wipes sighting history, live face results, browser feed registry, and the |
| gossip interaction graph (edges). Enrolled face DB is untouched. Gossip |
| tracking stays enabled so new co-presence events accumulate cleanly. |
| |
| By default does not broadcast — the initiating client resets local UI state. |
| Pass ``broadcast: true`` only for an explicit global coordination reset (admin only). |
| Pass ``full_reset: true`` when the operator explicitly starts a new session. |
| """ |
| if body.broadcast and not auth_service.has_role(principal, "admin"): |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="Global tracking reset requires admin role", |
| ) |
| _audit_destructive( |
| principal, |
| "tracking/session/reset", |
| f"broadcast={body.broadcast} full_reset={body.full_reset}", |
| ) |
| global detections_history, _detections_dirty |
| user_id = _operator_id(principal) |
|
|
| async with store_locks.detections_lock: |
| detections_history.clear() |
| _detections_dirty = False |
| save_detections() |
|
|
| vision_engine.face_results.clear() |
| if hasattr(vision_engine, "clear_browser_feeds"): |
| vision_engine.clear_browser_feeds() |
| gossip_bridge.clear_graph() |
| gossip_bridge.clear_tracking_meta() |
| gossip_bridge.start_tracking() |
| if os.path.exists(_FACE_DB_PATH): |
| enrolled = [ |
| n for n in os.listdir(_FACE_DB_PATH) |
| if os.path.isdir(os.path.join(_FACE_DB_PATH, n)) and not n.startswith("unknown_") |
| ] |
| gossip_bridge.register_enrolled_roster(enrolled) |
|
|
| today_path = os.path.join(GOSSIP_HISTORY_DIR, f"{datetime.datetime.now():%Y-%m-%d}.json") |
| if os.path.exists(today_path): |
| async with store_locks.gossip_lock: |
| try: |
| os.remove(today_path) |
| except OSError: |
| pass |
|
|
| if body.clear_agent_steps and body.session_id: |
| await _clear_agent_steps(user_id, body.session_id) |
| await agentic_orchestrator.prune_adk_sessions(user_id, keep_session_id=None) |
|
|
| if os.path.exists(_UPLOADS_PATH): |
| for item in os.listdir(_UPLOADS_PATH): |
| item_path = os.path.join(_UPLOADS_PATH, item) |
| if os.path.isfile(item_path): |
| try: |
| os.remove(item_path) |
| except OSError: |
| pass |
|
|
| if hasattr(app.state, "last_missing_person_img"): |
| app.state.last_missing_person_img = None |
|
|
| payload = { |
| "type": "tracking_session_reset", |
| "detections_history": [], |
| "timestamp": datetime.datetime.now().isoformat(), |
| "broadcast": body.broadcast, |
| } |
| if body.broadcast: |
| await manager.broadcast(json.dumps(payload)) |
|
|
| agentic_orchestrator.inject_context( |
| alerts=alerts_db, |
| sos_events=sos_events, |
| vision_engine=vision_engine, |
| rooms=[], |
| detections_history=detections_history, |
| issues=issues_db, |
| incident_logs=incident_logs, |
| signage_state=signage_state, |
| ) |
| return {"status": "reset", "message": "Live tracking session cleared", "broadcast": body.broadcast} |
|
|
|
|
| @app.get("/gossip/known_people") |
| async def get_known_people(_: dict = Depends(require_operator_if_auth_enabled)): |
| """Return all people currently recognized by the face engine.""" |
| |
| known = set(gossip_bridge.get_known_people()) |
| known.update(vision_engine.face_engine.db.keys()) |
| return {"known_people": sorted(known)} |
|
|
|
|
| |
|
|
| @app.get("/ai/status") |
| async def ai_status(_: dict = Depends(require_vision_access)): |
| """Return enabled AI models plus per-model capability metadata.""" |
| caps = ( |
| vision_engine.get_ai_capabilities() |
| if hasattr(vision_engine, "get_ai_capabilities") |
| else {} |
| ) |
| return { |
| "models": vision_engine.get_ai_status(), |
| "capabilities": caps, |
| "cloud": CEPHEUS_CLOUD, |
| } |
|
|
|
|
| @app.post("/ai/toggle/{model_id}") |
| async def ai_toggle(model_id: str, _: dict = Depends(require_operator)): |
| """Toggle a specific AI model on/off.""" |
| caps = ( |
| vision_engine.get_ai_capabilities() |
| if hasattr(vision_engine, "get_ai_capabilities") |
| else {} |
| ) |
| model_cap = caps.get(model_id, {}) |
| if model_cap.get("supported") is False: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=model_cap.get("note", f"Model '{model_id}' is not available in this deployment"), |
| ) |
| new_state = vision_engine.toggle_ai_model(model_id) |
| |
| await manager.broadcast(json.dumps({ |
| "type": "ai_model_update", |
| "model_id": model_id, |
| "enabled": new_state, |
| "all": vision_engine.get_ai_status(), |
| })) |
| return {"model_id": model_id, "enabled": new_state} |
|
|
|
|
| @app.get("/face_db/list") |
| async def list_face_db(_: dict = Depends(require_operator_if_auth_enabled)): |
| """List enrolled persons — roster aligned with the embedding store used for search.""" |
| enrolled_keys: set[str] = set() |
| fe = getattr(vision_engine, "face_engine", None) |
| if fe is not None: |
| try: |
| _ensure_face_db(fe) |
| db = getattr(fe, "db", {}) or {} |
| enrolled_keys = {str(k) for k in db if not str(k).startswith("unknown_")} |
| except Exception as exc: |
| logger.debug("face_db list reload skipped: %s", exc) |
|
|
| people: list[dict] = [] |
| disk_names: set[str] = set() |
| if os.path.exists(_FACE_DB_PATH): |
| for person_name in os.listdir(_FACE_DB_PATH): |
| if person_name.startswith("unknown_"): |
| continue |
| person_dir = os.path.join(_FACE_DB_PATH, person_name) |
| if not os.path.isdir(person_dir): |
| continue |
| disk_names.add(person_name) |
| imgs = [f for f in os.listdir(person_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] |
| img_url = f"/files/face/{person_name}/{imgs[0]}" if imgs else None |
| has_emb = person_name in enrolled_keys |
| meta = face_metadata.read_person_metadata(person_name, _FACE_DB_PATH) |
| person_entry = { |
| "name": person_name, |
| "image": img_url, |
| "status": "AUTHORIZED" if has_emb else "PENDING_EMBEDDING", |
| "role": meta.get("role") or face_metadata.DEFAULT_ROLE, |
| } |
| people.append(person_entry) |
|
|
| for name in enrolled_keys: |
| if name not in disk_names: |
| meta = face_metadata.read_person_metadata(name, _FACE_DB_PATH) |
| person_entry = { |
| "name": name, |
| "image": None, |
| "status": "AUTHORIZED", |
| "role": meta.get("role") or face_metadata.DEFAULT_ROLE, |
| } |
| people.append(person_entry) |
|
|
| for p in people: |
| person_dir = os.path.join(_FACE_DB_PATH, p["name"]) |
| if os.path.isdir(person_dir): |
| try: |
| mtime = os.path.getmtime(person_dir) |
| p["enrolledAt"] = datetime.datetime.fromtimestamp(mtime, tz=datetime.timezone.utc).isoformat() |
| except OSError: |
| pass |
| if not p.get("role"): |
| p["role"] = face_metadata.DEFAULT_ROLE |
| det = next((d for d in detections_history if d.get("name") == p["name"]), None) |
| if det: |
| p["lastSeen"] = det.get("seen_at") |
| p["lastLocation"] = det.get("camId") |
|
|
| return sorted(people, key=lambda x: x["name"]) |
|
|
|
|
| @app.delete("/face_db/{person_name}") |
| async def delete_face_db_person( |
| person_name: str, |
| _: dict = Depends(require_admin_audited("face_db/delete")), |
| ): |
| cleaned = _sanitize_person_name(person_name) |
|
|
| |
| person_dir = os.path.join(_FACE_DB_PATH, cleaned) |
| if os.path.isdir(person_dir): |
| import shutil |
| shutil.rmtree(person_dir, ignore_errors=True) |
|
|
| |
| fe = getattr(vision_engine, "face_engine", None) |
| fr_dir = os.path.dirname(_FACE_DB_PATH) |
| for emb_folder in ("faces_db", "temp_faces_db"): |
| npy_path = os.path.join(fr_dir, emb_folder, f"{cleaned}.npy") |
| if os.path.exists(npy_path): |
| try: |
| os.remove(npy_path) |
| logger.info("Deleted embedding file: %s", npy_path) |
| except OSError as exc: |
| logger.warning("Could not delete %s: %s", npy_path, exc) |
|
|
| |
| if fe and hasattr(fe, "db"): |
| for key in [cleaned, cleaned.replace("_", " "), person_name.strip()]: |
| fe.db.pop(key, None) |
|
|
| |
| _invalidate_face_db(fe) |
|
|
| logger.info("Deleted face identity: %s", cleaned) |
| return {"status": "deleted", "name": cleaned} |
|
|
|
|
| @app.get("/face_db/debug") |
| async def face_db_debug(_: dict = Depends(require_operator_if_auth_enabled)): |
| """Return embedding shape and norm for every enrolled identity — useful for diagnosing score=0 issues.""" |
| fe = getattr(vision_engine, "face_engine", None) |
| if fe is None: |
| return {"error": "face_engine not initialized", "db": []} |
| _ensure_face_db(fe) |
| db = getattr(fe, "db", {}) or {} |
| entries = [] |
| for name, emb in db.items(): |
| if emb is None: |
| entries.append({"name": name, "shape": None, "norm": None, "status": "null_embedding", "is_unknown": name.startswith("unknown_")}) |
| continue |
| try: |
| shape = list(emb.shape) |
| norm = float(np.linalg.norm(emb)) |
| status = "ok" if norm > 0.1 else "zero_or_near_zero" |
| except Exception as exc: |
| shape = None |
| norm = None |
| status = f"error: {exc}" |
| entries.append({ |
| "name": name, |
| "shape": shape, |
| "norm": round(norm, 4) if norm is not None else None, |
| "status": status, |
| "is_unknown": name.startswith("unknown_"), |
| }) |
| return { |
| "insightface_loaded": getattr(fe, "app", None) is not None, |
| "db_size": len(db), |
| "enrolled_named": len([e for e in entries if not e["is_unknown"]]), |
| "entries": sorted(entries, key=lambda e: e["name"]), |
| } |
|
|
|
|
| @app.post("/face_db/clear_temp") |
| async def clear_temp_faces(_: dict = Depends(require_vision_access)): |
| """Clear temporary faces and embeddings from the current session.""" |
| fr_dir = os.path.dirname(_FACE_DB_PATH) |
| temp_emb = os.path.join(fr_dir, "temp_faces_db") |
| temp_img = os.path.join(fr_dir, "temp_face_database") |
| deleted = 0 |
| for d in (temp_emb, temp_img): |
| if not os.path.exists(d): |
| continue |
| for f in os.listdir(d): |
| |
| if f.startswith("unknown_"): |
| continue |
| fp = os.path.join(d, f) |
| if os.path.isfile(fp): |
| try: |
| os.remove(fp) |
| deleted += 1 |
| except Exception as e: |
| logger.warning("Failed to delete temp face file %s: %s", fp, e) |
| fe = getattr(vision_engine, "face_engine", None) |
| if fe: |
| _invalidate_face_db(fe) |
| return {"status": "success", "deleted": deleted} |
|
|
|
|
|
|
|
|
|
|
| @app.post("/emergency/dispatch-log") |
| async def post_emergency_dispatch_log(body: dict, _: None = Depends(require_operator)): |
| entry = { |
| "id": body.get("id") or f"disp-{uuid.uuid4().hex[:8]}", |
| "timestamp": body.get("timestamp") or datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| "type": body.get("type") or "general", |
| "message": body.get("message") or "", |
| "status": body.get("status") or "pending", |
| } |
| emergency_dispatch_log.insert(0, entry) |
| del emergency_dispatch_log[200:] |
| save_json(DISPATCH_LOG_FILE, emergency_dispatch_log) |
| return entry |
|
|
|
|
| @app.get("/alerts/log") |
| async def get_alerts_log(_: dict = Depends(require_operator_if_auth_enabled)): |
| return alerts_broadcast_log[-50:] |
|
|
|
|
| @app.post("/alerts/log") |
| async def post_alerts_log(body: dict, _: None = Depends(require_operator)): |
| entry = { |
| "id": f"bc-{uuid.uuid4().hex[:8]}", |
| "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| "message": body.get("message") or "", |
| "recipients": body.get("recipients") or [], |
| "relatedIssue": body.get("relatedIssue"), |
| "sentBy": body.get("sentBy") or "admin", |
| "deliveryCount": body.get("deliveryCount") or len(body.get("recipients") or []), |
| } |
| alerts_broadcast_log.insert(0, entry) |
| del alerts_broadcast_log[100:] |
| save_json(ALERTS_LOG_FILE, alerts_broadcast_log) |
| return entry |
|
|
|
|
| @app.get("/face_results") |
| async def get_face_results(_: dict = Depends(require_vision_access)): |
| """Return latest face recognition hits per camera (expired results are empty/stale).""" |
| raw = vision_engine.get_face_results() |
| return vision_session.build_face_results_payload(raw, _strip_face_embeddings) |
|
|
|
|
| @app.get("/debug/vision") |
| async def debug_vision(_: dict = Depends(require_vision_access)): |
| """Live vision pipeline diagnostics for HF debugging.""" |
| try: |
| pipe = _get_vision_pipeline() |
| stats = pipe.stats.as_dict() |
| except Exception as exc: |
| logger.warning("debug/vision pipeline stats failed: %s", exc) |
| stats = {} |
| presence = vision_session.presence_summary() |
| agent_open = len(manager.agent_connections) > 0 |
| ws_open = len(manager.active_connections) > 0 |
| raw_faces = vision_engine.get_face_results() |
| last_match = None |
| last_score = None |
| for cam_id, faces in (raw_faces or {}).items(): |
| for f in faces or []: |
| if not isinstance(f, dict): |
| continue |
| if f.get("found") and f.get("name"): |
| last_match = str(f.get("name")) |
| last_score = float(f.get("confidence") or 0) |
| break |
| if last_match: |
| break |
| return { |
| "fps": stats.get("fps", 0), |
| "queue": stats.get("queue_depth", 0), |
| "socket_count": len(manager.active_connections), |
| "agent_socket_count": len(manager.agent_connections), |
| "camera_count": vision_session.camera_count(), |
| "polling_enabled": False, |
| "websocket_state": "open" if ws_open else "closed", |
| "ws_connected": ws_open, |
| "agent_ws_connected": agent_open, |
| "agent_ws_state": "open" if agent_open else "closed", |
| "ws_state": "open" if ws_open else "closed", |
| "watchdog_state": "client_side", |
| "inference_ms": stats.get("last_inference_ms", 0), |
| "dropped_frames": stats.get("dropped", 0), |
| "skipped_frames": stats.get("skipped", 0), |
| "processed_frames": stats.get("processed", 0), |
| "face_workers": _FACE_WORKERS, |
| "public_vision": public_vision_allowed(), |
| "facial_enabled": vision_engine.get_ai_status().get("facial", False), |
| "presence_state": presence.get("presence_state"), |
| "presence_is_stale": presence.get("presence_is_stale"), |
| "last_frame_age_ms": presence.get("last_frame_age_ms"), |
| "last_presence_update_age_ms": presence.get("last_presence_update_age_ms"), |
| "last_match_age_ms": presence.get("last_match_age_ms"), |
| "presence_ttl_s": vision_session.PRESENCE_TTL_S, |
| "track_frame_ok": bool(raw_faces), |
| "inference_complete": stats.get("processed", 0) > 0, |
| "result_emitted": last_match is not None, |
| "last_match": last_match, |
| "last_score": last_score, |
| "last_result_age_ms": presence.get("last_match_age_ms"), |
| } |
|
|
|
|
| @app.get("/debug/emergency") |
| async def debug_emergency(): |
| """Emergency subsystem diagnostics.""" |
| return { |
| "dispatch_log_count": len(emergency_dispatch_log), |
| "staff_activity_count": len(staff_activity), |
| "sos_event_count": len(sos_events), |
| "staff_request_count": len(staff_requests_db), |
| "httpx_available": httpx is not None, |
| "nearby_cache_entries": len(_nearby_cache), |
| "maps_configured": maps_configured() if "maps_configured" in globals() else False, |
| "public_vision": public_vision_allowed(), |
| "fallback_mode": "overpass" if httpx is not None else "unavailable", |
| "service_availability": { |
| "overpass": httpx is not None, |
| "maps_api": maps_configured() if "maps_configured" in globals() else False, |
| }, |
| } |
|
|
|
|
| @app.post("/missing_person") |
| async def missing_person_search(file: UploadFile = File(...), _: None = Depends(require_operator)): |
| """Upload an image, search for that person across all live camera feeds, and trigger agent.""" |
| try: |
| if file.content_type not in ALLOWED_IMAGE_TYPES: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") |
| contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") |
| nparr = np.frombuffer(contents, np.uint8) |
| query_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if query_frame is None: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") |
|
|
| fe = vision_engine.face_engine |
| await _run_face_work(_ensure_face_db, fe) |
| result = await _run_face_work(vision_engine.search_missing_person, query_frame) |
| |
| |
| upload_dir, file_path = _safe_image_upload_path(file.filename or "capture.jpg") |
| with open(file_path, "wb") as f: |
| f.write(contents) |
| app.state.last_missing_person_img = normalize_files_url(f"/uploads/{os.path.basename(file_path)}") |
| result = dict(result) |
| result["image_url"] = app.state.last_missing_person_img |
| if CEPHEUS_CLOUD: |
| result.setdefault("search_mode", result.get("search_mode") or "database_only") |
| if not result.get("found"): |
| best_score = result.get("best_score", 0) |
| enrolled = result.get("enrolled_count", 0) |
| if enrolled == 0 or best_score == 0: |
| result["reason"] = ( |
| "Face database is empty — no enrolled faces to match against. " |
| "Go to Issues → Face Database to enroll faces first." |
| ) |
| else: |
| result["reason"] = result.get("reason") or ( |
| "No match in enrolled database. Live camera search requires GPU vision backend." |
| ) |
|
|
| |
| return result |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"Missing person search error: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Missing person search failed") |
|
|
|
|
| @app.post("/face/search_live") |
| async def face_search_live(file: UploadFile = File(...), _: None = Depends(require_operator)): |
| """ |
| Search for the face in the uploaded image across ALL active live camera feeds. |
| |
| Unlike /missing_person (which searches the enrolled face DB), this endpoint |
| takes the embedding of the uploaded image and compares it directly against the |
| live face embeddings currently detected by the vision engine. This is the |
| correct implementation of "Search Live Feed" — the uploaded image is the query, |
| not the current camera frame. |
| """ |
| warming = _vision_warming_response() |
| if warming is not None: |
| return warming |
| try: |
| if file.content_type not in ALLOWED_IMAGE_TYPES: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") |
| contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") |
| nparr = np.frombuffer(contents, np.uint8) |
| query_frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if query_frame is None: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") |
|
|
| fe = vision_engine.face_engine |
| await _run_face_work(_ensure_face_db, fe) |
|
|
| if face_live_search is not None: |
| loop = asyncio.get_event_loop() |
| result = await loop.run_in_executor( |
| None, |
| face_live_search.search_query_in_live_feeds, |
| query_frame, |
| fe, |
| vision_engine, |
| None, |
| ) |
| result["search_mode"] = "live" |
| else: |
| logger.info("face_live_search unavailable, falling back to database search") |
| result = await _run_face_work(vision_engine.search_missing_person, query_frame) |
| result["search_mode"] = "database_only" |
|
|
| |
| upload_dir, file_path = _safe_image_upload_path(file.filename or "search_query.jpg") |
| with open(file_path, "wb") as f: |
| f.write(contents) |
| |
| result["image_url"] = result.get("match_image") or normalize_files_url(f"/uploads/{os.path.basename(file_path)}") |
| return result |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error("face/search_live error: %s", e) |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Live face search failed") |
|
|
|
|
| @app.post("/register_face") |
|
|
| async def register_face( |
| name: str = Form(...), |
| role: str = Form(default="Staff"), |
| file: UploadFile = File(...), |
| _: dict = Depends(require_admin_audited("register_face")), |
| ): |
| """Register a named face from an uploaded image into the face database.""" |
| try: |
| if file.content_type not in ALLOWED_IMAGE_TYPES: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Unsupported image type") |
| cleaned_name = _sanitize_person_name(name) |
| contents = _read_limited_bytes(await file.read(MAX_IMAGE_BYTES + 1), MAX_IMAGE_BYTES, "Image") |
| nparr = np.frombuffer(contents, np.uint8) |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if frame is None: |
| raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not decode image") |
|
|
| fe = vision_engine.face_engine |
| if hasattr(fe, "register_from_frame"): |
| ok = await _run_face_work(fe.register_from_frame, cleaned_name, frame) |
| else: |
| ok = await _run_face_work(fe.register_face_from_frame, cleaned_name, frame) |
| if not ok: |
| raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="No face detected for registration") |
| face_metadata.write_person_metadata(cleaned_name, _FACE_DB_PATH, role=role) |
| _invalidate_face_db(fe) |
| return {"success": ok, "name": cleaned_name, "role": role.strip() or face_metadata.DEFAULT_ROLE} |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"register_face error: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Face registration failed") |
|
|
|
|
| @app.post("/upload_video") |
| async def upload_video(file: UploadFile = File(...), _: None = Depends(require_operator)): |
| """Upload a video file to be used as a camera source.""" |
| try: |
| upload_dir, file_path = _safe_upload_path(file.filename) |
| with open(file_path, "wb") as f: |
| f.write(_read_limited_bytes(await file.read(MAX_VIDEO_BYTES + 1), MAX_VIDEO_BYTES, "Video")) |
| |
| rel_name = os.path.basename(file_path) |
| rel_url = f"/files/uploads/{rel_name}" |
| logger.info("Video uploaded: %s", rel_url) |
| return {"success": True, "file_path": rel_url, "url": rel_url} |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"upload_video error: {e}") |
| raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Video upload failed") |
|
|
|
|
| |
| |
| |
|
|
| WS_HEARTBEAT_INTERVAL_S = 20.0 |
| WS_RECEIVE_TIMEOUT_S = float(os.getenv("CEPHEUS_WS_RECEIVE_TIMEOUT", "300") or "300") |
|
|
|
|
| async def _ws_send_heartbeat(websocket: WebSocket) -> None: |
| """Keep HF/nginx proxy from killing idle WebSocket connections.""" |
| try: |
| while True: |
| await asyncio.sleep(WS_HEARTBEAT_INTERVAL_S) |
| await manager.send_personal_message( |
| json.dumps({"type": "ping", "ts": time.time()}), |
| websocket, |
| ) |
| except Exception: |
| pass |
|
|
|
|
| def _vision_set_camera_for_client(client_id: str, cam_id: str, index) -> bool: |
| if hasattr(vision_engine, "set_camera_for_client"): |
| return vision_engine.set_camera_for_client(client_id, cam_id, index) |
| return vision_engine.set_camera(cam_id, index) |
|
|
|
|
| def _vision_release_camera_for_client(client_id: str, cam_id: str) -> None: |
| if hasattr(vision_engine, "release_camera_for_client"): |
| vision_engine.release_camera_for_client(client_id, cam_id) |
| else: |
| vision_engine.release_camera(cam_id) |
|
|
|
|
| def _vision_release_client_cameras(client_id: str) -> None: |
| if hasattr(vision_engine, "release_client_cameras"): |
| vision_engine.release_client_cameras(client_id) |
|
|
| _active_track_frames: set[str] = set() |
|
|
|
|
| def _process_frame_sync(frame_bytes: bytes) -> dict: |
| """ |
| Runs inside thread pool. Pure sync. No asyncio. No app.prepare(). |
| """ |
| import cv2 |
| import numpy as np |
| nparr = np.frombuffer(frame_bytes, np.uint8) |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if frame is None: |
| return {"type": "face_results", "faces": []} |
|
|
| |
| |
| matches = vision_engine.face_engine.match_all_faces(frame) |
|
|
| faces_payload = [] |
| for m in matches: |
| faces_payload.append({ |
| "name": m.get("name", "Unknown"), |
| "confidence": round(float(m.get("confidence", 0.0)), 3), |
| "bbox": m.get("bbox", [0, 0, 0, 0]), |
| "found": bool(m.get("found", True)) |
| }) |
| return {"type": "face_results", "faces": faces_payload} |
|
|
|
|
| @app.websocket("/ws") |
| async def websocket_endpoint(websocket: WebSocket): |
| if not _ws_auth_ok(websocket): |
| await websocket.close(code=1008, reason="Unauthorized") |
| return |
| ws_principal = _ws_principal(websocket) |
| client_id = str(uuid.uuid4()) |
| vision_session.register_client(client_id) |
| await manager.connect(websocket) |
| heartbeat_task = asyncio.create_task(_ws_send_heartbeat(websocket)) |
|
|
| |
| await manager.send_personal_message(json.dumps({ |
| "type": "signage_init", |
| "all": signage_state, |
| }), websocket) |
|
|
| try: |
| while True: |
| try: |
| data = await asyncio.wait_for( |
| websocket.receive_text(), |
| timeout=WS_RECEIVE_TIMEOUT_S, |
| ) |
| except asyncio.TimeoutError: |
| logger.debug("WS receive timeout (no client message) — keeping connection %s", client_id[:8]) |
| continue |
| try: |
| msg = json.loads(data) |
| mtype = msg.get("type") |
|
|
| if mtype == "pong": |
| continue |
|
|
| if mtype == "ping": |
| await manager.send_personal_message( |
| json.dumps({"type": "pong", "ts": msg.get("ts", time.time())}), |
| websocket, |
| ) |
| continue |
|
|
| if mtype != "track_frame": |
| logger.info(f"Received from client: {data}") |
|
|
| if mtype == "track_frame": |
| cam_id = msg.get("cam_id") |
| image_b64 = msg.get("image") |
| if cam_id and image_b64: |
| if cam_id in _active_track_frames: |
| continue |
| _active_track_frames.add(cam_id) |
| loop = asyncio.get_event_loop() |
|
|
| async def _process_track_frame(cid: str, img: str) -> None: |
| try: |
| await _run_face_work(vision_engine.process_browser_frame, cid, img) |
| finally: |
| _active_track_frames.discard(cid) |
|
|
| _spawn(_process_track_frame(cam_id, image_b64)) |
| continue |
|
|
| if mtype == "frame": |
| frame_b64 = msg.get("data", "") |
| if not frame_b64: |
| continue |
| |
| try: |
| import base64 |
| encoded = frame_b64.split(",")[1] if "," in frame_b64 else frame_b64 |
| frame_bytes = base64.b64decode(encoded) |
| |
| loop = asyncio.get_running_loop() |
| result = await loop.run_in_executor( |
| _FACE_EXECUTOR, |
| _process_frame_sync, |
| frame_bytes |
| ) |
| await manager.send_personal_message(json.dumps(result), websocket) |
| except Exception as exc: |
| logger.error("Frame processing error: %s", exc) |
| continue |
|
|
| if mtype == "select_camera": |
| if not _ws_can_mutate(ws_principal): |
| await manager.send_personal_message(json.dumps({ |
| "type": "error", |
| "msg": "Insufficient permissions for camera control", |
| }), websocket) |
| continue |
| cam_id = msg.get("cam_id") |
| index = msg.get("index") |
| if cam_id is not None and index is not None: |
| ok_claim, claim_reason = vision_session.claim_camera(client_id, cam_id, index) |
| if not ok_claim: |
| await manager.send_personal_message(json.dumps({ |
| "type": "camera_ack", |
| "cam_id": cam_id, |
| "index": index, |
| "success": False, |
| "reason": claim_reason, |
| }), websocket) |
| continue |
| if claim_reason == "duplicate_ignored": |
| await manager.send_personal_message(json.dumps({ |
| "type": "camera_ack", |
| "cam_id": cam_id, |
| "index": index, |
| "success": True, |
| "duplicate": True, |
| }), websocket) |
| continue |
| async def _open_camera(ws, cid, idx, owner_id): |
| try: |
| loop = asyncio.get_event_loop() |
| success = await loop.run_in_executor( |
| _CAMERA_EXECUTOR, _vision_set_camera_for_client, owner_id, cid, idx |
| ) |
| await manager.send_personal_message(json.dumps({ |
| "type": "camera_ack", |
| "cam_id": cid, |
| "index": idx, |
| "success": success, |
| }), ws) |
| log_msg = f"Camera {cid} → index {idx}: {'OK' if success else 'FAILED'}" |
| logger.info(log_msg) |
| except Exception as e: |
| logger.error(f"Camera open task error for {cid}: {e}") |
|
|
| _spawn(_open_camera(websocket, cam_id, index, client_id)) |
|
|
| elif mtype == "close_camera": |
| if not _ws_can_mutate(ws_principal): |
| await manager.send_personal_message(json.dumps({ |
| "type": "error", |
| "msg": "Insufficient permissions for camera control", |
| }), websocket) |
| continue |
| cam_id = msg.get("cam_id") |
| if cam_id: |
| _vision_release_camera_for_client(client_id, cam_id) |
| logger.info(f"Released camera {cam_id} via WS request (client {client_id[:8]})") |
| await manager.send_personal_message(json.dumps({ |
| "type": "camera_ack", |
| "cam_id": cam_id, |
| "index": None, |
| "success": True, |
| }), websocket) |
|
|
| elif mtype in ("set_signage", "toggle_signage"): |
| if not _ws_can_mutate(ws_principal): |
| await manager.send_personal_message(json.dumps({ |
| "type": "error", |
| "msg": "Insufficient permissions for signage control", |
| }), websocket) |
| continue |
| sign_id = msg.get("id") |
| if sign_id: |
| if "active" in msg: |
| await _broadcast_signage_state(sign_id, bool(msg["active"])) |
| else: |
| current = signage_state.get(sign_id, False) |
| await _broadcast_signage_state(sign_id, not current) |
|
|
| elif mtype == "sos": |
| try: |
| sos_limiter.check_ws(websocket, "sos") |
| except HTTPException: |
| await manager.send_personal_message(json.dumps({ |
| "type": "error", |
| "msg": "SOS rate limit exceeded", |
| }), websocket) |
| continue |
| event = { |
| "id": str(uuid.uuid4()), |
| "guest_id": msg.get("guest_id", "unknown"), |
| "lat": msg.get("lat", 0), |
| "lng": msg.get("lng", 0), |
| "location_label": msg.get("location", "Unknown"), |
| "message": msg.get("message", "SOS"), |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| async with store_locks.sos_lock: |
| sos_events.append(event) |
| _trim_memory_list(sos_events, 100) |
| save_sos() |
| alert = Alert( |
| id=event["id"], |
| type="sos", |
| location=event["location_label"], |
| message=event["message"], |
| severity="critical", |
| lat=event["lat"], |
| lng=event["lng"], |
| ) |
| await _process_new_alert( |
| alert, |
| source="GUEST_APP", |
| extra_context={"guest_id": event["guest_id"]}, |
| sos_mode=True, |
| ) |
| await manager.broadcast(json.dumps({"type": "sos_event", **event})) |
|
|
| except Exception as e: |
| logger.error(f"Error handling WS message: {e}") |
| try: |
| await manager.send_personal_message( |
| json.dumps({"type": "error", "msg": "Request failed"}), |
| websocket, |
| ) |
| except Exception: |
| pass |
|
|
| except WebSocketDisconnect: |
| logger.info("Client disconnected.") |
| except Exception as e: |
| logger.error(f"WS error: {e}") |
| finally: |
| heartbeat_task.cancel() |
| try: |
| await heartbeat_task |
| except asyncio.CancelledError: |
| pass |
| _vision_release_client_cameras(client_id) |
| vision_session.release_client(client_id) |
| manager.disconnect(websocket) |
|
|
|
|
| |
| |
| |
|
|
| def _decode_copilot_image_data(image_data: str) -> tuple[bytes | None, str]: |
| """Decode a data-URL or raw base64 image from the agent copilot UI.""" |
| if not image_data or not isinstance(image_data, str): |
| return None, "image/jpeg" |
| data = image_data.strip() |
| mime = "image/jpeg" |
| if data.startswith("data:"): |
| header, _, payload = data.partition(",") |
| if ";" in header: |
| mime = header[5:].split(";")[0].strip() or mime |
| data = payload |
| try: |
| return base64.b64decode(data, validate=False), mime |
| except Exception: |
| return None, mime |
|
|
|
|
| async def _vision_context_from_copilot_image(image_bytes: bytes) -> dict | None: |
| """Run missing-person vision search on a copilot-attached image.""" |
| if not image_bytes: |
| return None |
| try: |
| nparr = np.frombuffer(image_bytes, np.uint8) |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if frame is None: |
| return None |
| result = await _run_face_work(vision_engine.search_missing_person, frame) |
| return dict(result) if result else None |
| except Exception as exc: |
| logger.warning("Copilot image vision search failed: %s", exc) |
| return None |
|
|
|
|
| @app.websocket("/ws/agents") |
| async def agents_ws(websocket: WebSocket): |
| """Streaming WebSocket for the ADK multi-agent orchestrator.""" |
| if not _ws_auth_ok(websocket): |
| await websocket.close(code=1008, reason="Unauthorized") |
| return |
| try: |
| await manager.connect_agent(websocket) |
| except Exception as exc: |
| logger.warning("Agent WS accept failed: %s", exc) |
| try: |
| await websocket.close(code=1011, reason="Agent socket unavailable") |
| except Exception: |
| pass |
| return |
| try: |
| await manager.send_personal_message( |
| json.dumps({"type": "connected", "msg": "agent_ws_ready"}), |
| websocket, |
| ) |
| except Exception: |
| pass |
| ws_principal = _ws_principal(websocket) |
| ws_user_id = _operator_id(ws_principal) |
| heartbeat_task = asyncio.create_task(_ws_send_heartbeat(websocket)) |
| try: |
| while True: |
| try: |
| raw = await asyncio.wait_for( |
| websocket.receive_text(), |
| timeout=WS_RECEIVE_TIMEOUT_S, |
| ) |
| except asyncio.TimeoutError: |
| logger.debug("Agents WS receive timeout — keeping connection") |
| continue |
| try: |
| msg = json.loads(raw) |
| except Exception: |
| await manager.send_personal_message(json.dumps({"type": "error", "msg": "Invalid JSON"}), websocket) |
| continue |
|
|
| if msg.get("type") == "pong": |
| continue |
|
|
| if msg.get("type") == "ping": |
| await manager.send_personal_message( |
| json.dumps({"type": "pong", "ts": msg.get("ts", time.time())}), |
| websocket, |
| ) |
| continue |
|
|
| if msg.get("type") != "agent_query": |
| continue |
|
|
| prompt = (msg.get("prompt") or "").strip() |
| raw_image_data = msg.get("image_data") |
| image_bytes, image_mime = _decode_copilot_image_data(raw_image_data) if raw_image_data else (None, "image/jpeg") |
| if raw_image_data and image_bytes is None: |
| await manager.send_personal_message( |
| json.dumps({"type": "error", "msg": "Could not decode attached image"}), |
| websocket, |
| ) |
| continue |
|
|
| if image_bytes and not prompt: |
| prompt = "Analyze the attached image and report tactical findings." |
| elif image_bytes: |
| vision_ctx = await _vision_context_from_copilot_image(image_bytes) |
| if vision_ctx: |
| prompt = ( |
| f"{prompt}\n\n[Vision engine analysis of attached image: " |
| f"{json.dumps(vision_ctx)}]" |
| ) |
| else: |
| prompt = f"{prompt}\n\n[User attached an image for visual analysis.]" |
|
|
| if not prompt: |
| await manager.send_personal_message( |
| json.dumps({"type": "error", "msg": "Prompt or image required"}), |
| websocket, |
| ) |
| continue |
|
|
| session_id = msg.get("session_id", "default") |
| agent_override = (msg.get("agent_override") or "").strip() or None |
|
|
| |
| agentic_orchestrator.inject_context( |
| alerts=alerts_db, |
| sos_events=sos_events, |
| vision_engine=vision_engine, |
| rooms=[], |
| detections_history=detections_history, |
| issues=issues_db, |
| incident_logs=incident_logs, |
| signage_state=signage_state, |
| ) |
|
|
| try: |
| async for step in agentic_orchestrator.run_agent_stream( |
| prompt, |
| session_id, |
| user_id=ws_user_id, |
| agent_override=agent_override, |
| image_data=image_bytes, |
| image_mime=image_mime, |
| ): |
| await _append_agent_step(step, ws_user_id, session_id) |
| await manager.send_personal_message(json.dumps({"type": "agent_step", **step}), websocket) |
| save_issues() |
| await manager.send_personal_message(json.dumps({"type": "agent_done"}), websocket) |
| except Exception as exc: |
| logger.error(f"Agent stream error: {exc}") |
| await manager.send_personal_message( |
| json.dumps({"type": "agent_step", "agent": "System", "content": "Agent request failed", "step_type": "error"}), |
| websocket, |
| ) |
| await manager.send_personal_message(json.dumps({"type": "agent_done"}), websocket) |
|
|
| except WebSocketDisconnect: |
| logger.info("Agent client disconnected.") |
| except Exception as e: |
| logger.error(f"Agents WS error: {e}") |
| finally: |
| heartbeat_task.cancel() |
| try: |
| await heartbeat_task |
| except asyncio.CancelledError: |
| pass |
| manager.disconnect(websocket) |
|
|
|
|
| |
| |
| |
|
|
| @app.websocket("/ws/vision") |
| async def vision_ws(websocket: WebSocket): |
| """Legacy single-frame endpoint — prefer browser track_frame + /ws camera_broadcast.""" |
| if not _ws_auth_ok(websocket): |
| await websocket.close(code=1008, reason="Unauthorized") |
| return |
| logger.warning("Client connected to deprecated /ws/vision — migrate to track_frame flow") |
| await websocket.accept() |
| try: |
| while True: |
| data = await websocket.receive_text() |
| count, frame_b64 = vision_engine.process_frame(data) |
| await websocket.send_text(json.dumps({"count": count, "frame": frame_b64})) |
| except WebSocketDisconnect: |
| pass |
| except Exception as e: |
| logger.error(f"Vision WS error: {e}") |
|
|
|
|
| |
| |
| |
|
|
| async def background_vision_task(): |
| """Read from all cameras every ~100ms, broadcast frames + AI results.""" |
| loop = asyncio.get_event_loop() |
| while True: |
| try: |
| active_results, new_events = await loop.run_in_executor(None, vision_engine.get_active_frames) |
|
|
| |
| for evt in new_events: |
| alert = Alert(**evt) |
| |
| _spawn(_process_new_alert(alert, source="VISION_ENGINE_AI")) |
|
|
| def to_b64(frame) -> str | None: |
| if frame is None: |
| return None |
| _, buf = cv2.imencode(".jpg", frame, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) |
| return f"data:image/jpeg;base64,{base64.b64encode(buf).decode()}" |
|
|
| total_count = sum(count for count, _ in active_results.values()) |
| vision_engine.last_crowd_count = total_count |
| vision_engine.last_total_count = total_count |
| prev_crowd = getattr(vision_engine, "_last_broadcast_crowd", None) |
| if prev_crowd != total_count: |
| vision_engine._last_broadcast_crowd = total_count |
| await manager.broadcast(json.dumps({ |
| "type": "crowd_update", |
| "total": total_count, |
| "timestamp": datetime.datetime.now().isoformat(), |
| })) |
|
|
| cameras_payload = {} |
| for cam_id, (count, annotated) in active_results.items(): |
| face_hits = vision_engine.face_results.get(cam_id, []) |
| cameras_payload[cam_id] = { |
| "count": count, |
| "frame": to_b64(annotated), |
| "faces": [ |
| {k: v for k, v in (face or {}).items() if k != "embedding"} |
| for face in face_hits |
| ], |
| } |
| |
| |
| async with store_locks.detections_lock: |
| for f in face_hits: |
| if f["name"] != "Unknown": |
| _upsert_detection_sighting( |
| f["name"], |
| confidence=float(f.get("confidence", 0.0)), |
| cam_id=cam_id, |
| thumbnail=f.get("thumbnail"), |
| ) |
| global _detections_dirty |
| _detections_dirty = True |
|
|
| global _last_detection_flush |
| now = datetime.datetime.now() |
| if _detections_dirty and (now - _last_detection_flush).total_seconds() >= DETECTION_SAVE_INTERVAL_SECONDS: |
| save_detections() |
| _detections_dirty = False |
| _last_detection_flush = now |
|
|
| frame_detection_updates: list[dict] = [] |
| for cam_id, (count, _) in active_results.items(): |
| for f in vision_engine.face_results.get(cam_id, []): |
| if f.get("name") and f["name"] != "Unknown": |
| frame_detection_updates.append( |
| next( |
| (d for d in detections_history if d.get("name") == f["name"]), |
| { |
| "name": f["name"], |
| "camId": cam_id, |
| "confidence": f.get("confidence"), |
| "seen_at": datetime.datetime.now(datetime.timezone.utc).isoformat(), |
| }, |
| ) |
| ) |
|
|
| broadcast_data = { |
| "type": "camera_broadcast", |
| "total_count": total_count, |
| "cameras": cameras_payload, |
| "active_cam_count": len(active_results), |
| "detection_updates": frame_detection_updates, |
| "timestamp": datetime.datetime.now().isoformat(), |
| } |
| await manager.broadcast(json.dumps(broadcast_data)) |
|
|
| except Exception as e: |
| logger.error(f"Background vision task error: {e}") |
|
|
| await asyncio.sleep(0.1) |
|
|
|
|
| def _schedule_coro(coro) -> None: |
| """Schedule a coroutine on the main event loop from any thread (agent tools run in a worker thread).""" |
| loop = getattr(app.state, "loop", None) |
| if loop is None: |
| try: |
| _spawn(coro) |
| except RuntimeError: |
| logger.warning("No event loop available to schedule agent broadcast") |
| return |
| try: |
| asyncio.run_coroutine_threadsafe(coro, loop) |
| except Exception as exc: |
| logger.warning("Failed to schedule agent broadcast: %s", exc) |
|
|
|
|
| def _on_agent_issue_created(issue: dict) -> None: |
| """Persist and broadcast issues created/updated by the agent orchestrator.""" |
| save_issues() |
| _schedule_coro(manager.broadcast(json.dumps({"type": "issue_update", "issue": issue}))) |
|
|
|
|
| def _on_agent_ai_model_changed(model_id: str, enabled: bool, all_status: dict) -> None: |
| """Broadcast vision AI model changes triggered by the agent so the UI stays in sync.""" |
| _schedule_coro(manager.broadcast(json.dumps({ |
| "type": "ai_model_update", |
| "model_id": model_id, |
| "enabled": enabled, |
| "all": all_status, |
| }))) |
|
|
|
|
| def _agent_broadcast_alert(alert_type: str, message: str, location: str, severity: str) -> dict: |
| """Agent action: raise a real alert through the standard alert pipeline.""" |
| alert = Alert( |
| type=alert_type or "info", |
| location=location or "", |
| message=message or "", |
| severity=severity or "high", |
| ) |
| _schedule_coro(_process_new_alert(alert, source="AGENT")) |
| return alert.model_dump() |
|
|
|
|
| def _agent_set_signage(signage_id: str, active: bool) -> None: |
| """Agent action: toggle a digital signage panel.""" |
| _schedule_coro(_broadcast_signage_state(signage_id, bool(active))) |
|
|
|
|
| def _agent_update_issue(issue: dict) -> None: |
| """Agent action: persist + broadcast an issue the agent mutated in place.""" |
| save_issues() |
| _schedule_coro(manager.broadcast(json.dumps({"type": "issue_update", "issue": issue}))) |
|
|
|
|
| _warmload_in_progress_lock = asyncio.Lock() |
| _warmload_in_progress = False |
|
|
| async def _safe_warmload_models() -> dict | None: |
| global _warmload_in_progress |
| if _warmload_in_progress: |
| logger.info("Warmload already in progress — skipping concurrent call.") |
| return None |
| _warmload_in_progress = True |
| try: |
| loop = asyncio.get_event_loop() |
| return await loop.run_in_executor(_FACE_EXECUTOR, vision_engine.warmload_models) |
| finally: |
| _warmload_in_progress = False |
|
|
| async def _run_vision_warmload() -> None: |
| """Background model warmload — idempotent, safe to call from startup or /health/live.""" |
| global _warmload_complete |
| if not mark_warmload_started(): |
| return |
| accel = detect_acceleration() |
| logger.info( |
| "Vision warmload starting (full_vision=%s, provider=%s, cuda=%s).", |
| use_full_vision_engine(), |
| accel.get("provider"), |
| accel.get("cuda_available"), |
| ) |
| loop = asyncio.get_event_loop() |
| try: |
| result = await _safe_warmload_models() |
| if result is None: |
| return |
| mark_warmload_complete(result) |
| _warmload_complete = True |
| logger.info("[Startup] Vision warmload complete") |
| except Exception as exc: |
| mark_warmload_failed(str(exc)) |
| logger.error("Vision warmload failed: %s", exc) |
|
|
|
|
| async def _keep_warm_loop() -> None: |
| """CRITICAL: sleep FIRST — never run warmload at t=0.""" |
| if not use_full_vision_engine(): |
| return |
| interval = max(60, int(os.getenv("CEPHEUS_KEEP_WARM_SEC", "120"))) |
| |
| |
| while not _warmload_complete: |
| await asyncio.sleep(5) |
| |
| |
| await asyncio.sleep(interval) |
| |
| logger.info("Keep-warm loop started (interval=%ss).", interval) |
| while True: |
| try: |
| await _safe_warmload_models() |
| except Exception as exc: |
| logger.debug("Keep-warm tick failed: %s", exc) |
| await asyncio.sleep(interval) |
|
|
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| app.state.loop = asyncio.get_running_loop() |
| security_config.validate_startup() |
| auth_service.validate_production_users() |
| load_all_data() |
| auth_service.init_refresh_store() |
| if os.getenv("CEPHEUS_PRODUCTION", "").strip() == "1": |
| if not os.getenv("CEPHEUS_JWT_SECRET", "").strip(): |
| logger.critical("CEPHEUS_PRODUCTION=1 but CEPHEUS_JWT_SECRET is missing") |
| if os.getenv("CEPHEUS_AUTH_DEV_MODE", "").strip() == "1": |
| logger.critical("CEPHEUS_AUTH_DEV_MODE must be 0 in production") |
| logger.info("Application started. Persistent data loaded. Refresh store: %s", auth_service.refresh_store_backend()) |
| logger.info("Face inference pool: %s workers (OMP_NUM_THREADS=%s)", _FACE_WORKERS, os.getenv("OMP_NUM_THREADS", "default")) |
| |
| await _run_vision_warmload() |
| _spawn(_keep_warm_loop()) |
| if not CEPHEUS_CLOUD: |
| _spawn(background_vision_task()) |
| if os.getenv("CEPHEUS_GOSSIP_AUTO_START", "1").strip().lower() not in ("0", "false", "no", "off"): |
| gossip_bridge.start_tracking() |
| gossip_root = os.getenv("CEPHEUS_GOSSIP_ROOT", "").strip() |
| enrolled_names: list[str] = [] |
| if os.path.exists(_FACE_DB_PATH): |
| for person_name in os.listdir(_FACE_DB_PATH): |
| if person_name.startswith("unknown_"): |
| continue |
| person_dir = os.path.join(_FACE_DB_PATH, person_name) |
| if os.path.isdir(person_dir): |
| gossip_bridge.seed_known_person(person_name) |
| enrolled_names.append(person_name) |
| if gossip_root: |
| gossip_bridge.set_root_person(gossip_root) |
| elif enrolled_names: |
| gossip_bridge.set_root_person(sorted(enrolled_names)[0]) |
| if CEPHEUS_CLOUD and use_full_vision_engine(): |
| accel = detect_acceleration() |
| mode = ( |
| "cloud full vision (GPU, no local camera broadcast loop)" |
| if accel.get("cuda_available") |
| else "cloud full vision (CPU, no local camera broadcast loop)" |
| ) |
| elif CEPHEUS_CLOUD: |
| mode = "cloud stub (no ML / cameras)" |
| else: |
| mode = "full vision (local)" |
| logger.info("Gossip bridge ready. Root: %s. Mode: %s", gossip_bridge.get_gossip_json().get("root_person"), mode) |
| |
| agentic_orchestrator.inject_context( |
| alerts=alerts_db, |
| sos_events=sos_events, |
| vision_engine=vision_engine, |
| rooms=[], |
| detections_history=detections_history, |
| issues=issues_db, |
| incident_logs=incident_logs, |
| signage_state=signage_state, |
| ) |
| if hasattr(agentic_orchestrator, "set_issue_created_callback"): |
| agentic_orchestrator.set_issue_created_callback(_on_agent_issue_created) |
| if hasattr(agentic_orchestrator, "set_ai_model_changed_callback"): |
| agentic_orchestrator.set_ai_model_changed_callback(_on_agent_ai_model_changed) |
| if hasattr(agentic_orchestrator, "set_action_handlers"): |
| agentic_orchestrator.set_action_handlers( |
| broadcast_alert=_agent_broadcast_alert, |
| set_signage=_agent_set_signage, |
| update_issue=_agent_update_issue, |
| ) |
| logger.info("Vision AI defaults: %s", vision_engine.get_ai_status()) |
| _orch_mode = "ADK multi-agent" if getattr(agentic_orchestrator, "_ADK_AVAILABLE", False) else "native google.genai" |
| logger.info("Agentic orchestrator ready (%s).", _orch_mode) |
|
|
|
|
| if __name__ == "__main__": |
| import uvicorn |
| _port = int(os.environ.get("PORT", os.environ.get("UVICORN_PORT", "8000"))) |
| uvicorn.run(app, host="0.0.0.0", port=_port) |
|
|