# api/services/stream_service.py import json import asyncio from typing import AsyncGenerator from api.config import settings, logger class StreamService: """Service for handling streaming responses""" @staticmethod async def stream_chat_response( message: str, session_id: str ) -> AsyncGenerator[str, None]: """Stream chat responses token by token""" try: # Check for pending interrupt first if settings.chat_manager.has_pending_interrupt(session_id): interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {}) interrupt_data_obj = interrupt_info.get("interrupt_data") interrupt_value = ( interrupt_data_obj.value if hasattr(interrupt_data_obj, 'value') else {} ) yield f"data: {json.dumps({'type': 'interrupt', 'data': interrupt_value})}\n\n" return # Process message response = await settings.chat_manager.chat( message=message, session_id=session_id, interrupt_handler=None ) # Check if interrupt occurred during processing if settings.chat_manager.has_pending_interrupt(session_id): interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {}) interrupt_data_obj = interrupt_info.get("interrupt_data") interrupt_value = ( interrupt_data_obj.value if hasattr(interrupt_data_obj, 'value') else {} ) yield f"data: {json.dumps({'type': 'interrupt', 'data': interrupt_value})}\n\n" return # Stream response token by token words = response.split() for i, word in enumerate(words): token = word + (" " if i < len(words) - 1 else "") yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n" await asyncio.sleep(settings.STREAM_DELAY) # Send completion event yield f"data: {json.dumps({'type': 'done', 'session_id': session_id})}\n\n" except Exception as e: logger.exception(f"Error in streaming: {e}") yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"