import asyncio import logging from haystack.dataclasses import StreamingChunk logger = logging.getLogger(__name__) class StreamingQueue: """Bridges Haystack's streaming callback to an async iterator for FastAPI JSON lines streaming.""" _DONE = object() def __init__(self) -> None: self._queue: asyncio.Queue = asyncio.Queue() async def callback(self, chunk: StreamingChunk) -> None: await self._queue.put(chunk) def sync_callback(self, chunk: StreamingChunk) -> None: self._queue.put_nowait(chunk) def __aiter__(self) -> "StreamingQueue": return self async def __anext__(self) -> StreamingChunk: item = await self._queue.get() if item is self._DONE: logger.info("Stream exhausted") raise StopAsyncIteration if item.tool_calls: for tc in item.tool_calls: logger.info("Tool call [%s] %s args=%s", tc.index, tc.tool_name or "(streaming)", tc.arguments or "") elif item.tool_call_result: logger.info("Tool result: %s", str(item.tool_call_result.result)[:200]) elif item.finish_reason: logger.info("Finish reason: %s", item.finish_reason) elif item.content: logger.info("Text chunk: %r", item.content) return item async def __aenter__(self) -> "StreamingQueue": logger.info("StreamingQueue opened") return self async def __aexit__(self, *args) -> None: logger.info("StreamingQueue closed, sending done sentinel") await self._queue.put(self._DONE)