Spaces:
Configuration error
Configuration error
File size: 4,681 Bytes
05269f9 bc216e0 05269f9 bc216e0 05269f9 c663573 05269f9 bc216e0 05269f9 df01f5b c663573 05269f9 c663573 05269f9 c663573 05269f9 c663573 05269f9 c663573 05269f9 c663573 05269f9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | from abc import ABC, abstractmethod
from typing import AsyncGenerator, Dict, Any, Optional, List, Tuple
import asyncio
import gradio as gr
from dataclasses import dataclass
from enum import Enum
from src.internal.agents.base_agents import AgentRequest
from src.internal.agents.expert_router_agent import ExpertRouterAgent
from src.internal.rag.chat_template import get_chat_template
class RAGChatbot:
"""Main RAG Chatbot class"""
def __init__(self, router_agent : ExpertRouterAgent, title: str = "RAG Chatbot"):
self.router_agent = router_agent
self.title = title
self.css = self._get_default_css()
def _get_default_css(self) -> str:
"""Get default CSS styling"""
return """
.gradio-container {
max-width: 900px !important;
margin: auto !important;
}
.chat-message {
padding: 10px;
margin: 5px;
border-radius: 10px;
}
#chatbot {
height: 500px;
}
"""
async def _stream_response(self, message: str, chat_memory : List[Dict], ) -> AsyncGenerator[str, None]:
"""Internal method untuk streaming response"""
try:
partial_response = ""
router_agent_request = AgentRequest(
chat_memory = chat_memory,
prompt_template = get_chat_template("expert_router"),
question = message
)
print("Message : ", message)
async for stream_data in self.router_agent.get_result(router_agent_request):
if stream_data["type"] == "chunk":
chunk = stream_data["data"]["chunk"]
partial_response += chunk
yield partial_response
elif stream_data["type"] == "metadata":
setup_time = stream_data['data']['setup_time']
print(f"\nSetup completed in {setup_time:.2f}s")
elif stream_data["type"] == "complete":
total_time = stream_data['data']['total_time']
print(f"\nTotal time: {total_time:.2f}s")
# chat_memory.append({"role": "assistant", "content": partial_response })
print("Chat Memory :", chat_memory)
except Exception as e:
yield f"❌ Error: {str(e)}"
def _chatbot_response(self, message: str, history: List[Tuple[str, str]], chat_memory : List[Dict]):
"""Generate chatbot response with proper async handling"""
try:
# Create new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async_gen = self._stream_response(message = message, chat_memory = chat_memory)
try:
while True:
result = loop.run_until_complete(async_gen.__anext__())
yield result
except StopAsyncIteration:
pass
finally:
loop.close()
except Exception as e:
yield f"❌ Error: {str(e)}"
def _clear_chat(self) -> Tuple[List, str]:
"""Clear chat history"""
return [], ""
def _user_message(self, message: str, history: List, generating: bool) -> Tuple[str, List, bool, gr.update, gr.update]:
"""Handle user message input"""
if message.strip() and not generating:
history.append([message, None])
return "", history, True, gr.update(visible=True), gr.update(interactive=False)
return message, history, generating, gr.update(visible=False), gr.update(interactive=True)
def _bot_message_stream(self, history: List, generating: bool, chat_memory : List[Dict],):
"""Handle streaming bot response"""
if history and history[-1][1] is None and generating:
user_msg = history[-1][0]
for partial_response in self._chatbot_response(user_msg, history, chat_memory):
history[-1][1] = partial_response
yield history, True, gr.update(visible=True), gr.update(interactive=False)
yield history, False, gr.update(visible=False), gr.update(interactive=True)
else:
yield history, generating, gr.update(visible=False), gr.update(interactive=True)
def _stop_generation(self) -> Tuple[bool, gr.update, gr.update]:
"""Stop the generation process"""
return False, gr.update(visible=False), gr.update(interactive=True)
|