Spaces:
Build error
Build error
| from collections.abc import AsyncGenerator | |
| from json import dumps | |
| from typing import TYPE_CHECKING | |
| from agno.agent import Agent, RunContentEvent, ToolCallCompletedEvent, ToolCallStartedEvent | |
| from agno.models.message import Message | |
| from agno.models.metrics import Metrics | |
| from agno.models.response import ToolExecution | |
| from agno.utils.log import configure_agno_logging, log_error, log_warning | |
| from gradio import Audio, Blocks, ChatInterface, ChatMessage, Error, Video | |
| from gradio.components.chatbot import MetadataDict | |
| from qdrant_client.http.exceptions import ResponseHandlingException | |
| from chattr.agent.agent import AgentConfiguration, setup_agent | |
| from chattr.agent.database import setup_database | |
| from chattr.agent.description import setup_description | |
| from chattr.agent.instructions import setup_instructions | |
| from chattr.agent.knowledge import setup_knowledge | |
| from chattr.agent.model import setup_model | |
| from chattr.agent.tools import close_mcp_tools, setup_mcp_tools | |
| from chattr.agent.vector_database import setup_vector_database | |
| from chattr.app.logger import setup_logger | |
| from chattr.app.settings import Settings | |
| if TYPE_CHECKING: | |
| from logging import Logger | |
| from agno.db.json import JsonDb | |
| from agno.knowledge import Knowledge | |
| from agno.models.openai import OpenAILike | |
| from agno.tools.mcp import MultiMCPTools | |
| from agno.vectordb.qdrant import Qdrant | |
| class App: | |
| """Main application class for the Chattr Multi-agent system app.""" | |
| def __init__(self, settings: Settings) -> None: | |
| """Initialize the Chattr app.""" | |
| self.settings = settings | |
| logger: Logger = setup_logger(self.settings.log) | |
| configure_agno_logging(custom_default_logger=logger) | |
| def gradio_app(self) -> Blocks: | |
| """Create and return the main Gradio Blocks interface for the Chattr app.""" | |
| return ChatInterface( | |
| fn=self.generate_response, | |
| save_history=True, | |
| title="Chattr", | |
| show_progress="full", | |
| ) | |
| def _response_at_run_content_event( | |
| self, | |
| history: list[ChatMessage | Audio | Video], | |
| response: RunContentEvent, | |
| ) -> list[ChatMessage | Audio | Video]: | |
| """Handle the response run content event.""" | |
| if not isinstance(response, RunContentEvent): | |
| _msg = "Expected RunContentEvent" | |
| raise TypeError(_msg) | |
| history.append(ChatMessage(role="assistant", content=str(response.content))) | |
| return history | |
| def _response_at_tool_call_started_event( | |
| self, | |
| history: list[ChatMessage | Audio | Video], | |
| response: ToolCallStartedEvent, | |
| ) -> list[ChatMessage | Audio | Video]: | |
| """Handle the response tool call started event.""" | |
| if not isinstance(response.tool, ToolExecution): | |
| _msg = "ToolExecution expected" | |
| log_error(_msg) | |
| raise TypeError(_msg) | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=dumps(response.tool.tool_args, indent=4), | |
| metadata=MetadataDict( | |
| title=str(response.tool.tool_name), | |
| id=str(response.tool.tool_call_id), | |
| duration=response.tool.created_at, | |
| ), | |
| ), | |
| ) | |
| return history | |
| def _response_at_tool_call_completed_event( | |
| self, | |
| history: list[ChatMessage | Audio | Video], | |
| response: ToolCallCompletedEvent, | |
| ) -> list[ChatMessage | Audio | Video]: | |
| """Handle the response tool call completed event.""" | |
| if not isinstance(response.tool, ToolExecution): | |
| _msg = "ToolExecution expected" | |
| log_error(_msg) | |
| raise TypeError(_msg) | |
| if response.tool.tool_call_error: | |
| if not isinstance(response.tool.metrics, Metrics): | |
| _msg = "Metrics expected" | |
| log_error(_msg) | |
| raise TypeError(_msg) | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=dumps(response.tool.tool_args, indent=4), | |
| metadata=MetadataDict( | |
| title=str(response.tool.tool_name), | |
| id=str(response.tool.tool_call_id), | |
| log="Tool Call Failed", | |
| duration=float(str(response.tool.metrics.duration)), | |
| ), | |
| ), | |
| ) | |
| else: | |
| if not isinstance(response.tool.metrics, Metrics): | |
| _msg = "Metrics expected" | |
| log_error(_msg) | |
| raise TypeError(_msg) | |
| history.append( | |
| ChatMessage( | |
| role="assistant", | |
| content=dumps(response.tool.tool_args, indent=4), | |
| metadata=MetadataDict( | |
| title=str(response.tool.tool_name), | |
| id=str(response.tool.tool_call_id), | |
| log="Tool Call Succeeded", | |
| duration=float(str(response.tool.metrics.duration)), | |
| ), | |
| ), | |
| ) | |
| if response.tool.tool_name == "generate_audio_for_text": | |
| history.append(Audio(response.tool.result, autoplay=True)) | |
| elif response.tool.tool_name == "generate_video_mcp": | |
| history.append(Video(response.tool.result, autoplay=True)) | |
| else: | |
| _msg = f"Unknown tool name: {response.tool.tool_name}" | |
| log_error(_msg) | |
| raise Error(_msg, print_exception=self.settings.debug) | |
| return history | |
| async def generate_response( | |
| self, | |
| message: str, | |
| history: list[ChatMessage | Audio | Video], | |
| ) -> AsyncGenerator[list[ChatMessage | Audio | Video]]: | |
| """ | |
| Generate a response to a user message and update the conversation history. | |
| This asynchronous method streams responses from the state graph and | |
| yields updated history and audio file paths as needed. | |
| Args: | |
| message: The user's input message as a string. | |
| history: The conversation history as a list of ChatMessage, Audio, or Video objects. | |
| Returns: | |
| AsyncGenerator: Yields a list of the updated history containing | |
| ChatMessage, Audio, and Video objects if it existed. | |
| """ | |
| try: | |
| _tools: list[MultiMCPTools] | None = None | |
| tools: MultiMCPTools | None = await setup_mcp_tools(self.settings.mcp) | |
| model: OpenAILike = setup_model(self.settings.model) | |
| db: JsonDb = setup_database() | |
| vectordb: Qdrant = setup_vector_database(self.settings.vector_database) | |
| knowledge: Knowledge = setup_knowledge(vectordb, db) | |
| description: str = setup_description(self.settings.character.name) | |
| instructions: list[str] = setup_instructions(self.settings.character.name, [tools]) | |
| if not tools or len(tools.tools) == 0: | |
| _msg = "No tools found" | |
| log_warning(_msg) | |
| else: | |
| _tools = [tools] | |
| agent: Agent = await setup_agent( | |
| AgentConfiguration( | |
| model=model, | |
| tools=_tools, | |
| description=description, | |
| instructions=instructions, | |
| db=db, | |
| knowledge=knowledge, | |
| timezone=self.settings.timezone, | |
| debug_mode=self.settings.debug, | |
| ), | |
| ) | |
| async for response in agent.arun( | |
| Message(content=message, role="user"), | |
| stream=True, | |
| stream_events=True, | |
| ): | |
| # pprint(response) | |
| if isinstance(response, RunContentEvent): | |
| history = self._response_at_run_content_event(history, response) | |
| elif isinstance(response, ToolCallStartedEvent): | |
| history = self._response_at_tool_call_started_event(history, response) | |
| elif isinstance(response, ToolCallCompletedEvent): | |
| history = self._response_at_tool_call_completed_event(history, response) | |
| print(f"---------- {history}") | |
| yield history | |
| await close_mcp_tools(tools) | |
| except (StopAsyncIteration, StopIteration) as e: | |
| log_error(f"Iteration stopped. {e}") | |
| except ResponseHandlingException as e: | |
| log_error(f"Vector database is not reachable. {e}") | |
| except RuntimeError as e: | |
| log_error(f"Runtime error. {e}") | |
| except ValueError as e: | |
| log_error(f"Value error. {e}") | |
| except TypeError as e: | |
| log_error(f"Type error. {e}") | |
| except Exception as e: | |
| log_error(f"Unexpected error. {e}") | |
| finally: | |
| yield history | |