from abc import ABC, abstractmethod from typing import AsyncGenerator, Dict, Any, Optional, List, Tuple import asyncio import gradio as gr from dataclasses import dataclass from enum import Enum from src.internal.agents.base_agents import AgentRequest from src.internal.agents.expert_router_agent import ExpertRouterAgent from src.internal.rag.chat_template import get_chat_template class RAGChatbot: """Main RAG Chatbot class""" def __init__(self, router_agent : ExpertRouterAgent, title: str = "RAG Chatbot"): self.router_agent = router_agent self.title = title self.css = self._get_default_css() def _get_default_css(self) -> str: """Get default CSS styling""" return """ .gradio-container { max-width: 900px !important; margin: auto !important; } .chat-message { padding: 10px; margin: 5px; border-radius: 10px; } #chatbot { height: 500px; } """ async def _stream_response(self, message: str, chat_memory : List[Dict], ) -> AsyncGenerator[str, None]: """Internal method untuk streaming response""" try: partial_response = "" router_agent_request = AgentRequest( chat_memory = chat_memory, prompt_template = get_chat_template("expert_router"), question = message ) print("Message : ", message) async for stream_data in self.router_agent.get_result(router_agent_request): if stream_data["type"] == "chunk": chunk = stream_data["data"]["chunk"] partial_response += chunk yield partial_response elif stream_data["type"] == "metadata": setup_time = stream_data['data']['setup_time'] print(f"\nSetup completed in {setup_time:.2f}s") elif stream_data["type"] == "complete": total_time = stream_data['data']['total_time'] print(f"\nTotal time: {total_time:.2f}s") # chat_memory.append({"role": "assistant", "content": partial_response }) print("Chat Memory :", chat_memory) except Exception as e: yield f"❌ Error: {str(e)}" def _chatbot_response(self, message: str, history: List[Tuple[str, str]], chat_memory : List[Dict]): """Generate chatbot response with proper async handling""" try: # Create new event loop for this thread loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) async_gen = self._stream_response(message = message, chat_memory = chat_memory) try: while True: result = loop.run_until_complete(async_gen.__anext__()) yield result except StopAsyncIteration: pass finally: loop.close() except Exception as e: yield f"❌ Error: {str(e)}" def _clear_chat(self) -> Tuple[List, str]: """Clear chat history""" return [], "" def _user_message(self, message: str, history: List, generating: bool) -> Tuple[str, List, bool, gr.update, gr.update]: """Handle user message input""" if message.strip() and not generating: history.append([message, None]) return "", history, True, gr.update(visible=True), gr.update(interactive=False) return message, history, generating, gr.update(visible=False), gr.update(interactive=True) def _bot_message_stream(self, history: List, generating: bool, chat_memory : List[Dict],): """Handle streaming bot response""" if history and history[-1][1] is None and generating: user_msg = history[-1][0] for partial_response in self._chatbot_response(user_msg, history, chat_memory): history[-1][1] = partial_response yield history, True, gr.update(visible=True), gr.update(interactive=False) yield history, False, gr.update(visible=False), gr.update(interactive=True) else: yield history, generating, gr.update(visible=False), gr.update(interactive=True) def _stop_generation(self) -> Tuple[bool, gr.update, gr.update]: """Stop the generation process""" return False, gr.update(visible=False), gr.update(interactive=True)