RalphThings commited on
Commit
aab5620
·
verified ·
1 Parent(s): 7817058

Upload 3 files

Browse files
Files changed (3) hide show
  1. text_inspector_tool.py +124 -0
  2. text_web_browser.py +567 -0
  3. visual_qa.py +189 -0
text_inspector_tool.py ADDED
@@ -0,0 +1,124 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from smolagents import Tool
2
+ from smolagents.models import Model
3
+
4
+
5
+ class TextInspectorTool(Tool):
6
+ name = "inspect_file_as_text"
7
+ description = """
8
+ You cannot load files yourself: instead call this tool to read a file as markdown text and ask questions about it.
9
+ This tool handles the following file extensions: [".html", ".htm", ".xlsx", ".pptx", ".wav", ".mp3", ".m4a", ".flac", ".pdf", ".docx"], and all other types of text files. IT DOES NOT HANDLE IMAGES."""
10
+
11
+ inputs = {
12
+ "file_path": {
13
+ "description": "The path to the file you want to read as text. Must be a '.something' file, like '.pdf'. If it is an image, use the visualizer tool instead! DO NOT use this tool for an HTML webpage: use the web_search tool instead!",
14
+ "type": "string",
15
+ },
16
+ "question": {
17
+ "description": "[Optional]: Your question, as a natural language sentence. Provide as much context as possible. Do not pass this parameter if you just want to directly return the content of the file.",
18
+ "type": "string",
19
+ "nullable": True,
20
+ },
21
+ }
22
+ output_type = "string"
23
+
24
+ def __init__(self, model: Model = None, text_limit: int = 100000):
25
+ super().__init__()
26
+ self.model = model
27
+ self.text_limit = text_limit
28
+ from .mdconvert import MarkdownConverter
29
+
30
+ self.md_converter = MarkdownConverter()
31
+
32
+ def forward_initial_exam_mode(self, file_path, question):
33
+ from smolagents.models import MessageRole
34
+
35
+ result = self.md_converter.convert(file_path)
36
+
37
+ if file_path[-4:] in [".png", ".jpg"]:
38
+ raise Exception("Cannot use inspect_file_as_text tool with images: use visualizer instead!")
39
+
40
+ if ".zip" in file_path:
41
+ return result.text_content
42
+
43
+ if not question:
44
+ return result.text_content
45
+
46
+ if len(result.text_content) < 4000:
47
+ return "Document content: " + result.text_content
48
+
49
+ messages = [
50
+ {
51
+ "role": MessageRole.SYSTEM,
52
+ "content": [
53
+ {
54
+ "type": "text",
55
+ "text": "Here is a file:\n### "
56
+ + str(result.title)
57
+ + "\n\n"
58
+ + result.text_content[: self.text_limit],
59
+ }
60
+ ],
61
+ },
62
+ {
63
+ "role": MessageRole.USER,
64
+ "content": [
65
+ {
66
+ "type": "text",
67
+ "text": "Now please write a short, 5 sentence caption for this document, that could help someone asking this question: "
68
+ + question
69
+ + "\n\nDon't answer the question yourself! Just provide useful notes on the document",
70
+ }
71
+ ],
72
+ },
73
+ ]
74
+ return self.model(messages).content
75
+
76
+ def forward(self, file_path, question: str | None = None) -> str:
77
+ from smolagents.models import MessageRole
78
+
79
+ result = self.md_converter.convert(file_path)
80
+
81
+ if file_path[-4:] in [".png", ".jpg"]:
82
+ raise Exception("Cannot use inspect_file_as_text tool with images: use visualizer instead!")
83
+
84
+ if ".zip" in file_path:
85
+ return result.text_content
86
+
87
+ if not question:
88
+ return result.text_content
89
+
90
+ messages = [
91
+ {
92
+ "role": MessageRole.SYSTEM,
93
+ "content": [
94
+ {
95
+ "type": "text",
96
+ "text": "You will have to write a short caption for this file, then answer this question:"
97
+ + question,
98
+ }
99
+ ],
100
+ },
101
+ {
102
+ "role": MessageRole.USER,
103
+ "content": [
104
+ {
105
+ "type": "text",
106
+ "text": "Here is the complete file:\n### "
107
+ + str(result.title)
108
+ + "\n\n"
109
+ + result.text_content[: self.text_limit],
110
+ }
111
+ ],
112
+ },
113
+ {
114
+ "role": MessageRole.USER,
115
+ "content": [
116
+ {
117
+ "type": "text",
118
+ "text": "Now answer the question below. Use these three headings: '1. Short answer', '2. Extremely detailed answer', '3. Additional Context on the document and question asked'."
119
+ + question,
120
+ }
121
+ ],
122
+ },
123
+ ]
124
+ return self.model(messages).content
text_web_browser.py ADDED
@@ -0,0 +1,567 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Shamelessly stolen from Microsoft Autogen team: thanks to them for this great resource!
2
+ # https://github.com/microsoft/autogen/blob/gaia_multiagent_v01_march_1st/autogen/browser_utils.py
3
+ import mimetypes
4
+ import os
5
+ import pathlib
6
+ import re
7
+ import time
8
+ import uuid
9
+ from typing import Any
10
+ from urllib.parse import unquote, urljoin, urlparse
11
+
12
+ import pathvalidate
13
+ import requests
14
+ from serpapi import GoogleSearch
15
+
16
+ from smolagents import Tool
17
+
18
+ from .cookies import COOKIES
19
+ from .mdconvert import FileConversionException, MarkdownConverter, UnsupportedFormatException
20
+
21
+
22
+ class SimpleTextBrowser:
23
+ """(In preview) An extremely simple text-based web browser comparable to Lynx. Suitable for Agentic use."""
24
+
25
+ def __init__(
26
+ self,
27
+ start_page: str | None = None,
28
+ viewport_size: int | None = 1024 * 8,
29
+ downloads_folder: str | None | None = None,
30
+ serpapi_key: str | None | None = None,
31
+ request_kwargs: dict[str, Any] | None | None = None,
32
+ ):
33
+ self.start_page: str = start_page if start_page else "about:blank"
34
+ self.viewport_size = viewport_size # Applies only to the standard uri types
35
+ self.downloads_folder = downloads_folder
36
+ self.history: list[tuple[str, float]] = list()
37
+ self.page_title: str | None = None
38
+ self.viewport_current_page = 0
39
+ self.viewport_pages: list[tuple[int, int]] = list()
40
+ self.set_address(self.start_page)
41
+ self.serpapi_key = serpapi_key
42
+ self.request_kwargs = request_kwargs
43
+ self.request_kwargs["cookies"] = COOKIES
44
+ self._mdconvert = MarkdownConverter()
45
+ self._page_content: str = ""
46
+
47
+ self._find_on_page_query: str | None = None
48
+ self._find_on_page_last_result: int | None = None # Location of the last result
49
+
50
+ @property
51
+ def address(self) -> str:
52
+ """Return the address of the current page."""
53
+ return self.history[-1][0]
54
+
55
+ def set_address(self, uri_or_path: str, filter_year: int | None = None) -> None:
56
+ # TODO: Handle anchors
57
+ self.history.append((uri_or_path, time.time()))
58
+
59
+ # Handle special URIs
60
+ if uri_or_path == "about:blank":
61
+ self._set_page_content("")
62
+ elif uri_or_path.startswith("google:"):
63
+ self._serpapi_search(uri_or_path[len("google:") :].strip(), filter_year=filter_year)
64
+ else:
65
+ if (
66
+ not uri_or_path.startswith("http:")
67
+ and not uri_or_path.startswith("https:")
68
+ and not uri_or_path.startswith("file:")
69
+ ):
70
+ if len(self.history) > 1:
71
+ prior_address = self.history[-2][0]
72
+ uri_or_path = urljoin(prior_address, uri_or_path)
73
+ # Update the address with the fully-qualified path
74
+ self.history[-1] = (uri_or_path, self.history[-1][1])
75
+ self._fetch_page(uri_or_path)
76
+
77
+ self.viewport_current_page = 0
78
+ self.find_on_page_query = None
79
+ self.find_on_page_viewport = None
80
+
81
+ @property
82
+ def viewport(self) -> str:
83
+ """Return the content of the current viewport."""
84
+ bounds = self.viewport_pages[self.viewport_current_page]
85
+ return self.page_content[bounds[0] : bounds[1]]
86
+
87
+ @property
88
+ def page_content(self) -> str:
89
+ """Return the full contents of the current page."""
90
+ return self._page_content
91
+
92
+ def _set_page_content(self, content: str) -> None:
93
+ """Sets the text content of the current page."""
94
+ self._page_content = content
95
+ self._split_pages()
96
+ if self.viewport_current_page >= len(self.viewport_pages):
97
+ self.viewport_current_page = len(self.viewport_pages) - 1
98
+
99
+ def page_down(self) -> None:
100
+ self.viewport_current_page = min(self.viewport_current_page + 1, len(self.viewport_pages) - 1)
101
+
102
+ def page_up(self) -> None:
103
+ self.viewport_current_page = max(self.viewport_current_page - 1, 0)
104
+
105
+ def find_on_page(self, query: str) -> str | None:
106
+ """Searches for the query from the current viewport forward, looping back to the start if necessary."""
107
+
108
+ # Did we get here via a previous find_on_page search with the same query?
109
+ # If so, map to find_next
110
+ if query == self._find_on_page_query and self.viewport_current_page == self._find_on_page_last_result:
111
+ return self.find_next()
112
+
113
+ # Ok it's a new search start from the current viewport
114
+ self._find_on_page_query = query
115
+ viewport_match = self._find_next_viewport(query, self.viewport_current_page)
116
+ if viewport_match is None:
117
+ self._find_on_page_last_result = None
118
+ return None
119
+ else:
120
+ self.viewport_current_page = viewport_match
121
+ self._find_on_page_last_result = viewport_match
122
+ return self.viewport
123
+
124
+ def find_next(self) -> str | None:
125
+ """Scroll to the next viewport that matches the query"""
126
+
127
+ if self._find_on_page_query is None:
128
+ return None
129
+
130
+ starting_viewport = self._find_on_page_last_result
131
+ if starting_viewport is None:
132
+ starting_viewport = 0
133
+ else:
134
+ starting_viewport += 1
135
+ if starting_viewport >= len(self.viewport_pages):
136
+ starting_viewport = 0
137
+
138
+ viewport_match = self._find_next_viewport(self._find_on_page_query, starting_viewport)
139
+ if viewport_match is None:
140
+ self._find_on_page_last_result = None
141
+ return None
142
+ else:
143
+ self.viewport_current_page = viewport_match
144
+ self._find_on_page_last_result = viewport_match
145
+ return self.viewport
146
+
147
+ def _find_next_viewport(self, query: str, starting_viewport: int) -> int | None:
148
+ """Search for matches between the starting viewport looping when reaching the end."""
149
+
150
+ if query is None:
151
+ return None
152
+
153
+ # Normalize the query, and convert to a regular expression
154
+ nquery = re.sub(r"\*", "__STAR__", query)
155
+ nquery = " " + (" ".join(re.split(r"\W+", nquery))).strip() + " "
156
+ nquery = nquery.replace(" __STAR__ ", "__STAR__ ") # Merge isolated stars with prior word
157
+ nquery = nquery.replace("__STAR__", ".*").lower()
158
+
159
+ if nquery.strip() == "":
160
+ return None
161
+
162
+ idxs = list()
163
+ idxs.extend(range(starting_viewport, len(self.viewport_pages)))
164
+ idxs.extend(range(0, starting_viewport))
165
+
166
+ for i in idxs:
167
+ bounds = self.viewport_pages[i]
168
+ content = self.page_content[bounds[0] : bounds[1]]
169
+
170
+ # TODO: Remove markdown links and images
171
+ ncontent = " " + (" ".join(re.split(r"\W+", content))).strip().lower() + " "
172
+ if re.search(nquery, ncontent):
173
+ return i
174
+
175
+ return None
176
+
177
+ def visit_page(self, path_or_uri: str, filter_year: int | None = None) -> str:
178
+ """Update the address, visit the page, and return the content of the viewport."""
179
+ self.set_address(path_or_uri, filter_year=filter_year)
180
+ return self.viewport
181
+
182
+ def _split_pages(self) -> None:
183
+ # Do not split search results
184
+ if self.address.startswith("google:"):
185
+ self.viewport_pages = [(0, len(self._page_content))]
186
+ return
187
+
188
+ # Handle empty pages
189
+ if len(self._page_content) == 0:
190
+ self.viewport_pages = [(0, 0)]
191
+ return
192
+
193
+ # Break the viewport into pages
194
+ self.viewport_pages = []
195
+ start_idx = 0
196
+ while start_idx < len(self._page_content):
197
+ end_idx = min(start_idx + self.viewport_size, len(self._page_content)) # type: ignore[operator]
198
+ # Adjust to end on a space
199
+ while end_idx < len(self._page_content) and self._page_content[end_idx - 1] not in [" ", "\t", "\r", "\n"]:
200
+ end_idx += 1
201
+ self.viewport_pages.append((start_idx, end_idx))
202
+ start_idx = end_idx
203
+
204
+ def _serpapi_search(self, query: str, filter_year: int | None = None) -> None:
205
+ if self.serpapi_key is None:
206
+ raise ValueError("Missing SerpAPI key.")
207
+
208
+ params = {
209
+ "engine": "google",
210
+ "q": query,
211
+ "api_key": self.serpapi_key,
212
+ }
213
+ if filter_year is not None:
214
+ params["tbs"] = f"cdr:1,cd_min:01/01/{filter_year},cd_max:12/31/{filter_year}"
215
+
216
+ search = GoogleSearch(params)
217
+ results = search.get_dict()
218
+ self.page_title = f"{query} - Search"
219
+ if "organic_results" not in results.keys():
220
+ raise Exception(f"No results found for query: '{query}'. Use a less specific query.")
221
+ if len(results["organic_results"]) == 0:
222
+ year_filter_message = f" with filter year={filter_year}" if filter_year is not None else ""
223
+ self._set_page_content(
224
+ f"No results found for '{query}'{year_filter_message}. Try with a more general query, or remove the year filter."
225
+ )
226
+ return
227
+
228
+ def _prev_visit(url):
229
+ for i in range(len(self.history) - 1, -1, -1):
230
+ if self.history[i][0] == url:
231
+ return f"You previously visited this page {round(time.time() - self.history[i][1])} seconds ago.\n"
232
+ return ""
233
+
234
+ web_snippets: list[str] = list()
235
+ idx = 0
236
+ if "organic_results" in results:
237
+ for page in results["organic_results"]:
238
+ idx += 1
239
+ date_published = ""
240
+ if "date" in page:
241
+ date_published = "\nDate published: " + page["date"]
242
+
243
+ source = ""
244
+ if "source" in page:
245
+ source = "\nSource: " + page["source"]
246
+
247
+ snippet = ""
248
+ if "snippet" in page:
249
+ snippet = "\n" + page["snippet"]
250
+
251
+ redacted_version = f"{idx}. [{page['title']}]({page['link']}){date_published}{source}\n{_prev_visit(page['link'])}{snippet}"
252
+
253
+ redacted_version = redacted_version.replace("Your browser can't play this video.", "")
254
+ web_snippets.append(redacted_version)
255
+
256
+ content = (
257
+ f"A Google search for '{query}' found {len(web_snippets)} results:\n\n## Web Results\n"
258
+ + "\n\n".join(web_snippets)
259
+ )
260
+
261
+ self._set_page_content(content)
262
+
263
+ def _fetch_page(self, url: str) -> None:
264
+ download_path = ""
265
+ try:
266
+ if url.startswith("file://"):
267
+ download_path = os.path.normcase(os.path.normpath(unquote(url[7:])))
268
+ res = self._mdconvert.convert_local(download_path)
269
+ self.page_title = res.title
270
+ self._set_page_content(res.text_content)
271
+ else:
272
+ # Prepare the request parameters
273
+ request_kwargs = self.request_kwargs.copy() if self.request_kwargs is not None else {}
274
+ request_kwargs["stream"] = True
275
+
276
+ # Send a HTTP request to the URL
277
+ response = requests.get(url, **request_kwargs)
278
+ response.raise_for_status()
279
+
280
+ # If the HTTP request was successful
281
+ content_type = response.headers.get("content-type", "")
282
+
283
+ # Text or HTML
284
+ if "text/" in content_type.lower():
285
+ res = self._mdconvert.convert_response(response)
286
+ self.page_title = res.title
287
+ self._set_page_content(res.text_content)
288
+ # A download
289
+ else:
290
+ # Try producing a safe filename
291
+ fname = None
292
+ download_path = None
293
+ try:
294
+ fname = pathvalidate.sanitize_filename(os.path.basename(urlparse(url).path)).strip()
295
+ download_path = os.path.abspath(os.path.join(self.downloads_folder, fname))
296
+
297
+ suffix = 0
298
+ while os.path.exists(download_path) and suffix < 1000:
299
+ suffix += 1
300
+ base, ext = os.path.splitext(fname)
301
+ new_fname = f"{base}__{suffix}{ext}"
302
+ download_path = os.path.abspath(os.path.join(self.downloads_folder, new_fname))
303
+
304
+ except NameError:
305
+ pass
306
+
307
+ # No suitable name, so make one
308
+ if fname is None:
309
+ extension = mimetypes.guess_extension(content_type)
310
+ if extension is None:
311
+ extension = ".download"
312
+ fname = str(uuid.uuid4()) + extension
313
+ download_path = os.path.abspath(os.path.join(self.downloads_folder, fname))
314
+
315
+ # Open a file for writing
316
+ with open(download_path, "wb") as fh:
317
+ for chunk in response.iter_content(chunk_size=512):
318
+ fh.write(chunk)
319
+
320
+ # Render it
321
+ local_uri = pathlib.Path(download_path).as_uri()
322
+ self.set_address(local_uri)
323
+
324
+ except UnsupportedFormatException as e:
325
+ print(e)
326
+ self.page_title = ("Download complete.",)
327
+ self._set_page_content(f"# Download complete\n\nSaved file to '{download_path}'")
328
+ except FileConversionException as e:
329
+ print(e)
330
+ self.page_title = ("Download complete.",)
331
+ self._set_page_content(f"# Download complete\n\nSaved file to '{download_path}'")
332
+ except FileNotFoundError:
333
+ self.page_title = "Error 404"
334
+ self._set_page_content(f"## Error 404\n\nFile not found: {download_path}")
335
+ except requests.exceptions.RequestException as request_exception:
336
+ try:
337
+ self.page_title = f"Error {response.status_code}"
338
+
339
+ # If the error was rendered in HTML we might as well render it
340
+ content_type = response.headers.get("content-type", "")
341
+ if content_type is not None and "text/html" in content_type.lower():
342
+ res = self._mdconvert.convert(response)
343
+ self.page_title = f"Error {response.status_code}"
344
+ self._set_page_content(f"## Error {response.status_code}\n\n{res.text_content}")
345
+ else:
346
+ text = ""
347
+ for chunk in response.iter_content(chunk_size=512, decode_unicode=True):
348
+ text += chunk
349
+ self.page_title = f"Error {response.status_code}"
350
+ self._set_page_content(f"## Error {response.status_code}\n\n{text}")
351
+ except NameError:
352
+ self.page_title = "Error"
353
+ self._set_page_content(f"## Error\n\n{str(request_exception)}")
354
+
355
+ def _state(self) -> tuple[str, str]:
356
+ header = f"Address: {self.address}\n"
357
+ if self.page_title is not None:
358
+ header += f"Title: {self.page_title}\n"
359
+
360
+ current_page = self.viewport_current_page
361
+ total_pages = len(self.viewport_pages)
362
+
363
+ address = self.address
364
+ for i in range(len(self.history) - 2, -1, -1): # Start from the second last
365
+ if self.history[i][0] == address:
366
+ header += f"You previously visited this page {round(time.time() - self.history[i][1])} seconds ago.\n"
367
+ break
368
+
369
+ header += f"Viewport position: Showing page {current_page + 1} of {total_pages}.\n"
370
+ return (header, self.viewport)
371
+
372
+
373
+ class SearchInformationTool(Tool):
374
+ name = "web_search"
375
+ description = "Perform a web search query (think a google search) and returns the search results."
376
+ inputs = {"query": {"type": "string", "description": "The web search query to perform."}}
377
+ inputs["filter_year"] = {
378
+ "type": "string",
379
+ "description": "[Optional parameter]: filter the search results to only include pages from a specific year. For example, '2020' will only include pages from 2020. Make sure to use this parameter if you're trying to search for articles from a specific date!",
380
+ "nullable": True,
381
+ }
382
+ output_type = "string"
383
+
384
+ def __init__(self, browser):
385
+ super().__init__()
386
+ self.browser = browser
387
+
388
+ def forward(self, query: str, filter_year: int | None = None) -> str:
389
+ self.browser.visit_page(f"google: {query}", filter_year=filter_year)
390
+ header, content = self.browser._state()
391
+ return header.strip() + "\n=======================\n" + content
392
+
393
+
394
+ class VisitTool(Tool):
395
+ name = "visit_page"
396
+ description = "Visit a webpage at a given URL and return its text. Given a url to a YouTube video, this returns the transcript."
397
+ inputs = {"url": {"type": "string", "description": "The relative or absolute url of the webpage to visit."}}
398
+ output_type = "string"
399
+
400
+ def __init__(self, browser=None):
401
+ super().__init__()
402
+ self.browser = browser
403
+
404
+ def forward(self, url: str) -> str:
405
+ self.browser.visit_page(url)
406
+ header, content = self.browser._state()
407
+ return header.strip() + "\n=======================\n" + content
408
+
409
+
410
+ class DownloadTool(Tool):
411
+ name = "download_file"
412
+ description = """
413
+ Download a file at a given URL. The file should be of this format: [".xlsx", ".pptx", ".wav", ".mp3", ".m4a", ".png", ".docx"]
414
+ After using this tool, for further inspection of this page you should return the download path to your manager via final_answer, and they will be able to inspect it.
415
+ DO NOT use this tool for .pdf or .txt or .htm files: for these types of files use visit_page with the file url instead."""
416
+ inputs = {"url": {"type": "string", "description": "The relative or absolute url of the file to be downloaded."}}
417
+ output_type = "string"
418
+
419
+ def __init__(self, browser):
420
+ super().__init__()
421
+ self.browser = browser
422
+
423
+ def forward(self, url: str) -> str:
424
+ import requests
425
+
426
+ if "arxiv" in url:
427
+ url = url.replace("abs", "pdf")
428
+ response = requests.get(url)
429
+ content_type = response.headers.get("content-type", "")
430
+ extension = mimetypes.guess_extension(content_type)
431
+ if extension and isinstance(extension, str):
432
+ new_path = f"./downloads/file{extension}"
433
+ else:
434
+ new_path = "./downloads/file.object"
435
+
436
+ with open(new_path, "wb") as f:
437
+ f.write(response.content)
438
+
439
+ if "pdf" in extension or "txt" in extension or "htm" in extension:
440
+ raise Exception("Do not use this tool for pdf or txt or html files: use visit_page instead.")
441
+
442
+ return f"File was downloaded and saved under path {new_path}."
443
+
444
+
445
+ class ArchiveSearchTool(Tool):
446
+ name = "find_archived_url"
447
+ description = "Given a url, searches the Wayback Machine and returns the archived version of the url that's closest in time to the desired date."
448
+ inputs = {
449
+ "url": {"type": "string", "description": "The url you need the archive for."},
450
+ "date": {
451
+ "type": "string",
452
+ "description": "The date that you want to find the archive for. Give this date in the format 'YYYYMMDD', for instance '27 June 2008' is written as '20080627'.",
453
+ },
454
+ }
455
+ output_type = "string"
456
+
457
+ def __init__(self, browser=None):
458
+ super().__init__()
459
+ self.browser = browser
460
+
461
+ def forward(self, url, date) -> str:
462
+ import requests
463
+
464
+ no_timestamp_url = f"https://archive.org/wayback/available?url={url}"
465
+ archive_url = no_timestamp_url + f"&timestamp={date}"
466
+ response = requests.get(archive_url).json()
467
+ response_notimestamp = requests.get(no_timestamp_url).json()
468
+ if "archived_snapshots" in response and "closest" in response["archived_snapshots"]:
469
+ closest = response["archived_snapshots"]["closest"]
470
+ print("Archive found!", closest)
471
+
472
+ elif "archived_snapshots" in response_notimestamp and "closest" in response_notimestamp["archived_snapshots"]:
473
+ closest = response_notimestamp["archived_snapshots"]["closest"]
474
+ print("Archive found!", closest)
475
+ else:
476
+ raise Exception(f"Your {url=} was not archived on Wayback Machine, try a different url.")
477
+ target_url = closest["url"]
478
+ self.browser.visit_page(target_url)
479
+ header, content = self.browser._state()
480
+ return (
481
+ f"Web archive for url {url}, snapshot taken at date {closest['timestamp'][:8]}:\n"
482
+ + header.strip()
483
+ + "\n=======================\n"
484
+ + content
485
+ )
486
+
487
+
488
+ class PageUpTool(Tool):
489
+ name = "page_up"
490
+ description = "Scroll the viewport UP one page-length in the current webpage and return the new viewport content."
491
+ inputs = {}
492
+ output_type = "string"
493
+
494
+ def __init__(self, browser=None):
495
+ super().__init__()
496
+ self.browser = browser
497
+
498
+ def forward(self) -> str:
499
+ self.browser.page_up()
500
+ header, content = self.browser._state()
501
+ return header.strip() + "\n=======================\n" + content
502
+
503
+
504
+ class PageDownTool(Tool):
505
+ name = "page_down"
506
+ description = (
507
+ "Scroll the viewport DOWN one page-length in the current webpage and return the new viewport content."
508
+ )
509
+ inputs = {}
510
+ output_type = "string"
511
+
512
+ def __init__(self, browser=None):
513
+ super().__init__()
514
+ self.browser = browser
515
+
516
+ def forward(self) -> str:
517
+ self.browser.page_down()
518
+ header, content = self.browser._state()
519
+ return header.strip() + "\n=======================\n" + content
520
+
521
+
522
+ class FinderTool(Tool):
523
+ name = "find_on_page_ctrl_f"
524
+ description = "Scroll the viewport to the first occurrence of the search string. This is equivalent to Ctrl+F."
525
+ inputs = {
526
+ "search_string": {
527
+ "type": "string",
528
+ "description": "The string to search for on the page. This search string supports wildcards like '*'",
529
+ }
530
+ }
531
+ output_type = "string"
532
+
533
+ def __init__(self, browser=None):
534
+ super().__init__()
535
+ self.browser = browser
536
+
537
+ def forward(self, search_string: str) -> str:
538
+ find_result = self.browser.find_on_page(search_string)
539
+ header, content = self.browser._state()
540
+
541
+ if find_result is None:
542
+ return (
543
+ header.strip()
544
+ + f"\n=======================\nThe search string '{search_string}' was not found on this page."
545
+ )
546
+ else:
547
+ return header.strip() + "\n=======================\n" + content
548
+
549
+
550
+ class FindNextTool(Tool):
551
+ name = "find_next"
552
+ description = "Scroll the viewport to next occurrence of the search string. This is equivalent to finding the next match in a Ctrl+F search."
553
+ inputs = {}
554
+ output_type = "string"
555
+
556
+ def __init__(self, browser=None):
557
+ super().__init__()
558
+ self.browser = browser
559
+
560
+ def forward(self) -> str:
561
+ find_result = self.browser.find_next()
562
+ header, content = self.browser._state()
563
+
564
+ if find_result is None:
565
+ return header.strip() + "\n=======================\nThe search string was not found on this page."
566
+ else:
567
+ return header.strip() + "\n=======================\n" + content
visual_qa.py ADDED
@@ -0,0 +1,189 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import base64
2
+ import json
3
+ import mimetypes
4
+ import os
5
+ import uuid
6
+ from io import BytesIO
7
+
8
+ import PIL.Image
9
+ import requests
10
+ from dotenv import load_dotenv
11
+ from huggingface_hub import InferenceClient
12
+
13
+ from smolagents import Tool, tool
14
+
15
+
16
+ load_dotenv(override=True)
17
+
18
+
19
+ def process_images_and_text(image_path, query, client):
20
+ from transformers import AutoProcessor
21
+
22
+ messages = [
23
+ {
24
+ "role": "user",
25
+ "content": [
26
+ {"type": "image"},
27
+ {"type": "text", "text": query},
28
+ ],
29
+ },
30
+ ]
31
+ idefics_processor = AutoProcessor.from_pretrained("HuggingFaceM4/idefics2-8b-chatty")
32
+ prompt_with_template = idefics_processor.apply_chat_template(messages, add_generation_prompt=True)
33
+
34
+ # load images from local directory
35
+
36
+ # encode images to strings which can be sent to the endpoint
37
+ def encode_local_image(image_path):
38
+ # load image
39
+ image = PIL.Image.open(image_path).convert("RGB")
40
+
41
+ # Convert the image to a base64 string
42
+ buffer = BytesIO()
43
+ image.save(buffer, format="JPEG") # Use the appropriate format (e.g., JPEG, PNG)
44
+ base64_image = base64.b64encode(buffer.getvalue()).decode("utf-8")
45
+
46
+ # add string formatting required by the endpoint
47
+ image_string = f"data:image/jpeg;base64,{base64_image}"
48
+
49
+ return image_string
50
+
51
+ image_string = encode_local_image(image_path)
52
+ prompt_with_images = prompt_with_template.replace("<image>", "![]({}) ").format(image_string)
53
+
54
+ payload = {
55
+ "inputs": prompt_with_images,
56
+ "parameters": {
57
+ "return_full_text": False,
58
+ "max_new_tokens": 200,
59
+ },
60
+ }
61
+
62
+ return json.loads(client.post(json=payload).decode())[0]
63
+
64
+
65
+ # Function to encode the image
66
+ def encode_image(image_path):
67
+ if image_path.startswith("http"):
68
+ user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
69
+ request_kwargs = {
70
+ "headers": {"User-Agent": user_agent},
71
+ "stream": True,
72
+ }
73
+
74
+ # Send a HTTP request to the URL
75
+ response = requests.get(image_path, **request_kwargs)
76
+ response.raise_for_status()
77
+ content_type = response.headers.get("content-type", "")
78
+
79
+ extension = mimetypes.guess_extension(content_type)
80
+ if extension is None:
81
+ extension = ".download"
82
+
83
+ fname = str(uuid.uuid4()) + extension
84
+ download_path = os.path.abspath(os.path.join("downloads", fname))
85
+
86
+ with open(download_path, "wb") as fh:
87
+ for chunk in response.iter_content(chunk_size=512):
88
+ fh.write(chunk)
89
+
90
+ image_path = download_path
91
+
92
+ with open(image_path, "rb") as image_file:
93
+ return base64.b64encode(image_file.read()).decode("utf-8")
94
+
95
+
96
+ def resize_image(image_path):
97
+ img = PIL.Image.open(image_path)
98
+ width, height = img.size
99
+ img = img.resize((int(width / 2), int(height / 2)))
100
+ new_image_path = f"resized_{image_path}"
101
+ img.save(new_image_path)
102
+ return new_image_path
103
+
104
+
105
+ class VisualQATool(Tool):
106
+ name = "visualizer"
107
+ description = "A tool that can answer questions about attached images."
108
+ inputs = {
109
+ "image_path": {
110
+ "description": "The path to the image on which to answer the question",
111
+ "type": "string",
112
+ },
113
+ "question": {"description": "the question to answer", "type": "string", "nullable": True},
114
+ }
115
+ output_type = "string"
116
+
117
+ client = InferenceClient("HuggingFaceM4/idefics2-8b-chatty")
118
+
119
+ def forward(self, image_path: str, question: str | None = None) -> str:
120
+ output = ""
121
+ add_note = False
122
+ if not question:
123
+ add_note = True
124
+ question = "Please write a detailed caption for this image."
125
+ try:
126
+ output = process_images_and_text(image_path, question, self.client)
127
+ except Exception as e:
128
+ print(e)
129
+ if "Payload Too Large" in str(e):
130
+ new_image_path = resize_image(image_path)
131
+ output = process_images_and_text(new_image_path, question, self.client)
132
+
133
+ if add_note:
134
+ output = (
135
+ f"You did not provide a particular question, so here is a detailed caption for the image: {output}"
136
+ )
137
+
138
+ return output
139
+
140
+
141
+ @tool
142
+ def visualizer(image_path: str, question: str | None = None) -> str:
143
+ """A tool that can answer questions about attached images.
144
+
145
+ Args:
146
+ image_path: The path to the image on which to answer the question. This should be a local path to downloaded image.
147
+ question: The question to answer.
148
+ """
149
+ import mimetypes
150
+ import os
151
+
152
+ import requests
153
+
154
+ from .visual_qa import encode_image
155
+
156
+ add_note = False
157
+ if not question:
158
+ add_note = True
159
+ question = "Please write a detailed caption for this image."
160
+ if not isinstance(image_path, str):
161
+ raise Exception("You should provide at least `image_path` string argument to this tool!")
162
+
163
+ mime_type, _ = mimetypes.guess_type(image_path)
164
+ base64_image = encode_image(image_path)
165
+
166
+ payload = {
167
+ "model": "gpt-4o",
168
+ "messages": [
169
+ {
170
+ "role": "user",
171
+ "content": [
172
+ {"type": "text", "text": question},
173
+ {"type": "image_url", "image_url": {"url": f"data:{mime_type};base64,{base64_image}"}},
174
+ ],
175
+ }
176
+ ],
177
+ "max_tokens": 1000,
178
+ }
179
+ headers = {"Content-Type": "application/json", "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"}
180
+ response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
181
+ try:
182
+ output = response.json()["choices"][0]["message"]["content"]
183
+ except Exception:
184
+ raise Exception(f"Response format unexpected: {response.json()}")
185
+
186
+ if add_note:
187
+ output = f"You did not provide a particular question, so here is a detailed caption for the image: {output}"
188
+
189
+ return output