| """ |
| agentic_orchestrator.py |
| Cepheus Multi-Agent System — Google ADK 1.32.0 |
| |
| Agents: |
| AlertResponseAgent, LocationAgent, PlanningAgent, EmergencyDispatchAgent, |
| MissingPersonAgent, GossipGraphAgent, OrchestratorAgent |
| |
| Usage: |
| from agentic_orchestrator import inject_context, run_agent_stream |
| |
| """ |
| from __future__ import annotations |
| import base64 |
| import requests |
| from geopy.distance import geodesic |
| import asyncio |
| import logging |
| import os |
| import time |
| import sys |
| import uuid |
| import json |
| import datetime |
| import re |
| import threading |
| from typing import AsyncGenerator, Any |
|
|
| |
| _FR_DIR = os.path.join(os.path.dirname(__file__), "Face_Recognition") |
| if _FR_DIR not in sys.path: |
| sys.path.insert(0, _FR_DIR) |
|
|
| import gossip_bridge |
| from gemini_config import generate_with_fallback, get_model, is_rate_limit |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| |
| |
| try: |
| from google.adk.agents import Agent |
| from google.adk.runners import Runner |
| from google.adk.sessions import InMemorySessionService |
| _ADK_AVAILABLE = True |
| except Exception as _adk_err: |
| logger.warning("google-adk unavailable (%s) — using native google.genai orchestrator.", _adk_err) |
| _ADK_AVAILABLE = False |
|
|
| class Agent: |
| def __init__(self, **kwargs): |
| self.__dict__.update(kwargs) |
|
|
| class Runner: |
| def __init__(self, **kwargs): |
| self.__dict__.update(kwargs) |
|
|
| class InMemorySessionService: |
| async def get_session(self, **_kwargs): |
| return None |
|
|
| async def create_session(self, **_kwargs): |
| return None |
|
|
| try: |
| from google.genai import types as genai_types |
| except Exception: |
| genai_types = None |
|
|
| |
| _ctx: dict[str, Any] = { |
| "alerts": [], |
| "sos_events": [], |
| "vision_engine": None, |
| "rooms": [], |
| "detections_history": [], |
| "issues": [], |
| "incident_logs": [], |
| "signage_state": {}, |
| } |
|
|
| _issue_created_cb = None |
| _ai_model_changed_cb = None |
| _ISSUE_DEDUP_COOLDOWN_SEC = 300 |
| _recent_issue_keys: dict[str, float] = {} |
| _issue_create_lock = threading.Lock() |
| _RAISE_ISSUE_INTENT_PATTERNS = ( |
| r"\braise\s+(an?\s+)?issue\b", |
| r"\bcreate\s+(an?\s+)?issue\b", |
| r"\bopen\s+(an?\s+)?incident\b", |
| r"\blog\s+(an?\s+)?incident\b", |
| r"\bfile\s+(an?\s+)?(incident|report)\b", |
| r"\bregister\s+(an?\s+)?incident\b", |
| r"\bstart\s+(an?\s+)?issue\b", |
| r"\btrack\s+as\s+(an?\s+)?issue\b", |
| ) |
| |
| |
| |
| |
| _action_handlers: dict[str, Any] = {} |
|
|
|
|
| def set_issue_created_callback(callback): |
| """Register async/sync callback invoked when agent creates an issue (main.py wires broadcast).""" |
| global _issue_created_cb |
| _issue_created_cb = callback |
|
|
|
|
| def set_ai_model_changed_callback(callback): |
| """Register callback invoked when the agent toggles a vision AI model (main.py wires broadcast).""" |
| global _ai_model_changed_cb |
| _ai_model_changed_cb = callback |
|
|
|
|
| def set_action_handlers(**handlers): |
| """Register high-level platform action handlers from main.py (broadcast_alert, set_signage, update_issue).""" |
| _action_handlers.update({k: v for k, v in handlers.items() if v}) |
|
|
|
|
| def inject_context(alerts, sos_events, vision_engine, rooms, detections_history, |
| issues=None, incident_logs=None, signage_state=None): |
| """Called by main.py to give agents access to live platform data.""" |
| _ctx["alerts"] = alerts |
| _ctx["sos_events"] = sos_events |
| _ctx["vision_engine"] = vision_engine |
| if rooms: |
| _ctx["rooms"] = rooms |
| elif vision_engine is not None and hasattr(vision_engine, "room_stats"): |
| _ctx["rooms"] = list(getattr(vision_engine, "room_stats", {}).values()) |
| else: |
| _ctx["rooms"] = [] |
| _ctx["detections_history"] = detections_history |
| if issues is not None: |
| _ctx["issues"] = issues |
| if incident_logs is not None: |
| _ctx["incident_logs"] = incident_logs |
| if signage_state is not None: |
| _ctx["signage_state"] = signage_state |
|
|
|
|
| def prompt_requests_issue_creation(prompt: str) -> bool: |
| """True only when the operator explicitly asked to raise/create/log an issue.""" |
| text = (prompt or "").strip().lower() |
| if not text: |
| return False |
| return any(re.search(pat, text) for pat in _RAISE_ISSUE_INTENT_PATTERNS) |
|
|
|
|
| |
| |
| |
|
|
| def get_current_alerts() -> dict: |
| """Retrieve the 10 most recent active crisis alerts from the system.""" |
| closed = {"RESOLVED", "CLOSED", "CANCELLED", "DISMISSED", "CLEARED"} |
| raw = _ctx["alerts"][-50:] |
| alerts = [] |
| for a in raw: |
| if hasattr(a, "model_dump"): |
| item = a.model_dump() |
| elif isinstance(a, dict): |
| item = a |
| else: |
| continue |
| status = str(item.get("status", "")).upper() |
| if status and status in closed: |
| continue |
| alerts.append(item) |
| if len(alerts) >= 10: |
| break |
| return {"count": len(alerts), "alerts": alerts} |
|
|
|
|
| def get_crowd_statistics() -> dict: |
| """Get current crowd statistics: total count and per-room occupancy.""" |
| ve = _ctx["vision_engine"] |
| total = getattr(ve, "last_total_count", 0) if ve else 0 |
| rooms_out = [] |
| for r in _ctx["rooms"]: |
| if isinstance(r, dict): |
| rooms_out.append({"id": r.get("id"), "label": r.get("label"), "occupancy": r.get("occupancy", 0), "capacity": r.get("capacity", 0)}) |
| else: |
| rooms_out.append({"id": getattr(r, "id", ""), "label": getattr(r, "label", ""), "occupancy": getattr(r, "occupancy", 0), "capacity": getattr(r, "capacity", 0)}) |
| return {"total_crowd_count": total, "rooms": rooms_out} |
|
|
|
|
| def get_known_persons() -> dict: |
| """ |
| Return the persisted sighting history (last recorded location per person). |
| |
| WARNING: This is NOT live camera data. Entries may be from earlier in the |
| session or from before a reset. For who is on camera RIGHT NOW, always use |
| get_live_camera_faces or who_is_on_camera_now instead. |
| """ |
| detections = _ctx.get("detections_history", []) |
| return { |
| "count": len(detections), |
| "persons": detections, |
| "data_type": "SIGHTING_HISTORY", |
| "note": "Historical records — NOT live. Use get_live_camera_faces for active detections.", |
| "must_not_use_for_live_presence": True, |
| } |
|
|
|
|
| def format_active_camera_report(feeds: dict) -> str: |
| """Deterministic one-line report derived only from get_active_camera_feeds output.""" |
| count = int(feeds.get("count", 0) or 0) |
| if count <= 0: |
| return "0 active camera feeds." |
| ids = [str(c.get("cam_id", "?")) for c in feeds.get("cameras", [])] |
| return f"{count} active camera feed(s): {', '.join(ids)}." |
|
|
|
|
| def report_active_cameras_from_tool() -> dict: |
| """Authoritative camera count for agents — always mirrors get_active_camera_feeds.""" |
| feeds = get_active_camera_feeds() |
| count = int(feeds.get("count", 0) or 0) |
| return { |
| "count": count, |
| "cameras": feeds.get("cameras", []), |
| "report": format_active_camera_report(feeds), |
| "source_tool": "get_active_camera_feeds", |
| } |
|
|
|
|
| def who_is_on_camera_now() -> dict: |
| """Authoritative live-presence answer — never uses sighting history.""" |
| live = get_live_camera_faces() |
| persons = live.get("live_persons", []) |
| return { |
| "count": len(persons), |
| "persons": persons, |
| "data_type": "LIVE", |
| "source_tool": "get_live_camera_faces", |
| "note": "Live camera detections only — not enrolled roster or sighting history.", |
| } |
|
|
|
|
| def _collect_live_faces() -> tuple[dict[str, list], list[dict]]: |
| """Return (per_camera_faces, flat_live_person_list) from face_results only.""" |
| ve = _ctx.get("vision_engine") |
| per_cam: dict[str, list] = {} |
| live: list[dict] = [] |
| if ve is None or not hasattr(ve, "face_results"): |
| return per_cam, live |
| for cam_id, faces in (ve.face_results or {}).items(): |
| known = [ |
| f for f in (faces or []) |
| if str(f.get("name", "")).lower() not in ("unknown", "unidentified", "none", "") |
| ] |
| if known: |
| per_cam[cam_id] = known |
| for f in known: |
| live.append({ |
| "name": f.get("name"), |
| "confidence": f.get("confidence"), |
| "camera": cam_id, |
| "status": "LIVE_DETECTED", |
| }) |
| return per_cam, live |
|
|
|
|
| def get_live_camera_faces() -> dict: |
| """ |
| Return ONLY persons actively visible on live camera feeds right now. |
| |
| This is the authoritative source for 'how many faces can you see', 'who is |
| on camera now', and any question that asks for live/active (not historical) |
| detections. Each person has status LIVE_DETECTED. |
| """ |
| per_cam, live = _collect_live_faces() |
| feeds = get_active_camera_feeds() |
| return { |
| "total_live_faces": len(live), |
| "live_persons": live, |
| "cameras": {cid: {"face_count": len(faces), "faces": faces} for cid, faces in per_cam.items()}, |
| "active_feeds": feeds.get("cameras", []), |
| "active_feed_count": feeds.get("count", 0), |
| "data_type": "LIVE", |
| } |
|
|
|
|
| def classify_alert_severity(alert_type: str, location: str, crowd_count: int) -> dict: |
| """ |
| Classify the severity and impact of a crisis alert. |
| |
| Args: |
| alert_type: Type of alert e.g. fire, stampede, medical, sos, fight |
| location: Zone or room where the alert originated |
| crowd_count: Estimated number of people in the affected area |
| """ |
| severity_map = {"fire": "CRITICAL", "stampede": "CRITICAL", "sos": "CRITICAL", |
| "medical": "HIGH", "fight": "HIGH", "anomaly": "MEDIUM"} |
| severity = severity_map.get(alert_type.lower(), "MEDIUM") |
| return { |
| "severity": severity, |
| "alert_type": alert_type, |
| "location": location, |
| "evacuation_needed": severity == "CRITICAL", |
| "estimated_affected": crowd_count, |
| "recommended_action": f"Immediate evacuation of {location}" if severity == "CRITICAL" else f"Monitor and assess {location}", |
| } |
|
|
|
|
| OVERPASS_URL = "https://overpass-api.de/api/interpreter" |
| SERVICE_TAGS = { |
| "police": "police", |
| "fire": "fire_station", |
| "ambulance": "hospital" |
| } |
|
|
| |
| STATION_LAT = float(os.getenv("CEPHEUS_STATION_LAT", "12.9716")) |
| STATION_LON = float(os.getenv("CEPHEUS_STATION_LON", "77.5946")) |
|
|
| def locate_nearest_emergency_services( |
| service_type: str, |
| latitude: float = STATION_LAT, |
| longitude: float = STATION_LON, |
| radius_meters: int = 5000 |
| ) -> dict: |
| """ |
| Finds the nearest real emergency service using OpenStreetMap Overpass API. |
| |
| Args: |
| service_type: police | fire | ambulance |
| latitude: Current latitude (e.g. 12.9716 for Bengaluru) |
| longitude: Current longitude (e.g. 77.5946 for Bengaluru) |
| radius_meters: Search radius in meters |
| """ |
| service_type = service_type.lower() |
| if service_type not in SERVICE_TAGS: |
| return {"success": False, "message": "service_type must be police, fire, or ambulance"} |
|
|
| amenity = SERVICE_TAGS[service_type] |
| query = f""" |
| [out:json]; |
| ( |
| node["amenity"="{amenity}"](around:{radius_meters},{latitude},{longitude}); |
| way["amenity"="{amenity}"](around:{radius_meters},{latitude},{longitude}); |
| relation["amenity"="{amenity}"](around:{radius_meters},{latitude},{longitude}); |
| ); |
| out center tags; |
| """ |
| try: |
| response = requests.get(OVERPASS_URL, params={"data": query}, timeout=15) |
| response.raise_for_status() |
| data = response.json() |
| except Exception as e: |
| return {"success": False, "message": f"API Error: {str(e)}"} |
|
|
| if not data.get("elements"): |
| return {"success": False, "message": f"No nearby {service_type} services found"} |
|
|
| nearest = None |
| nearest_distance = float("inf") |
|
|
| for element in data["elements"]: |
| lat = element.get("lat") or element.get("center", {}).get("lat") |
| lon = element.get("lon") or element.get("center", {}).get("lon") |
| if not lat or not lon: |
| continue |
|
|
| distance = geodesic((latitude, longitude), (lat, lon)).km |
| if distance < nearest_distance: |
| nearest_distance = distance |
| nearest = element |
|
|
| if not nearest: |
| return {"success": False, "message": f"No valid {service_type} locations found"} |
|
|
| tags = nearest.get("tags", {}) |
| return { |
| "success": True, |
| "service_type": service_type, |
| "name": tags.get("name", f"Nearest {service_type.title()} Service"), |
| "distance_km": round(nearest_distance, 2), |
| "latitude": nearest.get("lat") or nearest.get("center", {}).get("lat"), |
| "longitude": nearest.get("lon") or nearest.get("center", {}).get("lon"), |
| "address": { |
| "street": tags.get("addr:street"), |
| "city": tags.get("addr:city"), |
| "postcode": tags.get("addr:postcode") |
| }, |
| "phone": tags.get("phone") or tags.get("contact:phone"), |
| "osm_id": nearest.get("id") |
| } |
|
|
|
|
| def simulate_dispatch_call(service_type: str, location: str, message: str, affected_count: int) -> dict: |
| """ |
| Simulate an emergency dispatch coordination call. SIMULATED ONLY — no real calls placed. |
| |
| Args: |
| service_type: One of: police, fire, ambulance |
| location: Crisis location |
| message: Brief description of the emergency |
| affected_count: Number of people affected |
| """ |
| scripts = { |
| "police": f"[DISPATCH] Police Control: Units dispatched to {location}. ETA 3 min. Maintain perimeter.", |
| "fire": f"[DISPATCH] Fire Control: Engine dispatched to {location}. {affected_count} civilians in zone. Clear evac routes. ETA 5 min.", |
| "ambulance": f"[DISPATCH] Medical Control: Ambulance to {location}. Prepare triage at main entrance. ETA 7 min.", |
| } |
| transcript = ( |
| f'OPERATOR: Emergency at {location}. {message}. ~{affected_count} people affected.\n' |
| + scripts.get(service_type.lower(), f"Emergency services alerted for {location}.") |
| ) |
| return { |
| "dispatch_status": "ACKNOWLEDGED", |
| "service": service_type, |
| "location": location, |
| "affected_count": affected_count, |
| "call_transcript": transcript, |
| "eta_minutes": {"police": 3, "fire": 5, "ambulance": 7}.get(service_type.lower(), 5), |
| "simulation": True, |
| "dispatch_mode": "SIMULATION", |
| "note": "SIMULATION ONLY — no real call was placed", |
| } |
|
|
|
|
| def find_nearest_emergency_services( |
| lat: float, |
| lng: float, |
| radius_m: int = 10000, |
| top_n: int = 3, |
| ) -> dict: |
| """ |
| Google Maps: nearest hospitals, fire stations, police, ambulances, and supply stores |
| with traffic-aware ETAs. Prefer this over locate_nearest_emergency_services when |
| GOOGLE_MAPS_API_KEY is configured. |
| """ |
| from emergency_maps_service import find_nearest_services |
|
|
| return find_nearest_services(lat, lng, radius_m=radius_m, top_n=top_n) |
|
|
|
|
| def get_traffic_aware_route( |
| origin_lat: float, |
| origin_lng: float, |
| dest_lat: float, |
| dest_lng: float, |
| place_name: str = "", |
| ) -> dict: |
| """Google Maps driving route with live traffic ETA and turn-by-turn steps.""" |
| from emergency_maps_service import get_directions_with_traffic |
|
|
| return get_directions_with_traffic(origin_lat, origin_lng, dest_lat, dest_lng, place_name) |
|
|
|
|
| def recommend_emergency_dispatch( |
| lat: float, |
| lng: float, |
| emergency_type: str, |
| severity: str = "medium", |
| ) -> dict: |
| """ |
| Rank nearest services by emergency type and traffic-aware ETA. |
| emergency_type: fire | medical | security | crowd | general |
| """ |
| from emergency_maps_service import recommend_emergency_dispatch as _recommend |
|
|
| return _recommend(lat, lng, emergency_type, severity=severity) |
|
|
|
|
| def generate_evacuation_plan(location: str, affected_count: int, alert_type: str) -> dict: |
| """ |
| Generate a tactical evacuation and responder assignment plan. |
| |
| Args: |
| location: Affected zone name |
| affected_count: Number of people to evacuate |
| alert_type: Type of emergency e.g. fire, stampede |
| """ |
| routes = { |
| "PLATFORM 1": ["Exit via North Gate (Gate A)", "Overflow to Concourse via Stairwell B"], |
| "PLATFORM 2": ["Exit via South Gate (Gate D)", "Overflow to Concourse via Stairwell C"], |
| "CONCOURSE": ["Exit via Main Hall Gate", "Secondary exit via East Corridor"], |
| } |
| evac_routes = routes.get(location.upper(), ["Evacuate via nearest marked exit", "Assemble at muster point"]) |
| return { |
| "plan_type": f"{alert_type.upper()} RESPONSE", |
| "affected_zone": location, |
| "affected_count": affected_count, |
| "evacuation_routes": evac_routes, |
| "assembly_point": "External plaza / Station forecourt", |
| "responder_assignments": [ |
| {"role": "Crowd Control", "count": max(2, affected_count // 50), "zone": location}, |
| {"role": "Medical Standby", "count": 2, "zone": "Assembly Point"}, |
| {"role": "Communication Officer","count": 1, "zone": "Control Room"}, |
| ], |
| "estimated_clearance_minutes": max(3, affected_count // 100 * 2), |
| "verified": False, |
| "note": "TEMPLATE PLAN — verify routes and staffing with on-site staff before execution.", |
| } |
|
|
|
|
| def _name_matches(query: str, candidate: str) -> bool: |
| """Token-aware name match — avoids substring false positives (ISSUE-042).""" |
| q = (query or "").strip().lower() |
| n = (candidate or "").strip().lower() |
| if not q or not n: |
| return False |
| if q == n: |
| return True |
| if re.search(rf"\b{re.escape(q)}\b", n): |
| return True |
| if re.search(rf"\b{re.escape(n)}\b", q): |
| return True |
| return False |
|
|
|
|
| def _enrolled_face_names() -> list[str]: |
| """Return the list of enrolled (known) person names from the face database.""" |
| ve = _ctx.get("vision_engine") |
| fe = getattr(ve, "face_engine", None) if ve else None |
| names: set[str] = set() |
| if fe is not None: |
| try: |
| if hasattr(fe, "reload_db"): |
| fe.reload_db() |
| db = getattr(fe, "db", {}) or {} |
| for key in db.keys(): |
| if not str(key).startswith("unknown_"): |
| names.add(str(key)) |
| except Exception as exc: |
| logger.warning("enrolled face lookup failed: %s", exc) |
| return sorted(names) |
|
|
|
|
| def list_enrolled_faces() -> dict: |
| """ |
| List every person enrolled in the facial-recognition database. |
| |
| Use this to answer questions like "is <name> in the database?" before |
| searching the live camera feeds. Returns the authoritative enrolled roster. |
| """ |
| names = _enrolled_face_names() |
| return {"count": len(names), "enrolled_persons": names} |
|
|
|
|
| def search_person_in_camera_feeds(person_name: str) -> dict: |
| """ |
| Search for a specific person across the facial-recognition system. |
| |
| This performs a real, deterministic lookup against three sources: |
| 1. The enrolled face database (is this person known to the system?) |
| 2. Live per-camera face-recognition results (currently visible on a feed?) |
| 3. The recent detection history (recently sighted on a feed?) |
| |
| Args: |
| person_name: Full or partial name of the person to locate |
| |
| Returns a structured result. `found` is True only when the person is |
| LIVE_DETECTED on a camera feed right now — never for historical sightings. |
| """ |
| query = (person_name or "").strip() |
|
|
| enrolled = _enrolled_face_names() |
| is_enrolled = any(_name_matches(query, name) for name in enrolled) |
| enrolled_match = next((name for name in enrolled if _name_matches(query, name)), None) |
|
|
| |
| ve = _ctx.get("vision_engine") |
| live_hits = [] |
| if ve is not None and hasattr(ve, "face_results"): |
| try: |
| for cam_id, faces in (ve.face_results or {}).items(): |
| for face in faces or []: |
| if _name_matches(query, str(face.get("name", ""))): |
| live_hits.append({ |
| "camera": cam_id, |
| "confidence": round(float(face.get("confidence", 0.0)), 3), |
| }) |
| except Exception as exc: |
| logger.warning("live face_results scan failed: %s", exc) |
|
|
| |
| detections = _ctx.get("detections_history", []) |
| history_matches = [d for d in detections if _name_matches(query, str(d.get("name", "")))] |
|
|
| facial_ai_on = bool(getattr(ve, "ai_models", {}).get("facial")) if ve else False |
|
|
| |
| profile_pic = None |
| if is_enrolled and enrolled_match: |
| try: |
| from main import _FACE_DB_PATH |
| person_dir = os.path.join(_FACE_DB_PATH, enrolled_match) |
| if os.path.isdir(person_dir): |
| for filename in os.listdir(person_dir): |
| if filename.lower().endswith((".jpg", ".jpeg", ".png")) and "occluded" not in filename.lower(): |
| profile_pic = f"/files/face/{enrolled_match}/{filename}" |
| break |
| except Exception as exc: |
| logger.debug("Failed to locate profile photo: %s", exc) |
|
|
| if live_hits: |
| best = max(live_hits, key=lambda h: h["confidence"]) |
| |
| |
| thumbnail = None |
| for d in detections: |
| if _name_matches(query, str(d.get("name", ""))) and d.get("thumbnail"): |
| thumbnail = d.get("thumbnail") |
| break |
| |
| evidence = "Matched on live camera face-recognition feed." |
| if thumbnail: |
| evidence += f" Camera Sighting: " |
| if profile_pic: |
| evidence += f" Profile Photo: " |
| |
| return { |
| "found": True, |
| "person": query, |
| "status": "LIVE_DETECTED", |
| "is_enrolled": is_enrolled, |
| "enrolled_as": enrolled_match, |
| "last_seen_camera": best["camera"], |
| "confidence": best["confidence"], |
| "live_cameras": [h["camera"] for h in live_hits], |
| "facial_ai_active": facial_ai_on, |
| "evidence": evidence, |
| } |
|
|
| if history_matches: |
| latest = history_matches[-1] |
| thumbnail = latest.get("thumbnail") |
| |
| evidence = "Matched in sighting history only — not currently on a live feed." |
| if thumbnail: |
| evidence += f" Last Camera Sighting: " |
| if profile_pic: |
| evidence += f" Profile Photo: " |
| |
| return { |
| "found": False, |
| "person": query, |
| "status": "RECENTLY_SEEN", |
| "is_enrolled": is_enrolled, |
| "enrolled_as": enrolled_match, |
| "last_seen_camera": latest.get("camId") or latest.get("location") or "unknown", |
| "last_seen_at": latest.get("seen_at") or latest.get("timestamp", "unknown"), |
| "confidence": round(float(latest.get("confidence", 0.0)), 3), |
| "total_sightings": len(history_matches), |
| "facial_ai_active": facial_ai_on, |
| "evidence": evidence, |
| } |
|
|
| evidence = ( |
| f"'{query}' is enrolled in the database but has no live detection or recent sighting on any camera." |
| if is_enrolled |
| else f"'{query}' is not enrolled in the face database and was not detected on any camera." |
| ) |
| if profile_pic: |
| evidence += f" Profile Photo: " |
|
|
| return { |
| "found": False, |
| "person": query, |
| "status": "NOT_FOUND", |
| "is_enrolled": is_enrolled, |
| "enrolled_as": enrolled_match, |
| "facial_ai_active": facial_ai_on, |
| "evidence": evidence, |
| "recommendation": ( |
| "Person is known but not currently visible. Keep facial AI active and monitor feeds." |
| if is_enrolled |
| else "Enroll this person via Face Database, or verify the name/spelling." |
| ), |
| } |
|
|
|
|
| def set_ai_model_state(model_id: str, enabled: bool) -> dict: |
| """ |
| Turn a vision AI model on or off across all camera feeds. |
| |
| Args: |
| model_id: One of: facial, crowd, anomaly, medical, fight |
| enabled: True to activate, False to deactivate |
| |
| Use this to satisfy commands like "turn on facial detection". Facial |
| recognition is on by default and should stay on unless explicitly disabled. |
| """ |
| ve = _ctx.get("vision_engine") |
| if ve is None: |
| return {"success": False, "message": "Vision engine not available"} |
| model_id = (model_id or "").strip().lower() |
| valid = set(getattr(ve, "ai_models", {}).keys()) or {"facial", "crowd", "anomaly", "medical", "fight"} |
| if model_id not in valid: |
| return {"success": False, "message": f"Unknown model '{model_id}'. Valid: {sorted(valid)}"} |
| try: |
| if hasattr(ve, "set_ai_model"): |
| ve.set_ai_model(model_id, bool(enabled)) |
| else: |
| ve.ai_models[model_id] = bool(enabled) |
| status_now = ve.get_ai_status() if hasattr(ve, "get_ai_status") else dict(getattr(ve, "ai_models", {})) |
| if _ai_model_changed_cb: |
| try: |
| _ai_model_changed_cb(model_id, bool(enabled), status_now) |
| except Exception as exc: |
| logger.warning("ai_model_changed callback failed: %s", exc) |
| return {"success": True, "model_id": model_id, "enabled": bool(enabled), "ai_status": status_now} |
| except Exception as exc: |
| return {"success": False, "message": str(exc)} |
|
|
|
|
| def switch_camera_source(cam_id: str, hardware_index: int) -> dict: |
| """ |
| Switch a specific logical camera (e.g. cam-01) to a different hardware source index. |
| |
| Args: |
| cam_id: Logical ID like 'cam-01' or 'cam-02' |
| hardware_index: Integer index of the physical camera (0, 1, 2, etc.) |
| """ |
| ve = _ctx.get("vision_engine") |
| if not ve: |
| return {"success": False, "msg": "Vision Engine not available"} |
| try: |
| success = ve.set_camera(cam_id, hardware_index) |
| return {"success": success, "cam_id": cam_id, "new_index": hardware_index} |
| except Exception as e: |
| return {"success": False, "msg": str(e)} |
|
|
|
|
| def close_camera_stream(cam_id: str) -> dict: |
| """ |
| Release and close a specific camera stream to save resources. |
| |
| Args: |
| cam_id: Logical ID like 'cam-01' or 'cam-02' |
| """ |
| ve = _ctx.get("vision_engine") |
| if not ve: |
| return {"success": False, "msg": "Vision Engine not available"} |
| try: |
| ve.release_camera(cam_id) |
| return {"success": True, "cam_id": cam_id, "status": "closed"} |
| except Exception as e: |
| return {"success": False, "msg": str(e)} |
|
|
|
|
| def get_interaction_graph(person_name: str) -> dict: |
| """ |
| Retrieve the social interaction and co-presence graph for a specific person. |
| |
| Args: |
| person_name: Name of the person to build the interaction graph for |
| """ |
| try: |
| gossip_bridge.set_root_person(person_name) |
| data = gossip_bridge.get_gossip_json(person_name) |
| contacts = data.get("contacts", {}) |
| return { |
| "person": person_name, |
| "total_contacts": data.get("total_interactions", 0), |
| "total_people_in_network": data.get("total_people", 0), |
| "level_1_direct_contacts": [c["name"] for c in contacts.get("level_1", [])], |
| "level_2_indirect_contacts": [c["name"] for c in contacts.get("level_2", [])], |
| "level_3_extended_network": [c["name"] for c in contacts.get("level_3", [])], |
| "graph_nodes": len(data.get("nodes", [])), |
| "graph_links": len(data.get("links", [])), |
| "is_tracking_active": data.get("is_tracking", False), |
| } |
| except Exception as exc: |
| return {"person": person_name, "error": str(exc), "status": "GRAPH_UNAVAILABLE"} |
|
|
|
|
| def get_active_camera_feeds() -> dict: |
| """Get the status and AI model configuration of all active camera feeds.""" |
| ve = _ctx.get("vision_engine") |
| if not ve: |
| return {"cameras": [], "count": 0, "ai_status": {}, "error": "Vision engine not available"} |
| cameras = [] |
| seen: set[str] = set() |
| browser_feeds = getattr(ve, "browser_feeds", {}) or {} |
| camera_indices = getattr(ve, "camera_indices", {}) or {} |
|
|
| def _append_camera(cid: str, *, index: int = 0, source: str, last_seen=None) -> None: |
| if not cid or cid in seen: |
| return |
| seen.add(cid) |
| cameras.append({ |
| "cam_id": cid, |
| "index": index, |
| "status": "ACTIVE", |
| "source": source, |
| "last_seen": last_seen, |
| }) |
|
|
| for cid, idx in camera_indices.items(): |
| meta = browser_feeds.get(cid, {}) |
| _append_camera( |
| cid, |
| index=idx, |
| source=meta.get("source", "device"), |
| last_seen=meta.get("last_seen"), |
| ) |
| for cid, meta in browser_feeds.items(): |
| _append_camera( |
| cid, |
| index=meta.get("index", 0), |
| source="browser", |
| last_seen=meta.get("last_seen"), |
| ) |
| |
| for cid in getattr(ve, "active_cameras", {}) or {}: |
| meta = browser_feeds.get(cid, {}) |
| _append_camera( |
| cid, |
| index=camera_indices.get(cid, 0), |
| source=meta.get("source", "device"), |
| last_seen=meta.get("last_seen"), |
| ) |
| |
| for cid, faces in (getattr(ve, "face_results", None) or {}).items(): |
| if not faces: |
| continue |
| meta = browser_feeds.get(cid, {}) |
| _append_camera( |
| cid, |
| index=camera_indices.get(cid, meta.get("index", 0)), |
| source=meta.get("source", "live_faces"), |
| last_seen=meta.get("last_seen"), |
| ) |
| per_cam, _ = _collect_live_faces() |
| for cam in cameras: |
| cid = cam["cam_id"] |
| cam["live_faces"] = [f.get("name") for f in per_cam.get(cid, [])] |
| cam["live_face_count"] = len(per_cam.get(cid, [])) |
| ai_status = ve.get_ai_status() if hasattr(ve, "get_ai_status") else {} |
| count = len(cameras) |
| payload = {"cameras": cameras, "count": count, "ai_status": ai_status} |
| payload["report"] = format_active_camera_report(payload) |
| return payload |
|
|
|
|
| def _normalize_issue_subject(title: str, description: str, metadata: dict | None) -> str: |
| """Extract a canonical person/subject key for deduplication.""" |
| meta = metadata or {} |
| for key in ("person_name", "subject", "name"): |
| val = meta.get(key) |
| if val and str(val).strip(): |
| return str(val).strip().upper() |
| blob = f"{title or ''} {description or ''}" |
| for pat in ( |
| r"Missing Person(?:\s+Database)?\s+Match:\s*([A-Za-z0-9_\-\s]+)", |
| r"Missing Person:\s*([A-Za-z0-9_\-\s]+?)(?:\s*\(|$|:)", |
| r"Database Match:\s*([A-Za-z0-9_\-\s]+)", |
| r"Missing Person Found:\s*([A-Za-z0-9_\-\s]+)", |
| r"Person Located:\s*([A-Za-z0-9_\-\s]+)", |
| ): |
| m = re.search(pat, blob, re.I) |
| if m: |
| return m.group(1).split("(")[0].strip().upper() |
| return re.sub(r"\s+", " ", (title or "general").strip().upper())[:48] |
|
|
|
|
| def _issue_category_key(title: str) -> str: |
| t = (title or "").lower() |
| if "pending" in t and "verification" in t: |
| return "pending_verification" |
| if "database match" in t or "missing person" in t or "missing person found" in t: |
| return "missing_person_match" |
| return "general" |
|
|
|
|
| def _is_issue_active(issue: dict) -> bool: |
| closed = {"RESOLVED", "CLOSED", "CANCELLED"} |
| status = str(issue.get("status", "")).upper() |
| progress = int(issue.get("progress", 0) or 0) |
| return status not in closed and progress < 100 |
|
|
|
|
| def create_tactical_issue(title: str, description: str, priority: str = "MEDIUM", metadata_json: str = "{}") -> dict: |
| """ |
| Create a new tactical incident report or 'issue' in the management system. |
| Use this for missing persons found, unauthorized access, or resolved crises. |
| |
| Args: |
| title: Short title of the issue |
| description: Detailed report |
| priority: LOW | MEDIUM | HIGH | CRITICAL |
| metadata_json: Optional JSON string containing keys like 'camera', 'timestamp', 'person_name' |
| """ |
| issues = _ctx.get("issues") |
| if issues is None: |
| return {"success": False, "message": "Issue database not connected"} |
|
|
| if not _ctx.get("allow_issue_creation"): |
| return { |
| "success": False, |
| "blocked": True, |
| "message": ( |
| "Issue creation blocked — the operator did not explicitly ask to raise an issue. " |
| "Report the search result only, or ask the operator to click 'Raise issue' or " |
| "say 'raise an issue for [name]'." |
| ), |
| } |
|
|
| try: |
| metadata = json.loads(metadata_json) if metadata_json else {} |
| except Exception: |
| metadata = {} |
|
|
| subject = _normalize_issue_subject(title, description, metadata) |
| category = _issue_category_key(title) |
| dedup_key = f"{subject}|{category}" |
|
|
| |
| merge_categories = {"missing_person_match", "pending_verification"} |
|
|
| with _issue_create_lock: |
| for iss in issues: |
| if not _is_issue_active(iss): |
| continue |
| existing_subject = _normalize_issue_subject( |
| iss.get("title", ""), iss.get("desc", ""), iss.get("metadata") or {}, |
| ) |
| existing_cat = _issue_category_key(iss.get("title", "")) |
| same_person = existing_subject == subject |
| same_type = existing_cat == category or ( |
| same_person and existing_cat in merge_categories and category in merge_categories |
| ) |
| if not (same_person and same_type): |
| continue |
|
|
| iss_meta = iss.setdefault("metadata", {}) |
| if metadata.get("confidence") is not None: |
| iss_meta["confidence"] = metadata["confidence"] |
| if metadata.get("camera"): |
| iss_meta["camera"] = metadata["camera"] |
| if metadata.get("person_name"): |
| iss_meta["person_name"] = metadata["person_name"] |
| conf = metadata.get("confidence") |
| stamp = datetime.datetime.now().isoformat() |
| update_line = ( |
| f"[UPDATE {stamp}] Confidence: {conf}" |
| if conf is not None |
| else f"[UPDATE {stamp}] {description[:180]}" |
| ) |
| iss["desc"] = f"{iss.get('desc', '')}\n{update_line}".strip() |
| if _issue_created_cb: |
| try: |
| _issue_created_cb(iss) |
| except Exception as exc: |
| logger.error("issue_created callback failed: %s", exc) |
| return { |
| "success": True, |
| "issue": iss, |
| "deduplicated": True, |
| "message": f"Updated existing ongoing issue for {subject}.", |
| } |
|
|
| now = time.time() |
| last_created = _recent_issue_keys.get(dedup_key, 0) |
| if now - last_created < _ISSUE_DEDUP_COOLDOWN_SEC: |
| return { |
| "success": False, |
| "deduplicated": True, |
| "message": ( |
| f"Cooldown active for {subject} ({category}). " |
| f"Wait {_ISSUE_DEDUP_COOLDOWN_SEC // 60} minutes before creating another." |
| ), |
| } |
|
|
| new_issue = { |
| "id": f"ISS-{uuid.uuid4().hex[:6].upper()}", |
| "title": title, |
| "desc": description, |
| "status": "ONGOING", |
| "progress": 0, |
| "priority": priority, |
| "staff": 1, |
| "metadata": metadata, |
| "timestamp": datetime.datetime.now().isoformat() |
| } |
| issues.insert(0, new_issue) |
| _recent_issue_keys[dedup_key] = now |
| if _issue_created_cb: |
| try: |
| _issue_created_cb(new_issue) |
| except Exception as exc: |
| logger.error("issue_created callback failed: %s", exc) |
| return {"success": True, "issue": new_issue} |
|
|
|
|
| def dispatch_personnel_to_camera(camera_id: str, count: int = 1) -> dict: |
| """ |
| Simulate assigning tactical staff or security personnel to a specific camera location. |
| Use this to lock down an area once a missing person is detected. |
| |
| Args: |
| camera_id: ID of the camera (e.g. 'cam-01') |
| count: Number of personnel to dispatch |
| """ |
| issues = _ctx.get("issues") |
| if not issues: |
| return {"success": False, "message": "No active issues to attach staff to. Create an issue first."} |
|
|
| cam = (camera_id or "").strip().lower() |
| closed = {"RESOLVED", "CLOSED", "CANCELLED"} |
| active_issues = [ |
| iss for iss in issues |
| if str(iss.get("status", "")).upper() not in closed and int(iss.get("progress", 0) or 0) < 100 |
| ] |
| if not active_issues: |
| return {"success": False, "message": "No open issues to attach staff to. Create an issue first."} |
|
|
| |
| target_issue = None |
| for iss in active_issues: |
| meta_cam = str(iss.get("metadata", {}).get("camera", "")).strip().lower() |
| if cam and meta_cam == cam: |
| target_issue = iss |
| break |
| if target_issue is None: |
| cam_token = re.escape(cam) |
| for iss in active_issues: |
| blob = f"{iss.get('title','')} {iss.get('desc','')}".lower() |
| if cam and re.search(rf"\b{cam_token}\b", blob): |
| target_issue = iss |
| break |
| |
| if target_issue is None: |
| return { |
| "success": False, |
| "message": ( |
| f"No open issue linked to camera {camera_id}. " |
| "Call create_tactical_issue first, then dispatch personnel." |
| ), |
| } |
|
|
| target_issue["staff"] = target_issue.get("staff", 0) + count |
| target_issue["desc"] = f"{target_issue.get('desc', '')}\n[UPDATE] Dispatched {count} personnel to sector {camera_id}." |
| if _issue_created_cb: |
| try: |
| _issue_created_cb(target_issue) |
| except Exception as exc: |
| logger.warning("issue update broadcast failed: %s", exc) |
| return { |
| "success": True, |
| "message": f"{count} personnel dispatched to {camera_id}; assigned to issue {target_issue['id']}", |
| "issue_id": target_issue["id"], |
| "simulation": True, |
| "dispatch_mode": "SIMULATION", |
| "note": "SIMULATION ONLY — staff assignment recorded in issue tracker; no external dispatch", |
| } |
|
|
|
|
| def get_active_issues() -> dict: |
| """List open tactical issues/incidents (excludes resolved/closed).""" |
| issues = _ctx.get("issues") or [] |
| closed = {"RESOLVED", "CLOSED", "CANCELLED"} |
| active = [ |
| iss for iss in issues |
| if str(iss.get("status", "")).upper() not in closed and int(iss.get("progress", 0) or 0) < 100 |
| ] |
| out = [] |
| for iss in active[:25]: |
| out.append({ |
| "id": iss.get("id"), |
| "title": iss.get("title"), |
| "status": iss.get("status"), |
| "priority": iss.get("priority"), |
| "progress": iss.get("progress", 0), |
| "staff": iss.get("staff", 0), |
| }) |
| return {"count": len(active), "issues": out} |
|
|
|
|
| def get_sos_events() -> dict: |
| """Get active SOS / panic events raised from the guest app.""" |
| events = _ctx.get("sos_events") or [] |
| out = [] |
| for e in events[-10:]: |
| d = e.model_dump() if hasattr(e, "model_dump") else (e if isinstance(e, dict) else {}) |
| out.append({ |
| "id": d.get("id"), |
| "location": d.get("location_label") or d.get("location"), |
| "message": d.get("message"), |
| "lat": d.get("lat"), |
| "lng": d.get("lng"), |
| "timestamp": d.get("timestamp"), |
| }) |
| return {"count": len(events), "sos_events": out} |
|
|
|
|
| def get_recent_incident_logs(limit: int = 10) -> dict: |
| """ |
| Get the most recent incident log entries (audit trail of platform events). |
| |
| Args: |
| limit: How many recent entries to return (max 30) |
| """ |
| logs = _ctx.get("incident_logs") or [] |
| limit = max(1, min(int(limit or 10), 30)) |
| recent = logs[:limit] if logs else [] |
| return {"count": len(logs), "recent": recent} |
|
|
|
|
| def get_signage_status() -> dict: |
| """Get the current on/off state of every digital evacuation/safety signage panel.""" |
| state = _ctx.get("signage_state") or {} |
| try: |
| return {"count": len(state), "signage": dict(state)} |
| except Exception: |
| return {"count": 0, "signage": {}} |
|
|
|
|
| def get_platform_overview() -> dict: |
| """ |
| Get a single consolidated snapshot of the entire platform state: alert, |
| issue, SOS and tracking counts, crowd total, vision-AI status, and enrolled |
| roster. Use this first to understand the whole situation at a glance. |
| """ |
| ve = _ctx.get("vision_engine") |
| ai_status = ve.get_ai_status() if ve and hasattr(ve, "get_ai_status") else {} |
| _, live_persons = _collect_live_faces() |
| feeds = get_active_camera_feeds() |
| active_issues = get_active_issues() |
| active_alerts = get_current_alerts() |
| return { |
| "active_alerts": active_alerts.get("count", 0), |
| "active_issues": active_issues.get("count", 0), |
| "active_sos": len(_ctx.get("sos_events") or []), |
| "live_faces_now": [p.get("name") for p in live_persons], |
| "live_face_count": len(live_persons), |
| "active_camera_feeds": feeds.get("count", 0), |
| "crowd_total": getattr(ve, "last_total_count", 0) if ve else 0, |
| "vision_ai_status": ai_status, |
| "facial_recognition_active": bool(ai_status.get("facial")), |
| "enrolled_persons": _enrolled_face_names(), |
| "active_signage": [k for k, v in (_ctx.get("signage_state") or {}).items() if v], |
| } |
|
|
|
|
| def broadcast_alert(alert_type: str, message: str, location: str = "", severity: str = "high") -> dict: |
| """ |
| Raise and broadcast a live alert/notification across the platform (dashboard, |
| comms, logs). Use for warnings, evacuations, or operational notices. |
| |
| Args: |
| alert_type: e.g. fire, medical, security, evacuation, info |
| message: Human-readable alert text |
| location: Affected zone/room (optional) |
| severity: critical | high | medium | low | info |
| """ |
| handler = _action_handlers.get("broadcast_alert") |
| if not handler: |
| return {"success": False, "message": "Alert broadcasting not available"} |
| try: |
| result = handler(alert_type, message, location, severity) |
| return {"success": True, "alert": result} |
| except Exception as exc: |
| return {"success": False, "message": str(exc)} |
|
|
|
|
| def set_signage_state(signage_id: str, active: bool) -> dict: |
| """ |
| Activate or deactivate a digital signage panel (e.g. EVACUATE, LOCKDOWN, ALL CLEAR). |
| |
| Args: |
| signage_id: Signage panel ID (e.g. s1, s3, s6, s8) |
| active: True to light it up, False to turn it off |
| """ |
| handler = _action_handlers.get("set_signage") |
| if not handler: |
| return {"success": False, "message": "Signage control not available"} |
| try: |
| handler(signage_id, bool(active)) |
| return {"success": True, "signage_id": signage_id, "active": bool(active)} |
| except Exception as exc: |
| return {"success": False, "message": str(exc)} |
|
|
|
|
| def update_issue(issue_id: str, status: str = "", progress: int = -1, note: str = "") -> dict: |
| """ |
| Update an existing tactical issue: change status, set progress, or append a note. |
| |
| Args: |
| issue_id: The issue ID (e.g. ISS-1A2B3C) |
| status: New status (e.g. ONGOING, RESOLVED) — leave blank to keep |
| progress: 0-100 percent complete — use -1 to keep current |
| note: Optional update note appended to the description |
| """ |
| issues = _ctx.get("issues") or [] |
| target = next((i for i in issues if str(i.get("id")) == str(issue_id)), None) |
| if target is None: |
| return {"success": False, "message": f"Issue {issue_id} not found"} |
| if status: |
| target["status"] = status |
| if progress is not None and progress != -1: |
| try: |
| pct = int(round(float(progress))) |
| if 0 <= pct <= 100: |
| target["progress"] = pct |
| except (TypeError, ValueError): |
| pass |
| if note: |
| target["desc"] = f"{target.get('desc', '')}\n[UPDATE] {note}" |
| handler = _action_handlers.get("update_issue") |
| if handler: |
| try: |
| handler(target) |
| except Exception as exc: |
| logger.warning("update_issue broadcast failed: %s", exc) |
| elif _issue_created_cb: |
| try: |
| _issue_created_cb(target) |
| except Exception: |
| pass |
| return {"success": True, "issue": { |
| "id": target.get("id"), "status": target.get("status"), "progress": target.get("progress"), |
| }} |
|
|
|
|
| |
| _ALL_TOOLS = [ |
| get_current_alerts, |
| get_crowd_statistics, |
| get_known_persons, |
| get_live_camera_faces, |
| who_is_on_camera_now, |
| report_active_cameras_from_tool, |
| classify_alert_severity, |
| locate_nearest_emergency_services, |
| find_nearest_emergency_services, |
| get_traffic_aware_route, |
| recommend_emergency_dispatch, |
| simulate_dispatch_call, |
| generate_evacuation_plan, |
| search_person_in_camera_feeds, |
| switch_camera_source, |
| close_camera_stream, |
| get_interaction_graph, |
| get_active_camera_feeds, |
| create_tactical_issue, |
| dispatch_personnel_to_camera, |
| list_enrolled_faces, |
| set_ai_model_state, |
| get_active_issues, |
| get_sos_events, |
| get_recent_incident_logs, |
| get_signage_status, |
| get_platform_overview, |
| broadcast_alert, |
| set_signage_state, |
| update_issue, |
| ] |
| _TOOL_MAP = {fn.__name__: fn for fn in _ALL_TOOLS} |
|
|
| |
| |
| |
|
|
| _MODEL = get_model("default") |
| _MODEL_PRO = get_model("pro") |
|
|
| _alert_agent = Agent( |
| name="AlertResponseAgent", |
| model=_MODEL, |
| description="Analyzes crisis alerts, classifies severity, and coordinates immediate tactical responses.", |
| instruction=( |
| "You are the AlertResponseAgent for Cepheus. " |
| "When invoked: get current alerts, classify severity, identify affected zones. " |
| "Output a clear tactical analysis with: alert type, severity, affected count, recommended action. " |
| "Be concise and structured." |
| ), |
| tools=[ |
| get_current_alerts, get_crowd_statistics, classify_alert_severity, |
| get_active_issues, get_sos_events, get_recent_incident_logs, |
| get_platform_overview, broadcast_alert, set_signage_state, update_issue, |
| ], |
| ) |
|
|
| _location_agent = Agent( |
| name="LocationAgent", |
| model=_MODEL, |
| description="Tracks person locations, monitors zone occupancy, and identifies nearby responders.", |
| instruction=( |
| "You are the LocationAgent. Track real-time locations of persons and crowd zones. " |
| "Use get_live_camera_faces for who is on camera RIGHT NOW. Use get_known_persons only for " |
| "historical sighting records (never present history as live). Use get_crowd_statistics for zone data. " |
| "Report precise locations, camera IDs, and timestamps." |
| ), |
| tools=[ |
| who_is_on_camera_now, get_live_camera_faces, get_crowd_statistics, |
| get_active_camera_feeds, report_active_cameras_from_tool, |
| list_enrolled_faces, switch_camera_source, close_camera_stream, |
| ], |
| ) |
|
|
| _planning_agent = Agent( |
| name="PlanningAgent", |
| model=_MODEL, |
| description="Generates tactical evacuation strategies, responder assignments, and response timelines.", |
| instruction=( |
| "You are the PlanningAgent. Generate clear, step-by-step crisis response plans. " |
| "Always call generate_evacuation_plan with location, count, and alert type. " |
| "Format output with numbered steps, responder roles, and ETAs." |
| ), |
| tools=[ |
| generate_evacuation_plan, get_crowd_statistics, get_signage_status, |
| broadcast_alert, set_signage_state, |
| ], |
| ) |
|
|
| _dispatch_agent = Agent( |
| name="EmergencyDispatchAgent", |
| model=_MODEL, |
| description="Locates nearest emergency services and simulates dispatch coordination calls.", |
| instruction=( |
| "You are the EmergencyDispatchAgent. " |
| "Step 1: Call find_nearest_emergency_services (Google traffic ETAs) or " |
| "locate_nearest_emergency_services (OSM fallback) to find responders. " |
| "Step 2: Call recommend_emergency_dispatch for ranked dispatch choices. " |
| "Step 3: Call get_traffic_aware_route for turn-by-turn navigation when dispatching. " |
| "Step 4: Call simulate_dispatch_call to coordinate (SIMULATED ONLY). " |
| "IMPORTANT: Always state simulated calls are SIMULATED. " |
| f"Station reference: {STATION_LAT}, {STATION_LON}." |
| ), |
| tools=[ |
| find_nearest_emergency_services, |
| get_traffic_aware_route, |
| recommend_emergency_dispatch, |
| locate_nearest_emergency_services, |
| simulate_dispatch_call, |
| ], |
| ) |
|
|
| _missing_agent = Agent( |
| name="MissingPersonAgent", |
| model=_MODEL, |
| description="Searches for missing or specified persons across all active camera feeds.", |
| instruction=( |
| "You are the MissingPersonAgent. Search for persons using the vision system. " |
| "1. If a name is provided, call search_person_in_camera_feeds with the person's name. " |
| "2. If the user provides an image search result directly, skip step 1 and use the provided result. " |
| "3. ONLY call create_tactical_issue when the operator explicitly asked to raise or log an incident. " |
| " The tool rejects calls unless the prompt contained words like 'raise an issue' — never create " |
| " issues automatically after a face match. Report results only unless explicitly instructed. " |
| " The tool deduplicates automatically — repeated detections update the same ONGOING issue. " |
| " - title: 'Missing Person Database Match: [Name]' " |
| " - description: include confidence and camera when known " |
| " - metadata_json: JSON with person_name, camera, confidence, timestamp " |
| "4. ONLY call dispatch_personnel_to_camera when the operator requested dispatch. " |
| "5. Report found/not-found status, last camera, and confidence to the user. " |
| "6. CRITICAL: If the tool response includes an 'evidence' field with markdown image syntax (e.g., ``), you MUST include that exact markdown string in your final response so the user sees the image." |
| ), |
| tools=[ |
| search_person_in_camera_feeds, get_active_camera_feeds, report_active_cameras_from_tool, |
| who_is_on_camera_now, get_live_camera_faces, list_enrolled_faces, set_ai_model_state, |
| create_tactical_issue, dispatch_personnel_to_camera, |
| ], |
| ) |
|
|
| _gossip_agent = Agent( |
| name="GossipGraphAgent", |
| model=_MODEL, |
| description="Analyzes social interaction and co-presence graphs for contact tracing and intelligence.", |
| instruction=( |
| "You are the GossipGraphAgent. Analyze person interaction networks from camera co-presence data. " |
| "Call get_interaction_graph with the person's name. " |
| "Present: Level 1 (direct contacts), Level 2 (indirect), Level 3 (extended network). " |
| "Highlight strong connections and surveillance implications." |
| ), |
| tools=[get_interaction_graph, get_known_persons], |
| ) |
|
|
| |
| _orchestrator = Agent( |
| name="OrchestratorAgent", |
| model=_MODEL_PRO, |
| description="Master tactical AI coordinator for Cepheus crisis response platform.", |
| instruction=( |
| "You are the OrchestratorAgent — the master coordinator for Cepheus. " |
| "Route queries to the correct specialized agent:\n" |
| "- Crisis alerts/incidents → AlertResponseAgent\n" |
| "- Person locations/tracking → LocationAgent\n" |
| "- Evacuation/response plans → PlanningAgent\n" |
| "- Emergency service dispatch → EmergencyDispatchAgent\n" |
| "- Missing/locate a person → MissingPersonAgent\n" |
| "- Social graph/interactions → GossipGraphAgent\n" |
| "For complex incidents (e.g. fire alert), chain multiple agents: " |
| "AlertResponseAgent → LocationAgent → PlanningAgent → EmergencyDispatchAgent. " |
| "Stream your reasoning step by step. Be decisive and tactical. " |
| "CRITICAL: If any tool or sub-agent returns an image markdown snippet (e.g., ``), you MUST output it exactly as is in your response." |
| ), |
| tools=_ALL_TOOLS, |
| sub_agents=[_alert_agent, _location_agent, _planning_agent, _dispatch_agent, _missing_agent, _gossip_agent], |
| ) |
|
|
| |
| _NAMED_AGENTS: dict[str, Agent] = { |
| "OrchestratorAgent": _orchestrator, |
| "AlertResponseAgent": _alert_agent, |
| "LocationAgent": _location_agent, |
| "PlanningAgent": _planning_agent, |
| "EmergencyDispatchAgent": _dispatch_agent, |
| "MissingPersonAgent": _missing_agent, |
| "GossipGraphAgent": _gossip_agent, |
| } |
|
|
| |
| |
| |
|
|
| _session_service = InMemorySessionService() |
| _adk_session_keys: dict[str, float] = {} |
| _MAX_ADK_SESSIONS_PER_USER = 10 |
| _ADK_SESSION_TTL_SECONDS = 3600 |
|
|
|
|
| def _adk_registry_key(user_id: str, session_id: str) -> str: |
| return f"{user_id}:{session_id}" |
|
|
|
|
| def _touch_adk_session(user_id: str, session_id: str) -> None: |
| _adk_session_keys[_adk_registry_key(user_id, session_id)] = time.time() |
|
|
|
|
| async def prune_adk_sessions(user_id: str, keep_session_id: str | None = None) -> None: |
| """Drop ADK in-memory sessions for an operator (optionally keeping one active session).""" |
| now = time.time() |
| prefix = f"{user_id}:" |
| for key in list(_adk_session_keys.keys()): |
| if not key.startswith(prefix): |
| continue |
| sid = key[len(prefix):] |
| if keep_session_id and sid == keep_session_id: |
| continue |
| stale = (now - _adk_session_keys.get(key, 0)) > _ADK_SESSION_TTL_SECONDS |
| user_count = sum(1 for k in _adk_session_keys if k.startswith(prefix)) |
| if not stale and user_count <= _MAX_ADK_SESSIONS_PER_USER: |
| continue |
| try: |
| if hasattr(_session_service, "delete_session"): |
| await _session_service.delete_session(app_name="cepheus", user_id=user_id, session_id=sid) |
| except Exception as exc: |
| logger.debug("ADK session delete skipped (%s): %s", sid, exc) |
| _adk_session_keys.pop(key, None) |
|
|
|
|
| _RUNNERS = { |
| name: Runner(agent=agent, app_name="cepheus", session_service=_session_service) |
| for name, agent in _NAMED_AGENTS.items() |
| } |
| _runner = _RUNNERS["OrchestratorAgent"] |
|
|
| _NATIVE_SYSTEM_INSTRUCTION = ( |
| "You are Cepheus Tactical Copilot, coordinating emergency management. Operate via tools: " |
| "alerts, crowds, cameras, face-recognition, AI controls, graph, dispatch, issues.\n\n" |
| "RULES:\n" |
| "1. NEVER invent/assume. Use ONLY tool results. If no tool was called, you don't know.\n" |
| "2. Report only real actions taken. create_tactical_issue ONLY if explicitly asked to 'raise/create issue'.\n" |
| "3. Be deterministic.\n\n" |
| "LIVE VS HISTORY:\n" |
| "- get_live_camera_faces: who is on camera RIGHT NOW.\n" |
| "- get_known_persons: HISTORY ONLY.\n" |
| "- search_person_in_camera_feeds: check 'status' (LIVE_DETECTED vs RECENTLY_SEEN).\n\n" |
| "PERSON SEARCH:\n" |
| " a. search_person_in_camera_feeds(name).\n" |
| " b. Evaluate 'found' (LIVE_DETECTED only).\n" |
| " c. If found AND user asked: create_tactical_issue + dispatch_personnel_to_camera.\n" |
| " d. If NOT found: state clearly, NO issue.\n" |
| " e. Report tool analysis and exact action taken.\n\n" |
| "CAPABILITIES:\n" |
| "- Snapshot: get_platform_overview.\n" |
| "- Act: create_tactical_issue, dispatch_personnel_to_camera, broadcast_alert, set_signage_state.\n" |
| "- Evacuate: generate_evacuation_plan.\n" |
| "- Dispatch: locate_nearest_emergency_services → simulate_dispatch_call (STATE SIMULATED).\n" |
| "- Trace: get_interaction_graph.\n" |
| f"Station ref: {STATION_LAT}, {STATION_LON} (override CEPHEUS_STATION_LAT/LON). Be concise/decisive." |
| ) |
|
|
| _MAX_NATIVE_ITERATIONS = 8 |
| _TOOL_RESULT_PREVIEW_LEN = 1200 |
|
|
|
|
| def _collect_agent_tool_names(agent: Agent) -> set[str]: |
| """Return all tool function names exposed by an ADK agent and its sub-agents.""" |
| names = {fn.__name__ for fn in (getattr(agent, "tools", None) or [])} |
| for sub in getattr(agent, "sub_agents", None) or []: |
| names |= _collect_agent_tool_names(sub) |
| return names |
|
|
|
|
| def adk_tool_names() -> set[str]: |
| """Tool names reachable from the OrchestratorAgent ADK tree.""" |
| return _collect_agent_tool_names(_orchestrator) |
|
|
|
|
| def _decode_data_url_image(image_data: str | bytes | None) -> tuple[bytes | None, str]: |
| """Decode a data-URL or raw base64 image payload from the copilot UI.""" |
| if not image_data: |
| return None, "image/jpeg" |
| if isinstance(image_data, bytes): |
| return image_data, "image/jpeg" |
| data = str(image_data).strip() |
| mime = "image/jpeg" |
| if data.startswith("data:"): |
| header, _, payload = data.partition(",") |
| if ";" in header: |
| mime = header[5:].split(";")[0].strip() or mime |
| data = payload |
| try: |
| return base64.b64decode(data, validate=False), mime |
| except Exception: |
| return None, mime |
|
|
|
|
| def _build_user_parts( |
| prompt: str, |
| image_bytes: bytes | None = None, |
| image_mime: str = "image/jpeg", |
| ) -> list: |
| """Build multimodal user parts for Gemini (text + optional inline image).""" |
| if genai_types is None: |
| return [] |
| parts: list = [genai_types.Part(text=prompt)] |
| if image_bytes: |
| parts.append(genai_types.Part(inline_data=genai_types.Blob(mime_type=image_mime, data=image_bytes))) |
| return parts |
|
|
|
|
| def _friendly_model_error(exc: Exception, *, chain_exhausted: bool = True) -> str: |
| text = str(exc) |
| low = text.lower() |
| api_key = os.getenv("GEMINI_API_KEY", "").strip() |
| |
| if ( |
| not is_rate_limit(exc) |
| and ("api key" in low or "api_key_invalid" in low or "permission_denied" in low or "401" in text) |
| ): |
| if api_key: |
| return f"AI model authentication failed despite configured key: {text[:120]}" |
| return "AI model authentication failed — GEMINI_API_KEY is missing or invalid." |
| |
| if is_rate_limit(exc) and chain_exhausted: |
| return ( |
| "All configured Gemini models are temporarily rate-limited (quota reached on every tier). " |
| "Please retry in a moment." |
| ) |
| if is_rate_limit(exc): |
| return f"AI model call failed: temporary overload — {text[:120]}" |
| return f"AI model call failed: {text[:200]}" |
|
|
|
|
| def _step_has_content(step: dict) -> bool: |
| content = str(step.get("content") or "").strip() |
| return bool(content) |
|
|
|
|
| def _make_text_step(agent: str, content: str | None, step_type: str) -> dict | None: |
| """Generation-point filter: do not create steps with empty/whitespace text (Bug C).""" |
| text = str(content or "").strip() |
| if not text: |
| return None |
| return {"agent": agent, "content": text, "step_type": step_type} |
|
|
|
|
| def _normalize_stream_step(step: dict) -> dict | None: |
| """Transport-layer guard for any step dict (tool_call lines always have content).""" |
| if not isinstance(step, dict): |
| return None |
| if not _step_has_content(step): |
| return None |
| return step |
|
|
|
|
| def location_agent_tool_names() -> set[str]: |
| """Tool names exposed to LocationAgent (must not include sighting history for live queries).""" |
| return {fn.__name__ for fn in (_location_agent.tools or [])} |
|
|
|
|
| def agents_must_not_use_known_persons_for_live() -> set[str]: |
| """Agents that answer live-presence questions — get_known_persons is forbidden.""" |
| forbidden = set() |
| for agent in (_location_agent, _missing_agent): |
| names = {fn.__name__ for fn in (agent.tools or [])} |
| if "get_known_persons" in names: |
| forbidden.add(agent.name) |
| return forbidden |
|
|
|
|
| async def _run_native_stream( |
| prompt: str, |
| image_bytes: bytes | None = None, |
| image_mime: str = "image/jpeg", |
| ) -> AsyncGenerator[dict, None]: |
| """Native google.genai function-calling loop (used when ADK is unavailable).""" |
| if genai_types is None: |
| yield {"agent": "System", "content": "google-genai SDK not installed.", "step_type": "error"} |
| return |
|
|
| from google import genai |
|
|
| api_key = os.getenv("GEMINI_API_KEY", "").strip() |
| client = genai.Client(api_key=api_key, http_options={"api_version": "v1beta"}) |
| config = genai_types.GenerateContentConfig( |
| system_instruction=_NATIVE_SYSTEM_INSTRUCTION, |
| tools=_ALL_TOOLS, |
| automatic_function_calling=genai_types.AutomaticFunctionCallingConfig(disable=True), |
| temperature=0.3, |
| max_output_tokens=1200, |
| ) |
| contents = [genai_types.Content(role="user", parts=_build_user_parts(prompt, image_bytes, image_mime))] |
|
|
| async def _generate_with_retry(): |
| return await asyncio.to_thread( |
| generate_with_fallback, |
| client, |
| tier="pro", |
| contents=contents, |
| config=config, |
| ) |
|
|
| for _ in range(_MAX_NATIVE_ITERATIONS): |
| try: |
| response = await _generate_with_retry() |
| except Exception as exc: |
| logger.error("native genai call failed: %s", exc) |
| err_step = { |
| "agent": "System", |
| "content": _friendly_model_error(exc, chain_exhausted=True), |
| "step_type": "error", |
| "genuine_quota_error": bool(is_rate_limit(exc)), |
| } |
| if norm := _normalize_stream_step(err_step): |
| yield norm |
| return |
|
|
| candidate = (response.candidates or [None])[0] |
| if not candidate or not candidate.content or not candidate.content.parts: |
| text = getattr(response, "text", None) |
| if step := _make_text_step("OrchestratorAgent", text, "response"): |
| yield step |
| return |
|
|
| contents.append(candidate.content) |
| function_calls = [p.function_call for p in candidate.content.parts if getattr(p, "function_call", None)] |
| texts = [p.text for p in candidate.content.parts if getattr(p, "text", None)] |
|
|
| if not function_calls: |
| |
| for text in texts: |
| if step := _make_text_step("OrchestratorAgent", text, "response"): |
| yield step |
| if not texts: |
| if norm := _normalize_stream_step({ |
| "agent": "System", |
| "content": "Model returned an empty response.", |
| "step_type": "error", |
| }): |
| yield norm |
| return |
|
|
| |
| for text in texts: |
| if step := _make_text_step("OrchestratorAgent", text, "thinking"): |
| yield step |
|
|
| tool_response_parts = [] |
| for fc in function_calls: |
| args = dict(fc.args or {}) |
| if norm := _normalize_stream_step({ |
| "agent": "OrchestratorAgent", |
| "content": f"Calling `{fc.name}` with args: {args or '()'}", |
| "step_type": "tool_call", |
| }): |
| yield norm |
| fn = _TOOL_MAP.get(fc.name) |
| if fn is None: |
| result = {"error": f"Unknown tool {fc.name}"} |
| else: |
| try: |
| result = await asyncio.to_thread(lambda: fn(**args)) |
| except Exception as exc: |
| logger.error("tool %s failed: %s", fc.name, exc) |
| result = {"error": str(exc)} |
| preview = str(result) |
| if len(preview) > _TOOL_RESULT_PREVIEW_LEN: |
| preview = f"{preview[:_TOOL_RESULT_PREVIEW_LEN]}…" |
| if fc.name == "get_active_camera_feeds" and isinstance(result, dict): |
| result = {**result, "agent_report": format_active_camera_report(result)} |
| if norm := _normalize_stream_step({ |
| "agent": "OrchestratorAgent", |
| "content": f"Tool `{fc.name}` returned: {preview}", |
| "step_type": "tool_result", |
| }): |
| yield norm |
| tool_response_parts.append( |
| genai_types.Part.from_function_response(name=fc.name, response={"result": result}) |
| ) |
| contents.append(genai_types.Content(role="user", parts=tool_response_parts)) |
|
|
| yield { |
| "agent": "OrchestratorAgent", |
| "content": "Reached reasoning step limit. Provide a more specific query if needed.", |
| "step_type": "response", |
| } |
|
|
|
|
| async def run_agent_stream( |
| prompt: str, |
| session_id: str = "default", |
| user_id: str = "operator", |
| agent_override: str | None = None, |
| image_data: str | bytes | None = None, |
| image_mime: str = "image/jpeg", |
| ) -> AsyncGenerator[dict, None]: |
| """ |
| Run the orchestrator with the given prompt and yield streaming step dicts. |
| Each yielded dict: { "agent": str, "content": str, "step_type": str } |
| step_type: "thinking" | "tool_call" | "tool_result" | "response" | "error" |
| """ |
| api_key = os.getenv("GEMINI_API_KEY", "").strip() |
| if not api_key: |
| yield {"agent": "System", "content": "GEMINI_API_KEY not configured.", "step_type": "error"} |
| return |
|
|
| prev_allow = bool(_ctx.get("allow_issue_creation")) |
| _ctx["allow_issue_creation"] = prompt_requests_issue_creation(prompt) |
| try: |
| image_bytes, resolved_mime = _decode_data_url_image(image_data) |
| if image_data and image_bytes is None: |
| yield {"agent": "System", "content": "Could not decode attached image.", "step_type": "error"} |
| return |
| if image_bytes and resolved_mime: |
| image_mime = resolved_mime |
|
|
| if not _ADK_AVAILABLE: |
| yield { |
| "agent": "System", |
| "content": ( |
| "Native orchestrator active (google-adk not installed). " |
| "Tool parity may differ from ADK mode." |
| ), |
| "step_type": "info", |
| } |
| async for step in _run_native_stream(prompt, image_bytes=image_bytes, image_mime=image_mime): |
| yield step |
| return |
|
|
| agent_name = (agent_override or "").strip() or "OrchestratorAgent" |
| runner = _RUNNERS.get(agent_name, _runner) |
|
|
| try: |
| operator_id = (user_id or "operator").strip() or "operator" |
| await prune_adk_sessions(operator_id, keep_session_id=session_id) |
|
|
| session = await _session_service.get_session( |
| app_name="cepheus", user_id=operator_id, session_id=session_id, |
| ) |
| if session is None: |
| session = await _session_service.create_session( |
| app_name="cepheus", user_id=operator_id, session_id=session_id, |
| ) |
| _touch_adk_session(operator_id, session_id) |
|
|
| user_content = genai_types.Content( |
| role="user", |
| parts=_build_user_parts(prompt, image_bytes, image_mime), |
| ) |
|
|
| async for event in runner.run_async( |
| user_id=operator_id, |
| session_id=session.id, |
| new_message=user_content, |
| ): |
| author = getattr(event, "author", agent_name) or agent_name |
|
|
| |
| if not event.content or not event.content.parts: |
| continue |
|
|
| for part in event.content.parts: |
| |
| if hasattr(part, "function_call") and part.function_call: |
| fc = part.function_call |
| args_str = str(dict(fc.args)) if fc.args else "()" |
| if norm := _normalize_stream_step({ |
| "agent": author, |
| "content": f"Calling `{fc.name}` with args: {args_str}", |
| "step_type": "tool_call", |
| }): |
| yield norm |
| |
| elif hasattr(part, "function_response") and part.function_response: |
| fr = part.function_response |
| result_preview = str(fr.response) |
| if len(result_preview) > _TOOL_RESULT_PREVIEW_LEN: |
| result_preview = f"{result_preview[:_TOOL_RESULT_PREVIEW_LEN]}…" |
| if norm := _normalize_stream_step({ |
| "agent": author, |
| "content": f"Tool `{fr.name}` returned: {result_preview}", |
| "step_type": "tool_result", |
| }): |
| yield norm |
| |
| elif hasattr(part, "text") and part.text is not None: |
| step_type = "response" if event.is_final_response() else "thinking" |
| if step := _make_text_step(author, part.text, step_type): |
| yield step |
|
|
| except Exception as exc: |
| logger.error(f"Agent stream error: {exc}") |
| if norm := _normalize_stream_step({ |
| "agent": "System", |
| "content": _friendly_model_error(exc, chain_exhausted=True), |
| "step_type": "error", |
| }): |
| yield norm |
| finally: |
| _ctx["allow_issue_creation"] = prev_allow |
|
|