FairRelay / brain /app /core /events.py
MouleeswaranM's picture
Upload folder using huggingface_hub
fcf8749 verified
raw
history blame
4.55 kB
"""
Agent Event Bus for real-time SSE synchronization.
Provides pub/sub mechanism for agent events that can be consumed
by multiple SSE clients (visualization UI, demo page, etc.).
"""
from typing import AsyncIterator, Dict, Any, List, Optional
from uuid import UUID
import asyncio
import time
class AgentEventBus:
"""
Simple in-process pub/sub for agent events.
Multiple listeners (SSE connections) can subscribe and receive
events published by LangGraph nodes during allocation runs.
"""
def __init__(self) -> None:
self._subscribers: List[asyncio.Queue] = []
self._lock = asyncio.Lock()
self._recent_events: List[Dict[str, Any]] = []
self._max_recent = 100 # Keep last 100 events for late joiners
async def subscribe(self) -> AsyncIterator[Dict[str, Any]]:
"""
Async generator yielding events. Callers iterate and send as SSE.
Yields:
Agent event dictionaries
"""
queue: asyncio.Queue = asyncio.Queue()
async with self._lock:
self._subscribers.append(queue)
# Send recent events to catch up
for event in self._recent_events[-20:]:
await queue.put(event)
try:
while True:
event = await queue.get()
yield event
finally:
async with self._lock:
if queue in self._subscribers:
self._subscribers.remove(queue)
async def publish(self, event: Dict[str, Any]) -> None:
"""
Publish an event to all subscribers.
Args:
event: Agent event dictionary
"""
async with self._lock:
# Store in recent events
self._recent_events.append(event)
if len(self._recent_events) > self._max_recent:
self._recent_events = self._recent_events[-self._max_recent:]
# Broadcast to all subscribers
for queue in self._subscribers:
try:
await queue.put(event)
except Exception:
pass # Ignore failed subscribers
def get_recent_events(
self,
allocation_run_id: Optional[str] = None,
limit: int = 50
) -> List[Dict[str, Any]]:
"""
Get recent events, optionally filtered by allocation run.
Args:
allocation_run_id: Filter by specific run (optional)
limit: Maximum events to return
Returns:
List of recent events
"""
events = self._recent_events
if allocation_run_id:
events = [
e for e in events
if e.get("allocation_run_id") == allocation_run_id
]
return events[-limit:]
# Global singleton
agent_event_bus = AgentEventBus()
def make_agent_event(
allocation_run_id: str,
agent_name: str,
step_type: str,
state: str,
payload: Optional[Dict[str, Any]] = None,
) -> Dict[str, Any]:
"""
Create a standardized agent event dictionary.
Args:
allocation_run_id: UUID string of the allocation run
agent_name: Agent identifier (e.g., "ML_EFFORT", "ROUTE_PLANNER")
step_type: Step identifier (e.g., "MATRIX_GENERATION", "PROPOSAL_1")
state: Event state - "STARTED", "COMPLETED", or "ERROR"
payload: Optional additional data for the event
Returns:
Formatted event dictionary
"""
return {
"allocation_run_id": str(allocation_run_id),
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"agent_name": agent_name,
"step_type": step_type,
"state": state,
"payload": payload or {},
}
async def publish_agent_event(
allocation_run_id: str,
agent_name: str,
step_type: str,
state: str,
payload: Optional[Dict[str, Any]] = None,
) -> None:
"""
Convenience function to publish an agent event.
Args:
allocation_run_id: UUID string of the allocation run
agent_name: Agent identifier
step_type: Step identifier
state: Event state
payload: Optional additional data
"""
event = make_agent_event(
allocation_run_id=allocation_run_id,
agent_name=agent_name,
step_type=step_type,
state=state,
payload=payload,
)
await agent_event_bus.publish(event)