File size: 4,681 Bytes
05269f9
 
 
 
 
 
bc216e0
 
 
05269f9
 
 
bc216e0
 
05269f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c663573
05269f9
 
 
bc216e0
 
 
 
 
 
 
05269f9
 
 
 
 
 
 
 
 
 
 
 
df01f5b
c663573
05269f9
 
 
c663573
05269f9
 
 
 
 
 
c663573
05269f9
 
 
 
 
 
 
 
c663573
05269f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c663573
05269f9
 
 
 
c663573
05269f9
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
from abc import ABC, abstractmethod
from typing import AsyncGenerator, Dict, Any, Optional, List, Tuple
import asyncio
import gradio as gr
from dataclasses import dataclass
from enum import Enum
from src.internal.agents.base_agents import AgentRequest
from src.internal.agents.expert_router_agent import ExpertRouterAgent
from src.internal.rag.chat_template import get_chat_template
class RAGChatbot:
    """Main RAG Chatbot class"""
    
    def __init__(self, router_agent : ExpertRouterAgent,  title: str = "RAG Chatbot"):
        self.router_agent = router_agent
        self.title = title
        self.css = self._get_default_css()

    def _get_default_css(self) -> str:
        """Get default CSS styling"""
        return """
        .gradio-container {
            max-width: 900px !important;
            margin: auto !important;
        }
        .chat-message {
            padding: 10px;
            margin: 5px;
            border-radius: 10px;
        }
        #chatbot {
            height: 500px;
        }
        """
    
    async def _stream_response(self, message: str, chat_memory : List[Dict], ) -> AsyncGenerator[str, None]:
        """Internal method untuk streaming response"""
        try:
            partial_response = ""
            router_agent_request = AgentRequest(
                chat_memory = chat_memory,
                prompt_template = get_chat_template("expert_router"),
                question = message
            )
            print("Message : ", message)
            async for stream_data in self.router_agent.get_result(router_agent_request):
                if stream_data["type"] == "chunk":
                    chunk = stream_data["data"]["chunk"]
                    partial_response += chunk
                    yield partial_response
                    
                elif stream_data["type"] == "metadata":
                    setup_time = stream_data['data']['setup_time']
                    print(f"\nSetup completed in {setup_time:.2f}s")
                    
                elif stream_data["type"] == "complete":
                    total_time = stream_data['data']['total_time']
                    print(f"\nTotal time: {total_time:.2f}s")
            # chat_memory.append({"role": "assistant", "content": partial_response })
            print("Chat Memory :", chat_memory)
        except Exception as e:
            yield f"❌ Error: {str(e)}"
    
    def _chatbot_response(self, message: str, history: List[Tuple[str, str]], chat_memory : List[Dict]):
        """Generate chatbot response with proper async handling"""
        try:
            # Create new event loop for this thread
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            
            async_gen = self._stream_response(message = message, chat_memory = chat_memory)
            
            try:
                while True:
                    result = loop.run_until_complete(async_gen.__anext__())
                    yield result
            except StopAsyncIteration:
                pass
            finally:
                
                loop.close()
                
        except Exception as e:
            yield f"❌ Error: {str(e)}"
    
    def _clear_chat(self) -> Tuple[List, str]:
        """Clear chat history"""
        return [], ""
    
    def _user_message(self, message: str, history: List, generating: bool) -> Tuple[str, List, bool, gr.update, gr.update]:
        """Handle user message input"""
        if message.strip() and not generating:
            history.append([message, None])
            return "", history, True, gr.update(visible=True), gr.update(interactive=False)
        return message, history, generating, gr.update(visible=False), gr.update(interactive=True)
    
    def _bot_message_stream(self,  history: List, generating: bool, chat_memory : List[Dict],):
        """Handle streaming bot response"""
        if history and history[-1][1] is None and generating:
            user_msg = history[-1][0]
            
            for partial_response in self._chatbot_response(user_msg, history, chat_memory):
                history[-1][1] = partial_response
                yield history, True, gr.update(visible=True), gr.update(interactive=False)
            
            yield history, False, gr.update(visible=False), gr.update(interactive=True)
        else:
            yield history, generating, gr.update(visible=False), gr.update(interactive=True)
    
    def _stop_generation(self) -> Tuple[bool, gr.update, gr.update]:
        """Stop the generation process"""
        return False, gr.update(visible=False), gr.update(interactive=True)