solution_challenge_backend / backend /agentic_service.py
github-actions
Deploy to Hugging Face
c794b6b
Raw
History Blame Contribute Delete
6.55 kB
from __future__ import annotations
import json
import os
from typing import Any
from google import genai
from google.genai import types
from pydantic import BaseModel, ValidationError, Field
from gemini_config import generate_with_fallback
class TaskItem(BaseModel):
title: str
owner: str
eta_min: int = Field(ge=0, le=240)
class AgenticPlan(BaseModel):
summary: str
priority: str
tasks: list[TaskItem]
services_called: list[str]
targeted_notifications: list[dict[str, Any] | str]
def _strip_json_fence(raw: str) -> str:
text = (raw or "").strip()
if not text.startswith("```"):
return text
lines = text.splitlines()
if lines and lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
return "\n".join(lines).strip()
def _safe_json_loads(raw: str, fallback: dict[str, Any]) -> dict[str, Any]:
try:
return json.loads(_strip_json_fence(raw))
except json.JSONDecodeError:
return fallback
def _fallback_plan(payload: dict[str, Any], reason: str) -> dict[str, Any]:
return {
"summary": f"Fallback agentic plan generated ({reason}).",
"priority": payload.get("severity", "medium"),
"tasks": [
{
"title": "Stabilize immediate area",
"owner": "Security",
"eta_min": 2,
},
{
"title": "Broadcast contextual evacuation/update",
"owner": "Control Room",
"eta_min": 3,
},
],
"services_called": ["Security Dispatch"],
"targeted_notifications": [],
}
def _sanitize_incident_payload(payload: dict[str, Any]) -> dict[str, Any]:
allowed = {"alert_id", "type", "severity", "location", "message", "lat", "lng", "crowd_count", "room_counts", "timestamp", "guest_id"}
sanitized = {k: payload.get(k) for k in allowed if k in payload}
for key in ("type", "severity", "location", "message", "guest_id"):
value = sanitized.get(key)
if isinstance(value, str):
sanitized[key] = value[:500]
return sanitized
def generate_agentic_plan(payload: dict[str, Any]) -> dict[str, Any]:
payload = _sanitize_incident_payload(payload)
alert_type = str(payload.get("type") or "unknown").lower()
severity = str(payload.get("severity") or "medium").lower()
location = payload.get("location") or "Unknown Location"
message = str(payload.get("message") or "").lower()
# Determine priority
priority = severity if severity in ("critical", "high", "medium", "low") else "medium"
# Define tasks and services based on alert type/content keywords
tasks = []
services_called = []
if alert_type == "medical" or "medical" in message or "fall" in message or "injur" in message:
summary = f"Emergency medical response dispatched for {location}."
tasks = [
{"title": "Deploy Med-Response Delta team to scene", "owner": "Med-Response Delta", "eta_min": 2},
{"title": f"Secure and clear immediate vicinity at {location}", "owner": "Security Prime", "eta_min": 3},
{"title": "Coordinate pathing and entrance access for ambulance", "owner": "Ops Control", "eta_min": 5}
]
services_called = ["Medical Dispatch", "Security Dispatch"]
elif alert_type == "fight" or "fight" in message or "altercation" in message or "assault" in message or "violence" in message:
summary = f"Security intervention and de-escalation protocol at {location}."
tasks = [
{"title": "Deploy Security Prime patrol units to scene", "owner": "Security Prime", "eta_min": 1},
{"title": f"Track and isolate scene on CCTV feeds", "owner": "Ops Control", "eta_min": 2},
{"title": "Prepare Med-Response standby team", "owner": "Med-Response Delta", "eta_min": 4}
]
services_called = ["Security Dispatch", "Operations Command"]
elif alert_type == "stampede" or "stampede" in message or "panic" in message or "chaos" in message or "crowd" in message:
summary = f"Crowd control and localized evacuation routing for {location}."
tasks = [
{"title": "Initiate emergency crowd de-congestion plan", "owner": "Security Prime", "eta_min": 2},
{"title": f"Open backup exits and direct occupants at {location}", "owner": "Unit Alpha-1", "eta_min": 3},
{"title": "Broadcast evacuation/safety instructions over PA", "owner": "Ops Control", "eta_min": 1}
]
services_called = ["Evacuation Control", "Security Dispatch"]
else:
summary = f"Tactical safety check and monitoring plan for {location}."
tasks = [
{"title": f"Deploy Unit Alpha-1 for initial assessment of {location}", "owner": "Unit Alpha-1", "eta_min": 2},
{"title": "Establish direct radio link with staff on site", "owner": "Ops Control", "eta_min": 4}
]
services_called = ["Security Dispatch"]
return {
"summary": summary,
"priority": priority,
"tasks": tasks,
"services_called": services_called,
"targeted_notifications": []
}
def generate_chat_response(prompt: str, context: dict[str, Any] = None) -> str:
api_key = os.getenv("GEMINI_API_KEY", "").strip()
if not api_key:
return "Gemini API key is not configured. Please set GEMINI_API_KEY."
try:
client = genai.Client(api_key=api_key, http_options={'api_version': 'v1beta'})
system_instruction = (
"You are Gemini Tactical Copilot, an AI assistant integrated into the safety and intelligence platform. "
"You help human operators analyze live camera feeds, manage crowd anomalies, track missing persons, and coordinate emergency responses. "
"Keep your answers concise, tactical, and direct."
)
if context:
system_instruction += f"\n\nCurrent System Context:\n{json.dumps(context)}"
response = generate_with_fallback(
client,
tier="default",
contents=prompt,
config=types.GenerateContentConfig(
system_instruction=system_instruction,
temperature=0.4,
max_output_tokens=400,
),
)
return response.text or "No response generated."
except Exception as exc:
return f"Communication with Gemini failed: {exc}"