Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| from haystack.dataclasses import StreamingChunk | |
| logger = logging.getLogger(__name__) | |
| class StreamingQueue: | |
| """Bridges Haystack's streaming callback to an async iterator for FastAPI JSON lines streaming.""" | |
| _DONE = object() | |
| def __init__(self) -> None: | |
| self._queue: asyncio.Queue = asyncio.Queue() | |
| async def callback(self, chunk: StreamingChunk) -> None: | |
| await self._queue.put(chunk) | |
| def sync_callback(self, chunk: StreamingChunk) -> None: | |
| self._queue.put_nowait(chunk) | |
| def __aiter__(self) -> "StreamingQueue": | |
| return self | |
| async def __anext__(self) -> StreamingChunk: | |
| item = await self._queue.get() | |
| if item is self._DONE: | |
| logger.info("Stream exhausted") | |
| raise StopAsyncIteration | |
| if item.tool_calls: | |
| for tc in item.tool_calls: | |
| logger.info("Tool call [%s] %s args=%s", tc.index, tc.tool_name or "(streaming)", tc.arguments or "") | |
| elif item.tool_call_result: | |
| logger.info("Tool result: %s", str(item.tool_call_result.result)[:200]) | |
| elif item.finish_reason: | |
| logger.info("Finish reason: %s", item.finish_reason) | |
| elif item.content: | |
| logger.info("Text chunk: %r", item.content) | |
| return item | |
| async def __aenter__(self) -> "StreamingQueue": | |
| logger.info("StreamingQueue opened") | |
| return self | |
| async def __aexit__(self, *args) -> None: | |
| logger.info("StreamingQueue closed, sending done sentinel") | |
| await self._queue.put(self._DONE) | |