cs-ai-sakura-dev / src /internal /chatbot /chatbot_handler.py
lifedebugger's picture
Deploy files from GitHub repository
df01f5b
from abc import ABC, abstractmethod
from typing import AsyncGenerator, Dict, Any, Optional, List, Tuple
import asyncio
import gradio as gr
from dataclasses import dataclass
from enum import Enum
from src.internal.agents.base_agents import AgentRequest
from src.internal.agents.expert_router_agent import ExpertRouterAgent
from src.internal.rag.chat_template import get_chat_template
class RAGChatbot:
"""Main RAG Chatbot class"""
def __init__(self, router_agent : ExpertRouterAgent, title: str = "RAG Chatbot"):
self.router_agent = router_agent
self.title = title
self.css = self._get_default_css()
def _get_default_css(self) -> str:
"""Get default CSS styling"""
return """
.gradio-container {
max-width: 900px !important;
margin: auto !important;
}
.chat-message {
padding: 10px;
margin: 5px;
border-radius: 10px;
}
#chatbot {
height: 500px;
}
"""
async def _stream_response(self, message: str, chat_memory : List[Dict], ) -> AsyncGenerator[str, None]:
"""Internal method untuk streaming response"""
try:
partial_response = ""
router_agent_request = AgentRequest(
chat_memory = chat_memory,
prompt_template = get_chat_template("expert_router"),
question = message
)
print("Message : ", message)
async for stream_data in self.router_agent.get_result(router_agent_request):
if stream_data["type"] == "chunk":
chunk = stream_data["data"]["chunk"]
partial_response += chunk
yield partial_response
elif stream_data["type"] == "metadata":
setup_time = stream_data['data']['setup_time']
print(f"\nSetup completed in {setup_time:.2f}s")
elif stream_data["type"] == "complete":
total_time = stream_data['data']['total_time']
print(f"\nTotal time: {total_time:.2f}s")
# chat_memory.append({"role": "assistant", "content": partial_response })
print("Chat Memory :", chat_memory)
except Exception as e:
yield f"❌ Error: {str(e)}"
def _chatbot_response(self, message: str, history: List[Tuple[str, str]], chat_memory : List[Dict]):
"""Generate chatbot response with proper async handling"""
try:
# Create new event loop for this thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
async_gen = self._stream_response(message = message, chat_memory = chat_memory)
try:
while True:
result = loop.run_until_complete(async_gen.__anext__())
yield result
except StopAsyncIteration:
pass
finally:
loop.close()
except Exception as e:
yield f"❌ Error: {str(e)}"
def _clear_chat(self) -> Tuple[List, str]:
"""Clear chat history"""
return [], ""
def _user_message(self, message: str, history: List, generating: bool) -> Tuple[str, List, bool, gr.update, gr.update]:
"""Handle user message input"""
if message.strip() and not generating:
history.append([message, None])
return "", history, True, gr.update(visible=True), gr.update(interactive=False)
return message, history, generating, gr.update(visible=False), gr.update(interactive=True)
def _bot_message_stream(self, history: List, generating: bool, chat_memory : List[Dict],):
"""Handle streaming bot response"""
if history and history[-1][1] is None and generating:
user_msg = history[-1][0]
for partial_response in self._chatbot_response(user_msg, history, chat_memory):
history[-1][1] = partial_response
yield history, True, gr.update(visible=True), gr.update(interactive=False)
yield history, False, gr.update(visible=False), gr.update(interactive=True)
else:
yield history, generating, gr.update(visible=False), gr.update(interactive=True)
def _stop_generation(self) -> Tuple[bool, gr.update, gr.update]:
"""Stop the generation process"""
return False, gr.update(visible=False), gr.update(interactive=True)