silasyl's picture
switched model back to gpt-4o-mini due to poor performance / adding ytb videos due to block
4e998d8
Raw
History Blame Contribute Delete
13.7 kB
import base64
import cv2
import io
import openai
import os
import requests
import whisper
import wikipedia
import yt_dlp
from dotenv import load_dotenv
from PIL import Image
from smolagents import CodeAgent, DuckDuckGoSearchTool, OpenAIServerModel, Tool, VisitWebpageTool
from youtube_transcript_api import YouTubeTranscriptApi
load_dotenv()
# database credentials
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY
class MockResponse:
def __init__(self, content: bytes):
self.content = content
def get_file_content(file_id: str, url: str = None):
# Simulates download, I am using this because currently I am blocked from downloading too much
# Look for any file with that ID regardless of extension
folder_path = "files"
for filename in os.listdir(folder_path):
if filename.startswith(file_id):
file_path = os.path.join(folder_path, filename)
with open(file_path, "rb") as f:
content = f.read()
# Simulate response.content
return MockResponse(content)
class WikipediaSummaryTool(Tool):
name = "wikipedia_summary"
description = "Fetches a summary of a topic from Wikipedia."
inputs = {
"query": {
"type": "string",
"description": "The topic to search on Wikipedia."
}
}
output_type = "string"
def __init__(self):
wikipedia.set_lang("en")
def is_initialized(self) -> bool:
return True
def forward(self, query: str):
# Calls wikipedia api
response = wikipedia.summary(query)
return response
class WikipediaPageTool(Tool):
name = "wikipedia_page"
description = "Fetches the complete page of a topic from Wikipedia."
inputs = {
"query": {
"type": "string",
"description": "The topic to search on Wikipedia."
}
}
output_type = "string"
def __init__(self):
wikipedia.set_lang("en")
def is_initialized(self) -> bool:
return True
def forward(self, query: str):
# Calls wikipedia api
page = wikipedia.page(query)
return page.content
class YouTubeVisionAnalyzer(Tool):
name = "youtube_vision_analyzer"
description = "Analyzes visual content from YouTube videos by extracting and processing frames. It does not process audio or subtitles, and is best used for tasks involving objects, scenes, or visual patterns appearing in the video."
inputs = {
"video_url": {
"type": "string",
"description": "The URL of the YouTube video to process."
},
"user_query": {
"type": "string",
"description": "The user's query."
}
}
output_type = "string"
def __init__(self):
pass
def is_initialized(self) -> bool:
return True
@staticmethod
def download_youtube_video(url: str):
# Download the video using yt-dlp (saves as youtube_video.mp4)
ydl_opts = {
'format': 'mp4',
'outtmpl': 'youtube_video.mp4'
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
return 'youtube_video.mp4'
@staticmethod
def extract_frames(video_path: str, output_dir="frames"):
os.makedirs(output_dir, exist_ok=True)
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS)
frame_interval = int(fps * 5) # 5 seconds
frame_count = 0
saved_count = 0
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
if frame_count % frame_interval == 0:
frame_filename = os.path.join(output_dir, f"frame_{saved_count:03d}.jpg")
cv2.imwrite(frame_filename, frame)
saved_count += 1
frame_count += 1
cap.release()
return output_dir
@staticmethod
def encode_image(image_path:str, new_size=512):
# Resize image to upper 512 pixels and return in base64 format
with Image.open(image_path) as image:
original_width, original_height = image.size
if original_width > original_height:
ratio = new_size / original_width
else:
ratio = new_size / original_height
new_width = int(original_width * ratio)
new_height = int(original_height * ratio)
resized_image = image.resize((new_width, new_height))
buffered = io.BytesIO()
resized_image.save(buffered, format='JPEG')
return base64.b64encode(buffered.getvalue()).decode('utf-8')
@staticmethod
def call_vision_llm(folder_path: str, user_query: str):
encoded_images = []
responses = []
model = OpenAIServerModel(
api_key=OPENAI_API_KEY,
model_id='gpt-4o-mini',
temperature=0,
)
for filename in sorted(os.listdir(folder_path)):
if filename.endswith(".jpg"):
img_path = os.path.join(folder_path, filename)
encoded_image = YouTubeVisionAnalyzer.encode_image(img_path)
encoded_images.append(encoded_image)
batch_size = 12
for i in range(0, len(encoded_images), batch_size):
batch = encoded_images[i:i+batch_size]
messages = [
{
"role": "system",
"content": [
{
"type": "text",
"text": "You are an assistant analyzing image frames extracted from a video. If the user query refers to a video, remember these are frames from the video. Do not provide extra information or external inference.",
}
]
},
{
"role": "user",
"content": [
{
"type": "text",
"text": user_query,
},
*[
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{encoded_image}",
"detail": "low"
}
}
for encoded_image in batch
]
]
}
]
responses.append(model(messages).content)
messages = [
{
"role": "system",
"content": "You are a helpful assistant that summarizes and extracts the correct answer from multiple partial observations. Each partial response comes from analyzing a batch of video frames. Given the user's query and the list of partial responses, your task is to provide the best final answer to the user's query. Be concise in the final answer."
},
{
"role": "user",
"content": f"User's query:\n{user_query}.\n\nPartial responses:\n" + "\n".join(f"- {response}" for response in responses)
}
]
final_response = model(messages).content
return final_response
@staticmethod
def delete_video_file(video_path: str, folder_path: str):
if os.path.exists(video_path):
os.remove(video_path)
if os.path.exists(folder_path):
for filename in os.listdir(folder_path):
if filename.endswith(".jpg"):
file_path = os.path.join(folder_path, filename)
os.remove(file_path)
def forward(self, video_url: str, user_query: str):
# Process video: download, extract frames, detect objects, call llm
#video_path = YouTubeVisionAnalyzer.download_youtube_video(video_url)
video_path = 'youtube_video.mp4'
folder_path = YouTubeVisionAnalyzer.extract_frames(video_path)
response = YouTubeVisionAnalyzer.call_vision_llm(folder_path, user_query)
#YouTubeVisionAnalyzer.delete_video_file(video_path, folder_path)
return response
class YouTubeTranscriptTool(Tool):
name = "youtube_transcript_tool"
description = "Extracts textual transcripts (captions) from YouTube videos to analyze spoken content. This tool is useful for identifying what is said in the video, such as dialogue, spoken instructions, or narration. It does not analyze visual elements like scenes or objects. Pay attention because transcriptions may be truncated."
inputs = {
"video_url": {
"type": "string",
"description": "The YouTube video URL."
}
}
output_type = "string"
def __init__(self):
pass
def is_initialized(self) -> bool:
return True
def forward(self, video_url: str):
# Extract the video ID from the URL
# video_id = video_url.split("v=")[-1]
try:
# Fetch the transcript using YouTubeTranscriptApi
# transcript = YouTubeTranscriptApi.get_transcript(video_id)
# transcript = str([element['text'] for element in transcript])
transcript = """["Wow this coffee\'s great I was just", \'thinking that\', \'yeah is that cinnamon chicory\', \'tea oak\', \'[Music]\', "isn\'t that hot", \'extremely\']"""
return transcript
except Exception as e:
return str(e)
class AudioFileTranscriptTool(Tool):
name = "audio_file_transcript_tool"
description = "Extracts text transcripts from uploaded audio files (e.g., MP3, WAV). Use this tool to analyze spoken content from user-provided files, not from YouTube or video links. It only processes audio, not visual information."
inputs = {
"file_id": {
"type": "string",
"description": "Metadata required to download the audio."
},
"file_url": {
"type": "string",
"description": "Metadata required to download the audio."
},
}
output_type = "string"
def __init__(self):
# Load Whisper model
self.whisper_model = whisper.load_model("base", device="cpu")
def is_initialized(self) -> bool:
return True
def forward(self, file_id: str, file_url: str):
# Downloads an audio file and transcript it to text
#questions_files = f"{file_url}/files"
#response = requests.get(f"{questions_files}/{file_id}", timeout=15)
response = get_file_content(file_id, file_url)
# Save MP3 bytes to a file
with open("audio.mp3", "wb") as f:
f.write(response.content)
# Transcribe the audio
client = openai.OpenAI(api_key=OPENAI_API_KEY)
with open("audio.mp3", "rb") as f:
transcript = client.audio.transcriptions.create(
model="gpt-4o-mini-transcribe",
file=f,
language="en"
)
return transcript.text
class PythonFileDownloader(Tool):
name = "python_file_downloader"
description = "Downloads and stores a Python (.py) file locally as 'code.py' so it can be programmatically analyzed by the agent. This tool does not interpret or summarize the data itself — it only ensures the file is available in the environment."
inputs = {
"file_id": {
"type": "string",
"description": "Metadata required to download the file."
},
"file_url": {
"type": "string",
"description": "Metadata required to download the file."
},
}
output_type = "string"
def __init__(self):
pass
def is_initialized(self) -> bool:
return True
def forward(self, file_id: str, file_url: str):
# Downloads a python file and decode it
#questions_files = f"{file_url}/files"
#response = requests.get(f"{questions_files}/{file_id}", timeout=15)
response = get_file_content(file_id, file_url)
# Save bytes to a Python file
with open("code.py", "wb") as f:
f.write(response.content)
return "The file is available as 'code.py'."
class ExcelFileLoader(Tool):
name = "excel_file_loader"
description = "Downloads and stores an Excel spreadsheet (.xlsx) locally as 'sheet.xlsx' so it can be programmatically analyzed by the agent using tools like pandas. This tool does not interpret or summarize the data itself — it only ensures the file is available in the environment."
inputs = {
"file_id": {
"type": "string",
"description": "Metadata required to download the file."
},
"file_url": {
"type": "string",
"description": "Metadata required to download the file."
},
}
output_type = "string"
def __init__(self):
pass
def is_initialized(self) -> bool:
return True
def forward(self, file_id: str, file_url: str):
# Downloads a spreadsheet and saves it
#questions_files = f"{file_url}/files"
#response = requests.get(f"{questions_files}/{file_id}", timeout=15)
response = get_file_content(file_id, file_url)
# Save bytes to a spreadsheet file
with open("sheet.xlsx", "wb") as f:
f.write(response.content)
return "The file is available as 'sheet.xlsx'."