ReliefLensDemo / backend /agents /dispatch_agent.py
copilot-swe-agent[bot]
feat: build complete ReliefLensAI backend
d0f3cbe unverified
Raw
History Blame Contribute Delete
1.33 kB
from __future__ import annotations
import logging
from typing import Any, List
from schemas.dispatch import DispatchMessage
from schemas.incident import Incident
from schemas.resource import ResourceRecommendation
from skills.generate_dispatch_message import generate_dispatch_message
logger = logging.getLogger(__name__)
class DispatchAgent:
def __init__(self, vllm_client: Any = None) -> None:
self.vllm_client = vllm_client
async def run(
self,
incidents: List[Incident],
all_resources: List[ResourceRecommendation],
) -> List[DispatchMessage]:
logger.info("DispatchAgent: generating messages for %d incidents", len(incidents))
messages: List[DispatchMessage] = []
resource_by_incident = {}
for r in all_resources:
resource_by_incident.setdefault(r.incident_id, []).append(r)
for incident in incidents:
incident_resources = resource_by_incident.get(incident.id, [])
msg = await generate_dispatch_message(
incident,
incident_resources,
channel="radio",
vllm_client=self.vllm_client,
)
messages.append(msg)
logger.info("DispatchAgent: generated %d dispatch messages", len(messages))
return messages