| from __future__ import annotations |
|
|
| import json |
| import os |
| from typing import Any |
| from google import genai |
| from google.genai import types |
| from pydantic import BaseModel, ValidationError, Field |
|
|
| from gemini_config import generate_with_fallback |
|
|
|
|
| class TaskItem(BaseModel): |
| title: str |
| owner: str |
| eta_min: int = Field(ge=0, le=240) |
|
|
|
|
| class AgenticPlan(BaseModel): |
| summary: str |
| priority: str |
| tasks: list[TaskItem] |
| services_called: list[str] |
| targeted_notifications: list[dict[str, Any] | str] |
|
|
|
|
| def _strip_json_fence(raw: str) -> str: |
| text = (raw or "").strip() |
| if not text.startswith("```"): |
| return text |
| lines = text.splitlines() |
| if lines and lines[0].startswith("```"): |
| lines = lines[1:] |
| if lines and lines[-1].strip() == "```": |
| lines = lines[:-1] |
| return "\n".join(lines).strip() |
|
|
|
|
| def _safe_json_loads(raw: str, fallback: dict[str, Any]) -> dict[str, Any]: |
| try: |
| return json.loads(_strip_json_fence(raw)) |
| except json.JSONDecodeError: |
| return fallback |
|
|
|
|
| def _fallback_plan(payload: dict[str, Any], reason: str) -> dict[str, Any]: |
| return { |
| "summary": f"Fallback agentic plan generated ({reason}).", |
| "priority": payload.get("severity", "medium"), |
| "tasks": [ |
| { |
| "title": "Stabilize immediate area", |
| "owner": "Security", |
| "eta_min": 2, |
| }, |
| { |
| "title": "Broadcast contextual evacuation/update", |
| "owner": "Control Room", |
| "eta_min": 3, |
| }, |
| ], |
| "services_called": ["Security Dispatch"], |
| "targeted_notifications": [], |
| } |
|
|
|
|
| def _sanitize_incident_payload(payload: dict[str, Any]) -> dict[str, Any]: |
| allowed = {"alert_id", "type", "severity", "location", "message", "lat", "lng", "crowd_count", "room_counts", "timestamp", "guest_id"} |
| sanitized = {k: payload.get(k) for k in allowed if k in payload} |
| for key in ("type", "severity", "location", "message", "guest_id"): |
| value = sanitized.get(key) |
| if isinstance(value, str): |
| sanitized[key] = value[:500] |
| return sanitized |
|
|
|
|
| def generate_agentic_plan(payload: dict[str, Any]) -> dict[str, Any]: |
| payload = _sanitize_incident_payload(payload) |
| alert_type = str(payload.get("type") or "unknown").lower() |
| severity = str(payload.get("severity") or "medium").lower() |
| location = payload.get("location") or "Unknown Location" |
| message = str(payload.get("message") or "").lower() |
|
|
| |
| priority = severity if severity in ("critical", "high", "medium", "low") else "medium" |
|
|
| |
| tasks = [] |
| services_called = [] |
| |
| if alert_type == "medical" or "medical" in message or "fall" in message or "injur" in message: |
| summary = f"Emergency medical response dispatched for {location}." |
| tasks = [ |
| {"title": "Deploy Med-Response Delta team to scene", "owner": "Med-Response Delta", "eta_min": 2}, |
| {"title": f"Secure and clear immediate vicinity at {location}", "owner": "Security Prime", "eta_min": 3}, |
| {"title": "Coordinate pathing and entrance access for ambulance", "owner": "Ops Control", "eta_min": 5} |
| ] |
| services_called = ["Medical Dispatch", "Security Dispatch"] |
| elif alert_type == "fight" or "fight" in message or "altercation" in message or "assault" in message or "violence" in message: |
| summary = f"Security intervention and de-escalation protocol at {location}." |
| tasks = [ |
| {"title": "Deploy Security Prime patrol units to scene", "owner": "Security Prime", "eta_min": 1}, |
| {"title": f"Track and isolate scene on CCTV feeds", "owner": "Ops Control", "eta_min": 2}, |
| {"title": "Prepare Med-Response standby team", "owner": "Med-Response Delta", "eta_min": 4} |
| ] |
| services_called = ["Security Dispatch", "Operations Command"] |
| elif alert_type == "stampede" or "stampede" in message or "panic" in message or "chaos" in message or "crowd" in message: |
| summary = f"Crowd control and localized evacuation routing for {location}." |
| tasks = [ |
| {"title": "Initiate emergency crowd de-congestion plan", "owner": "Security Prime", "eta_min": 2}, |
| {"title": f"Open backup exits and direct occupants at {location}", "owner": "Unit Alpha-1", "eta_min": 3}, |
| {"title": "Broadcast evacuation/safety instructions over PA", "owner": "Ops Control", "eta_min": 1} |
| ] |
| services_called = ["Evacuation Control", "Security Dispatch"] |
| else: |
| summary = f"Tactical safety check and monitoring plan for {location}." |
| tasks = [ |
| {"title": f"Deploy Unit Alpha-1 for initial assessment of {location}", "owner": "Unit Alpha-1", "eta_min": 2}, |
| {"title": "Establish direct radio link with staff on site", "owner": "Ops Control", "eta_min": 4} |
| ] |
| services_called = ["Security Dispatch"] |
|
|
| return { |
| "summary": summary, |
| "priority": priority, |
| "tasks": tasks, |
| "services_called": services_called, |
| "targeted_notifications": [] |
| } |
|
|
|
|
| def generate_chat_response(prompt: str, context: dict[str, Any] = None) -> str: |
| api_key = os.getenv("GEMINI_API_KEY", "").strip() |
|
|
| if not api_key: |
| return "Gemini API key is not configured. Please set GEMINI_API_KEY." |
|
|
| try: |
| client = genai.Client(api_key=api_key, http_options={'api_version': 'v1beta'}) |
|
|
| system_instruction = ( |
| "You are Gemini Tactical Copilot, an AI assistant integrated into the safety and intelligence platform. " |
| "You help human operators analyze live camera feeds, manage crowd anomalies, track missing persons, and coordinate emergency responses. " |
| "Keep your answers concise, tactical, and direct." |
| ) |
| if context: |
| system_instruction += f"\n\nCurrent System Context:\n{json.dumps(context)}" |
|
|
| response = generate_with_fallback( |
| client, |
| tier="default", |
| contents=prompt, |
| config=types.GenerateContentConfig( |
| system_instruction=system_instruction, |
| temperature=0.4, |
| max_output_tokens=400, |
| ), |
| ) |
| return response.text or "No response generated." |
|
|
| except Exception as exc: |
| return f"Communication with Gemini failed: {exc}" |
|
|