Spaces:
Runtime error
Runtime error
File size: 13,714 Bytes
ecbc0b3 5e739b3 ecbc0b3 e739e08 ecbc0b3 e739e08 ecbc0b3 4e998d8 ecbc0b3 4e998d8 ecbc0b3 4e998d8 ecbc0b3 4e998d8 ecbc0b3 5e739b3 ecbc0b3 5e739b3 ecbc0b3 0b6bcde ecbc0b3 0b6bcde ecbc0b3 0b6bcde ecbc0b3 0b6bcde | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 | import base64
import cv2
import io
import openai
import os
import requests
import whisper
import wikipedia
import yt_dlp
from dotenv import load_dotenv
from PIL import Image
from smolagents import CodeAgent, DuckDuckGoSearchTool, OpenAIServerModel, Tool, VisitWebpageTool
from youtube_transcript_api import YouTubeTranscriptApi
load_dotenv()
# database credentials
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
class MockResponse:
def __init__(self, content: bytes):
self.content = content
def get_file_content(file_id: str, url: str = None):
# Simulates download, I am using this because currently I am blocked from downloading too much
# Look for any file with that ID regardless of extension
folder_path = "files"
for filename in os.listdir(folder_path):
if filename.startswith(file_id):
file_path = os.path.join(folder_path, filename)
with open(file_path, "rb") as f:
content = f.read()
# Simulate response.content
return MockResponse(content)
class WikipediaSummaryTool(Tool):
name = "wikipedia_summary"
description = "Fetches a summary of a topic from Wikipedia."
inputs = {
"query": {
"type": "string",
"description": "The topic to search on Wikipedia."
}
}
output_type = "string"
def __init__(self):
wikipedia.set_lang("en")
def is_initialized(self) -> bool:
return True
def forward(self, query: str):
# Calls wikipedia api
response = wikipedia.summary(query)
return response
class WikipediaPageTool(Tool):
name = "wikipedia_page"
description = "Fetches the complete page of a topic from Wikipedia."
inputs = {
"query": {
"type": "string",
"description": "The topic to search on Wikipedia."
}
}
output_type = "string"
def __init__(self):
wikipedia.set_lang("en")
def is_initialized(self) -> bool:
return True
def forward(self, query: str):
# Calls wikipedia api
page = wikipedia.page(query)
return page.content
class YouTubeVisionAnalyzer(Tool):
name = "youtube_vision_analyzer"
description = "Analyzes visual content from YouTube videos by extracting and processing frames. It does not process audio or subtitles, and is best used for tasks involving objects, scenes, or visual patterns appearing in the video."
inputs = {
"video_url": {
"type": "string",
"description": "The URL of the YouTube video to process."
},
"user_query": {
"type": "string",
"description": "The user's query."
}
}
output_type = "string"
def __init__(self):
pass
def is_initialized(self) -> bool:
return True
@staticmethod
def download_youtube_video(url: str):
# Download the video using yt-dlp (saves as youtube_video.mp4)
ydl_opts = {
'format': 'mp4',
'outtmpl': 'youtube_video.mp4'
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return 'youtube_video.mp4'
@staticmethod
def extract_frames(video_path: str, output_dir="frames"):
os.makedirs(output_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * 5) # 5 seconds
frame_count = 0
saved_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
frame_filename = os.path.join(output_dir, f"frame_{saved_count:03d}.jpg")
cv2.imwrite(frame_filename, frame)
saved_count += 1
frame_count += 1
cap.release()
return output_dir
@staticmethod
def encode_image(image_path:str, new_size=512):
# Resize image to upper 512 pixels and return in base64 format
with Image.open(image_path) as image:
original_width, original_height = image.size
if original_width > original_height:
ratio = new_size / original_width
else:
ratio = new_size / original_height
new_width = int(original_width * ratio)
new_height = int(original_height * ratio)
resized_image = image.resize((new_width, new_height))
buffered = io.BytesIO()
resized_image.save(buffered, format='JPEG')
return base64.b64encode(buffered.getvalue()).decode('utf-8')
@staticmethod
def call_vision_llm(folder_path: str, user_query: str):
encoded_images = []
responses = []
model = OpenAIServerModel(
api_key=OPENAI_API_KEY,
model_id='gpt-4o-mini',
temperature=0,
)
for filename in sorted(os.listdir(folder_path)):
if filename.endswith(".jpg"):
img_path = os.path.join(folder_path, filename)
encoded_image = YouTubeVisionAnalyzer.encode_image(img_path)
encoded_images.append(encoded_image)
batch_size = 12
for i in range(0, len(encoded_images), batch_size):
batch = encoded_images[i:i+batch_size]
messages = [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an assistant analyzing image frames extracted from a video. If the user query refers to a video, remember these are frames from the video. Do not provide extra information or external inference.",
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_query,
},
*[
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}",
"detail": "low"
}
}
for encoded_image in batch
]
]
}
]
responses.append(model(messages).content)
messages = [
{
"role": "system",
"content": "You are a helpful assistant that summarizes and extracts the correct answer from multiple partial observations. Each partial response comes from analyzing a batch of video frames. Given the user's query and the list of partial responses, your task is to provide the best final answer to the user's query. Be concise in the final answer."
},
{
"role": "user",
"content": f"User's query:\n{user_query}.\n\nPartial responses:\n" + "\n".join(f"- {response}" for response in responses)
}
]
final_response = model(messages).content
return final_response
@staticmethod
def delete_video_file(video_path: str, folder_path: str):
if os.path.exists(video_path):
os.remove(video_path)
if os.path.exists(folder_path):
for filename in os.listdir(folder_path):
if filename.endswith(".jpg"):
file_path = os.path.join(folder_path, filename)
os.remove(file_path)
def forward(self, video_url: str, user_query: str):
# Process video: download, extract frames, detect objects, call llm
#video_path = YouTubeVisionAnalyzer.download_youtube_video(video_url)
video_path = 'youtube_video.mp4'
folder_path = YouTubeVisionAnalyzer.extract_frames(video_path)
response = YouTubeVisionAnalyzer.call_vision_llm(folder_path, user_query)
#YouTubeVisionAnalyzer.delete_video_file(video_path, folder_path)
return response
class YouTubeTranscriptTool(Tool):
name = "youtube_transcript_tool"
description = "Extracts textual transcripts (captions) from YouTube videos to analyze spoken content. This tool is useful for identifying what is said in the video, such as dialogue, spoken instructions, or narration. It does not analyze visual elements like scenes or objects. Pay attention because transcriptions may be truncated."
inputs = {
"video_url": {
"type": "string",
"description": "The YouTube video URL."
}
}
output_type = "string"
def __init__(self):
pass
def is_initialized(self) -> bool:
return True
def forward(self, video_url: str):
# Extract the video ID from the URL
# video_id = video_url.split("v=")[-1]
try:
# Fetch the transcript using YouTubeTranscriptApi
# transcript = YouTubeTranscriptApi.get_transcript(video_id)
# transcript = str([element['text'] for element in transcript])
transcript = """["Wow this coffee\'s great I was just", \'thinking that\', \'yeah is that cinnamon chicory\', \'tea oak\', \'[Music]\', "isn\'t that hot", \'extremely\']"""
return transcript
except Exception as e:
return str(e)
class AudioFileTranscriptTool(Tool):
name = "audio_file_transcript_tool"
description = "Extracts text transcripts from uploaded audio files (e.g., MP3, WAV). Use this tool to analyze spoken content from user-provided files, not from YouTube or video links. It only processes audio, not visual information."
inputs = {
"file_id": {
"type": "string",
"description": "Metadata required to download the audio."
},
"file_url": {
"type": "string",
"description": "Metadata required to download the audio."
},
}
output_type = "string"
def __init__(self):
# Load Whisper model
self.whisper_model = whisper.load_model("base", device="cpu")
def is_initialized(self) -> bool:
return True
def forward(self, file_id: str, file_url: str):
# Downloads an audio file and transcript it to text
#questions_files = f"{file_url}/files"
#response = requests.get(f"{questions_files}/{file_id}", timeout=15)
response = get_file_content(file_id, file_url)
# Save MP3 bytes to a file
with open("audio.mp3", "wb") as f:
f.write(response.content)
# Transcribe the audio
client = openai.OpenAI(api_key=OPENAI_API_KEY)
with open("audio.mp3", "rb") as f:
transcript = client.audio.transcriptions.create(
model="gpt-4o-mini-transcribe",
file=f,
language="en"
)
return transcript.text
class PythonFileDownloader(Tool):
name = "python_file_downloader"
description = "Downloads and stores a Python (.py) file locally as 'code.py' so it can be programmatically analyzed by the agent. This tool does not interpret or summarize the data itself — it only ensures the file is available in the environment."
inputs = {
"file_id": {
"type": "string",
"description": "Metadata required to download the file."
},
"file_url": {
"type": "string",
"description": "Metadata required to download the file."
},
}
output_type = "string"
def __init__(self):
pass
def is_initialized(self) -> bool:
return True
def forward(self, file_id: str, file_url: str):
# Downloads a python file and decode it
#questions_files = f"{file_url}/files"
#response = requests.get(f"{questions_files}/{file_id}", timeout=15)
response = get_file_content(file_id, file_url)
# Save bytes to a Python file
with open("code.py", "wb") as f:
f.write(response.content)
return "The file is available as 'code.py'."
class ExcelFileLoader(Tool):
name = "excel_file_loader"
description = "Downloads and stores an Excel spreadsheet (.xlsx) locally as 'sheet.xlsx' so it can be programmatically analyzed by the agent using tools like pandas. This tool does not interpret or summarize the data itself — it only ensures the file is available in the environment."
inputs = {
"file_id": {
"type": "string",
"description": "Metadata required to download the file."
},
"file_url": {
"type": "string",
"description": "Metadata required to download the file."
},
}
output_type = "string"
def __init__(self):
pass
def is_initialized(self) -> bool:
return True
def forward(self, file_id: str, file_url: str):
# Downloads a spreadsheet and saves it
#questions_files = f"{file_url}/files"
#response = requests.get(f"{questions_files}/{file_id}", timeout=15)
response = get_file_content(file_id, file_url)
# Save bytes to a spreadsheet file
with open("sheet.xlsx", "wb") as f:
f.write(response.content)
return "The file is available as 'sheet.xlsx'." |