| """ |
| Streaming Engine — token-by-token output streaming. |
| |
| Enables real-time streaming of agent outputs as they're generated. |
| Optimized for minimal latency with chunked transfer encoding. |
| """ |
| import asyncio |
| import json |
| import time |
| from typing import AsyncGenerator, Optional |
| from schemas.agent import AgentEvent |
|
|
|
|
| class StreamingEngine: |
| """ |
| Manages streaming of agent events to clients. |
| |
| Supports: |
| - Token-by-token streaming via SSE (Server-Sent Events) |
| - Batch compression for low-latency networks |
| - Event prioritization (important events sent first) |
| """ |
|
|
| def __init__(self): |
| self._events_sent = 0 |
| self._bytes_sent = 0 |
| self._start_time = time.time() |
|
|
| async def stream_events(self, event_generator: AsyncGenerator[AgentEvent, None], |
| include_metadata: bool = True) -> AsyncGenerator[str, None]: |
| """ |
| Convert agent events to SSE-formatted strings. |
| Yields formatted event strings ready for HTTP response. |
| """ |
| async for event in event_generator: |
| self._events_sent += 1 |
| event_dict = event.model_dump() if hasattr(event, 'model_dump') else event |
|
|
| if not include_metadata and 'metadata' in event_dict: |
| event_dict = {k: v for k, v in event_dict.items() if k != 'metadata'} |
|
|
| formatted = f"data: {json.dumps(event_dict)}\n\n" |
| self._bytes_sent += len(formatted) |
| yield formatted |
|
|
| |
| yield "data: [DONE]\n\n" |
|
|
| async def stream_text(self, text_generator: AsyncGenerator[str, None]) -> AsyncGenerator[str, None]: |
| """ |
| Stream text content token by token. |
| Each chunk is sent as an SSE event. |
| """ |
| async for token in text_generator: |
| self._events_sent += 1 |
| self._bytes_sent += len(token) |
| yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n" |
|
|
| yield "data: [DONE]\n\n" |
|
|
| def get_stats(self) -> dict: |
| """Get streaming statistics.""" |
| elapsed = time.time() - self._start_time |
| return { |
| "events_sent": self._events_sent, |
| "bytes_sent": self._bytes_sent, |
| "elapsed_seconds": round(elapsed, 2), |
| "events_per_second": round(self._events_sent / max(elapsed, 0.01), 1), |
| } |
|
|