MuhammadQASIM111 commited on
Commit
29a299d
·
verified ·
1 Parent(s): 5c31fa1

Create Gradio_UI.py

Browse files
Files changed (1) hide show
  1. Gradio_UI.py +281 -0
Gradio_UI.py ADDED
@@ -0,0 +1,281 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import mimetypes
2
+ import os
3
+ import re
4
+ import shutil
5
+ from typing import Optional
6
+
7
+ from smolagents.agent_types import AgentAudio, AgentImage, AgentText, handle_agent_output_types
8
+ from smolagents.agents import ActionStep, MultiStepAgent
9
+ from smolagents.memory import MemoryStep
10
+ from smolagents.utils import _is_package_available
11
+
12
+
13
+ def pull_messages_from_step(
14
+ step_log: MemoryStep,
15
+ ):
16
+ """Extract ChatMessage objects from agent steps with proper nesting"""
17
+ import gradio as gr
18
+
19
+ if isinstance(step_log, ActionStep):
20
+ # Output the step number
21
+ step_number = f"Step {step_log.step_number}" if step_log.step_number is not None else ""
22
+ yield gr.ChatMessage(role="assistant", content=f"**{step_number}**")
23
+
24
+ # First yield the thought/reasoning from the LLM
25
+ if hasattr(step_log, "model_output") and step_log.model_output is not None:
26
+ # Clean up the LLM output
27
+ model_output = step_log.model_output.strip()
28
+ # Remove any trailing <end_code> and extra backticks, handling multiple possible formats
29
+ model_output = re.sub(r"```\s*<end_code>", "```", model_output) # handles ```<end_code>
30
+ model_output = re.sub(r"<end_code>\s*```", "```", model_output) # handles <end_code>```
31
+ model_output = re.sub(r"```\s*\n\s*<end_code>", "```", model_output) # handles ```\n<end_code>
32
+ model_output = model_output.strip()
33
+ yield gr.ChatMessage(role="assistant", content=model_output)
34
+
35
+ # For tool calls, create a parent message
36
+ if hasattr(step_log, "tool_calls") and step_log.tool_calls is not None:
37
+ first_tool_call = step_log.tool_calls[0]
38
+ used_code = first_tool_call.name == "python_interpreter"
39
+ parent_id = f"call_{len(step_log.tool_calls)}"
40
+
41
+ # Tool call becomes the parent message with timing info
42
+ # First we will handle arguments based on type
43
+ args = first_tool_call.arguments
44
+ if isinstance(args, dict):
45
+ content = str(args.get("answer", str(args)))
46
+ else:
47
+ content = str(args).strip()
48
+
49
+ if used_code:
50
+ # Clean up the content by removing any end code tags
51
+ content = re.sub(r"```.*?\n", "", content) # Remove existing code blocks
52
+ content = re.sub(r"\s*<end_code>\s*", "", content) # Remove end_code tags
53
+ content = content.strip()
54
+ if not content.startswith("```python"):
55
+ content = f"```python\n{content}\n```"
56
+
57
+ parent_message_tool = gr.ChatMessage(
58
+ role="assistant",
59
+ content=content,
60
+ metadata={
61
+ "title": f"🛠️ Used tool {first_tool_call.name}",
62
+ "id": parent_id,
63
+ "status": "pending",
64
+ },
65
+ )
66
+ yield parent_message_tool
67
+
68
+ # Nesting execution logs under the tool call if they exist
69
+ if hasattr(step_log, "observations") and (
70
+ step_log.observations is not None and step_log.observations.strip()
71
+ ): # Only yield execution logs if there's actual content
72
+ log_content = step_log.observations.strip()
73
+ if log_content:
74
+ log_content = re.sub(r"^Execution logs:\s*", "", log_content)
75
+ yield gr.ChatMessage(
76
+ role="assistant",
77
+ content=f"{log_content}",
78
+ metadata={"title": "📝 Execution Logs", "parent_id": parent_id, "status": "done"},
79
+ )
80
+
81
+ # Nesting any errors under the tool call
82
+ if hasattr(step_log, "error") and step_log.error is not None:
83
+ yield gr.ChatMessage(
84
+ role="assistant",
85
+ content=str(step_log.error),
86
+ metadata={"title": "💥 Error", "parent_id": parent_id, "status": "done"},
87
+ )
88
+
89
+ # Update parent message metadata to done status without yielding a new message
90
+ parent_message_tool.metadata["status"] = "done"
91
+
92
+ # Handle standalone errors but not from tool calls
93
+ elif hasattr(step_log, "error") and step_log.error is not None:
94
+ yield gr.ChatMessage(role="assistant", content=str(step_log.error), metadata={"title": "💥 Error"})
95
+
96
+ # Calculate duration and token information
97
+ step_footnote = f"{step_number}"
98
+ if hasattr(step_log, "input_token_count") and hasattr(step_log, "output_token_count"):
99
+ token_str = (
100
+ f" | Input-tokens:{step_log.input_token_count:,} | Output-tokens:{step_log.output_token_count:,}"
101
+ )
102
+ step_footnote += token_str
103
+ if hasattr(step_log, "duration"):
104
+ step_duration = f" | Duration: {round(float(step_log.duration), 2)}" if step_log.duration else None
105
+ step_footnote += step_duration
106
+ step_footnote = f"""<span style="color: #bbbbc2; font-size: 12px;">{step_footnote}</span> """
107
+ yield gr.ChatMessage(role="assistant", content=f"{step_footnote}")
108
+ yield gr.ChatMessage(role="assistant", content="-----")
109
+
110
+
111
+ def stream_to_gradio(
112
+ agent,
113
+ task: str,
114
+ reset_agent_memory: bool = False,
115
+ additional_args: Optional[dict] = None,
116
+ ):
117
+ """Runs an agent with the given task and streams the messages from the agent as gradio ChatMessages."""
118
+ if not _is_package_available("gradio"):
119
+ raise ModuleNotFoundError(
120
+ "Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`"
121
+ )
122
+ import gradio as gr
123
+
124
+ total_input_tokens = 0
125
+ total_output_tokens = 0
126
+
127
+ for step_log in agent.run(task, stream=True, reset=reset_agent_memory, additional_args=additional_args):
128
+ # Track tokens if model provides them
129
+ if hasattr(agent.model, "last_input_token_count"):
130
+ total_input_tokens += agent.model.last_input_token_count
131
+ total_output_tokens += agent.model.last_output_token_count
132
+ if isinstance(step_log, ActionStep):
133
+ step_log.input_token_count = agent.model.last_input_token_count
134
+ step_log.output_token_count = agent.model.last_output_token_count
135
+
136
+ for message in pull_messages_from_step(
137
+ step_log,
138
+ ):
139
+ yield message
140
+
141
+ final_answer = step_log # Last log is the run's final_answer
142
+ final_answer = handle_agent_output_types(final_answer)
143
+
144
+ if isinstance(final_answer, AgentText):
145
+ yield gr.ChatMessage(
146
+ role="assistant",
147
+ content=f"**Final answer:**\n{final_answer.to_string()}\n",
148
+ )
149
+ elif isinstance(final_answer, AgentImage):
150
+ yield gr.ChatMessage(
151
+ role="assistant",
152
+ content={"path": final_answer.to_string(), "mime_type": "image/png"},
153
+ )
154
+ elif isinstance(final_answer, AgentAudio):
155
+ yield gr.ChatMessage(
156
+ role="assistant",
157
+ content={"path": final_answer.to_string(), "mime_type": "audio/wav"},
158
+ )
159
+ else:
160
+ yield gr.ChatMessage(role="assistant", content=f"**Final answer:** {str(final_answer)}")
161
+
162
+
163
+ class GradioUI:
164
+ """A one-line interface to launch your agent in Gradio"""
165
+
166
+ def __init__(self, agent: MultiStepAgent, file_upload_folder: str | None = None):
167
+ if not _is_package_available("gradio"):
168
+ raise ModuleNotFoundError(
169
+ "Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`"
170
+ )
171
+ self.agent = agent
172
+ self.file_upload_folder = file_upload_folder
173
+ if self.file_upload_folder is not None:
174
+ if not os.path.exists(file_upload_folder):
175
+ os.mkdir(file_upload_folder)
176
+
177
+ def interact_with_agent(self, prompt, messages):
178
+ import gradio as gr
179
+
180
+ messages.append(gr.ChatMessage(role="user", content=prompt))
181
+ yield messages
182
+ for msg in stream_to_gradio(self.agent, task=prompt, reset_agent_memory=False):
183
+ messages.append(msg)
184
+ yield messages
185
+ yield messages
186
+
187
+ def upload_file(
188
+ self,
189
+ file,
190
+ file_uploads_log,
191
+ allowed_file_types=[
192
+ "application/pdf",
193
+ "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
194
+ "text/plain",
195
+ ],
196
+ ):
197
+ """
198
+ Handle file uploads, default allowed types are .pdf, .docx, and .txt
199
+ """
200
+ import gradio as gr
201
+
202
+ if file is None:
203
+ return gr.Textbox("No file uploaded", visible=True), file_uploads_log
204
+
205
+ try:
206
+ mime_type, _ = mimetypes.guess_type(file.name)
207
+ except Exception as e:
208
+ return gr.Textbox(f"Error: {e}", visible=True), file_uploads_log
209
+
210
+ if mime_type not in allowed_file_types:
211
+ return gr.Textbox("File type disallowed", visible=True), file_uploads_log
212
+
213
+ # Sanitize file name
214
+ original_name = os.path.basename(file.name)
215
+ sanitized_name = re.sub(
216
+ r"[^\w\-.]", "_", original_name
217
+ ) # Replace any non-alphanumeric, non-dash, or non-dot characters with underscores
218
+
219
+ type_to_ext = {}
220
+ for ext, t in mimetypes.types_map.items():
221
+ if t not in type_to_ext:
222
+ type_to_ext[t] = ext
223
+
224
+ # Ensure the extension correlates to the mime type
225
+ sanitized_name = sanitized_name.split(".")[:-1]
226
+ sanitized_name.append("" + type_to_ext[mime_type])
227
+ sanitized_name = "".join(sanitized_name)
228
+
229
+ # Save the uploaded file to the specified folder
230
+ file_path = os.path.join(self.file_upload_folder, os.path.basename(sanitized_name))
231
+ shutil.copy(file.name, file_path)
232
+
233
+ return gr.Textbox(f"File uploaded: {file_path}", visible=True), file_uploads_log + [file_path]
234
+
235
+ def log_user_message(self, text_input, file_uploads_log):
236
+ return (
237
+ text_input
238
+ + (
239
+ f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}"
240
+ if len(file_uploads_log) > 0
241
+ else ""
242
+ ),
243
+ "",
244
+ )
245
+
246
+ def launch(self, **kwargs):
247
+ import gradio as gr
248
+
249
+ with gr.Blocks(fill_height=True) as demo:
250
+ stored_messages = gr.State([])
251
+ file_uploads_log = gr.State([])
252
+ chatbot = gr.Chatbot(
253
+ label="Agent",
254
+ type="messages",
255
+ avatar_images=(
256
+ None,
257
+ "https://huggingface.co/datasets/agents-course/course-images/resolve/main/en/communication/Alfred.png",
258
+ ),
259
+ resizeable=True,
260
+ scale=1,
261
+ )
262
+ # If an upload folder is provided, enable the upload feature
263
+ if self.file_upload_folder is not None:
264
+ upload_file = gr.File(label="Upload a file")
265
+ upload_status = gr.Textbox(label="Upload Status", interactive=False, visible=False)
266
+ upload_file.change(
267
+ self.upload_file,
268
+ [upload_file, file_uploads_log],
269
+ [upload_status, file_uploads_log],
270
+ )
271
+ text_input = gr.Textbox(lines=1, label="Chat Message")
272
+ text_input.submit(
273
+ self.log_user_message,
274
+ [text_input, file_uploads_log],
275
+ [stored_messages, text_input],
276
+ ).then(self.interact_with_agent, [stored_messages, chatbot], [chatbot])
277
+
278
+ demo.launch(debug=True, share=True, **kwargs)
279
+
280
+
281
+ __all__ = ["stream_to_gradio", "GradioUI"]