File size: 9,016 Bytes
b380004
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from collections.abc import AsyncGenerator
from json import dumps
from typing import TYPE_CHECKING

from agno.agent import Agent, RunContentEvent, ToolCallCompletedEvent, ToolCallStartedEvent
from agno.models.message import Message
from agno.models.metrics import Metrics
from agno.models.response import ToolExecution
from agno.utils.log import configure_agno_logging, log_error, log_warning
from gradio import Audio, Blocks, ChatInterface, ChatMessage, Error, Video
from gradio.components.chatbot import MetadataDict
from qdrant_client.http.exceptions import ResponseHandlingException

from chattr.agent.agent import AgentConfiguration, setup_agent
from chattr.agent.database import setup_database
from chattr.agent.description import setup_description
from chattr.agent.instructions import setup_instructions
from chattr.agent.knowledge import setup_knowledge
from chattr.agent.model import setup_model
from chattr.agent.tools import close_mcp_tools, setup_mcp_tools
from chattr.agent.vector_database import setup_vector_database
from chattr.app.logger import setup_logger
from chattr.app.settings import Settings

if TYPE_CHECKING:
    from logging import Logger

    from agno.db.json import JsonDb
    from agno.knowledge import Knowledge
    from agno.models.openai import OpenAILike
    from agno.tools.mcp import MultiMCPTools
    from agno.vectordb.qdrant import Qdrant


class App:
    """Main application class for the Chattr Multi-agent system app."""

    def __init__(self, settings: Settings) -> None:
        """Initialize the Chattr app."""
        self.settings = settings
        logger: Logger = setup_logger(self.settings.log)
        configure_agno_logging(custom_default_logger=logger)

    def gradio_app(self) -> Blocks:
        """Create and return the main Gradio Blocks interface for the Chattr app."""
        return ChatInterface(
            fn=self.generate_response,
            save_history=True,
            title="Chattr",
            show_progress="full",
        )

    def _response_at_run_content_event(
        self,
        history: list[ChatMessage | Audio | Video],
        response: RunContentEvent,
    ) -> list[ChatMessage | Audio | Video]:
        """Handle the response run content event."""
        if not isinstance(response, RunContentEvent):
            _msg = "Expected RunContentEvent"
            raise TypeError(_msg)
        history.append(ChatMessage(role="assistant", content=str(response.content)))
        return history

    def _response_at_tool_call_started_event(
        self,
        history: list[ChatMessage | Audio | Video],
        response: ToolCallStartedEvent,
    ) -> list[ChatMessage | Audio | Video]:
        """Handle the response tool call started event."""
        if not isinstance(response.tool, ToolExecution):
            _msg = "ToolExecution expected"
            log_error(_msg)
            raise TypeError(_msg)
        history.append(
            ChatMessage(
                role="assistant",
                content=dumps(response.tool.tool_args, indent=4),
                metadata=MetadataDict(
                    title=str(response.tool.tool_name),
                    id=str(response.tool.tool_call_id),
                    duration=response.tool.created_at,
                ),
            ),
        )
        return history

    def _response_at_tool_call_completed_event(
        self,
        history: list[ChatMessage | Audio | Video],
        response: ToolCallCompletedEvent,
    ) -> list[ChatMessage | Audio | Video]:
        """Handle the response tool call completed event."""
        if not isinstance(response.tool, ToolExecution):
            _msg = "ToolExecution expected"
            log_error(_msg)
            raise TypeError(_msg)
        if response.tool.tool_call_error:
            if not isinstance(response.tool.metrics, Metrics):
                _msg = "Metrics expected"
                log_error(_msg)
                raise TypeError(_msg)
            history.append(
                ChatMessage(
                    role="assistant",
                    content=dumps(response.tool.tool_args, indent=4),
                    metadata=MetadataDict(
                        title=str(response.tool.tool_name),
                        id=str(response.tool.tool_call_id),
                        log="Tool Call Failed",
                        duration=float(str(response.tool.metrics.duration)),
                    ),
                ),
            )
        else:
            if not isinstance(response.tool.metrics, Metrics):
                _msg = "Metrics expected"
                log_error(_msg)
                raise TypeError(_msg)
            history.append(
                ChatMessage(
                    role="assistant",
                    content=dumps(response.tool.tool_args, indent=4),
                    metadata=MetadataDict(
                        title=str(response.tool.tool_name),
                        id=str(response.tool.tool_call_id),
                        log="Tool Call Succeeded",
                        duration=float(str(response.tool.metrics.duration)),
                    ),
                ),
            )
            if response.tool.tool_name == "generate_audio_for_text":
                history.append(Audio(response.tool.result, autoplay=True))
            elif response.tool.tool_name == "generate_video_mcp":
                history.append(Video(response.tool.result, autoplay=True))
            else:
                _msg = f"Unknown tool name: {response.tool.tool_name}"
                log_error(_msg)
                raise Error(_msg, print_exception=self.settings.debug)
        return history

    async def generate_response(
        self,
        message: str,
        history: list[ChatMessage | Audio | Video],
    ) -> AsyncGenerator[list[ChatMessage | Audio | Video]]:
        """
        Generate a response to a user message and update the conversation history.

        This asynchronous method streams responses from the state graph and
        yields updated history and audio file paths as needed.

        Args:
            message: The user's input message as a string.
            history: The conversation history as a list of ChatMessage, Audio, or Video objects.

        Returns:
            AsyncGenerator: Yields a list of the updated history containing
                            ChatMessage, Audio, and Video objects if it existed.
        """
        try:
            _tools: list[MultiMCPTools] | None = None
            tools: MultiMCPTools | None = await setup_mcp_tools(self.settings.mcp)
            model: OpenAILike = setup_model(self.settings.model)
            db: JsonDb = setup_database()
            vectordb: Qdrant = setup_vector_database(self.settings.vector_database)
            knowledge: Knowledge = setup_knowledge(vectordb, db)
            description: str = setup_description(self.settings.character.name)
            instructions: list[str] = setup_instructions(self.settings.character.name, [tools])
            if not tools or len(tools.tools) == 0:
                _msg = "No tools found"
                log_warning(_msg)
            else:
                _tools = [tools]
            agent: Agent = await setup_agent(
                AgentConfiguration(
                    model=model,
                    tools=_tools,
                    description=description,
                    instructions=instructions,
                    db=db,
                    knowledge=knowledge,
                    timezone=self.settings.timezone,
                    debug_mode=self.settings.debug,
                ),
            )
            async for response in agent.arun(
                Message(content=message, role="user"),
                stream=True,
                stream_events=True,
            ):
                # pprint(response)
                if isinstance(response, RunContentEvent):
                    history = self._response_at_run_content_event(history, response)
                elif isinstance(response, ToolCallStartedEvent):
                    history = self._response_at_tool_call_started_event(history, response)
                elif isinstance(response, ToolCallCompletedEvent):
                    history = self._response_at_tool_call_completed_event(history, response)
                print(f"---------- {history}")
                yield history
            await close_mcp_tools(tools)
        except (StopAsyncIteration, StopIteration) as e:
            log_error(f"Iteration stopped. {e}")
        except ResponseHandlingException as e:
            log_error(f"Vector database is not reachable. {e}")
        except RuntimeError as e:
            log_error(f"Runtime error. {e}")
        except ValueError as e:
            log_error(f"Value error. {e}")
        except TypeError as e:
            log_error(f"Type error. {e}")
        except Exception as e:
            log_error(f"Unexpected error. {e}")
        finally:
            yield history