Kacper Łukawski
Fix passing MCP calls through the streaming queue
980b52e
import asyncio
import logging
from haystack.dataclasses import StreamingChunk
logger = logging.getLogger(__name__)
class StreamingQueue:
"""Bridges Haystack's streaming callback to an async iterator for FastAPI JSON lines streaming."""
_DONE = object()
def __init__(self) -> None:
self._queue: asyncio.Queue = asyncio.Queue()
async def callback(self, chunk: StreamingChunk) -> None:
await self._queue.put(chunk)
def sync_callback(self, chunk: StreamingChunk) -> None:
self._queue.put_nowait(chunk)
def __aiter__(self) -> "StreamingQueue":
return self
async def __anext__(self) -> StreamingChunk:
item = await self._queue.get()
if item is self._DONE:
logger.info("Stream exhausted")
raise StopAsyncIteration
if item.tool_calls:
for tc in item.tool_calls:
logger.info("Tool call [%s] %s args=%s", tc.index, tc.tool_name or "(streaming)", tc.arguments or "")
elif item.tool_call_result:
logger.info("Tool result: %s", str(item.tool_call_result.result)[:200])
elif item.finish_reason:
logger.info("Finish reason: %s", item.finish_reason)
elif item.content:
logger.info("Text chunk: %r", item.content)
return item
async def __aenter__(self) -> "StreamingQueue":
logger.info("StreamingQueue opened")
return self
async def __aexit__(self, *args) -> None:
logger.info("StreamingQueue closed, sending done sentinel")
await self._queue.put(self._DONE)