Karim0111 commited on
Commit
514151b
·
verified ·
1 Parent(s): 8c4d2b1

Create tools.py

Browse files
Files changed (1) hide show
  1. tools.py +431 -0
tools.py ADDED
@@ -0,0 +1,431 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import tempfile
2
+ import requests
3
+ import os
4
+
5
+ from time import sleep
6
+ from urllib.parse import urlparse
7
+ from typing import Optional, List
8
+ import yt_dlp
9
+ import imageio
10
+ from google.genai import types
11
+
12
+ from PIL import Image
13
+ from smolagents import CodeAgent, tool, OpenAIServerModel, LiteLLMModel
14
+ from google import genai
15
+ from dotenv import load_dotenv
16
+
17
+ load_dotenv()
18
+
19
+ # def save_screenshot(memory_step: ActionStep, agent: CodeAgent) -> None:
20
+ # sleep(1.0) # Let JavaScript animations happen before taking the screenshot
21
+ # driver = helium.get_driver()
22
+ # current_step = memory_step.step_number
23
+ # if driver is not None:
24
+ # for previous_memory_step in agent.memory.steps: # Remove previous screenshots from logs for lean processing
25
+ # if isinstance(previous_memory_step, ActionStep) and previous_memory_step.step_number <= current_step - 2:
26
+ # previous_memory_step.observations_images = None
27
+ # png_bytes = driver.get_screenshot_as_png()
28
+ # image = Image.open(BytesIO(png_bytes))
29
+ # print(f"Captured a browser screenshot: {image.size} pixels")
30
+ # memory_step.observations_images = [image.copy()] # Create a copy to ensure it persists, important!
31
+
32
+ # # Update observations with current URL
33
+ # url_info = f"Current url: {driver.current_url}"
34
+ # memory_step.observations = (
35
+ # url_info if memory_step.observations is None else memory_step.observations + "\n" + url_info
36
+ # )
37
+ # return
38
+
39
+ # def initialize_driver():
40
+ # """Initialize the Selenium WebDriver."""
41
+ # chrome_options = webdriver.ChromeOptions()
42
+ # chrome_options.add_argument("--force-device-scale-factor=1")
43
+ # chrome_options.add_argument("--window-size=1000,1350")
44
+ # chrome_options.add_argument("--disable-pdf-viewer")
45
+ # chrome_options.add_argument("--window-position=0,0")
46
+ # return helium.start_chrome(headless=False, options=chrome_options)
47
+
48
+ # @tool
49
+ # def search_item_ctrl_f(text: str, nth_result: int = 1) -> str:
50
+ # """
51
+ # Searches for text on the current page via Ctrl + F and jumps to the nth occurrence.
52
+ # Args:
53
+ # text: The text to search for
54
+ # nth_result: Which occurrence to jump to (default: 1)
55
+ # """
56
+ # elements = driver.find_elements(By.XPATH, f"//*[contains(text(), '{text}')]")
57
+ # if nth_result > len(elements):
58
+ # raise Exception(f"Match n°{nth_result} not found (only {len(elements)} matches found)")
59
+ # result = f"Found {len(elements)} matches for '{text}'."
60
+ # elem = elements[nth_result - 1]
61
+ # driver.execute_script("arguments[0].scrollIntoView(true);", elem)
62
+ # result += f"Focused on element {nth_result} of {len(elements)}"
63
+ # return result
64
+
65
+
66
+ # @tool
67
+ # def go_back() -> None:
68
+ # """Used when navigating web pages using Helium. Goes back to previous page."""
69
+ # driver.back()
70
+
71
+
72
+ # @tool
73
+ # def close_popups() -> str:
74
+ # """
75
+ # Closes any visible modal or pop-up on the page. Use this to dismiss pop-up windows! This does not work on cookie consent banners.
76
+ # """
77
+ # webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform()
78
+
79
+ @tool
80
+ def use_vision_model(question: str, images: List[Image.Image]) -> str:
81
+ """
82
+ Use a Vision Model to answer a question about a set of images.
83
+ Always use this tool to ask questions about a set of images you have been provided.
84
+ This function uses an image-to-text AI model.
85
+ You can ask a question about a list of one image or a list of multiple images.
86
+ So, if you have multiple images that you want to ask the same question of, pass the entire list of images to the model.
87
+ Ensure your prompt is specific enough to retrieve the exact information you are looking for.
88
+
89
+ Args:
90
+ question: The question to ask about the images. Type: str
91
+ images: The list of images to as the question about. Type: List[PIL.Image.Image]
92
+ """
93
+ image_model_name = "gemini/gemini-1.5-flash"
94
+
95
+ print(f'Leveraging model {image_model_name}')
96
+ # image_model_name = 'gemma3:12b'
97
+ # image_model = OpenAIServerModel(
98
+ # model_id=image_model_name,
99
+ # api_base='http://localhost:11434/v1/',
100
+ # api_key='ollama',
101
+ # flatten_messages_as_text=False
102
+ # )
103
+ image_model =LiteLLMModel(model_id=image_model_name,
104
+ api_key=os.getenv("GEMINI_KEY"),
105
+ temperature=0.2
106
+ )
107
+
108
+ content = [
109
+ {
110
+ "type": "text",
111
+ "text": question
112
+ }
113
+ ]
114
+ print(f"Asking model a question about {len(images)} images")
115
+ for image in images:
116
+ content.append({
117
+ "type": "image",
118
+ "image": image # ✅ Directly the PIL Image, no wrapping
119
+ })
120
+
121
+ messages = [
122
+ {
123
+ "role": "user",
124
+ "content": content
125
+ }
126
+ ]
127
+
128
+ output = image_model(messages).content
129
+ print(f'Model returned: {output}')
130
+ return output
131
+
132
+ @tool
133
+ def review_youtube_video(url: str, question: str) -> str:
134
+ """
135
+ Reviews a YouTube video and answers a specific question about that video.
136
+
137
+ Args:
138
+ url (str): the URL to the YouTube video. Should be like this format: https://www.youtube.com/watch?v=9hE5-98ZeCg
139
+ question (str): The question you are asking about the video
140
+ """
141
+ try:
142
+ client = genai.Client(api_key=os.getenv('GEMINI_KEY'))
143
+ model = 'gemini-2.0-flash-lite'
144
+ response = client.models.generate_content(
145
+ model=model,
146
+ contents=types.Content(
147
+ parts=[
148
+ types.Part(
149
+ file_data=types.FileData(file_uri=url)
150
+ ),
151
+ types.Part(text=question)
152
+ ]
153
+ )
154
+ )
155
+ return response.text
156
+ except Exception as e:
157
+ return f"Error asking {model} about video: {str(e)}"
158
+
159
+ @tool
160
+ def youtube_frames_to_images(url: str, sample_interval_seconds: int = 5) -> List[Image.Image]:
161
+ """
162
+ Reviews a YouTube video and returns a List of PIL Images (List[PIL.Image.Image]), which can then be reviewed by a vision model.
163
+ Only use this tool if you have been given a YouTube video that you need to analyze.
164
+ This will generate a list of images, and you can use the use_vision_model tool to analyze those images
165
+ Args:
166
+ url: The Youtube URL
167
+ sample_interval_seconds: The sampling interval (default is 5 seconds)
168
+ """
169
+ with tempfile.TemporaryDirectory() as tmpdir:
170
+ # Download the video locally
171
+ ydl_opts = {
172
+ 'format': 'bestvideo[height<=1080]+bestaudio/best[height<=1080]/best',
173
+ 'outtmpl': os.path.join(tmpdir, 'video.%(ext)s'),
174
+ 'quiet': True,
175
+ 'noplaylist': True,
176
+ 'merge_output_format': 'mp4',
177
+ 'force_ipv4': True, # Avoid IPv6 issues
178
+ }
179
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
180
+ info = ydl.extract_info(url, download=True)
181
+
182
+ # Find the downloaded file
183
+ video_path = None
184
+ for file in os.listdir(tmpdir):
185
+ if file.endswith('.mp4'):
186
+ video_path = os.path.join(tmpdir, file)
187
+ break
188
+
189
+ if not video_path:
190
+ raise RuntimeError("Failed to download video as mp4")
191
+
192
+ # ✅ Fix: Use `imageio.get_reader()` instead of `imopen()`
193
+ reader = imageio.get_reader(video_path) # Works for frame-by-frame iteration
194
+ metadata = reader.get_meta_data()
195
+ fps = metadata.get('fps')
196
+
197
+ if fps is None:
198
+ reader.close()
199
+ raise RuntimeError("Unable to determine FPS from video metadata")
200
+
201
+ frame_interval = int(fps * sample_interval_seconds)
202
+ images: List[Image.Image] = []
203
+
204
+ # ✅ Iterate over frames using `get_reader()`
205
+ for idx, frame in enumerate(reader):
206
+ if idx % frame_interval == 0:
207
+ images.append(Image.fromarray(frame))
208
+
209
+ reader.close()
210
+ return images
211
+
212
+ @tool
213
+ def read_file(filepath: str ) -> str:
214
+ """
215
+ Used to read the content of a file. Returns the content as a string.
216
+ Will only work for text-based files, such as .txt files or code files.
217
+ Do not use for audio or visual files.
218
+
219
+ Args:
220
+ filepath (str): The path to the file to be read.
221
+
222
+ Returns:
223
+ str: Content of the file as a string.
224
+
225
+ Raises:
226
+ IOError: If there is an error opening or reading from the file.
227
+ """
228
+ try:
229
+ with open(filepath, 'r', encoding='utf-8') as file:
230
+ content = file.read()
231
+ print(content)
232
+ return content
233
+ except FileNotFoundError:
234
+ print(f"File not found: {filepath}")
235
+ except IOError as e:
236
+ print(f"Error reading file: {str(e)}")
237
+
238
+ @tool
239
+ def download_file_from_url(url: str, filename: Optional[str] = None) -> str:
240
+ """
241
+ Download a file from a URL and save it to a temporary location.
242
+ Use this tool when you are asked a question and told that there is a file or image provided.
243
+
244
+
245
+ Args:
246
+ url: The URL to download from
247
+ filename: Optional filename, will generate one based on URL if not provided
248
+
249
+ Returns:
250
+ Path to the downloaded file
251
+ """
252
+ try:
253
+ # Parse URL to get filename if not provided
254
+ print(f"Downloading file from {url}")
255
+ if not filename:
256
+ path = urlparse(url).path
257
+ filename = os.path.basename(path)
258
+ if not filename:
259
+ # Generate a random name if we couldn't extract one
260
+ import uuid
261
+ filename = f"downloaded_{uuid.uuid4().hex[:8]}"
262
+
263
+ # Create temporary file
264
+ temp_dir = tempfile.gettempdir()
265
+ filepath = os.path.join(temp_dir, filename)
266
+
267
+ # Download the file
268
+ response = requests.get(url, stream=True)
269
+ response.raise_for_status()
270
+
271
+ # Save the file
272
+ with open(filepath, 'wb') as f:
273
+ for chunk in response.iter_content(chunk_size=8192):
274
+ f.write(chunk)
275
+
276
+ return f"File downloaded to {filepath}. You can now process this file."
277
+ except Exception as e:
278
+ return f"Error downloading file: {str(e)}"
279
+
280
+ @tool
281
+ def extract_text_from_image(image_path: str) -> str:
282
+ """
283
+ Extract text from an image using pytesseract (if available).
284
+
285
+ Args:
286
+ image_path: Path to the image file
287
+
288
+ Returns:
289
+ Extracted text or error message
290
+ """
291
+ try:
292
+ # Try to import pytesseract
293
+ import pytesseract
294
+ from PIL import Image
295
+
296
+ # Open the image
297
+ image = Image.open(image_path)
298
+
299
+ # Extract text
300
+ text = pytesseract.image_to_string(image)
301
+
302
+ return f"Extracted text from image:\n\n{text}"
303
+ except ImportError:
304
+ return "Error: pytesseract is not installed. Please install it with 'pip install pytesseract' and ensure Tesseract OCR is installed on your system."
305
+ except Exception as e:
306
+ return f"Error extracting text from image: {str(e)}"
307
+
308
+ @tool
309
+ def analyze_csv_file(file_path: str, query: str) -> str:
310
+ """
311
+ Analyze a CSV file using pandas and answer a question about it.
312
+ To use this file you need to have saved it in a location and pass that location to the function.
313
+ The download_file_from_url tool will save it by name to tempfile.gettempdir()
314
+
315
+ Args:
316
+ file_path: Path to the CSV file
317
+ query: Question about the data
318
+
319
+ Returns:
320
+ Analysis result or error message
321
+ """
322
+ try:
323
+ import pandas as pd
324
+
325
+ # Read the CSV file
326
+ df = pd.read_csv(file_path)
327
+
328
+ # Run various analyses based on the query
329
+ result = f"CSV file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
330
+ result += f"Columns: {', '.join(df.columns)}\n\n"
331
+
332
+ # Add summary statistics
333
+ result += "Summary statistics:\n"
334
+ result += str(df.describe())
335
+
336
+ return result
337
+ except ImportError:
338
+ return "Error: pandas is not installed. Please install it with 'pip install pandas'."
339
+ except Exception as e:
340
+ return f"Error analyzing CSV file: {str(e)}"
341
+
342
+ @tool
343
+ def analyze_excel_file(file_path: str, query: str) -> str:
344
+ """
345
+ Analyze an Excel file using pandas and answer a question about it.
346
+ To use this file you need to have saved it in a location and pass that location to the function.
347
+ The download_file_from_url tool will save it by name to tempfile.gettempdir()
348
+
349
+ Args:
350
+ file_path: Path to the Excel file
351
+ query: Question about the data
352
+
353
+ Returns:
354
+ Analysis result or error message
355
+ """
356
+ try:
357
+ import pandas as pd
358
+
359
+ # Read the Excel file
360
+ df = pd.read_excel(file_path)
361
+
362
+ # Run various analyses based on the query
363
+ result = f"Excel file loaded with {len(df)} rows and {len(df.columns)} columns.\n"
364
+ result += f"Columns: {', '.join(df.columns)}\n\n"
365
+
366
+ # Add summary statistics
367
+ result += "Summary statistics:\n"
368
+ result += str(df.describe())
369
+
370
+ return result
371
+ except ImportError:
372
+ return "Error: pandas and openpyxl are not installed. Please install them with 'pip install pandas openpyxl'."
373
+ except Exception as e:
374
+ return f"Error analyzing Excel file: {str(e)}"
375
+
376
+ import whisper
377
+
378
+ @tool
379
+ def youtube_transcribe(url: str) -> str:
380
+ """
381
+ Transcribes a YouTube video. Use when you need to process the audio from a YouTube video into Text.
382
+
383
+ Args:
384
+ url: Url of the YouTube video
385
+ """
386
+ model_size: str = "small"
387
+ # Load model
388
+ model = whisper.load_model(model_size)
389
+ with tempfile.TemporaryDirectory() as tmpdir:
390
+ # Download audio
391
+ ydl_opts = {
392
+ 'format': 'bestaudio/best',
393
+ 'outtmpl': os.path.join(tmpdir, 'audio.%(ext)s'),
394
+ 'quiet': True,
395
+ 'noplaylist': True,
396
+ 'postprocessors': [{
397
+ 'key': 'FFmpegExtractAudio',
398
+ 'preferredcodec': 'wav',
399
+ 'preferredquality': '192',
400
+ }],
401
+ 'force_ipv4': True,
402
+ }
403
+ with yt_dlp.YoutubeDL(ydl_opts) as ydl:
404
+ info = ydl.extract_info(url, download=True)
405
+
406
+ audio_path = next((os.path.join(tmpdir, f) for f in os.listdir(tmpdir) if f.endswith('.wav')), None)
407
+ if not audio_path:
408
+ raise RuntimeError("Failed to find audio")
409
+
410
+ # Transcribe
411
+ result = model.transcribe(audio_path)
412
+ return result['text']
413
+
414
+ @tool
415
+ def transcribe_audio(audio_file_path: str) -> str:
416
+ """
417
+ Transcribes an audio file. Use when you need to process audio data.
418
+ DO NOT use this tool for YouTube video; use the youtube_transcribe tool to process audio data from YouTube.
419
+ Use this tool when you have an audio file in .mp3, .wav, .aac, .ogg, .flac, .m4a, .alac or .wma
420
+
421
+ Args:
422
+ audio_file_path: Filepath to the audio file (str)
423
+ """
424
+ model_size: str = "small"
425
+ # Load model
426
+ model = whisper.load_model(model_size)
427
+ result = model.transcribe(audio_file_path)
428
+ return result['text']
429
+
430
+ # global driver
431
+ # driver = initialize_driver()