File size: 5,985 Bytes
deafbd7
 
ed46220
deafbd7
 
 
 
 
 
 
 
 
 
 
ed46220
deafbd7
 
 
 
 
 
 
 
 
ed46220
 
 
deafbd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed46220
 
deafbd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed46220
deafbd7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ed46220
 
deafbd7
ed46220
deafbd7
 
ed46220
deafbd7
 
ed46220
deafbd7
 
ed46220
deafbd7
ed46220
 
 
deafbd7
ed46220
deafbd7
 
 
 
 
ed46220
 
deafbd7
 
 
 
 
ed46220
deafbd7
 
 
 
ed46220
deafbd7
 
 
 
 
 
ed46220
deafbd7
ed46220
 
 
 
deafbd7
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python
# coding=utf-8

import mimetypes
import os
import re
import shutil
from typing import Optional

from smolagents.agent_types import AgentAudio, AgentImage, AgentText, handle_agent_output_types
from smolagents.agents import ActionStep, MultiStepAgent
from smolagents.memory import MemoryStep
from smolagents.utils import _is_package_available

def pull_messages_from_step(step_log: MemoryStep):
    """Extract ChatMessage objects from agent steps with proper nesting"""
    import gradio as gr

    if isinstance(step_log, ActionStep):
        step_number = f"Step {step_log.step_number}" if step_log.step_number is not None else ""
        yield gr.ChatMessage(role="assistant", content=f"**{step_number}**")

        if hasattr(step_log, "model_output") and step_log.model_output is not None:
            model_output = step_log.model_output.strip()
            model_output = re.sub(r"```\s*<end_code>", "```", model_output)
            model_output = re.sub(r"<end_code>\s*```", "```", model_output)
            model_output = re.sub(r"```\s*\n\s*<end_code>", "```", model_output)
            model_output = model_output.strip()
            yield gr.ChatMessage(role="assistant", content=model_output)

        if hasattr(step_log, "tool_calls") and step_log.tool_calls is not None:
            first_tool_call = step_log.tool_calls[0]
            used_code = first_tool_call.name == "python_interpreter"
            parent_id = f"call_{len(step_log.tool_calls)}"

            args = first_tool_call.arguments
            if isinstance(args, dict):
                content = str(args.get("answer", str(args)))
            else:
                content = str(args).strip()

            if used_code:
                content = re.sub(r"```.*?\n", "", content)
                content = re.sub(r"\s*<end_code>\s*", "", content)
                content = content.strip()
                if not content.startswith("```python"):
                    content = f"```python\n{content}\n```"

            parent_message_tool = gr.ChatMessage(
                role="assistant",
                content=content,
                metadata={
                    "title": f"🛠️ Used tool {first_tool_call.name}",
                    "id": parent_id,
                    "status": "pending",
                },
            )
            yield parent_message_tool

            if hasattr(step_log, "observations") and step_log.observations is not None:
                log_content = step_log.observations.strip()
                if log_content:
                    log_content = re.sub(r"^Execution logs:\s*", "", log_content)
                    yield gr.ChatMessage(
                        role="assistant",
                        content=f"{log_content}",
                        metadata={"title": "📝 Execution Logs", "parent_id": parent_id, "status": "done"},
                    )

            if hasattr(step_log, "error") and step_log.error is not None:
                yield gr.ChatMessage(
                    role="assistant",
                    content=str(step_log.error),
                    metadata={"title": "💥 Error", "parent_id": parent_id, "status": "done"},
                )
            parent_message_tool.metadata["status"] = "done"

        elif hasattr(step_log, "error") and step_log.error is not None:
            yield gr.ChatMessage(role="assistant", content=str(step_log.error), metadata={"title": "💥 Error"})

        step_footnote = f"<span style='color: #bbbbc2; font-size: 12px;'>{step_number}</span> "
        yield gr.ChatMessage(role="assistant", content=f"{step_footnote}\n-----")

def stream_to_gradio(agent, task: str, reset_agent_memory: bool = False, additional_args: Optional[dict] = None):
    import gradio as gr
    for step_log in agent.run(task, stream=True, reset=reset_agent_memory, additional_args=additional_args):
        for message in pull_messages_from_step(step_log):
            yield message

    final_answer = handle_agent_output_types(step_log)

    if isinstance(final_answer, AgentText):
        yield gr.ChatMessage(role="assistant", content=f"**Final answer:**\n{final_answer.to_string()}")
    elif isinstance(final_answer, AgentImage):
        yield gr.ChatMessage(role="assistant", content=gr.Image(final_answer.to_raw()))
    elif isinstance(final_answer, AgentAudio):
        yield gr.ChatMessage(role="assistant", content={"path": final_answer.to_string(), "mime_type": "audio/wav"})
    else:
        yield gr.ChatMessage(role="assistant", content=f"**** {str(final_answer)}")

class GradioUI:
    def __init__(self, agent: MultiStepAgent, file_upload_folder: str | None = None):
        self.agent = agent
        self.file_upload_folder = file_upload_folder
        if self.file_upload_folder and not os.path.exists(file_upload_folder):
            os.mkdir(file_upload_folder)

    def interact_with_agent(self, prompt, messages):
        import gradio as gr
        messages.append(gr.ChatMessage(role="user", content=prompt))
        yield messages
        for msg in stream_to_gradio(self.agent, task=prompt):
            messages.append(msg)
            yield messages

    def log_user_message(self, text_input, file_uploads_log):
        return text_input + (f"\nFiles: {file_uploads_log}" if file_uploads_log else ""), ""

    def launch(self, **kwargs):
        import gradio as gr
        with gr.Blocks(fill_height=True) as demo:
            stored_messages = gr.State([])
            file_uploads_log = gr.State([])
            chatbot = gr.Chatbot(label="Agent", type="messages", resizeable=True, scale=1)
            text_input = gr.Textbox(lines=1, label="Chat Message")
            text_input.submit(self.log_user_message, [text_input, file_uploads_log], [stored_messages, text_input]).then(
                self.interact_with_agent, [stored_messages, chatbot], [chatbot]
            )
        demo.launch(debug=True, **kwargs)

__all__ = ["stream_to_gradio", "GradioUI"]