File size: 2,595 Bytes
f37bf1d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# api/services/stream_service.py
import json
import asyncio
from typing import AsyncGenerator
from api.config import settings, logger


class StreamService:
    """Service for handling streaming responses"""
    
    @staticmethod
    async def stream_chat_response(
        message: str,
        session_id: str
    ) -> AsyncGenerator[str, None]:
        """Stream chat responses token by token"""
        try:
            # Check for pending interrupt first
            if settings.chat_manager.has_pending_interrupt(session_id):
                interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {})
                interrupt_data_obj = interrupt_info.get("interrupt_data")
                
                interrupt_value = (
                    interrupt_data_obj.value 
                    if hasattr(interrupt_data_obj, 'value') 
                    else {}
                )
                
                yield f"data: {json.dumps({'type': 'interrupt', 'data': interrupt_value})}\n\n"
                return
            
            # Process message
            response = await settings.chat_manager.chat(
                message=message,
                session_id=session_id,
                interrupt_handler=None
            )
            
            # Check if interrupt occurred during processing
            if settings.chat_manager.has_pending_interrupt(session_id):
                interrupt_info = settings.chat_manager.pending_interrupts.get(session_id, {})
                interrupt_data_obj = interrupt_info.get("interrupt_data")
                
                interrupt_value = (
                    interrupt_data_obj.value 
                    if hasattr(interrupt_data_obj, 'value') 
                    else {}
                )
                
                yield f"data: {json.dumps({'type': 'interrupt', 'data': interrupt_value})}\n\n"
                return
            
            # Stream response token by token
            words = response.split()
            for i, word in enumerate(words):
                token = word + (" " if i < len(words) - 1 else "")
                yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
                await asyncio.sleep(settings.STREAM_DELAY)
            
            # Send completion event
            yield f"data: {json.dumps({'type': 'done', 'session_id': session_id})}\n\n"
            
        except Exception as e:
            logger.exception(f"Error in streaming: {e}")
            yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"