DenviWorking commited on
Commit
c5ed555
·
verified ·
1 Parent(s): 7254638

Create tools.py

Browse files
Files changed (1) hide show
  1. tools.py +355 -0
tools.py ADDED
@@ -0,0 +1,355 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import requests
2
+ import io
3
+ import base64
4
+ import openai
5
+ from openai import OpenAI
6
+ from smolagents import tool
7
+ import os
8
+
9
+ import io, time, itertools, functools
10
+ from typing import List, Optional
11
+ import sys, contextlib
12
+
13
+ import av
14
+ from pytube import YouTube
15
+ from yt_dlp import YoutubeDL
16
+
17
+ from PIL import Image
18
+ from tqdm import tqdm
19
+ import wikipediaapi
20
+ import tempfile
21
+
22
+ model_id = "gpt-4.1"
23
+
24
+
25
+ @tool
26
+ def read_image(query: str, img_url: str) -> str:
27
+ """
28
+ Use a visual question answering (VQA) model to generate a response to a query based on an image.
29
+ Args:
30
+ query (str): A natural language question about the image.
31
+ img_url (str): The URL of the image to analyze.
32
+ Returns:
33
+ str: A response generated by the VQA model based on the provided image and question.
34
+ """
35
+ client = OpenAI()
36
+ response = client.responses.create(
37
+ model=model_id,
38
+ input=[
39
+ {
40
+ "role": "user",
41
+ "content": [
42
+ {"type": "input_text", "text": query},
43
+ {
44
+ "type": "input_image",
45
+ "image_url": img_url,
46
+ },
47
+ ],
48
+ }
49
+ ],
50
+ )
51
+ return response.output_text
52
+
53
+
54
+ @tool
55
+ def read_code(file_url: str) -> str:
56
+ """
57
+ Read the contents of a code file such as py file instead of executing it. Use this tool to analyze a code snippet.
58
+ Args:
59
+ file_url (str): The URL of the code file to retrieve.
60
+ Returns:
61
+ str: The content of the file as a string.
62
+ """
63
+ response = requests.get(file_url)
64
+ response.raise_for_status()
65
+ return response.text
66
+
67
+
68
+ @tool
69
+ def transcribe_audio(file_url: str, file_name: str) -> str:
70
+ """
71
+ Download and transcribe an audio file using transcription model.
72
+ Args:
73
+ file_url (str): Direct URL to the audio file (e.g., .mp3, .wav).
74
+ file_name (str): Filename including extension, used to determine format.
75
+ Returns:
76
+ str: The transcribed text from the audio file.
77
+ """
78
+ response = requests.get(file_url)
79
+ response.raise_for_status()
80
+
81
+ extension = file_name.split(".")[-1].lower() or "mp3"
82
+
83
+ audio_file = io.BytesIO(response.content)
84
+ audio_file.name = f"audio.{extension}"
85
+
86
+ client = OpenAI()
87
+ transcription = client.audio.transcriptions.create(
88
+ model="gpt-4o-transcribe", file=audio_file
89
+ )
90
+
91
+ return transcription.text
92
+
93
+
94
+ def _pytube_buffer(url: str) -> Optional[io.BytesIO]:
95
+ try:
96
+ from pytube import YouTube
97
+
98
+ yt = YouTube(url)
99
+ stream = (
100
+ yt.streams.filter(progressive=True, file_extension="mp4")
101
+ .order_by("resolution")
102
+ .desc()
103
+ .first()
104
+ )
105
+ if stream is None:
106
+ raise RuntimeError("No MP4 with audio found")
107
+ buf = io.BytesIO()
108
+ stream.stream_to_buffer(buf)
109
+ buf.seek(0)
110
+ return buf
111
+ except Exception as e:
112
+ print(f"[youtube_to_buffer] PyTube failed → {e}", file=sys.stderr)
113
+ return None
114
+
115
+
116
+ def _ytdlp_buffer(url: str) -> io.BytesIO:
117
+ """
118
+ Return a BytesIO containing some MP4 video stream for `url`.
119
+ Works whether YouTube serves a progressive file or separate A/V.
120
+ """
121
+ ydl_opts = {
122
+ "quiet": True,
123
+ "skip_download": True,
124
+ "format": "bestvideo[ext=mp4]/best[ext=mp4]/best",
125
+ }
126
+ with YoutubeDL(ydl_opts) as ydl:
127
+ info = ydl.extract_info(url, download=False)
128
+ if "entries" in info:
129
+ info = info["entries"][0]
130
+
131
+ if "url" in info:
132
+ video_urls = [info["url"]]
133
+
134
+ elif "requested_formats" in info:
135
+ video_urls = [
136
+ fmt["url"]
137
+ for fmt in info["requested_formats"]
138
+ if fmt.get("vcodec") != "none"
139
+ ]
140
+ if not video_urls:
141
+ raise RuntimeError("yt-dlp returned audio-only formats")
142
+
143
+ else:
144
+ raise RuntimeError("yt-dlp could not extract a stream URL")
145
+
146
+ buf = io.BytesIO()
147
+ for direct_url in video_urls:
148
+ with requests.get(direct_url, stream=True) as r:
149
+ r.raise_for_status()
150
+ for chunk in r.iter_content(chunk_size=1 << 16):
151
+ buf.write(chunk)
152
+
153
+ buf.seek(0)
154
+ return buf
155
+
156
+
157
+ @functools.lru_cache(maxsize=8)
158
+ def youtube_to_buffer(url: str) -> io.BytesIO:
159
+ """
160
+ Return a BytesIO containing a single progressive MP4
161
+ (H.264 + AAC) – the safest thing PyAV can open everywhere.
162
+ """
163
+ ydl_opts = {
164
+ "quiet": True,
165
+ "skip_download": True,
166
+ "format": (
167
+ "best[ext=mp4][vcodec^=avc1][acodec!=none]" "/best[ext=mp4][acodec!=none]"
168
+ ),
169
+ }
170
+
171
+ with YoutubeDL(ydl_opts) as ydl:
172
+ info = ydl.extract_info(url, download=False)
173
+ if "entries" in info:
174
+ info = info["entries"][0]
175
+
176
+ direct_url = info.get("url")
177
+ if not direct_url:
178
+ raise RuntimeError("yt-dlp could not find a progressive MP4 track")
179
+
180
+ buf = io.BytesIO()
181
+ with requests.get(direct_url, stream=True) as r:
182
+ r.raise_for_status()
183
+ for chunk in r.iter_content(chunk_size=1 << 17):
184
+ buf.write(chunk)
185
+
186
+ buf.seek(0)
187
+ return buf
188
+
189
+
190
+ def sample_frames(video_bytes: io.BytesIO, n_frames: int = 6) -> List[Image.Image]:
191
+ """Decode `n_frames` uniformly spaced RGB frames as PIL images."""
192
+ container = av.open(video_bytes, metadata_errors="ignore")
193
+ video = container.streams.video[0]
194
+ total = video.frames or 0
195
+
196
+ step = max(1, total // n_frames) if total else 30
197
+
198
+ frames: list[Image.Image] = []
199
+ for i, frame in enumerate(container.decode(video=0)):
200
+ if i % step == 0:
201
+ frames.append(frame.to_image())
202
+ if len(frames) >= n_frames:
203
+ break
204
+ container.close()
205
+ return frames
206
+
207
+
208
+ def pil_to_data_url(img: Image.Image, quality: int = 80) -> str:
209
+ buf = io.BytesIO()
210
+ img.save(buf, format="JPEG", quality=quality, optimize=True)
211
+ b64 = base64.b64encode(buf.getvalue()).decode()
212
+ return f"data:image/jpeg;base64,{b64}"
213
+
214
+
215
+ def save_audio_stream_to_temp_wav_file(video_bytes: io.BytesIO) -> Optional[str]:
216
+ """
217
+ Extracts the audio stream from video_bytes, saves it as a temporary WAV file,
218
+ and returns the path to the file.
219
+ Returns None if no audio stream is found or an error occurs.
220
+ """
221
+ try:
222
+ video_bytes.seek(0)
223
+ input_container = av.open(video_bytes, metadata_errors="ignore")
224
+
225
+ if not input_container.streams.audio:
226
+ print("No audio streams found in the video.", file=sys.stderr)
227
+ return None
228
+ input_audio_stream = input_container.streams.audio[0]
229
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file:
230
+ temp_audio_file_path = tmp_file.name
231
+
232
+ output_container = av.open(temp_audio_file_path, mode="w", format="wav")
233
+
234
+ channel_layout = "stereo"
235
+ if (
236
+ hasattr(input_audio_stream.codec_context, "layout")
237
+ and input_audio_stream.codec_context.layout
238
+ ):
239
+ channel_layout = input_audio_stream.codec_context.layout.name
240
+ elif (
241
+ hasattr(input_audio_stream.codec_context, "channels")
242
+ and input_audio_stream.codec_context.channels == 1
243
+ ):
244
+ channel_layout = "mono"
245
+
246
+ output_audio_stream = output_container.add_stream(
247
+ "pcm_s16le",
248
+ rate=input_audio_stream.codec_context.sample_rate,
249
+ layout=channel_layout,
250
+ )
251
+
252
+ for frame in input_container.decode(input_audio_stream):
253
+ for packet in output_audio_stream.encode(frame):
254
+ output_container.mux(packet)
255
+
256
+ for packet in output_audio_stream.encode():
257
+ output_container.mux(packet)
258
+
259
+ output_container.close()
260
+ input_container.close()
261
+ return temp_audio_file_path
262
+
263
+ except Exception as e:
264
+ print(f"Error extracting audio to temp WAV file: {e}", file=sys.stderr)
265
+ if "temp_audio_file_path" in locals() and os.path.exists(temp_audio_file_path):
266
+ os.remove(temp_audio_file_path)
267
+ return None
268
+
269
+
270
+ @tool
271
+ def run_video(query: str, url: str) -> str:
272
+ """
273
+ Get a YouTube video from url and return an answer to a natural-language query using the video.
274
+ Args:
275
+ query (str): A natural-language question whose answer is expected to be found in the visual content of the video.
276
+ url (str): Fully qualified URL of the YouTube video to analyze.
277
+ Returns:
278
+ str: A response generated by the VQA model based on the provided video and question.
279
+ """
280
+ n_frames = 4
281
+ buff = youtube_to_buffer(url)
282
+ if buff is None:
283
+ return "Error: Could not download or buffer the video."
284
+
285
+ frames = sample_frames(buff, n_frames=n_frames)
286
+ buff.seek(0)
287
+
288
+ transcript = "[Audio could not be processed]"
289
+ audio_file_path = None
290
+ try:
291
+ audio_file_path = save_audio_stream_to_temp_wav_file(buff)
292
+ if audio_file_path:
293
+ with open(audio_file_path, "rb") as audio_data:
294
+ transcription_response = openai.audio.transcriptions.create(
295
+ model="gpt-4o-transcribe", file=audio_data
296
+ )
297
+ transcript = transcription_response.text
298
+ else:
299
+ transcript = "[No audio stream found or error during extraction]"
300
+ print(
301
+ "No audio file path returned, skipping transcription.", file=sys.stderr
302
+ )
303
+ except Exception as e:
304
+ print(f"Error during audio transcription: {e}", file=sys.stderr)
305
+ transcript = f"[Error during audio transcription: {e}]"
306
+ finally:
307
+ if audio_file_path and os.path.exists(audio_file_path):
308
+ os.remove(audio_file_path)
309
+
310
+ prompt_text = f"Original Query: {query}\n\nVideo Transcript:\n{transcript}\n\nKey Visual Frames (analyze these along with the transcript to answer the query):"
311
+
312
+ content = [{"type": "text", "text": prompt_text}]
313
+
314
+ for img in frames:
315
+ content.append(
316
+ {
317
+ "type": "image_url",
318
+ "image_url": {"url": pil_to_data_url(img)},
319
+ }
320
+ )
321
+
322
+ try:
323
+ resp = openai.chat.completions.create(
324
+ model=model_id,
325
+ messages=[{"role": "user", "content": content}],
326
+ temperature=0.1,
327
+ )
328
+ result = resp.choices[0].message.content.strip()
329
+ except Exception as e:
330
+ print(f"Error calling OpenAI API: {e}", file=sys.stderr)
331
+ result = f"[Error processing with AI model: {e}]"
332
+
333
+ return result
334
+
335
+
336
+ @tool
337
+ def search_wikipedia(query: str) -> str:
338
+ """
339
+ get the contents of wikipedia page retrieved by search query.
340
+ Args:
341
+ query (str): A search term to search within wikipedia. Ideally it should be one word or a group of few words.
342
+ Returns:
343
+ str: The text content of wikipedia page
344
+ """
345
+ get_wiki = wikipediaapi.Wikipedia(
346
+ language="en",
347
+ user_agent="test_tokki",
348
+ extract_format=wikipediaapi.ExtractFormat.WIKI,
349
+ )
350
+ page_content = get_wiki.page(query)
351
+ text_content = page_content.text
352
+
353
+ cutoff = 25000
354
+ text_content = " ".join(text_content.split(" ")[:cutoff])
355
+ return text_content