| """ |
| Agent Event Bus for real-time SSE synchronization. |
| |
| Provides pub/sub mechanism for agent events that can be consumed |
| by multiple SSE clients (visualization UI, demo page, etc.). |
| """ |
|
|
| from typing import AsyncIterator, Dict, Any, List, Optional |
| from uuid import UUID |
| import asyncio |
| import time |
|
|
|
|
| class AgentEventBus: |
| """ |
| Simple in-process pub/sub for agent events. |
| |
| Multiple listeners (SSE connections) can subscribe and receive |
| events published by LangGraph nodes during allocation runs. |
| """ |
|
|
| def __init__(self) -> None: |
| self._subscribers: List[asyncio.Queue] = [] |
| self._lock = asyncio.Lock() |
| self._recent_events: List[Dict[str, Any]] = [] |
| self._max_recent = 100 |
|
|
| async def subscribe(self) -> AsyncIterator[Dict[str, Any]]: |
| """ |
| Async generator yielding events. Callers iterate and send as SSE. |
| |
| Yields: |
| Agent event dictionaries |
| """ |
| queue: asyncio.Queue = asyncio.Queue() |
| |
| async with self._lock: |
| self._subscribers.append(queue) |
| |
| for event in self._recent_events[-20:]: |
| await queue.put(event) |
|
|
| try: |
| while True: |
| event = await queue.get() |
| yield event |
| finally: |
| async with self._lock: |
| if queue in self._subscribers: |
| self._subscribers.remove(queue) |
|
|
| async def publish(self, event: Dict[str, Any]) -> None: |
| """ |
| Publish an event to all subscribers. |
| |
| Args: |
| event: Agent event dictionary |
| """ |
| async with self._lock: |
| |
| self._recent_events.append(event) |
| if len(self._recent_events) > self._max_recent: |
| self._recent_events = self._recent_events[-self._max_recent:] |
| |
| |
| for queue in self._subscribers: |
| try: |
| await queue.put(event) |
| except Exception: |
| pass |
|
|
| def get_recent_events( |
| self, |
| allocation_run_id: Optional[str] = None, |
| limit: int = 50 |
| ) -> List[Dict[str, Any]]: |
| """ |
| Get recent events, optionally filtered by allocation run. |
| |
| Args: |
| allocation_run_id: Filter by specific run (optional) |
| limit: Maximum events to return |
| |
| Returns: |
| List of recent events |
| """ |
| events = self._recent_events |
| if allocation_run_id: |
| events = [ |
| e for e in events |
| if e.get("allocation_run_id") == allocation_run_id |
| ] |
| return events[-limit:] |
|
|
|
|
| |
| agent_event_bus = AgentEventBus() |
|
|
|
|
| def make_agent_event( |
| allocation_run_id: str, |
| agent_name: str, |
| step_type: str, |
| state: str, |
| payload: Optional[Dict[str, Any]] = None, |
| ) -> Dict[str, Any]: |
| """ |
| Create a standardized agent event dictionary. |
| |
| Args: |
| allocation_run_id: UUID string of the allocation run |
| agent_name: Agent identifier (e.g., "ML_EFFORT", "ROUTE_PLANNER") |
| step_type: Step identifier (e.g., "MATRIX_GENERATION", "PROPOSAL_1") |
| state: Event state - "STARTED", "COMPLETED", or "ERROR" |
| payload: Optional additional data for the event |
| |
| Returns: |
| Formatted event dictionary |
| """ |
| return { |
| "allocation_run_id": str(allocation_run_id), |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), |
| "agent_name": agent_name, |
| "step_type": step_type, |
| "state": state, |
| "payload": payload or {}, |
| } |
|
|
|
|
| async def publish_agent_event( |
| allocation_run_id: str, |
| agent_name: str, |
| step_type: str, |
| state: str, |
| payload: Optional[Dict[str, Any]] = None, |
| ) -> None: |
| """ |
| Convenience function to publish an agent event. |
| |
| Args: |
| allocation_run_id: UUID string of the allocation run |
| agent_name: Agent identifier |
| step_type: Step identifier |
| state: Event state |
| payload: Optional additional data |
| """ |
| event = make_agent_event( |
| allocation_run_id=allocation_run_id, |
| agent_name=agent_name, |
| step_type=step_type, |
| state=state, |
| payload=payload, |
| ) |
| await agent_event_bus.publish(event) |
|
|