from __future__ import annotations import json import os from typing import Any from google import genai from google.genai import types from pydantic import BaseModel, ValidationError, Field from gemini_config import generate_with_fallback class TaskItem(BaseModel): title: str owner: str eta_min: int = Field(ge=0, le=240) class AgenticPlan(BaseModel): summary: str priority: str tasks: list[TaskItem] services_called: list[str] targeted_notifications: list[dict[str, Any] | str] def _strip_json_fence(raw: str) -> str: text = (raw or "").strip() if not text.startswith("```"): return text lines = text.splitlines() if lines and lines[0].startswith("```"): lines = lines[1:] if lines and lines[-1].strip() == "```": lines = lines[:-1] return "\n".join(lines).strip() def _safe_json_loads(raw: str, fallback: dict[str, Any]) -> dict[str, Any]: try: return json.loads(_strip_json_fence(raw)) except json.JSONDecodeError: return fallback def _fallback_plan(payload: dict[str, Any], reason: str) -> dict[str, Any]: return { "summary": f"Fallback agentic plan generated ({reason}).", "priority": payload.get("severity", "medium"), "tasks": [ { "title": "Stabilize immediate area", "owner": "Security", "eta_min": 2, }, { "title": "Broadcast contextual evacuation/update", "owner": "Control Room", "eta_min": 3, }, ], "services_called": ["Security Dispatch"], "targeted_notifications": [], } def _sanitize_incident_payload(payload: dict[str, Any]) -> dict[str, Any]: allowed = {"alert_id", "type", "severity", "location", "message", "lat", "lng", "crowd_count", "room_counts", "timestamp", "guest_id"} sanitized = {k: payload.get(k) for k in allowed if k in payload} for key in ("type", "severity", "location", "message", "guest_id"): value = sanitized.get(key) if isinstance(value, str): sanitized[key] = value[:500] return sanitized def generate_agentic_plan(payload: dict[str, Any]) -> dict[str, Any]: payload = _sanitize_incident_payload(payload) alert_type = str(payload.get("type") or "unknown").lower() severity = str(payload.get("severity") or "medium").lower() location = payload.get("location") or "Unknown Location" message = str(payload.get("message") or "").lower() # Determine priority priority = severity if severity in ("critical", "high", "medium", "low") else "medium" # Define tasks and services based on alert type/content keywords tasks = [] services_called = [] if alert_type == "medical" or "medical" in message or "fall" in message or "injur" in message: summary = f"Emergency medical response dispatched for {location}." tasks = [ {"title": "Deploy Med-Response Delta team to scene", "owner": "Med-Response Delta", "eta_min": 2}, {"title": f"Secure and clear immediate vicinity at {location}", "owner": "Security Prime", "eta_min": 3}, {"title": "Coordinate pathing and entrance access for ambulance", "owner": "Ops Control", "eta_min": 5} ] services_called = ["Medical Dispatch", "Security Dispatch"] elif alert_type == "fight" or "fight" in message or "altercation" in message or "assault" in message or "violence" in message: summary = f"Security intervention and de-escalation protocol at {location}." tasks = [ {"title": "Deploy Security Prime patrol units to scene", "owner": "Security Prime", "eta_min": 1}, {"title": f"Track and isolate scene on CCTV feeds", "owner": "Ops Control", "eta_min": 2}, {"title": "Prepare Med-Response standby team", "owner": "Med-Response Delta", "eta_min": 4} ] services_called = ["Security Dispatch", "Operations Command"] elif alert_type == "stampede" or "stampede" in message or "panic" in message or "chaos" in message or "crowd" in message: summary = f"Crowd control and localized evacuation routing for {location}." tasks = [ {"title": "Initiate emergency crowd de-congestion plan", "owner": "Security Prime", "eta_min": 2}, {"title": f"Open backup exits and direct occupants at {location}", "owner": "Unit Alpha-1", "eta_min": 3}, {"title": "Broadcast evacuation/safety instructions over PA", "owner": "Ops Control", "eta_min": 1} ] services_called = ["Evacuation Control", "Security Dispatch"] else: summary = f"Tactical safety check and monitoring plan for {location}." tasks = [ {"title": f"Deploy Unit Alpha-1 for initial assessment of {location}", "owner": "Unit Alpha-1", "eta_min": 2}, {"title": "Establish direct radio link with staff on site", "owner": "Ops Control", "eta_min": 4} ] services_called = ["Security Dispatch"] return { "summary": summary, "priority": priority, "tasks": tasks, "services_called": services_called, "targeted_notifications": [] } def generate_chat_response(prompt: str, context: dict[str, Any] = None) -> str: api_key = os.getenv("GEMINI_API_KEY", "").strip() if not api_key: return "Gemini API key is not configured. Please set GEMINI_API_KEY." try: client = genai.Client(api_key=api_key, http_options={'api_version': 'v1beta'}) system_instruction = ( "You are Gemini Tactical Copilot, an AI assistant integrated into the safety and intelligence platform. " "You help human operators analyze live camera feeds, manage crowd anomalies, track missing persons, and coordinate emergency responses. " "Keep your answers concise, tactical, and direct." ) if context: system_instruction += f"\n\nCurrent System Context:\n{json.dumps(context)}" response = generate_with_fallback( client, tier="default", contents=prompt, config=types.GenerateContentConfig( system_instruction=system_instruction, temperature=0.4, max_output_tokens=400, ), ) return response.text or "No response generated." except Exception as exc: return f"Communication with Gemini failed: {exc}"