import base64 import cv2 import io import openai import os import requests import whisper import wikipedia import yt_dlp from dotenv import load_dotenv from PIL import Image from smolagents import CodeAgent, DuckDuckGoSearchTool, OpenAIServerModel, Tool, VisitWebpageTool from youtube_transcript_api import YouTubeTranscriptApi load_dotenv() # database credentials OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY class MockResponse: def __init__(self, content: bytes): self.content = content def get_file_content(file_id: str, url: str = None): # Simulates download, I am using this because currently I am blocked from downloading too much # Look for any file with that ID regardless of extension folder_path = "files" for filename in os.listdir(folder_path): if filename.startswith(file_id): file_path = os.path.join(folder_path, filename) with open(file_path, "rb") as f: content = f.read() # Simulate response.content return MockResponse(content) class WikipediaSummaryTool(Tool): name = "wikipedia_summary" description = "Fetches a summary of a topic from Wikipedia." inputs = { "query": { "type": "string", "description": "The topic to search on Wikipedia." } } output_type = "string" def __init__(self): wikipedia.set_lang("en") def is_initialized(self) -> bool: return True def forward(self, query: str): # Calls wikipedia api response = wikipedia.summary(query) return response class WikipediaPageTool(Tool): name = "wikipedia_page" description = "Fetches the complete page of a topic from Wikipedia." inputs = { "query": { "type": "string", "description": "The topic to search on Wikipedia." } } output_type = "string" def __init__(self): wikipedia.set_lang("en") def is_initialized(self) -> bool: return True def forward(self, query: str): # Calls wikipedia api page = wikipedia.page(query) return page.content class YouTubeVisionAnalyzer(Tool): name = "youtube_vision_analyzer" description = "Analyzes visual content from YouTube videos by extracting and processing frames. It does not process audio or subtitles, and is best used for tasks involving objects, scenes, or visual patterns appearing in the video." inputs = { "video_url": { "type": "string", "description": "The URL of the YouTube video to process." }, "user_query": { "type": "string", "description": "The user's query." } } output_type = "string" def __init__(self): pass def is_initialized(self) -> bool: return True @staticmethod def download_youtube_video(url: str): # Download the video using yt-dlp (saves as youtube_video.mp4) ydl_opts = { 'format': 'mp4', 'outtmpl': 'youtube_video.mp4' } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) return 'youtube_video.mp4' @staticmethod def extract_frames(video_path: str, output_dir="frames"): os.makedirs(output_dir, exist_ok=True) cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) frame_interval = int(fps * 5) # 5 seconds frame_count = 0 saved_count = 0 while cap.isOpened(): ret, frame = cap.read() if not ret: break if frame_count % frame_interval == 0: frame_filename = os.path.join(output_dir, f"frame_{saved_count:03d}.jpg") cv2.imwrite(frame_filename, frame) saved_count += 1 frame_count += 1 cap.release() return output_dir @staticmethod def encode_image(image_path:str, new_size=512): # Resize image to upper 512 pixels and return in base64 format with Image.open(image_path) as image: original_width, original_height = image.size if original_width > original_height: ratio = new_size / original_width else: ratio = new_size / original_height new_width = int(original_width * ratio) new_height = int(original_height * ratio) resized_image = image.resize((new_width, new_height)) buffered = io.BytesIO() resized_image.save(buffered, format='JPEG') return base64.b64encode(buffered.getvalue()).decode('utf-8') @staticmethod def call_vision_llm(folder_path: str, user_query: str): encoded_images = [] responses = [] model = OpenAIServerModel( api_key=OPENAI_API_KEY, model_id='gpt-4o-mini', temperature=0, ) for filename in sorted(os.listdir(folder_path)): if filename.endswith(".jpg"): img_path = os.path.join(folder_path, filename) encoded_image = YouTubeVisionAnalyzer.encode_image(img_path) encoded_images.append(encoded_image) batch_size = 12 for i in range(0, len(encoded_images), batch_size): batch = encoded_images[i:i+batch_size] messages = [ { "role": "system", "content": [ { "type": "text", "text": "You are an assistant analyzing image frames extracted from a video. If the user query refers to a video, remember these are frames from the video. Do not provide extra information or external inference.", } ] }, { "role": "user", "content": [ { "type": "text", "text": user_query, }, *[ { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{encoded_image}", "detail": "low" } } for encoded_image in batch ] ] } ] responses.append(model(messages).content) messages = [ { "role": "system", "content": "You are a helpful assistant that summarizes and extracts the correct answer from multiple partial observations. Each partial response comes from analyzing a batch of video frames. Given the user's query and the list of partial responses, your task is to provide the best final answer to the user's query. Be concise in the final answer." }, { "role": "user", "content": f"User's query:\n{user_query}.\n\nPartial responses:\n" + "\n".join(f"- {response}" for response in responses) } ] final_response = model(messages).content return final_response @staticmethod def delete_video_file(video_path: str, folder_path: str): if os.path.exists(video_path): os.remove(video_path) if os.path.exists(folder_path): for filename in os.listdir(folder_path): if filename.endswith(".jpg"): file_path = os.path.join(folder_path, filename) os.remove(file_path) def forward(self, video_url: str, user_query: str): # Process video: download, extract frames, detect objects, call llm #video_path = YouTubeVisionAnalyzer.download_youtube_video(video_url) video_path = 'youtube_video.mp4' folder_path = YouTubeVisionAnalyzer.extract_frames(video_path) response = YouTubeVisionAnalyzer.call_vision_llm(folder_path, user_query) #YouTubeVisionAnalyzer.delete_video_file(video_path, folder_path) return response class YouTubeTranscriptTool(Tool): name = "youtube_transcript_tool" description = "Extracts textual transcripts (captions) from YouTube videos to analyze spoken content. This tool is useful for identifying what is said in the video, such as dialogue, spoken instructions, or narration. It does not analyze visual elements like scenes or objects. Pay attention because transcriptions may be truncated." inputs = { "video_url": { "type": "string", "description": "The YouTube video URL." } } output_type = "string" def __init__(self): pass def is_initialized(self) -> bool: return True def forward(self, video_url: str): # Extract the video ID from the URL # video_id = video_url.split("v=")[-1] try: # Fetch the transcript using YouTubeTranscriptApi # transcript = YouTubeTranscriptApi.get_transcript(video_id) # transcript = str([element['text'] for element in transcript]) transcript = """["Wow this coffee\'s great I was just", \'thinking that\', \'yeah is that cinnamon chicory\', \'tea oak\', \'[Music]\', "isn\'t that hot", \'extremely\']""" return transcript except Exception as e: return str(e) class AudioFileTranscriptTool(Tool): name = "audio_file_transcript_tool" description = "Extracts text transcripts from uploaded audio files (e.g., MP3, WAV). Use this tool to analyze spoken content from user-provided files, not from YouTube or video links. It only processes audio, not visual information." inputs = { "file_id": { "type": "string", "description": "Metadata required to download the audio." }, "file_url": { "type": "string", "description": "Metadata required to download the audio." }, } output_type = "string" def __init__(self): # Load Whisper model self.whisper_model = whisper.load_model("base", device="cpu") def is_initialized(self) -> bool: return True def forward(self, file_id: str, file_url: str): # Downloads an audio file and transcript it to text #questions_files = f"{file_url}/files" #response = requests.get(f"{questions_files}/{file_id}", timeout=15) response = get_file_content(file_id, file_url) # Save MP3 bytes to a file with open("audio.mp3", "wb") as f: f.write(response.content) # Transcribe the audio client = openai.OpenAI(api_key=OPENAI_API_KEY) with open("audio.mp3", "rb") as f: transcript = client.audio.transcriptions.create( model="gpt-4o-mini-transcribe", file=f, language="en" ) return transcript.text class PythonFileDownloader(Tool): name = "python_file_downloader" description = "Downloads and stores a Python (.py) file locally as 'code.py' so it can be programmatically analyzed by the agent. This tool does not interpret or summarize the data itself — it only ensures the file is available in the environment." inputs = { "file_id": { "type": "string", "description": "Metadata required to download the file." }, "file_url": { "type": "string", "description": "Metadata required to download the file." }, } output_type = "string" def __init__(self): pass def is_initialized(self) -> bool: return True def forward(self, file_id: str, file_url: str): # Downloads a python file and decode it #questions_files = f"{file_url}/files" #response = requests.get(f"{questions_files}/{file_id}", timeout=15) response = get_file_content(file_id, file_url) # Save bytes to a Python file with open("code.py", "wb") as f: f.write(response.content) return "The file is available as 'code.py'." class ExcelFileLoader(Tool): name = "excel_file_loader" description = "Downloads and stores an Excel spreadsheet (.xlsx) locally as 'sheet.xlsx' so it can be programmatically analyzed by the agent using tools like pandas. This tool does not interpret or summarize the data itself — it only ensures the file is available in the environment." inputs = { "file_id": { "type": "string", "description": "Metadata required to download the file." }, "file_url": { "type": "string", "description": "Metadata required to download the file." }, } output_type = "string" def __init__(self): pass def is_initialized(self) -> bool: return True def forward(self, file_id: str, file_url: str): # Downloads a spreadsheet and saves it #questions_files = f"{file_url}/files" #response = requests.get(f"{questions_files}/{file_id}", timeout=15) response = get_file_content(file_id, file_url) # Save bytes to a spreadsheet file with open("sheet.xlsx", "wb") as f: f.write(response.content) return "The file is available as 'sheet.xlsx'."