""" Streaming Engine — token-by-token output streaming. Enables real-time streaming of agent outputs as they're generated. Optimized for minimal latency with chunked transfer encoding. """ import asyncio import json import time from typing import AsyncGenerator, Optional from schemas.agent import AgentEvent class StreamingEngine: """ Manages streaming of agent events to clients. Supports: - Token-by-token streaming via SSE (Server-Sent Events) - Batch compression for low-latency networks - Event prioritization (important events sent first) """ def __init__(self): self._events_sent = 0 self._bytes_sent = 0 self._start_time = time.time() async def stream_events(self, event_generator: AsyncGenerator[AgentEvent, None], include_metadata: bool = True) -> AsyncGenerator[str, None]: """ Convert agent events to SSE-formatted strings. Yields formatted event strings ready for HTTP response. """ async for event in event_generator: self._events_sent += 1 event_dict = event.model_dump() if hasattr(event, 'model_dump') else event if not include_metadata and 'metadata' in event_dict: event_dict = {k: v for k, v in event_dict.items() if k != 'metadata'} formatted = f"data: {json.dumps(event_dict)}\n\n" self._bytes_sent += len(formatted) yield formatted # Send done signal yield "data: [DONE]\n\n" async def stream_text(self, text_generator: AsyncGenerator[str, None]) -> AsyncGenerator[str, None]: """ Stream text content token by token. Each chunk is sent as an SSE event. """ async for token in text_generator: self._events_sent += 1 self._bytes_sent += len(token) yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n" yield "data: [DONE]\n\n" def get_stats(self) -> dict: """Get streaming statistics.""" elapsed = time.time() - self._start_time return { "events_sent": self._events_sent, "bytes_sent": self._bytes_sent, "elapsed_seconds": round(elapsed, 2), "events_per_second": round(self._events_sent / max(elapsed, 0.01), 1), }