File size: 2,718 Bytes
fcf8749 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 | """
Agent Events SSE Endpoint.
Provides Server-Sent Events stream for real-time agent status updates
to visualization frontends.
"""
from fastapi import APIRouter, Query
from fastapi.responses import StreamingResponse
from typing import Optional
import json
import asyncio
from app.core.events import agent_event_bus
router = APIRouter(prefix="/api/v1", tags=["agent-events"])
@router.get("/agent-events/stream")
async def agent_events_stream(
run_id: Optional[str] = Query(None, description="Filter by allocation run ID")
):
"""
Server-Sent Events endpoint for agent events.
Returns a continuous stream of agent events for real-time visualization.
Optionally filter by allocation_run_id via query parameter.
Args:
run_id: Optional allocation run ID to filter events
Returns:
SSE stream of agent events
"""
async def event_generator():
# Send initial connection event
init_event = {
"type": "connected",
"message": "SSE connection established",
"filter_run_id": run_id,
}
yield f"data: {json.dumps(init_event)}\n\n"
# Send any recent events for this run
if run_id:
recent = agent_event_bus.get_recent_events(allocation_run_id=run_id)
for event in recent:
yield f"data: {json.dumps(event)}\n\n"
# Stream live events
async for event in agent_event_bus.subscribe():
# Filter by run_id if specified
if run_id and event.get("allocation_run_id") != run_id:
continue
# SSE format: "data: {...}\n\n"
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
},
)
@router.get("/agent-events/recent")
async def get_recent_events(
run_id: Optional[str] = Query(None, description="Filter by allocation run ID"),
limit: int = Query(50, ge=1, le=200, description="Maximum events to return"),
):
"""
Get recent agent events (non-streaming).
Useful for initial page load or debugging.
Args:
run_id: Optional allocation run ID to filter events
limit: Maximum number of events to return
Returns:
List of recent agent events
"""
events = agent_event_bus.get_recent_events(
allocation_run_id=run_id,
limit=limit,
)
return {"events": events, "count": len(events)}
|