Spaces:
Build error
Build error
File size: 9,016 Bytes
b380004 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | from collections.abc import AsyncGenerator
from json import dumps
from typing import TYPE_CHECKING
from agno.agent import Agent, RunContentEvent, ToolCallCompletedEvent, ToolCallStartedEvent
from agno.models.message import Message
from agno.models.metrics import Metrics
from agno.models.response import ToolExecution
from agno.utils.log import configure_agno_logging, log_error, log_warning
from gradio import Audio, Blocks, ChatInterface, ChatMessage, Error, Video
from gradio.components.chatbot import MetadataDict
from qdrant_client.http.exceptions import ResponseHandlingException
from chattr.agent.agent import AgentConfiguration, setup_agent
from chattr.agent.database import setup_database
from chattr.agent.description import setup_description
from chattr.agent.instructions import setup_instructions
from chattr.agent.knowledge import setup_knowledge
from chattr.agent.model import setup_model
from chattr.agent.tools import close_mcp_tools, setup_mcp_tools
from chattr.agent.vector_database import setup_vector_database
from chattr.app.logger import setup_logger
from chattr.app.settings import Settings
if TYPE_CHECKING:
from logging import Logger
from agno.db.json import JsonDb
from agno.knowledge import Knowledge
from agno.models.openai import OpenAILike
from agno.tools.mcp import MultiMCPTools
from agno.vectordb.qdrant import Qdrant
class App:
"""Main application class for the Chattr Multi-agent system app."""
def __init__(self, settings: Settings) -> None:
"""Initialize the Chattr app."""
self.settings = settings
logger: Logger = setup_logger(self.settings.log)
configure_agno_logging(custom_default_logger=logger)
def gradio_app(self) -> Blocks:
"""Create and return the main Gradio Blocks interface for the Chattr app."""
return ChatInterface(
fn=self.generate_response,
save_history=True,
title="Chattr",
show_progress="full",
)
def _response_at_run_content_event(
self,
history: list[ChatMessage | Audio | Video],
response: RunContentEvent,
) -> list[ChatMessage | Audio | Video]:
"""Handle the response run content event."""
if not isinstance(response, RunContentEvent):
_msg = "Expected RunContentEvent"
raise TypeError(_msg)
history.append(ChatMessage(role="assistant", content=str(response.content)))
return history
def _response_at_tool_call_started_event(
self,
history: list[ChatMessage | Audio | Video],
response: ToolCallStartedEvent,
) -> list[ChatMessage | Audio | Video]:
"""Handle the response tool call started event."""
if not isinstance(response.tool, ToolExecution):
_msg = "ToolExecution expected"
log_error(_msg)
raise TypeError(_msg)
history.append(
ChatMessage(
role="assistant",
content=dumps(response.tool.tool_args, indent=4),
metadata=MetadataDict(
title=str(response.tool.tool_name),
id=str(response.tool.tool_call_id),
duration=response.tool.created_at,
),
),
)
return history
def _response_at_tool_call_completed_event(
self,
history: list[ChatMessage | Audio | Video],
response: ToolCallCompletedEvent,
) -> list[ChatMessage | Audio | Video]:
"""Handle the response tool call completed event."""
if not isinstance(response.tool, ToolExecution):
_msg = "ToolExecution expected"
log_error(_msg)
raise TypeError(_msg)
if response.tool.tool_call_error:
if not isinstance(response.tool.metrics, Metrics):
_msg = "Metrics expected"
log_error(_msg)
raise TypeError(_msg)
history.append(
ChatMessage(
role="assistant",
content=dumps(response.tool.tool_args, indent=4),
metadata=MetadataDict(
title=str(response.tool.tool_name),
id=str(response.tool.tool_call_id),
log="Tool Call Failed",
duration=float(str(response.tool.metrics.duration)),
),
),
)
else:
if not isinstance(response.tool.metrics, Metrics):
_msg = "Metrics expected"
log_error(_msg)
raise TypeError(_msg)
history.append(
ChatMessage(
role="assistant",
content=dumps(response.tool.tool_args, indent=4),
metadata=MetadataDict(
title=str(response.tool.tool_name),
id=str(response.tool.tool_call_id),
log="Tool Call Succeeded",
duration=float(str(response.tool.metrics.duration)),
),
),
)
if response.tool.tool_name == "generate_audio_for_text":
history.append(Audio(response.tool.result, autoplay=True))
elif response.tool.tool_name == "generate_video_mcp":
history.append(Video(response.tool.result, autoplay=True))
else:
_msg = f"Unknown tool name: {response.tool.tool_name}"
log_error(_msg)
raise Error(_msg, print_exception=self.settings.debug)
return history
async def generate_response(
self,
message: str,
history: list[ChatMessage | Audio | Video],
) -> AsyncGenerator[list[ChatMessage | Audio | Video]]:
"""
Generate a response to a user message and update the conversation history.
This asynchronous method streams responses from the state graph and
yields updated history and audio file paths as needed.
Args:
message: The user's input message as a string.
history: The conversation history as a list of ChatMessage, Audio, or Video objects.
Returns:
AsyncGenerator: Yields a list of the updated history containing
ChatMessage, Audio, and Video objects if it existed.
"""
try:
_tools: list[MultiMCPTools] | None = None
tools: MultiMCPTools | None = await setup_mcp_tools(self.settings.mcp)
model: OpenAILike = setup_model(self.settings.model)
db: JsonDb = setup_database()
vectordb: Qdrant = setup_vector_database(self.settings.vector_database)
knowledge: Knowledge = setup_knowledge(vectordb, db)
description: str = setup_description(self.settings.character.name)
instructions: list[str] = setup_instructions(self.settings.character.name, [tools])
if not tools or len(tools.tools) == 0:
_msg = "No tools found"
log_warning(_msg)
else:
_tools = [tools]
agent: Agent = await setup_agent(
AgentConfiguration(
model=model,
tools=_tools,
description=description,
instructions=instructions,
db=db,
knowledge=knowledge,
timezone=self.settings.timezone,
debug_mode=self.settings.debug,
),
)
async for response in agent.arun(
Message(content=message, role="user"),
stream=True,
stream_events=True,
):
# pprint(response)
if isinstance(response, RunContentEvent):
history = self._response_at_run_content_event(history, response)
elif isinstance(response, ToolCallStartedEvent):
history = self._response_at_tool_call_started_event(history, response)
elif isinstance(response, ToolCallCompletedEvent):
history = self._response_at_tool_call_completed_event(history, response)
print(f"---------- {history}")
yield history
await close_mcp_tools(tools)
except (StopAsyncIteration, StopIteration) as e:
log_error(f"Iteration stopped. {e}")
except ResponseHandlingException as e:
log_error(f"Vector database is not reachable. {e}")
except RuntimeError as e:
log_error(f"Runtime error. {e}")
except ValueError as e:
log_error(f"Value error. {e}")
except TypeError as e:
log_error(f"Type error. {e}")
except Exception as e:
log_error(f"Unexpected error. {e}")
finally:
yield history
|