solution_challenge_backend / backend /agentic_orchestrator.py
github-actions
Deploy to Hugging Face
c794b6b
Raw
History Blame Contribute Delete
73.7 kB
"""
agentic_orchestrator.py
Cepheus Multi-Agent System — Google ADK 1.32.0
Agents:
AlertResponseAgent, LocationAgent, PlanningAgent, EmergencyDispatchAgent,
MissingPersonAgent, GossipGraphAgent, OrchestratorAgent
Usage:
from agentic_orchestrator import inject_context, run_agent_stream
"""
from __future__ import annotations
import base64
import requests
from geopy.distance import geodesic
import asyncio
import logging
import os
import time
import sys
import uuid
import json
import datetime
import re
import threading
from typing import AsyncGenerator, Any
# ── Add Face_Recognition/ to path so gossip_bridge can be imported
_FR_DIR = os.path.join(os.path.dirname(__file__), "Face_Recognition")
if _FR_DIR not in sys.path:
sys.path.insert(0, _FR_DIR)
import gossip_bridge
from gemini_config import generate_with_fallback, get_model, is_rate_limit
logger = logging.getLogger(__name__)
# ── Optional Google ADK ────────────────────────────────────────────────────────
# ADK gives us the full multi-agent orchestration. When it isn't installed we fall
# back to a native google.genai function-calling loop (see _run_native_stream),
# which still exercises every tool below. This keeps the copilot fully functional
# with just google-genai + GEMINI_API_KEY.
try:
from google.adk.agents import Agent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
_ADK_AVAILABLE = True
except Exception as _adk_err: # pragma: no cover - depends on optional dep
logger.warning("google-adk unavailable (%s) — using native google.genai orchestrator.", _adk_err)
_ADK_AVAILABLE = False
class Agent: # minimal placeholder so module-level definitions don't crash
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class Runner:
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
class InMemorySessionService:
async def get_session(self, **_kwargs):
return None
async def create_session(self, **_kwargs):
return None
try:
from google.genai import types as genai_types
except Exception: # pragma: no cover
genai_types = None
# ── Shared live context (injected from main.py at startup) ─────────────────────
_ctx: dict[str, Any] = {
"alerts": [],
"sos_events": [],
"vision_engine": None,
"rooms": [],
"detections_history": [],
"issues": [],
"incident_logs": [],
"signage_state": {},
}
_issue_created_cb = None
_ai_model_changed_cb = None
_ISSUE_DEDUP_COOLDOWN_SEC = 300
_recent_issue_keys: dict[str, float] = {}
_issue_create_lock = threading.Lock()
_RAISE_ISSUE_INTENT_PATTERNS = (
r"\braise\s+(an?\s+)?issue\b",
r"\bcreate\s+(an?\s+)?issue\b",
r"\bopen\s+(an?\s+)?incident\b",
r"\blog\s+(an?\s+)?incident\b",
r"\bfile\s+(an?\s+)?(incident|report)\b",
r"\bregister\s+(an?\s+)?incident\b",
r"\bstart\s+(an?\s+)?issue\b",
r"\btrack\s+as\s+(an?\s+)?issue\b",
)
# High-level platform actions wired by main.py (run on the main event loop):
# broadcast_alert(alert_type, message, location, severity) -> dict
# set_signage(signage_id, active) -> dict
# update_issue(issue) -> None (broadcast an already-mutated issue)
_action_handlers: dict[str, Any] = {}
def set_issue_created_callback(callback):
"""Register async/sync callback invoked when agent creates an issue (main.py wires broadcast)."""
global _issue_created_cb
_issue_created_cb = callback
def set_ai_model_changed_callback(callback):
"""Register callback invoked when the agent toggles a vision AI model (main.py wires broadcast)."""
global _ai_model_changed_cb
_ai_model_changed_cb = callback
def set_action_handlers(**handlers):
"""Register high-level platform action handlers from main.py (broadcast_alert, set_signage, update_issue)."""
_action_handlers.update({k: v for k, v in handlers.items() if v})
def inject_context(alerts, sos_events, vision_engine, rooms, detections_history,
issues=None, incident_logs=None, signage_state=None):
"""Called by main.py to give agents access to live platform data."""
_ctx["alerts"] = alerts
_ctx["sos_events"] = sos_events
_ctx["vision_engine"] = vision_engine
if rooms:
_ctx["rooms"] = rooms
elif vision_engine is not None and hasattr(vision_engine, "room_stats"):
_ctx["rooms"] = list(getattr(vision_engine, "room_stats", {}).values())
else:
_ctx["rooms"] = []
_ctx["detections_history"] = detections_history
if issues is not None:
_ctx["issues"] = issues
if incident_logs is not None:
_ctx["incident_logs"] = incident_logs
if signage_state is not None:
_ctx["signage_state"] = signage_state
def prompt_requests_issue_creation(prompt: str) -> bool:
"""True only when the operator explicitly asked to raise/create/log an issue."""
text = (prompt or "").strip().lower()
if not text:
return False
return any(re.search(pat, text) for pat in _RAISE_ISSUE_INTENT_PATTERNS)
# ════════════════════════════════════════════════════════════════════════════════
# TOOLS (plain Python functions — ADK derives JSON schema from type hints + docstrings)
# ════════════════════════════════════════════════════════════════════════════════
def get_current_alerts() -> dict:
"""Retrieve the 10 most recent active crisis alerts from the system."""
closed = {"RESOLVED", "CLOSED", "CANCELLED", "DISMISSED", "CLEARED"}
raw = _ctx["alerts"][-50:]
alerts = []
for a in raw:
if hasattr(a, "model_dump"):
item = a.model_dump()
elif isinstance(a, dict):
item = a
else:
continue
status = str(item.get("status", "")).upper()
if status and status in closed:
continue
alerts.append(item)
if len(alerts) >= 10:
break
return {"count": len(alerts), "alerts": alerts}
def get_crowd_statistics() -> dict:
"""Get current crowd statistics: total count and per-room occupancy."""
ve = _ctx["vision_engine"]
total = getattr(ve, "last_total_count", 0) if ve else 0
rooms_out = []
for r in _ctx["rooms"]:
if isinstance(r, dict):
rooms_out.append({"id": r.get("id"), "label": r.get("label"), "occupancy": r.get("occupancy", 0), "capacity": r.get("capacity", 0)})
else:
rooms_out.append({"id": getattr(r, "id", ""), "label": getattr(r, "label", ""), "occupancy": getattr(r, "occupancy", 0), "capacity": getattr(r, "capacity", 0)})
return {"total_crowd_count": total, "rooms": rooms_out}
def get_known_persons() -> dict:
"""
Return the persisted sighting history (last recorded location per person).
WARNING: This is NOT live camera data. Entries may be from earlier in the
session or from before a reset. For who is on camera RIGHT NOW, always use
get_live_camera_faces or who_is_on_camera_now instead.
"""
detections = _ctx.get("detections_history", [])
return {
"count": len(detections),
"persons": detections,
"data_type": "SIGHTING_HISTORY",
"note": "Historical records — NOT live. Use get_live_camera_faces for active detections.",
"must_not_use_for_live_presence": True,
}
def format_active_camera_report(feeds: dict) -> str:
"""Deterministic one-line report derived only from get_active_camera_feeds output."""
count = int(feeds.get("count", 0) or 0)
if count <= 0:
return "0 active camera feeds."
ids = [str(c.get("cam_id", "?")) for c in feeds.get("cameras", [])]
return f"{count} active camera feed(s): {', '.join(ids)}."
def report_active_cameras_from_tool() -> dict:
"""Authoritative camera count for agents — always mirrors get_active_camera_feeds."""
feeds = get_active_camera_feeds()
count = int(feeds.get("count", 0) or 0)
return {
"count": count,
"cameras": feeds.get("cameras", []),
"report": format_active_camera_report(feeds),
"source_tool": "get_active_camera_feeds",
}
def who_is_on_camera_now() -> dict:
"""Authoritative live-presence answer — never uses sighting history."""
live = get_live_camera_faces()
persons = live.get("live_persons", [])
return {
"count": len(persons),
"persons": persons,
"data_type": "LIVE",
"source_tool": "get_live_camera_faces",
"note": "Live camera detections only — not enrolled roster or sighting history.",
}
def _collect_live_faces() -> tuple[dict[str, list], list[dict]]:
"""Return (per_camera_faces, flat_live_person_list) from face_results only."""
ve = _ctx.get("vision_engine")
per_cam: dict[str, list] = {}
live: list[dict] = []
if ve is None or not hasattr(ve, "face_results"):
return per_cam, live
for cam_id, faces in (ve.face_results or {}).items():
known = [
f for f in (faces or [])
if str(f.get("name", "")).lower() not in ("unknown", "unidentified", "none", "")
]
if known:
per_cam[cam_id] = known
for f in known:
live.append({
"name": f.get("name"),
"confidence": f.get("confidence"),
"camera": cam_id,
"status": "LIVE_DETECTED",
})
return per_cam, live
def get_live_camera_faces() -> dict:
"""
Return ONLY persons actively visible on live camera feeds right now.
This is the authoritative source for 'how many faces can you see', 'who is
on camera now', and any question that asks for live/active (not historical)
detections. Each person has status LIVE_DETECTED.
"""
per_cam, live = _collect_live_faces()
feeds = get_active_camera_feeds()
return {
"total_live_faces": len(live),
"live_persons": live,
"cameras": {cid: {"face_count": len(faces), "faces": faces} for cid, faces in per_cam.items()},
"active_feeds": feeds.get("cameras", []),
"active_feed_count": feeds.get("count", 0),
"data_type": "LIVE",
}
def classify_alert_severity(alert_type: str, location: str, crowd_count: int) -> dict:
"""
Classify the severity and impact of a crisis alert.
Args:
alert_type: Type of alert e.g. fire, stampede, medical, sos, fight
location: Zone or room where the alert originated
crowd_count: Estimated number of people in the affected area
"""
severity_map = {"fire": "CRITICAL", "stampede": "CRITICAL", "sos": "CRITICAL",
"medical": "HIGH", "fight": "HIGH", "anomaly": "MEDIUM"}
severity = severity_map.get(alert_type.lower(), "MEDIUM")
return {
"severity": severity,
"alert_type": alert_type,
"location": location,
"evacuation_needed": severity == "CRITICAL",
"estimated_affected": crowd_count,
"recommended_action": f"Immediate evacuation of {location}" if severity == "CRITICAL" else f"Monitor and assess {location}",
}
OVERPASS_URL = "https://overpass-api.de/api/interpreter"
SERVICE_TAGS = {
"police": "police",
"fire": "fire_station",
"ambulance": "hospital"
}
# Default station location — override via CEPHEUS_STATION_LAT / CEPHEUS_STATION_LON
STATION_LAT = float(os.getenv("CEPHEUS_STATION_LAT", "12.9716"))
STATION_LON = float(os.getenv("CEPHEUS_STATION_LON", "77.5946"))
def locate_nearest_emergency_services(
service_type: str,
latitude: float = STATION_LAT,
longitude: float = STATION_LON,
radius_meters: int = 5000
) -> dict:
"""
Finds the nearest real emergency service using OpenStreetMap Overpass API.
Args:
service_type: police | fire | ambulance
latitude: Current latitude (e.g. 12.9716 for Bengaluru)
longitude: Current longitude (e.g. 77.5946 for Bengaluru)
radius_meters: Search radius in meters
"""
service_type = service_type.lower()
if service_type not in SERVICE_TAGS:
return {"success": False, "message": "service_type must be police, fire, or ambulance"}
amenity = SERVICE_TAGS[service_type]
query = f"""
[out:json];
(
node["amenity"="{amenity}"](around:{radius_meters},{latitude},{longitude});
way["amenity"="{amenity}"](around:{radius_meters},{latitude},{longitude});
relation["amenity"="{amenity}"](around:{radius_meters},{latitude},{longitude});
);
out center tags;
"""
try:
response = requests.get(OVERPASS_URL, params={"data": query}, timeout=15)
response.raise_for_status()
data = response.json()
except Exception as e:
return {"success": False, "message": f"API Error: {str(e)}"}
if not data.get("elements"):
return {"success": False, "message": f"No nearby {service_type} services found"}
nearest = None
nearest_distance = float("inf")
for element in data["elements"]:
lat = element.get("lat") or element.get("center", {}).get("lat")
lon = element.get("lon") or element.get("center", {}).get("lon")
if not lat or not lon:
continue
distance = geodesic((latitude, longitude), (lat, lon)).km
if distance < nearest_distance:
nearest_distance = distance
nearest = element
if not nearest:
return {"success": False, "message": f"No valid {service_type} locations found"}
tags = nearest.get("tags", {})
return {
"success": True,
"service_type": service_type,
"name": tags.get("name", f"Nearest {service_type.title()} Service"),
"distance_km": round(nearest_distance, 2),
"latitude": nearest.get("lat") or nearest.get("center", {}).get("lat"),
"longitude": nearest.get("lon") or nearest.get("center", {}).get("lon"),
"address": {
"street": tags.get("addr:street"),
"city": tags.get("addr:city"),
"postcode": tags.get("addr:postcode")
},
"phone": tags.get("phone") or tags.get("contact:phone"),
"osm_id": nearest.get("id")
}
def simulate_dispatch_call(service_type: str, location: str, message: str, affected_count: int) -> dict:
"""
Simulate an emergency dispatch coordination call. SIMULATED ONLY — no real calls placed.
Args:
service_type: One of: police, fire, ambulance
location: Crisis location
message: Brief description of the emergency
affected_count: Number of people affected
"""
scripts = {
"police": f"[DISPATCH] Police Control: Units dispatched to {location}. ETA 3 min. Maintain perimeter.",
"fire": f"[DISPATCH] Fire Control: Engine dispatched to {location}. {affected_count} civilians in zone. Clear evac routes. ETA 5 min.",
"ambulance": f"[DISPATCH] Medical Control: Ambulance to {location}. Prepare triage at main entrance. ETA 7 min.",
}
transcript = (
f'OPERATOR: Emergency at {location}. {message}. ~{affected_count} people affected.\n'
+ scripts.get(service_type.lower(), f"Emergency services alerted for {location}.")
)
return {
"dispatch_status": "ACKNOWLEDGED",
"service": service_type,
"location": location,
"affected_count": affected_count,
"call_transcript": transcript,
"eta_minutes": {"police": 3, "fire": 5, "ambulance": 7}.get(service_type.lower(), 5),
"simulation": True,
"dispatch_mode": "SIMULATION",
"note": "SIMULATION ONLY — no real call was placed",
}
def find_nearest_emergency_services(
lat: float,
lng: float,
radius_m: int = 10000,
top_n: int = 3,
) -> dict:
"""
Google Maps: nearest hospitals, fire stations, police, ambulances, and supply stores
with traffic-aware ETAs. Prefer this over locate_nearest_emergency_services when
GOOGLE_MAPS_API_KEY is configured.
"""
from emergency_maps_service import find_nearest_services
return find_nearest_services(lat, lng, radius_m=radius_m, top_n=top_n)
def get_traffic_aware_route(
origin_lat: float,
origin_lng: float,
dest_lat: float,
dest_lng: float,
place_name: str = "",
) -> dict:
"""Google Maps driving route with live traffic ETA and turn-by-turn steps."""
from emergency_maps_service import get_directions_with_traffic
return get_directions_with_traffic(origin_lat, origin_lng, dest_lat, dest_lng, place_name)
def recommend_emergency_dispatch(
lat: float,
lng: float,
emergency_type: str,
severity: str = "medium",
) -> dict:
"""
Rank nearest services by emergency type and traffic-aware ETA.
emergency_type: fire | medical | security | crowd | general
"""
from emergency_maps_service import recommend_emergency_dispatch as _recommend
return _recommend(lat, lng, emergency_type, severity=severity)
def generate_evacuation_plan(location: str, affected_count: int, alert_type: str) -> dict:
"""
Generate a tactical evacuation and responder assignment plan.
Args:
location: Affected zone name
affected_count: Number of people to evacuate
alert_type: Type of emergency e.g. fire, stampede
"""
routes = {
"PLATFORM 1": ["Exit via North Gate (Gate A)", "Overflow to Concourse via Stairwell B"],
"PLATFORM 2": ["Exit via South Gate (Gate D)", "Overflow to Concourse via Stairwell C"],
"CONCOURSE": ["Exit via Main Hall Gate", "Secondary exit via East Corridor"],
}
evac_routes = routes.get(location.upper(), ["Evacuate via nearest marked exit", "Assemble at muster point"])
return {
"plan_type": f"{alert_type.upper()} RESPONSE",
"affected_zone": location,
"affected_count": affected_count,
"evacuation_routes": evac_routes,
"assembly_point": "External plaza / Station forecourt",
"responder_assignments": [
{"role": "Crowd Control", "count": max(2, affected_count // 50), "zone": location},
{"role": "Medical Standby", "count": 2, "zone": "Assembly Point"},
{"role": "Communication Officer","count": 1, "zone": "Control Room"},
],
"estimated_clearance_minutes": max(3, affected_count // 100 * 2),
"verified": False,
"note": "TEMPLATE PLAN — verify routes and staffing with on-site staff before execution.",
}
def _name_matches(query: str, candidate: str) -> bool:
"""Token-aware name match — avoids substring false positives (ISSUE-042)."""
q = (query or "").strip().lower()
n = (candidate or "").strip().lower()
if not q or not n:
return False
if q == n:
return True
if re.search(rf"\b{re.escape(q)}\b", n):
return True
if re.search(rf"\b{re.escape(n)}\b", q):
return True
return False
def _enrolled_face_names() -> list[str]:
"""Return the list of enrolled (known) person names from the face database."""
ve = _ctx.get("vision_engine")
fe = getattr(ve, "face_engine", None) if ve else None
names: set[str] = set()
if fe is not None:
try:
if hasattr(fe, "reload_db"):
fe.reload_db()
db = getattr(fe, "db", {}) or {}
for key in db.keys():
if not str(key).startswith("unknown_"):
names.add(str(key))
except Exception as exc: # pragma: no cover - defensive
logger.warning("enrolled face lookup failed: %s", exc)
return sorted(names)
def list_enrolled_faces() -> dict:
"""
List every person enrolled in the facial-recognition database.
Use this to answer questions like "is <name> in the database?" before
searching the live camera feeds. Returns the authoritative enrolled roster.
"""
names = _enrolled_face_names()
return {"count": len(names), "enrolled_persons": names}
def search_person_in_camera_feeds(person_name: str) -> dict:
"""
Search for a specific person across the facial-recognition system.
This performs a real, deterministic lookup against three sources:
1. The enrolled face database (is this person known to the system?)
2. Live per-camera face-recognition results (currently visible on a feed?)
3. The recent detection history (recently sighted on a feed?)
Args:
person_name: Full or partial name of the person to locate
Returns a structured result. `found` is True only when the person is
LIVE_DETECTED on a camera feed right now — never for historical sightings.
"""
query = (person_name or "").strip()
enrolled = _enrolled_face_names()
is_enrolled = any(_name_matches(query, name) for name in enrolled)
enrolled_match = next((name for name in enrolled if _name_matches(query, name)), None)
# 2) Live camera face-recognition results (most authoritative "on camera now").
ve = _ctx.get("vision_engine")
live_hits = []
if ve is not None and hasattr(ve, "face_results"):
try:
for cam_id, faces in (ve.face_results or {}).items():
for face in faces or []:
if _name_matches(query, str(face.get("name", ""))):
live_hits.append({
"camera": cam_id,
"confidence": round(float(face.get("confidence", 0.0)), 3),
})
except Exception as exc: # pragma: no cover - defensive
logger.warning("live face_results scan failed: %s", exc)
# 3) Recent detection history (not live — reported separately).
detections = _ctx.get("detections_history", [])
history_matches = [d for d in detections if _name_matches(query, str(d.get("name", "")))]
facial_ai_on = bool(getattr(ve, "ai_models", {}).get("facial")) if ve else False
# Locate face_database profile photo if enrolled
profile_pic = None
if is_enrolled and enrolled_match:
try:
from main import _FACE_DB_PATH
person_dir = os.path.join(_FACE_DB_PATH, enrolled_match)
if os.path.isdir(person_dir):
for filename in os.listdir(person_dir):
if filename.lower().endswith((".jpg", ".jpeg", ".png")) and "occluded" not in filename.lower():
profile_pic = f"/files/face/{enrolled_match}/{filename}"
break
except Exception as exc:
logger.debug("Failed to locate profile photo: %s", exc)
if live_hits:
best = max(live_hits, key=lambda h: h["confidence"])
# Check for live thumbnail in detections history
thumbnail = None
for d in detections:
if _name_matches(query, str(d.get("name", ""))) and d.get("thumbnail"):
thumbnail = d.get("thumbnail")
break
evidence = "Matched on live camera face-recognition feed."
if thumbnail:
evidence += f" Camera Sighting: ![Live Sighting]({thumbnail})"
if profile_pic:
evidence += f" Profile Photo: ![Profile Photo]({profile_pic})"
return {
"found": True,
"person": query,
"status": "LIVE_DETECTED",
"is_enrolled": is_enrolled,
"enrolled_as": enrolled_match,
"last_seen_camera": best["camera"],
"confidence": best["confidence"],
"live_cameras": [h["camera"] for h in live_hits],
"facial_ai_active": facial_ai_on,
"evidence": evidence,
}
if history_matches:
latest = history_matches[-1]
thumbnail = latest.get("thumbnail")
evidence = "Matched in sighting history only — not currently on a live feed."
if thumbnail:
evidence += f" Last Camera Sighting: ![Historical Sighting]({thumbnail})"
if profile_pic:
evidence += f" Profile Photo: ![Profile Photo]({profile_pic})"
return {
"found": False,
"person": query,
"status": "RECENTLY_SEEN",
"is_enrolled": is_enrolled,
"enrolled_as": enrolled_match,
"last_seen_camera": latest.get("camId") or latest.get("location") or "unknown",
"last_seen_at": latest.get("seen_at") or latest.get("timestamp", "unknown"),
"confidence": round(float(latest.get("confidence", 0.0)), 3),
"total_sightings": len(history_matches),
"facial_ai_active": facial_ai_on,
"evidence": evidence,
}
evidence = (
f"'{query}' is enrolled in the database but has no live detection or recent sighting on any camera."
if is_enrolled
else f"'{query}' is not enrolled in the face database and was not detected on any camera."
)
if profile_pic:
evidence += f" Profile Photo: ![Profile Photo]({profile_pic})"
return {
"found": False,
"person": query,
"status": "NOT_FOUND",
"is_enrolled": is_enrolled,
"enrolled_as": enrolled_match,
"facial_ai_active": facial_ai_on,
"evidence": evidence,
"recommendation": (
"Person is known but not currently visible. Keep facial AI active and monitor feeds."
if is_enrolled
else "Enroll this person via Face Database, or verify the name/spelling."
),
}
def set_ai_model_state(model_id: str, enabled: bool) -> dict:
"""
Turn a vision AI model on or off across all camera feeds.
Args:
model_id: One of: facial, crowd, anomaly, medical, fight
enabled: True to activate, False to deactivate
Use this to satisfy commands like "turn on facial detection". Facial
recognition is on by default and should stay on unless explicitly disabled.
"""
ve = _ctx.get("vision_engine")
if ve is None:
return {"success": False, "message": "Vision engine not available"}
model_id = (model_id or "").strip().lower()
valid = set(getattr(ve, "ai_models", {}).keys()) or {"facial", "crowd", "anomaly", "medical", "fight"}
if model_id not in valid:
return {"success": False, "message": f"Unknown model '{model_id}'. Valid: {sorted(valid)}"}
try:
if hasattr(ve, "set_ai_model"):
ve.set_ai_model(model_id, bool(enabled))
else:
ve.ai_models[model_id] = bool(enabled)
status_now = ve.get_ai_status() if hasattr(ve, "get_ai_status") else dict(getattr(ve, "ai_models", {}))
if _ai_model_changed_cb:
try:
_ai_model_changed_cb(model_id, bool(enabled), status_now)
except Exception as exc: # pragma: no cover - defensive
logger.warning("ai_model_changed callback failed: %s", exc)
return {"success": True, "model_id": model_id, "enabled": bool(enabled), "ai_status": status_now}
except Exception as exc:
return {"success": False, "message": str(exc)}
def switch_camera_source(cam_id: str, hardware_index: int) -> dict:
"""
Switch a specific logical camera (e.g. cam-01) to a different hardware source index.
Args:
cam_id: Logical ID like 'cam-01' or 'cam-02'
hardware_index: Integer index of the physical camera (0, 1, 2, etc.)
"""
ve = _ctx.get("vision_engine")
if not ve:
return {"success": False, "msg": "Vision Engine not available"}
try:
success = ve.set_camera(cam_id, hardware_index)
return {"success": success, "cam_id": cam_id, "new_index": hardware_index}
except Exception as e:
return {"success": False, "msg": str(e)}
def close_camera_stream(cam_id: str) -> dict:
"""
Release and close a specific camera stream to save resources.
Args:
cam_id: Logical ID like 'cam-01' or 'cam-02'
"""
ve = _ctx.get("vision_engine")
if not ve:
return {"success": False, "msg": "Vision Engine not available"}
try:
ve.release_camera(cam_id)
return {"success": True, "cam_id": cam_id, "status": "closed"}
except Exception as e:
return {"success": False, "msg": str(e)}
def get_interaction_graph(person_name: str) -> dict:
"""
Retrieve the social interaction and co-presence graph for a specific person.
Args:
person_name: Name of the person to build the interaction graph for
"""
try:
gossip_bridge.set_root_person(person_name)
data = gossip_bridge.get_gossip_json(person_name)
contacts = data.get("contacts", {})
return {
"person": person_name,
"total_contacts": data.get("total_interactions", 0),
"total_people_in_network": data.get("total_people", 0),
"level_1_direct_contacts": [c["name"] for c in contacts.get("level_1", [])],
"level_2_indirect_contacts": [c["name"] for c in contacts.get("level_2", [])],
"level_3_extended_network": [c["name"] for c in contacts.get("level_3", [])],
"graph_nodes": len(data.get("nodes", [])),
"graph_links": len(data.get("links", [])),
"is_tracking_active": data.get("is_tracking", False),
}
except Exception as exc:
return {"person": person_name, "error": str(exc), "status": "GRAPH_UNAVAILABLE"}
def get_active_camera_feeds() -> dict:
"""Get the status and AI model configuration of all active camera feeds."""
ve = _ctx.get("vision_engine")
if not ve:
return {"cameras": [], "count": 0, "ai_status": {}, "error": "Vision engine not available"}
cameras = []
seen: set[str] = set()
browser_feeds = getattr(ve, "browser_feeds", {}) or {}
camera_indices = getattr(ve, "camera_indices", {}) or {}
def _append_camera(cid: str, *, index: int = 0, source: str, last_seen=None) -> None:
if not cid or cid in seen:
return
seen.add(cid)
cameras.append({
"cam_id": cid,
"index": index,
"status": "ACTIVE",
"source": source,
"last_seen": last_seen,
})
for cid, idx in camera_indices.items():
meta = browser_feeds.get(cid, {})
_append_camera(
cid,
index=idx,
source=meta.get("source", "device"),
last_seen=meta.get("last_seen"),
)
for cid, meta in browser_feeds.items():
_append_camera(
cid,
index=meta.get("index", 0),
source="browser",
last_seen=meta.get("last_seen"),
)
# Local GPU path: OpenCV captures in active_cameras (no browser_feeds entry).
for cid in getattr(ve, "active_cameras", {}) or {}:
meta = browser_feeds.get(cid, {})
_append_camera(
cid,
index=camera_indices.get(cid, 0),
source=meta.get("source", "device"),
last_seen=meta.get("last_seen"),
)
# Live face pipeline: cam_id present in face_results but feeds not yet registered.
for cid, faces in (getattr(ve, "face_results", None) or {}).items():
if not faces:
continue
meta = browser_feeds.get(cid, {})
_append_camera(
cid,
index=camera_indices.get(cid, meta.get("index", 0)),
source=meta.get("source", "live_faces"),
last_seen=meta.get("last_seen"),
)
per_cam, _ = _collect_live_faces()
for cam in cameras:
cid = cam["cam_id"]
cam["live_faces"] = [f.get("name") for f in per_cam.get(cid, [])]
cam["live_face_count"] = len(per_cam.get(cid, []))
ai_status = ve.get_ai_status() if hasattr(ve, "get_ai_status") else {}
count = len(cameras)
payload = {"cameras": cameras, "count": count, "ai_status": ai_status}
payload["report"] = format_active_camera_report(payload)
return payload
def _normalize_issue_subject(title: str, description: str, metadata: dict | None) -> str:
"""Extract a canonical person/subject key for deduplication."""
meta = metadata or {}
for key in ("person_name", "subject", "name"):
val = meta.get(key)
if val and str(val).strip():
return str(val).strip().upper()
blob = f"{title or ''} {description or ''}"
for pat in (
r"Missing Person(?:\s+Database)?\s+Match:\s*([A-Za-z0-9_\-\s]+)",
r"Missing Person:\s*([A-Za-z0-9_\-\s]+?)(?:\s*\(|$|:)",
r"Database Match:\s*([A-Za-z0-9_\-\s]+)",
r"Missing Person Found:\s*([A-Za-z0-9_\-\s]+)",
r"Person Located:\s*([A-Za-z0-9_\-\s]+)",
):
m = re.search(pat, blob, re.I)
if m:
return m.group(1).split("(")[0].strip().upper()
return re.sub(r"\s+", " ", (title or "general").strip().upper())[:48]
def _issue_category_key(title: str) -> str:
t = (title or "").lower()
if "pending" in t and "verification" in t:
return "pending_verification"
if "database match" in t or "missing person" in t or "missing person found" in t:
return "missing_person_match"
return "general"
def _is_issue_active(issue: dict) -> bool:
closed = {"RESOLVED", "CLOSED", "CANCELLED"}
status = str(issue.get("status", "")).upper()
progress = int(issue.get("progress", 0) or 0)
return status not in closed and progress < 100
def create_tactical_issue(title: str, description: str, priority: str = "MEDIUM", metadata_json: str = "{}") -> dict:
"""
Create a new tactical incident report or 'issue' in the management system.
Use this for missing persons found, unauthorized access, or resolved crises.
Args:
title: Short title of the issue
description: Detailed report
priority: LOW | MEDIUM | HIGH | CRITICAL
metadata_json: Optional JSON string containing keys like 'camera', 'timestamp', 'person_name'
"""
issues = _ctx.get("issues")
if issues is None:
return {"success": False, "message": "Issue database not connected"}
if not _ctx.get("allow_issue_creation"):
return {
"success": False,
"blocked": True,
"message": (
"Issue creation blocked — the operator did not explicitly ask to raise an issue. "
"Report the search result only, or ask the operator to click 'Raise issue' or "
"say 'raise an issue for [name]'."
),
}
try:
metadata = json.loads(metadata_json) if metadata_json else {}
except Exception:
metadata = {}
subject = _normalize_issue_subject(title, description, metadata)
category = _issue_category_key(title)
dedup_key = f"{subject}|{category}"
# Merge categories for the same missing-person subject (singleton per person).
merge_categories = {"missing_person_match", "pending_verification"}
with _issue_create_lock:
for iss in issues:
if not _is_issue_active(iss):
continue
existing_subject = _normalize_issue_subject(
iss.get("title", ""), iss.get("desc", ""), iss.get("metadata") or {},
)
existing_cat = _issue_category_key(iss.get("title", ""))
same_person = existing_subject == subject
same_type = existing_cat == category or (
same_person and existing_cat in merge_categories and category in merge_categories
)
if not (same_person and same_type):
continue
iss_meta = iss.setdefault("metadata", {})
if metadata.get("confidence") is not None:
iss_meta["confidence"] = metadata["confidence"]
if metadata.get("camera"):
iss_meta["camera"] = metadata["camera"]
if metadata.get("person_name"):
iss_meta["person_name"] = metadata["person_name"]
conf = metadata.get("confidence")
stamp = datetime.datetime.now().isoformat()
update_line = (
f"[UPDATE {stamp}] Confidence: {conf}"
if conf is not None
else f"[UPDATE {stamp}] {description[:180]}"
)
iss["desc"] = f"{iss.get('desc', '')}\n{update_line}".strip()
if _issue_created_cb:
try:
_issue_created_cb(iss)
except Exception as exc:
logger.error("issue_created callback failed: %s", exc)
return {
"success": True,
"issue": iss,
"deduplicated": True,
"message": f"Updated existing ongoing issue for {subject}.",
}
now = time.time()
last_created = _recent_issue_keys.get(dedup_key, 0)
if now - last_created < _ISSUE_DEDUP_COOLDOWN_SEC:
return {
"success": False,
"deduplicated": True,
"message": (
f"Cooldown active for {subject} ({category}). "
f"Wait {_ISSUE_DEDUP_COOLDOWN_SEC // 60} minutes before creating another."
),
}
new_issue = {
"id": f"ISS-{uuid.uuid4().hex[:6].upper()}",
"title": title,
"desc": description,
"status": "ONGOING",
"progress": 0,
"priority": priority,
"staff": 1,
"metadata": metadata,
"timestamp": datetime.datetime.now().isoformat()
}
issues.insert(0, new_issue)
_recent_issue_keys[dedup_key] = now
if _issue_created_cb:
try:
_issue_created_cb(new_issue)
except Exception as exc:
logger.error("issue_created callback failed: %s", exc)
return {"success": True, "issue": new_issue}
def dispatch_personnel_to_camera(camera_id: str, count: int = 1) -> dict:
"""
Simulate assigning tactical staff or security personnel to a specific camera location.
Use this to lock down an area once a missing person is detected.
Args:
camera_id: ID of the camera (e.g. 'cam-01')
count: Number of personnel to dispatch
"""
issues = _ctx.get("issues")
if not issues:
return {"success": False, "message": "No active issues to attach staff to. Create an issue first."}
cam = (camera_id or "").strip().lower()
closed = {"RESOLVED", "CLOSED", "CANCELLED"}
active_issues = [
iss for iss in issues
if str(iss.get("status", "")).upper() not in closed and int(iss.get("progress", 0) or 0) < 100
]
if not active_issues:
return {"success": False, "message": "No open issues to attach staff to. Create an issue first."}
# Prefer an issue explicitly tied to this camera (exact metadata match).
target_issue = None
for iss in active_issues:
meta_cam = str(iss.get("metadata", {}).get("camera", "")).strip().lower()
if cam and meta_cam == cam:
target_issue = iss
break
if target_issue is None:
cam_token = re.escape(cam)
for iss in active_issues:
blob = f"{iss.get('title','')} {iss.get('desc','')}".lower()
if cam and re.search(rf"\b{cam_token}\b", blob):
target_issue = iss
break
if target_issue is None:
return {
"success": False,
"message": (
f"No open issue linked to camera {camera_id}. "
"Call create_tactical_issue first, then dispatch personnel."
),
}
target_issue["staff"] = target_issue.get("staff", 0) + count
target_issue["desc"] = f"{target_issue.get('desc', '')}\n[UPDATE] Dispatched {count} personnel to sector {camera_id}."
if _issue_created_cb:
try:
_issue_created_cb(target_issue)
except Exception as exc: # pragma: no cover - defensive
logger.warning("issue update broadcast failed: %s", exc)
return {
"success": True,
"message": f"{count} personnel dispatched to {camera_id}; assigned to issue {target_issue['id']}",
"issue_id": target_issue["id"],
"simulation": True,
"dispatch_mode": "SIMULATION",
"note": "SIMULATION ONLY — staff assignment recorded in issue tracker; no external dispatch",
}
def get_active_issues() -> dict:
"""List open tactical issues/incidents (excludes resolved/closed)."""
issues = _ctx.get("issues") or []
closed = {"RESOLVED", "CLOSED", "CANCELLED"}
active = [
iss for iss in issues
if str(iss.get("status", "")).upper() not in closed and int(iss.get("progress", 0) or 0) < 100
]
out = []
for iss in active[:25]:
out.append({
"id": iss.get("id"),
"title": iss.get("title"),
"status": iss.get("status"),
"priority": iss.get("priority"),
"progress": iss.get("progress", 0),
"staff": iss.get("staff", 0),
})
return {"count": len(active), "issues": out}
def get_sos_events() -> dict:
"""Get active SOS / panic events raised from the guest app."""
events = _ctx.get("sos_events") or []
out = []
for e in events[-10:]:
d = e.model_dump() if hasattr(e, "model_dump") else (e if isinstance(e, dict) else {})
out.append({
"id": d.get("id"),
"location": d.get("location_label") or d.get("location"),
"message": d.get("message"),
"lat": d.get("lat"),
"lng": d.get("lng"),
"timestamp": d.get("timestamp"),
})
return {"count": len(events), "sos_events": out}
def get_recent_incident_logs(limit: int = 10) -> dict:
"""
Get the most recent incident log entries (audit trail of platform events).
Args:
limit: How many recent entries to return (max 30)
"""
logs = _ctx.get("incident_logs") or []
limit = max(1, min(int(limit or 10), 30))
recent = logs[:limit] if logs else []
return {"count": len(logs), "recent": recent}
def get_signage_status() -> dict:
"""Get the current on/off state of every digital evacuation/safety signage panel."""
state = _ctx.get("signage_state") or {}
try:
return {"count": len(state), "signage": dict(state)}
except Exception:
return {"count": 0, "signage": {}}
def get_platform_overview() -> dict:
"""
Get a single consolidated snapshot of the entire platform state: alert,
issue, SOS and tracking counts, crowd total, vision-AI status, and enrolled
roster. Use this first to understand the whole situation at a glance.
"""
ve = _ctx.get("vision_engine")
ai_status = ve.get_ai_status() if ve and hasattr(ve, "get_ai_status") else {}
_, live_persons = _collect_live_faces()
feeds = get_active_camera_feeds()
active_issues = get_active_issues()
active_alerts = get_current_alerts()
return {
"active_alerts": active_alerts.get("count", 0),
"active_issues": active_issues.get("count", 0),
"active_sos": len(_ctx.get("sos_events") or []),
"live_faces_now": [p.get("name") for p in live_persons],
"live_face_count": len(live_persons),
"active_camera_feeds": feeds.get("count", 0),
"crowd_total": getattr(ve, "last_total_count", 0) if ve else 0,
"vision_ai_status": ai_status,
"facial_recognition_active": bool(ai_status.get("facial")),
"enrolled_persons": _enrolled_face_names(),
"active_signage": [k for k, v in (_ctx.get("signage_state") or {}).items() if v],
}
def broadcast_alert(alert_type: str, message: str, location: str = "", severity: str = "high") -> dict:
"""
Raise and broadcast a live alert/notification across the platform (dashboard,
comms, logs). Use for warnings, evacuations, or operational notices.
Args:
alert_type: e.g. fire, medical, security, evacuation, info
message: Human-readable alert text
location: Affected zone/room (optional)
severity: critical | high | medium | low | info
"""
handler = _action_handlers.get("broadcast_alert")
if not handler:
return {"success": False, "message": "Alert broadcasting not available"}
try:
result = handler(alert_type, message, location, severity)
return {"success": True, "alert": result}
except Exception as exc:
return {"success": False, "message": str(exc)}
def set_signage_state(signage_id: str, active: bool) -> dict:
"""
Activate or deactivate a digital signage panel (e.g. EVACUATE, LOCKDOWN, ALL CLEAR).
Args:
signage_id: Signage panel ID (e.g. s1, s3, s6, s8)
active: True to light it up, False to turn it off
"""
handler = _action_handlers.get("set_signage")
if not handler:
return {"success": False, "message": "Signage control not available"}
try:
handler(signage_id, bool(active))
return {"success": True, "signage_id": signage_id, "active": bool(active)}
except Exception as exc:
return {"success": False, "message": str(exc)}
def update_issue(issue_id: str, status: str = "", progress: int = -1, note: str = "") -> dict:
"""
Update an existing tactical issue: change status, set progress, or append a note.
Args:
issue_id: The issue ID (e.g. ISS-1A2B3C)
status: New status (e.g. ONGOING, RESOLVED) — leave blank to keep
progress: 0-100 percent complete — use -1 to keep current
note: Optional update note appended to the description
"""
issues = _ctx.get("issues") or []
target = next((i for i in issues if str(i.get("id")) == str(issue_id)), None)
if target is None:
return {"success": False, "message": f"Issue {issue_id} not found"}
if status:
target["status"] = status
if progress is not None and progress != -1:
try:
pct = int(round(float(progress)))
if 0 <= pct <= 100:
target["progress"] = pct
except (TypeError, ValueError):
pass
if note:
target["desc"] = f"{target.get('desc', '')}\n[UPDATE] {note}"
handler = _action_handlers.get("update_issue")
if handler:
try:
handler(target)
except Exception as exc: # pragma: no cover - defensive
logger.warning("update_issue broadcast failed: %s", exc)
elif _issue_created_cb:
try:
_issue_created_cb(target)
except Exception:
pass
return {"success": True, "issue": {
"id": target.get("id"), "status": target.get("status"), "progress": target.get("progress"),
}}
# ── Tool registry (shared by ADK agents and native genai fallback) ───────────────
_ALL_TOOLS = [
get_current_alerts,
get_crowd_statistics,
get_known_persons,
get_live_camera_faces,
who_is_on_camera_now,
report_active_cameras_from_tool,
classify_alert_severity,
locate_nearest_emergency_services,
find_nearest_emergency_services,
get_traffic_aware_route,
recommend_emergency_dispatch,
simulate_dispatch_call,
generate_evacuation_plan,
search_person_in_camera_feeds,
switch_camera_source,
close_camera_stream,
get_interaction_graph,
get_active_camera_feeds,
create_tactical_issue,
dispatch_personnel_to_camera,
list_enrolled_faces,
set_ai_model_state,
get_active_issues,
get_sos_events,
get_recent_incident_logs,
get_signage_status,
get_platform_overview,
broadcast_alert,
set_signage_state,
update_issue,
]
_TOOL_MAP = {fn.__name__: fn for fn in _ALL_TOOLS}
# ════════════════════════════════════════════════════════════════════════════════
# AGENT DEFINITIONS
# ════════════════════════════════════════════════════════════════════════════════
_MODEL = get_model("default")
_MODEL_PRO = get_model("pro")
_alert_agent = Agent(
name="AlertResponseAgent",
model=_MODEL,
description="Analyzes crisis alerts, classifies severity, and coordinates immediate tactical responses.",
instruction=(
"You are the AlertResponseAgent for Cepheus. "
"When invoked: get current alerts, classify severity, identify affected zones. "
"Output a clear tactical analysis with: alert type, severity, affected count, recommended action. "
"Be concise and structured."
),
tools=[
get_current_alerts, get_crowd_statistics, classify_alert_severity,
get_active_issues, get_sos_events, get_recent_incident_logs,
get_platform_overview, broadcast_alert, set_signage_state, update_issue,
],
)
_location_agent = Agent(
name="LocationAgent",
model=_MODEL,
description="Tracks person locations, monitors zone occupancy, and identifies nearby responders.",
instruction=(
"You are the LocationAgent. Track real-time locations of persons and crowd zones. "
"Use get_live_camera_faces for who is on camera RIGHT NOW. Use get_known_persons only for "
"historical sighting records (never present history as live). Use get_crowd_statistics for zone data. "
"Report precise locations, camera IDs, and timestamps."
),
tools=[
who_is_on_camera_now, get_live_camera_faces, get_crowd_statistics,
get_active_camera_feeds, report_active_cameras_from_tool,
list_enrolled_faces, switch_camera_source, close_camera_stream,
],
)
_planning_agent = Agent(
name="PlanningAgent",
model=_MODEL,
description="Generates tactical evacuation strategies, responder assignments, and response timelines.",
instruction=(
"You are the PlanningAgent. Generate clear, step-by-step crisis response plans. "
"Always call generate_evacuation_plan with location, count, and alert type. "
"Format output with numbered steps, responder roles, and ETAs."
),
tools=[
generate_evacuation_plan, get_crowd_statistics, get_signage_status,
broadcast_alert, set_signage_state,
],
)
_dispatch_agent = Agent(
name="EmergencyDispatchAgent",
model=_MODEL,
description="Locates nearest emergency services and simulates dispatch coordination calls.",
instruction=(
"You are the EmergencyDispatchAgent. "
"Step 1: Call find_nearest_emergency_services (Google traffic ETAs) or "
"locate_nearest_emergency_services (OSM fallback) to find responders. "
"Step 2: Call recommend_emergency_dispatch for ranked dispatch choices. "
"Step 3: Call get_traffic_aware_route for turn-by-turn navigation when dispatching. "
"Step 4: Call simulate_dispatch_call to coordinate (SIMULATED ONLY). "
"IMPORTANT: Always state simulated calls are SIMULATED. "
f"Station reference: {STATION_LAT}, {STATION_LON}."
),
tools=[
find_nearest_emergency_services,
get_traffic_aware_route,
recommend_emergency_dispatch,
locate_nearest_emergency_services,
simulate_dispatch_call,
],
)
_missing_agent = Agent(
name="MissingPersonAgent",
model=_MODEL,
description="Searches for missing or specified persons across all active camera feeds.",
instruction=(
"You are the MissingPersonAgent. Search for persons using the vision system. "
"1. If a name is provided, call search_person_in_camera_feeds with the person's name. "
"2. If the user provides an image search result directly, skip step 1 and use the provided result. "
"3. ONLY call create_tactical_issue when the operator explicitly asked to raise or log an incident. "
" The tool rejects calls unless the prompt contained words like 'raise an issue' — never create "
" issues automatically after a face match. Report results only unless explicitly instructed. "
" The tool deduplicates automatically — repeated detections update the same ONGOING issue. "
" - title: 'Missing Person Database Match: [Name]' "
" - description: include confidence and camera when known "
" - metadata_json: JSON with person_name, camera, confidence, timestamp "
"4. ONLY call dispatch_personnel_to_camera when the operator requested dispatch. "
"5. Report found/not-found status, last camera, and confidence to the user. "
"6. CRITICAL: If the tool response includes an 'evidence' field with markdown image syntax (e.g., `![evidence](/files/uploads/...)`), you MUST include that exact markdown string in your final response so the user sees the image."
),
tools=[
search_person_in_camera_feeds, get_active_camera_feeds, report_active_cameras_from_tool,
who_is_on_camera_now, get_live_camera_faces, list_enrolled_faces, set_ai_model_state,
create_tactical_issue, dispatch_personnel_to_camera,
],
)
_gossip_agent = Agent(
name="GossipGraphAgent",
model=_MODEL,
description="Analyzes social interaction and co-presence graphs for contact tracing and intelligence.",
instruction=(
"You are the GossipGraphAgent. Analyze person interaction networks from camera co-presence data. "
"Call get_interaction_graph with the person's name. "
"Present: Level 1 (direct contacts), Level 2 (indirect), Level 3 (extended network). "
"Highlight strong connections and surveillance implications."
),
tools=[get_interaction_graph, get_known_persons],
)
# OrchestratorAgent routes to all sub-agents
_orchestrator = Agent(
name="OrchestratorAgent",
model=_MODEL_PRO,
description="Master tactical AI coordinator for Cepheus crisis response platform.",
instruction=(
"You are the OrchestratorAgent — the master coordinator for Cepheus. "
"Route queries to the correct specialized agent:\n"
"- Crisis alerts/incidents → AlertResponseAgent\n"
"- Person locations/tracking → LocationAgent\n"
"- Evacuation/response plans → PlanningAgent\n"
"- Emergency service dispatch → EmergencyDispatchAgent\n"
"- Missing/locate a person → MissingPersonAgent\n"
"- Social graph/interactions → GossipGraphAgent\n"
"For complex incidents (e.g. fire alert), chain multiple agents: "
"AlertResponseAgent → LocationAgent → PlanningAgent → EmergencyDispatchAgent. "
"Stream your reasoning step by step. Be decisive and tactical. "
"CRITICAL: If any tool or sub-agent returns an image markdown snippet (e.g., `![evidence](/files/...)`), you MUST output it exactly as is in your response."
),
tools=_ALL_TOOLS,
sub_agents=[_alert_agent, _location_agent, _planning_agent, _dispatch_agent, _missing_agent, _gossip_agent],
)
# Named agents + per-agent runners (agent_override from the copilot UI).
_NAMED_AGENTS: dict[str, Agent] = {
"OrchestratorAgent": _orchestrator,
"AlertResponseAgent": _alert_agent,
"LocationAgent": _location_agent,
"PlanningAgent": _planning_agent,
"EmergencyDispatchAgent": _dispatch_agent,
"MissingPersonAgent": _missing_agent,
"GossipGraphAgent": _gossip_agent,
}
# ════════════════════════════════════════════════════════════════════════════════
# RUNNER + SESSION SERVICE
# ════════════════════════════════════════════════════════════════════════════════
_session_service = InMemorySessionService()
_adk_session_keys: dict[str, float] = {}
_MAX_ADK_SESSIONS_PER_USER = 10
_ADK_SESSION_TTL_SECONDS = 3600
def _adk_registry_key(user_id: str, session_id: str) -> str:
return f"{user_id}:{session_id}"
def _touch_adk_session(user_id: str, session_id: str) -> None:
_adk_session_keys[_adk_registry_key(user_id, session_id)] = time.time()
async def prune_adk_sessions(user_id: str, keep_session_id: str | None = None) -> None:
"""Drop ADK in-memory sessions for an operator (optionally keeping one active session)."""
now = time.time()
prefix = f"{user_id}:"
for key in list(_adk_session_keys.keys()):
if not key.startswith(prefix):
continue
sid = key[len(prefix):]
if keep_session_id and sid == keep_session_id:
continue
stale = (now - _adk_session_keys.get(key, 0)) > _ADK_SESSION_TTL_SECONDS
user_count = sum(1 for k in _adk_session_keys if k.startswith(prefix))
if not stale and user_count <= _MAX_ADK_SESSIONS_PER_USER:
continue
try:
if hasattr(_session_service, "delete_session"):
await _session_service.delete_session(app_name="cepheus", user_id=user_id, session_id=sid)
except Exception as exc:
logger.debug("ADK session delete skipped (%s): %s", sid, exc)
_adk_session_keys.pop(key, None)
_RUNNERS = {
name: Runner(agent=agent, app_name="cepheus", session_service=_session_service)
for name, agent in _NAMED_AGENTS.items()
}
_runner = _RUNNERS["OrchestratorAgent"]
_NATIVE_SYSTEM_INSTRUCTION = (
"You are Cepheus Tactical Copilot, coordinating emergency management. Operate via tools: "
"alerts, crowds, cameras, face-recognition, AI controls, graph, dispatch, issues.\n\n"
"RULES:\n"
"1. NEVER invent/assume. Use ONLY tool results. If no tool was called, you don't know.\n"
"2. Report only real actions taken. create_tactical_issue ONLY if explicitly asked to 'raise/create issue'.\n"
"3. Be deterministic.\n\n"
"LIVE VS HISTORY:\n"
"- get_live_camera_faces: who is on camera RIGHT NOW.\n"
"- get_known_persons: HISTORY ONLY.\n"
"- search_person_in_camera_feeds: check 'status' (LIVE_DETECTED vs RECENTLY_SEEN).\n\n"
"PERSON SEARCH:\n"
" a. search_person_in_camera_feeds(name).\n"
" b. Evaluate 'found' (LIVE_DETECTED only).\n"
" c. If found AND user asked: create_tactical_issue + dispatch_personnel_to_camera.\n"
" d. If NOT found: state clearly, NO issue.\n"
" e. Report tool analysis and exact action taken.\n\n"
"CAPABILITIES:\n"
"- Snapshot: get_platform_overview.\n"
"- Act: create_tactical_issue, dispatch_personnel_to_camera, broadcast_alert, set_signage_state.\n"
"- Evacuate: generate_evacuation_plan.\n"
"- Dispatch: locate_nearest_emergency_services → simulate_dispatch_call (STATE SIMULATED).\n"
"- Trace: get_interaction_graph.\n"
f"Station ref: {STATION_LAT}, {STATION_LON} (override CEPHEUS_STATION_LAT/LON). Be concise/decisive."
)
_MAX_NATIVE_ITERATIONS = 8
_TOOL_RESULT_PREVIEW_LEN = 1200
def _collect_agent_tool_names(agent: Agent) -> set[str]:
"""Return all tool function names exposed by an ADK agent and its sub-agents."""
names = {fn.__name__ for fn in (getattr(agent, "tools", None) or [])}
for sub in getattr(agent, "sub_agents", None) or []:
names |= _collect_agent_tool_names(sub)
return names
def adk_tool_names() -> set[str]:
"""Tool names reachable from the OrchestratorAgent ADK tree."""
return _collect_agent_tool_names(_orchestrator)
def _decode_data_url_image(image_data: str | bytes | None) -> tuple[bytes | None, str]:
"""Decode a data-URL or raw base64 image payload from the copilot UI."""
if not image_data:
return None, "image/jpeg"
if isinstance(image_data, bytes):
return image_data, "image/jpeg"
data = str(image_data).strip()
mime = "image/jpeg"
if data.startswith("data:"):
header, _, payload = data.partition(",")
if ";" in header:
mime = header[5:].split(";")[0].strip() or mime
data = payload
try:
return base64.b64decode(data, validate=False), mime
except Exception:
return None, mime
def _build_user_parts(
prompt: str,
image_bytes: bytes | None = None,
image_mime: str = "image/jpeg",
) -> list:
"""Build multimodal user parts for Gemini (text + optional inline image)."""
if genai_types is None:
return []
parts: list = [genai_types.Part(text=prompt)]
if image_bytes:
parts.append(genai_types.Part(inline_data=genai_types.Blob(mime_type=image_mime, data=image_bytes)))
return parts
def _friendly_model_error(exc: Exception, *, chain_exhausted: bool = True) -> str:
text = str(exc)
low = text.lower()
api_key = os.getenv("GEMINI_API_KEY", "").strip()
# Auth problems first — these are the only case where the key itself is at fault.
if (
not is_rate_limit(exc)
and ("api key" in low or "api_key_invalid" in low or "permission_denied" in low or "401" in text)
):
if api_key:
return f"AI model authentication failed despite configured key: {text[:120]}"
return "AI model authentication failed — GEMINI_API_KEY is missing or invalid."
# Rate-limit text only when the full fallback chain was exhausted (genuine quota).
if is_rate_limit(exc) and chain_exhausted:
return (
"All configured Gemini models are temporarily rate-limited (quota reached on every tier). "
"Please retry in a moment."
)
if is_rate_limit(exc):
return f"AI model call failed: temporary overload — {text[:120]}"
return f"AI model call failed: {text[:200]}"
def _step_has_content(step: dict) -> bool:
content = str(step.get("content") or "").strip()
return bool(content)
def _make_text_step(agent: str, content: str | None, step_type: str) -> dict | None:
"""Generation-point filter: do not create steps with empty/whitespace text (Bug C)."""
text = str(content or "").strip()
if not text:
return None
return {"agent": agent, "content": text, "step_type": step_type}
def _normalize_stream_step(step: dict) -> dict | None:
"""Transport-layer guard for any step dict (tool_call lines always have content)."""
if not isinstance(step, dict):
return None
if not _step_has_content(step):
return None
return step
def location_agent_tool_names() -> set[str]:
"""Tool names exposed to LocationAgent (must not include sighting history for live queries)."""
return {fn.__name__ for fn in (_location_agent.tools or [])}
def agents_must_not_use_known_persons_for_live() -> set[str]:
"""Agents that answer live-presence questions — get_known_persons is forbidden."""
forbidden = set()
for agent in (_location_agent, _missing_agent):
names = {fn.__name__ for fn in (agent.tools or [])}
if "get_known_persons" in names:
forbidden.add(agent.name)
return forbidden
async def _run_native_stream(
prompt: str,
image_bytes: bytes | None = None,
image_mime: str = "image/jpeg",
) -> AsyncGenerator[dict, None]:
"""Native google.genai function-calling loop (used when ADK is unavailable)."""
if genai_types is None:
yield {"agent": "System", "content": "google-genai SDK not installed.", "step_type": "error"}
return
from google import genai
api_key = os.getenv("GEMINI_API_KEY", "").strip()
client = genai.Client(api_key=api_key, http_options={"api_version": "v1beta"})
config = genai_types.GenerateContentConfig(
system_instruction=_NATIVE_SYSTEM_INSTRUCTION,
tools=_ALL_TOOLS,
automatic_function_calling=genai_types.AutomaticFunctionCallingConfig(disable=True),
temperature=0.3,
max_output_tokens=1200,
)
contents = [genai_types.Content(role="user", parts=_build_user_parts(prompt, image_bytes, image_mime))]
async def _generate_with_retry():
return await asyncio.to_thread(
generate_with_fallback,
client,
tier="pro",
contents=contents,
config=config,
)
for _ in range(_MAX_NATIVE_ITERATIONS):
try:
response = await _generate_with_retry()
except Exception as exc:
logger.error("native genai call failed: %s", exc)
err_step = {
"agent": "System",
"content": _friendly_model_error(exc, chain_exhausted=True),
"step_type": "error",
"genuine_quota_error": bool(is_rate_limit(exc)),
}
if norm := _normalize_stream_step(err_step):
yield norm
return
candidate = (response.candidates or [None])[0]
if not candidate or not candidate.content or not candidate.content.parts:
text = getattr(response, "text", None)
if step := _make_text_step("OrchestratorAgent", text, "response"):
yield step
return
contents.append(candidate.content)
function_calls = [p.function_call for p in candidate.content.parts if getattr(p, "function_call", None)]
texts = [p.text for p in candidate.content.parts if getattr(p, "text", None)]
if not function_calls:
# Final answer turn — emit any text as the response and finish.
for text in texts:
if step := _make_text_step("OrchestratorAgent", text, "response"):
yield step
if not texts:
if norm := _normalize_stream_step({
"agent": "System",
"content": "Model returned an empty response.",
"step_type": "error",
}):
yield norm
return
# Intermediate reasoning text before tool execution (Gemini may emit "" between tool turns).
for text in texts:
if step := _make_text_step("OrchestratorAgent", text, "thinking"):
yield step
tool_response_parts = []
for fc in function_calls:
args = dict(fc.args or {})
if norm := _normalize_stream_step({
"agent": "OrchestratorAgent",
"content": f"Calling `{fc.name}` with args: {args or '()'}",
"step_type": "tool_call",
}):
yield norm
fn = _TOOL_MAP.get(fc.name)
if fn is None:
result = {"error": f"Unknown tool {fc.name}"}
else:
try:
result = await asyncio.to_thread(lambda: fn(**args))
except Exception as exc:
logger.error("tool %s failed: %s", fc.name, exc)
result = {"error": str(exc)}
preview = str(result)
if len(preview) > _TOOL_RESULT_PREVIEW_LEN:
preview = f"{preview[:_TOOL_RESULT_PREVIEW_LEN]}…"
if fc.name == "get_active_camera_feeds" and isinstance(result, dict):
result = {**result, "agent_report": format_active_camera_report(result)}
if norm := _normalize_stream_step({
"agent": "OrchestratorAgent",
"content": f"Tool `{fc.name}` returned: {preview}",
"step_type": "tool_result",
}):
yield norm
tool_response_parts.append(
genai_types.Part.from_function_response(name=fc.name, response={"result": result})
)
contents.append(genai_types.Content(role="user", parts=tool_response_parts))
yield {
"agent": "OrchestratorAgent",
"content": "Reached reasoning step limit. Provide a more specific query if needed.",
"step_type": "response",
}
async def run_agent_stream(
prompt: str,
session_id: str = "default",
user_id: str = "operator",
agent_override: str | None = None,
image_data: str | bytes | None = None,
image_mime: str = "image/jpeg",
) -> AsyncGenerator[dict, None]:
"""
Run the orchestrator with the given prompt and yield streaming step dicts.
Each yielded dict: { "agent": str, "content": str, "step_type": str }
step_type: "thinking" | "tool_call" | "tool_result" | "response" | "error"
"""
api_key = os.getenv("GEMINI_API_KEY", "").strip()
if not api_key:
yield {"agent": "System", "content": "GEMINI_API_KEY not configured.", "step_type": "error"}
return
prev_allow = bool(_ctx.get("allow_issue_creation"))
_ctx["allow_issue_creation"] = prompt_requests_issue_creation(prompt)
try:
image_bytes, resolved_mime = _decode_data_url_image(image_data)
if image_data and image_bytes is None:
yield {"agent": "System", "content": "Could not decode attached image.", "step_type": "error"}
return
if image_bytes and resolved_mime:
image_mime = resolved_mime
if not _ADK_AVAILABLE:
yield {
"agent": "System",
"content": (
"Native orchestrator active (google-adk not installed). "
"Tool parity may differ from ADK mode."
),
"step_type": "info",
}
async for step in _run_native_stream(prompt, image_bytes=image_bytes, image_mime=image_mime):
yield step
return
agent_name = (agent_override or "").strip() or "OrchestratorAgent"
runner = _RUNNERS.get(agent_name, _runner)
try:
operator_id = (user_id or "operator").strip() or "operator"
await prune_adk_sessions(operator_id, keep_session_id=session_id)
session = await _session_service.get_session(
app_name="cepheus", user_id=operator_id, session_id=session_id,
)
if session is None:
session = await _session_service.create_session(
app_name="cepheus", user_id=operator_id, session_id=session_id,
)
_touch_adk_session(operator_id, session_id)
user_content = genai_types.Content(
role="user",
parts=_build_user_parts(prompt, image_bytes, image_mime),
)
async for event in runner.run_async(
user_id=operator_id,
session_id=session.id,
new_message=user_content,
):
author = getattr(event, "author", agent_name) or agent_name
# Skip empty events
if not event.content or not event.content.parts:
continue
for part in event.content.parts:
# Tool call
if hasattr(part, "function_call") and part.function_call:
fc = part.function_call
args_str = str(dict(fc.args)) if fc.args else "()"
if norm := _normalize_stream_step({
"agent": author,
"content": f"Calling `{fc.name}` with args: {args_str}",
"step_type": "tool_call",
}):
yield norm
# Tool response
elif hasattr(part, "function_response") and part.function_response:
fr = part.function_response
result_preview = str(fr.response)
if len(result_preview) > _TOOL_RESULT_PREVIEW_LEN:
result_preview = f"{result_preview[:_TOOL_RESULT_PREVIEW_LEN]}…"
if norm := _normalize_stream_step({
"agent": author,
"content": f"Tool `{fr.name}` returned: {result_preview}",
"step_type": "tool_result",
}):
yield norm
# Text content
elif hasattr(part, "text") and part.text is not None:
step_type = "response" if event.is_final_response() else "thinking"
if step := _make_text_step(author, part.text, step_type):
yield step
except Exception as exc:
logger.error(f"Agent stream error: {exc}")
if norm := _normalize_stream_step({
"agent": "System",
"content": _friendly_model_error(exc, chain_exhausted=True),
"step_type": "error",
}):
yield norm
finally:
_ctx["allow_issue_creation"] = prev_allow