Chattr / src /chattr /app /app.py
MH0386's picture
Add HuggingFace sync
b380004 verified
from collections.abc import AsyncGenerator
from json import dumps
from typing import TYPE_CHECKING
from agno.agent import Agent, RunContentEvent, ToolCallCompletedEvent, ToolCallStartedEvent
from agno.models.message import Message
from agno.models.metrics import Metrics
from agno.models.response import ToolExecution
from agno.utils.log import configure_agno_logging, log_error, log_warning
from gradio import Audio, Blocks, ChatInterface, ChatMessage, Error, Video
from gradio.components.chatbot import MetadataDict
from qdrant_client.http.exceptions import ResponseHandlingException
from chattr.agent.agent import AgentConfiguration, setup_agent
from chattr.agent.database import setup_database
from chattr.agent.description import setup_description
from chattr.agent.instructions import setup_instructions
from chattr.agent.knowledge import setup_knowledge
from chattr.agent.model import setup_model
from chattr.agent.tools import close_mcp_tools, setup_mcp_tools
from chattr.agent.vector_database import setup_vector_database
from chattr.app.logger import setup_logger
from chattr.app.settings import Settings
if TYPE_CHECKING:
from logging import Logger
from agno.db.json import JsonDb
from agno.knowledge import Knowledge
from agno.models.openai import OpenAILike
from agno.tools.mcp import MultiMCPTools
from agno.vectordb.qdrant import Qdrant
class App:
"""Main application class for the Chattr Multi-agent system app."""
def __init__(self, settings: Settings) -> None:
"""Initialize the Chattr app."""
self.settings = settings
logger: Logger = setup_logger(self.settings.log)
configure_agno_logging(custom_default_logger=logger)
def gradio_app(self) -> Blocks:
"""Create and return the main Gradio Blocks interface for the Chattr app."""
return ChatInterface(
fn=self.generate_response,
save_history=True,
title="Chattr",
show_progress="full",
)
def _response_at_run_content_event(
self,
history: list[ChatMessage | Audio | Video],
response: RunContentEvent,
) -> list[ChatMessage | Audio | Video]:
"""Handle the response run content event."""
if not isinstance(response, RunContentEvent):
_msg = "Expected RunContentEvent"
raise TypeError(_msg)
history.append(ChatMessage(role="assistant", content=str(response.content)))
return history
def _response_at_tool_call_started_event(
self,
history: list[ChatMessage | Audio | Video],
response: ToolCallStartedEvent,
) -> list[ChatMessage | Audio | Video]:
"""Handle the response tool call started event."""
if not isinstance(response.tool, ToolExecution):
_msg = "ToolExecution expected"
log_error(_msg)
raise TypeError(_msg)
history.append(
ChatMessage(
role="assistant",
content=dumps(response.tool.tool_args, indent=4),
metadata=MetadataDict(
title=str(response.tool.tool_name),
id=str(response.tool.tool_call_id),
duration=response.tool.created_at,
),
),
)
return history
def _response_at_tool_call_completed_event(
self,
history: list[ChatMessage | Audio | Video],
response: ToolCallCompletedEvent,
) -> list[ChatMessage | Audio | Video]:
"""Handle the response tool call completed event."""
if not isinstance(response.tool, ToolExecution):
_msg = "ToolExecution expected"
log_error(_msg)
raise TypeError(_msg)
if response.tool.tool_call_error:
if not isinstance(response.tool.metrics, Metrics):
_msg = "Metrics expected"
log_error(_msg)
raise TypeError(_msg)
history.append(
ChatMessage(
role="assistant",
content=dumps(response.tool.tool_args, indent=4),
metadata=MetadataDict(
title=str(response.tool.tool_name),
id=str(response.tool.tool_call_id),
log="Tool Call Failed",
duration=float(str(response.tool.metrics.duration)),
),
),
)
else:
if not isinstance(response.tool.metrics, Metrics):
_msg = "Metrics expected"
log_error(_msg)
raise TypeError(_msg)
history.append(
ChatMessage(
role="assistant",
content=dumps(response.tool.tool_args, indent=4),
metadata=MetadataDict(
title=str(response.tool.tool_name),
id=str(response.tool.tool_call_id),
log="Tool Call Succeeded",
duration=float(str(response.tool.metrics.duration)),
),
),
)
if response.tool.tool_name == "generate_audio_for_text":
history.append(Audio(response.tool.result, autoplay=True))
elif response.tool.tool_name == "generate_video_mcp":
history.append(Video(response.tool.result, autoplay=True))
else:
_msg = f"Unknown tool name: {response.tool.tool_name}"
log_error(_msg)
raise Error(_msg, print_exception=self.settings.debug)
return history
async def generate_response(
self,
message: str,
history: list[ChatMessage | Audio | Video],
) -> AsyncGenerator[list[ChatMessage | Audio | Video]]:
"""
Generate a response to a user message and update the conversation history.
This asynchronous method streams responses from the state graph and
yields updated history and audio file paths as needed.
Args:
message: The user's input message as a string.
history: The conversation history as a list of ChatMessage, Audio, or Video objects.
Returns:
AsyncGenerator: Yields a list of the updated history containing
ChatMessage, Audio, and Video objects if it existed.
"""
try:
_tools: list[MultiMCPTools] | None = None
tools: MultiMCPTools | None = await setup_mcp_tools(self.settings.mcp)
model: OpenAILike = setup_model(self.settings.model)
db: JsonDb = setup_database()
vectordb: Qdrant = setup_vector_database(self.settings.vector_database)
knowledge: Knowledge = setup_knowledge(vectordb, db)
description: str = setup_description(self.settings.character.name)
instructions: list[str] = setup_instructions(self.settings.character.name, [tools])
if not tools or len(tools.tools) == 0:
_msg = "No tools found"
log_warning(_msg)
else:
_tools = [tools]
agent: Agent = await setup_agent(
AgentConfiguration(
model=model,
tools=_tools,
description=description,
instructions=instructions,
db=db,
knowledge=knowledge,
timezone=self.settings.timezone,
debug_mode=self.settings.debug,
),
)
async for response in agent.arun(
Message(content=message, role="user"),
stream=True,
stream_events=True,
):
# pprint(response)
if isinstance(response, RunContentEvent):
history = self._response_at_run_content_event(history, response)
elif isinstance(response, ToolCallStartedEvent):
history = self._response_at_tool_call_started_event(history, response)
elif isinstance(response, ToolCallCompletedEvent):
history = self._response_at_tool_call_completed_event(history, response)
print(f"---------- {history}")
yield history
await close_mcp_tools(tools)
except (StopAsyncIteration, StopIteration) as e:
log_error(f"Iteration stopped. {e}")
except ResponseHandlingException as e:
log_error(f"Vector database is not reachable. {e}")
except RuntimeError as e:
log_error(f"Runtime error. {e}")
except ValueError as e:
log_error(f"Value error. {e}")
except TypeError as e:
log_error(f"Type error. {e}")
except Exception as e:
log_error(f"Unexpected error. {e}")
finally:
yield history