Spaces:
Configuration error
Configuration error
| from abc import ABC, abstractmethod | |
| from typing import AsyncGenerator, Dict, Any, Optional, List, Tuple | |
| import asyncio | |
| import gradio as gr | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from src.internal.agents.base_agents import AgentRequest | |
| from src.internal.agents.expert_router_agent import ExpertRouterAgent | |
| from src.internal.rag.chat_template import get_chat_template | |
| class RAGChatbot: | |
| """Main RAG Chatbot class""" | |
| def __init__(self, router_agent : ExpertRouterAgent, title: str = "RAG Chatbot"): | |
| self.router_agent = router_agent | |
| self.title = title | |
| self.css = self._get_default_css() | |
| def _get_default_css(self) -> str: | |
| """Get default CSS styling""" | |
| return """ | |
| .gradio-container { | |
| max-width: 900px !important; | |
| margin: auto !important; | |
| } | |
| .chat-message { | |
| padding: 10px; | |
| margin: 5px; | |
| border-radius: 10px; | |
| } | |
| #chatbot { | |
| height: 500px; | |
| } | |
| """ | |
| async def _stream_response(self, message: str, chat_memory : List[Dict], ) -> AsyncGenerator[str, None]: | |
| """Internal method untuk streaming response""" | |
| try: | |
| partial_response = "" | |
| router_agent_request = AgentRequest( | |
| chat_memory = chat_memory, | |
| prompt_template = get_chat_template("expert_router"), | |
| question = message | |
| ) | |
| print("Message : ", message) | |
| async for stream_data in self.router_agent.get_result(router_agent_request): | |
| if stream_data["type"] == "chunk": | |
| chunk = stream_data["data"]["chunk"] | |
| partial_response += chunk | |
| yield partial_response | |
| elif stream_data["type"] == "metadata": | |
| setup_time = stream_data['data']['setup_time'] | |
| print(f"\nSetup completed in {setup_time:.2f}s") | |
| elif stream_data["type"] == "complete": | |
| total_time = stream_data['data']['total_time'] | |
| print(f"\nTotal time: {total_time:.2f}s") | |
| # chat_memory.append({"role": "assistant", "content": partial_response }) | |
| print("Chat Memory :", chat_memory) | |
| except Exception as e: | |
| yield f"❌ Error: {str(e)}" | |
| def _chatbot_response(self, message: str, history: List[Tuple[str, str]], chat_memory : List[Dict]): | |
| """Generate chatbot response with proper async handling""" | |
| try: | |
| # Create new event loop for this thread | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| async_gen = self._stream_response(message = message, chat_memory = chat_memory) | |
| try: | |
| while True: | |
| result = loop.run_until_complete(async_gen.__anext__()) | |
| yield result | |
| except StopAsyncIteration: | |
| pass | |
| finally: | |
| loop.close() | |
| except Exception as e: | |
| yield f"❌ Error: {str(e)}" | |
| def _clear_chat(self) -> Tuple[List, str]: | |
| """Clear chat history""" | |
| return [], "" | |
| def _user_message(self, message: str, history: List, generating: bool) -> Tuple[str, List, bool, gr.update, gr.update]: | |
| """Handle user message input""" | |
| if message.strip() and not generating: | |
| history.append([message, None]) | |
| return "", history, True, gr.update(visible=True), gr.update(interactive=False) | |
| return message, history, generating, gr.update(visible=False), gr.update(interactive=True) | |
| def _bot_message_stream(self, history: List, generating: bool, chat_memory : List[Dict],): | |
| """Handle streaming bot response""" | |
| if history and history[-1][1] is None and generating: | |
| user_msg = history[-1][0] | |
| for partial_response in self._chatbot_response(user_msg, history, chat_memory): | |
| history[-1][1] = partial_response | |
| yield history, True, gr.update(visible=True), gr.update(interactive=False) | |
| yield history, False, gr.update(visible=False), gr.update(interactive=True) | |
| else: | |
| yield history, generating, gr.update(visible=False), gr.update(interactive=True) | |
| def _stop_generation(self) -> Tuple[bool, gr.update, gr.update]: | |
| """Stop the generation process""" | |
| return False, gr.update(visible=False), gr.update(interactive=True) | |