MuhammadQASIM111 commited on
Commit
d04f615
·
verified ·
1 Parent(s): 35229a1

Update Gradio_UI.py

Browse files
Files changed (1) hide show
  1. Gradio_UI.py +44 -137
Gradio_UI.py CHANGED
@@ -9,47 +9,30 @@ from smolagents.agents import ActionStep, MultiStepAgent
9
  from smolagents.memory import MemoryStep
10
  from smolagents.utils import _is_package_available
11
 
12
-
13
- def pull_messages_from_step(
14
- step_log: MemoryStep,
15
- ):
16
- """Extract ChatMessage objects from agent steps with proper nesting"""
17
  import gradio as gr
18
 
19
  if isinstance(step_log, ActionStep):
20
- # Output the step number
21
  step_number = f"Step {step_log.step_number}" if step_log.step_number is not None else ""
22
  yield gr.ChatMessage(role="assistant", content=f"**{step_number}**")
23
 
24
- # First yield the thought/reasoning from the LLM
25
  if hasattr(step_log, "model_output") and step_log.model_output is not None:
26
- # Clean up the LLM output
27
  model_output = step_log.model_output.strip()
28
- # Remove any trailing <end_code> and extra backticks, handling multiple possible formats
29
- model_output = re.sub(r"```\s*<end_code>", "```", model_output) # handles ```<end_code>
30
- model_output = re.sub(r"<end_code>\s*```", "```", model_output) # handles <end_code>```
31
- model_output = re.sub(r"```\s*\n\s*<end_code>", "```", model_output) # handles ```\n<end_code>
32
  model_output = model_output.strip()
33
  yield gr.ChatMessage(role="assistant", content=model_output)
34
 
35
- # For tool calls, create a parent message
36
  if hasattr(step_log, "tool_calls") and step_log.tool_calls is not None:
37
  first_tool_call = step_log.tool_calls[0]
38
  used_code = first_tool_call.name == "python_interpreter"
39
  parent_id = f"call_{len(step_log.tool_calls)}"
40
-
41
- # Tool call becomes the parent message with timing info
42
- # First we will handle arguments based on type
43
  args = first_tool_call.arguments
44
- if isinstance(args, dict):
45
- content = str(args.get("answer", str(args)))
46
- else:
47
- content = str(args).strip()
48
-
49
  if used_code:
50
- # Clean up the content by removing any end code tags
51
- content = re.sub(r"```.*?\n", "", content) # Remove existing code blocks
52
- content = re.sub(r"\s*<end_code>\s*", "", content) # Remove end_code tags
53
  content = content.strip()
54
  if not content.startswith("```python"):
55
  content = f"```python\n{content}\n```"
@@ -57,28 +40,19 @@ def pull_messages_from_step(
57
  parent_message_tool = gr.ChatMessage(
58
  role="assistant",
59
  content=content,
60
- metadata={
61
- "title": f"🛠️ Used tool {first_tool_call.name}",
62
- "id": parent_id,
63
- "status": "pending",
64
- },
65
  )
66
  yield parent_message_tool
67
 
68
- # Nesting execution logs under the tool call if they exist
69
- if hasattr(step_log, "observations") and (
70
- step_log.observations is not None and step_log.observations.strip()
71
- ): # Only yield execution logs if there's actual content
72
  log_content = step_log.observations.strip()
73
- if log_content:
74
- log_content = re.sub(r"^Execution logs:\s*", "", log_content)
75
- yield gr.ChatMessage(
76
- role="assistant",
77
- content=f"{log_content}",
78
- metadata={"title": "📝 Execution Logs", "parent_id": parent_id, "status": "done"},
79
- )
80
-
81
- # Nesting any errors under the tool call
82
  if hasattr(step_log, "error") and step_log.error is not None:
83
  yield gr.ChatMessage(
84
  role="assistant",
@@ -86,46 +60,31 @@ def pull_messages_from_step(
86
  metadata={"title": "💥 Error", "parent_id": parent_id, "status": "done"},
87
  )
88
 
89
- # Update parent message metadata to done status without yielding a new message
90
  parent_message_tool.metadata["status"] = "done"
91
 
92
- # Handle standalone errors but not from tool calls
93
  elif hasattr(step_log, "error") and step_log.error is not None:
94
  yield gr.ChatMessage(role="assistant", content=str(step_log.error), metadata={"title": "💥 Error"})
95
 
96
- # Calculate duration and token information
97
  step_footnote = f"{step_number}"
98
  if hasattr(step_log, "input_token_count") and hasattr(step_log, "output_token_count"):
99
- token_str = (
100
- f" | Input-tokens:{step_log.input_token_count:,} | Output-tokens:{step_log.output_token_count:,}"
101
- )
102
  step_footnote += token_str
103
  if hasattr(step_log, "duration"):
104
  step_duration = f" | Duration: {round(float(step_log.duration), 2)}" if step_log.duration else None
105
  step_footnote += step_duration
106
- step_footnote = f"""<span style="color: #bbbbc2; font-size: 12px;">{step_footnote}</span> """
107
  yield gr.ChatMessage(role="assistant", content=f"{step_footnote}")
108
  yield gr.ChatMessage(role="assistant", content="-----")
109
 
110
-
111
- def stream_to_gradio(
112
- agent,
113
- task: str,
114
- reset_agent_memory: bool = False,
115
- additional_args: Optional[dict] = None,
116
- ):
117
- """Runs an agent with the given task and streams the messages from the agent as gradio ChatMessages."""
118
  if not _is_package_available("gradio"):
119
- raise ModuleNotFoundError(
120
- "Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`"
121
- )
122
  import gradio as gr
123
 
124
  total_input_tokens = 0
125
  total_output_tokens = 0
126
 
127
  for step_log in agent.run(task, stream=True, reset=reset_agent_memory, additional_args=additional_args):
128
- # Track tokens if model provides them
129
  if hasattr(agent.model, "last_input_token_count"):
130
  total_input_tokens += agent.model.last_input_token_count
131
  total_output_tokens += agent.model.last_output_token_count
@@ -133,50 +92,32 @@ def stream_to_gradio(
133
  step_log.input_token_count = agent.model.last_input_token_count
134
  step_log.output_token_count = agent.model.last_output_token_count
135
 
136
- for message in pull_messages_from_step(
137
- step_log,
138
- ):
139
  yield message
140
 
141
- final_answer = step_log # Last log is the run's final_answer
142
  final_answer = handle_agent_output_types(final_answer)
143
 
144
  if isinstance(final_answer, AgentText):
145
- yield gr.ChatMessage(
146
- role="assistant",
147
- content=f"**Final answer:**\n{final_answer.to_string()}\n",
148
- )
149
  elif isinstance(final_answer, AgentImage):
150
- yield gr.ChatMessage(
151
- role="assistant",
152
- content={"path": final_answer.to_string(), "mime_type": "image/png"},
153
- )
154
  elif isinstance(final_answer, AgentAudio):
155
- yield gr.ChatMessage(
156
- role="assistant",
157
- content={"path": final_answer.to_string(), "mime_type": "audio/wav"},
158
- )
159
  else:
160
  yield gr.ChatMessage(role="assistant", content=f"**Final answer:** {str(final_answer)}")
161
 
162
-
163
  class GradioUI:
164
- """A one-line interface to launch your agent in Gradio"""
165
-
166
  def __init__(self, agent: MultiStepAgent, file_upload_folder: str | None = None):
167
  if not _is_package_available("gradio"):
168
- raise ModuleNotFoundError(
169
- "Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`"
170
- )
171
  self.agent = agent
172
  self.file_upload_folder = file_upload_folder
173
- if self.file_upload_folder is not None:
174
- if not os.path.exists(file_upload_folder):
175
- os.mkdir(file_upload_folder)
176
 
177
  def interact_with_agent(self, prompt, messages):
178
  import gradio as gr
179
-
180
  messages.append(gr.ChatMessage(role="user", content=prompt))
181
  yield messages
182
  for msg in stream_to_gradio(self.agent, task=prompt, reset_agent_memory=False):
@@ -184,20 +125,10 @@ class GradioUI:
184
  yield messages
185
  yield messages
186
 
187
- def upload_file(
188
- self,
189
- file,
190
- file_uploads_log,
191
- allowed_file_types=[
192
- "application/pdf",
193
- "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
194
- "text/plain",
195
- ],
196
- ):
197
- """
198
- Handle file uploads, default allowed types are .pdf, .docx, and .txt
199
- """
200
  import gradio as gr
 
 
201
 
202
  if file is None:
203
  return gr.Textbox("No file uploaded", visible=True), file_uploads_log
@@ -210,38 +141,22 @@ class GradioUI:
210
  if mime_type not in allowed_file_types:
211
  return gr.Textbox("File type disallowed", visible=True), file_uploads_log
212
 
213
- # Sanitize file name
214
  original_name = os.path.basename(file.name)
215
- sanitized_name = re.sub(
216
- r"[^\w\-.]", "_", original_name
217
- ) # Replace any non-alphanumeric, non-dash, or non-dot characters with underscores
218
-
219
- type_to_ext = {}
220
- for ext, t in mimetypes.types_map.items():
221
- if t not in type_to_ext:
222
- type_to_ext[t] = ext
223
-
224
- # Ensure the extension correlates to the mime type
225
- sanitized_name = sanitized_name.split(".")[:-1]
226
- sanitized_name.append("" + type_to_ext[mime_type])
227
  sanitized_name = "".join(sanitized_name)
228
 
229
- # Save the uploaded file to the specified folder
230
- file_path = os.path.join(self.file_upload_folder, os.path.basename(sanitized_name))
231
  shutil.copy(file.name, file_path)
232
 
233
  return gr.Textbox(f"File uploaded: {file_path}", visible=True), file_uploads_log + [file_path]
234
 
235
  def log_user_message(self, text_input, file_uploads_log):
236
- return (
237
- text_input
238
- + (
239
- f"\nYou have been provided with these files, which might be helpful or not: {file_uploads_log}"
240
- if len(file_uploads_log) > 0
241
- else ""
242
- ),
243
- "",
244
- )
245
 
246
  def launch(self, **kwargs):
247
  import gradio as gr
@@ -259,23 +174,15 @@ class GradioUI:
259
  resizeable=True,
260
  scale=1,
261
  )
262
- # If an upload folder is provided, enable the upload feature
263
  if self.file_upload_folder is not None:
264
  upload_file = gr.File(label="Upload a file")
265
  upload_status = gr.Textbox(label="Upload Status", interactive=False, visible=False)
266
- upload_file.change(
267
- self.upload_file,
268
- [upload_file, file_uploads_log],
269
- [upload_status, file_uploads_log],
270
- )
271
  text_input = gr.Textbox(lines=1, label="Chat Message")
272
- text_input.submit(
273
- self.log_user_message,
274
- [text_input, file_uploads_log],
275
- [stored_messages, text_input],
276
- ).then(self.interact_with_agent, [stored_messages, chatbot], [chatbot])
277
 
278
  demo.launch(debug=True, share=True, **kwargs)
279
 
280
-
281
- __all__ = ["stream_to_gradio", "GradioUI"]
 
9
  from smolagents.memory import MemoryStep
10
  from smolagents.utils import _is_package_available
11
 
12
+ def pull_messages_from_step(step_log: MemoryStep):
 
 
 
 
13
  import gradio as gr
14
 
15
  if isinstance(step_log, ActionStep):
 
16
  step_number = f"Step {step_log.step_number}" if step_log.step_number is not None else ""
17
  yield gr.ChatMessage(role="assistant", content=f"**{step_number}**")
18
 
 
19
  if hasattr(step_log, "model_output") and step_log.model_output is not None:
 
20
  model_output = step_log.model_output.strip()
21
+ model_output = re.sub(r"```\s*<end_code>", "```", model_output)
22
+ model_output = re.sub(r"<end_code>\s*```", "```", model_output)
23
+ model_output = re.sub(r"```\s*\n\s*<end_code>", "```", model_output)
 
24
  model_output = model_output.strip()
25
  yield gr.ChatMessage(role="assistant", content=model_output)
26
 
 
27
  if hasattr(step_log, "tool_calls") and step_log.tool_calls is not None:
28
  first_tool_call = step_log.tool_calls[0]
29
  used_code = first_tool_call.name == "python_interpreter"
30
  parent_id = f"call_{len(step_log.tool_calls)}"
 
 
 
31
  args = first_tool_call.arguments
32
+ content = str(args.get("answer", str(args))) if isinstance(args, dict) else str(args).strip()
 
 
 
 
33
  if used_code:
34
+ content = re.sub(r"```.*?\n", "", content)
35
+ content = re.sub(r"\s*<end_code>\s*", "", content)
 
36
  content = content.strip()
37
  if not content.startswith("```python"):
38
  content = f"```python\n{content}\n```"
 
40
  parent_message_tool = gr.ChatMessage(
41
  role="assistant",
42
  content=content,
43
+ metadata={"title": f"🛠️ Used tool {first_tool_call.name}", "id": parent_id, "status": "pending"},
 
 
 
 
44
  )
45
  yield parent_message_tool
46
 
47
+ if hasattr(step_log, "observations") and step_log.observations and step_log.observations.strip():
 
 
 
48
  log_content = step_log.observations.strip()
49
+ log_content = re.sub(r"^Execution logs:\s*", "", log_content)
50
+ yield gr.ChatMessage(
51
+ role="assistant",
52
+ content=f"{log_content}",
53
+ metadata={"title": "📝 Execution Logs", "parent_id": parent_id, "status": "done"},
54
+ )
55
+
 
 
56
  if hasattr(step_log, "error") and step_log.error is not None:
57
  yield gr.ChatMessage(
58
  role="assistant",
 
60
  metadata={"title": "💥 Error", "parent_id": parent_id, "status": "done"},
61
  )
62
 
 
63
  parent_message_tool.metadata["status"] = "done"
64
 
 
65
  elif hasattr(step_log, "error") and step_log.error is not None:
66
  yield gr.ChatMessage(role="assistant", content=str(step_log.error), metadata={"title": "💥 Error"})
67
 
 
68
  step_footnote = f"{step_number}"
69
  if hasattr(step_log, "input_token_count") and hasattr(step_log, "output_token_count"):
70
+ token_str = f" | Input-tokens:{step_log.input_token_count:,} | Output-tokens:{step_log.output_token_count:,}"
 
 
71
  step_footnote += token_str
72
  if hasattr(step_log, "duration"):
73
  step_duration = f" | Duration: {round(float(step_log.duration), 2)}" if step_log.duration else None
74
  step_footnote += step_duration
75
+ step_footnote = f"""<span style=\"color: #bbbbc2; font-size: 12px;\">{step_footnote}</span> """
76
  yield gr.ChatMessage(role="assistant", content=f"{step_footnote}")
77
  yield gr.ChatMessage(role="assistant", content="-----")
78
 
79
+ def stream_to_gradio(agent, task: str, reset_agent_memory: bool = False, additional_args: Optional[dict] = None):
 
 
 
 
 
 
 
80
  if not _is_package_available("gradio"):
81
+ raise ModuleNotFoundError("Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`")
 
 
82
  import gradio as gr
83
 
84
  total_input_tokens = 0
85
  total_output_tokens = 0
86
 
87
  for step_log in agent.run(task, stream=True, reset=reset_agent_memory, additional_args=additional_args):
 
88
  if hasattr(agent.model, "last_input_token_count"):
89
  total_input_tokens += agent.model.last_input_token_count
90
  total_output_tokens += agent.model.last_output_token_count
 
92
  step_log.input_token_count = agent.model.last_input_token_count
93
  step_log.output_token_count = agent.model.last_output_token_count
94
 
95
+ for message in pull_messages_from_step(step_log):
 
 
96
  yield message
97
 
98
+ final_answer = step_log
99
  final_answer = handle_agent_output_types(final_answer)
100
 
101
  if isinstance(final_answer, AgentText):
102
+ yield gr.ChatMessage(role="assistant", content=f"**Final answer:**\n{final_answer.to_string()}\n")
 
 
 
103
  elif isinstance(final_answer, AgentImage):
104
+ yield gr.ChatMessage(role="assistant", content={"path": final_answer.to_string(), "mime_type": "image/png"})
 
 
 
105
  elif isinstance(final_answer, AgentAudio):
106
+ yield gr.ChatMessage(role="assistant", content={"path": final_answer.to_string(), "mime_type": "audio/wav"})
 
 
 
107
  else:
108
  yield gr.ChatMessage(role="assistant", content=f"**Final answer:** {str(final_answer)}")
109
 
 
110
  class GradioUI:
 
 
111
  def __init__(self, agent: MultiStepAgent, file_upload_folder: str | None = None):
112
  if not _is_package_available("gradio"):
113
+ raise ModuleNotFoundError("Please install 'gradio' extra to use the GradioUI: `pip install 'smolagents[gradio]'`")
 
 
114
  self.agent = agent
115
  self.file_upload_folder = file_upload_folder
116
+ if self.file_upload_folder is not None and not os.path.exists(file_upload_folder):
117
+ os.mkdir(file_upload_folder)
 
118
 
119
  def interact_with_agent(self, prompt, messages):
120
  import gradio as gr
 
121
  messages.append(gr.ChatMessage(role="user", content=prompt))
122
  yield messages
123
  for msg in stream_to_gradio(self.agent, task=prompt, reset_agent_memory=False):
 
125
  yield messages
126
  yield messages
127
 
128
+ def upload_file(self, file, file_uploads_log, allowed_file_types=None):
 
 
 
 
 
 
 
 
 
 
 
 
129
  import gradio as gr
130
+ if allowed_file_types is None:
131
+ allowed_file_types = ["application/pdf", "application/vnd.openxmlformats-officedocument.wordprocessingml.document", "text/plain"]
132
 
133
  if file is None:
134
  return gr.Textbox("No file uploaded", visible=True), file_uploads_log
 
141
  if mime_type not in allowed_file_types:
142
  return gr.Textbox("File type disallowed", visible=True), file_uploads_log
143
 
 
144
  original_name = os.path.basename(file.name)
145
+ sanitized_name = re.sub(r"[^\w\-.]", "_", original_name)
146
+ ext_map = {v: k for k, v in mimetypes.types_map.items()}
147
+ sanitized_name = sanitized_name.split(".")[:-1] + [ext_map.get(mime_type, "txt")]
 
 
 
 
 
 
 
 
 
148
  sanitized_name = "".join(sanitized_name)
149
 
150
+ file_path = os.path.join(self.file_upload_folder, sanitized_name)
 
151
  shutil.copy(file.name, file_path)
152
 
153
  return gr.Textbox(f"File uploaded: {file_path}", visible=True), file_uploads_log + [file_path]
154
 
155
  def log_user_message(self, text_input, file_uploads_log):
156
+ context = text_input
157
+ if file_uploads_log:
158
+ context += f"\nAttached files: {file_uploads_log}"
159
+ return context, ""
 
 
 
 
 
160
 
161
  def launch(self, **kwargs):
162
  import gradio as gr
 
174
  resizeable=True,
175
  scale=1,
176
  )
 
177
  if self.file_upload_folder is not None:
178
  upload_file = gr.File(label="Upload a file")
179
  upload_status = gr.Textbox(label="Upload Status", interactive=False, visible=False)
180
+ upload_file.change(self.upload_file, [upload_file, file_uploads_log], [upload_status, file_uploads_log])
181
+
 
 
 
182
  text_input = gr.Textbox(lines=1, label="Chat Message")
183
+ text_input.submit(self.log_user_message, [text_input, file_uploads_log], [stored_messages, text_input])\
184
+ .then(self.interact_with_agent, [stored_messages, chatbot], [chatbot])
 
 
 
185
 
186
  demo.launch(debug=True, share=True, **kwargs)
187
 
188
+ __all__ = ["stream_to_gradio", "GradioUI"]