Spaces:
Runtime error
Runtime error
| import base64 | |
| import cv2 | |
| import io | |
| import openai | |
| import os | |
| import requests | |
| import whisper | |
| import wikipedia | |
| import yt_dlp | |
| from dotenv import load_dotenv | |
| from PIL import Image | |
| from smolagents import CodeAgent, DuckDuckGoSearchTool, OpenAIServerModel, Tool, VisitWebpageTool | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| load_dotenv() | |
| # database credentials | |
| OPENAI_API_KEY = os.getenv('OPENAI_API_KEY') | |
| os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY | |
| class MockResponse: | |
| def __init__(self, content: bytes): | |
| self.content = content | |
| def get_file_content(file_id: str, url: str = None): | |
| # Simulates download, I am using this because currently I am blocked from downloading too much | |
| # Look for any file with that ID regardless of extension | |
| folder_path = "files" | |
| for filename in os.listdir(folder_path): | |
| if filename.startswith(file_id): | |
| file_path = os.path.join(folder_path, filename) | |
| with open(file_path, "rb") as f: | |
| content = f.read() | |
| # Simulate response.content | |
| return MockResponse(content) | |
| class WikipediaSummaryTool(Tool): | |
| name = "wikipedia_summary" | |
| description = "Fetches a summary of a topic from Wikipedia." | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "The topic to search on Wikipedia." | |
| } | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| wikipedia.set_lang("en") | |
| def is_initialized(self) -> bool: | |
| return True | |
| def forward(self, query: str): | |
| # Calls wikipedia api | |
| response = wikipedia.summary(query) | |
| return response | |
| class WikipediaPageTool(Tool): | |
| name = "wikipedia_page" | |
| description = "Fetches the complete page of a topic from Wikipedia." | |
| inputs = { | |
| "query": { | |
| "type": "string", | |
| "description": "The topic to search on Wikipedia." | |
| } | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| wikipedia.set_lang("en") | |
| def is_initialized(self) -> bool: | |
| return True | |
| def forward(self, query: str): | |
| # Calls wikipedia api | |
| page = wikipedia.page(query) | |
| return page.content | |
| class YouTubeVisionAnalyzer(Tool): | |
| name = "youtube_vision_analyzer" | |
| description = "Analyzes visual content from YouTube videos by extracting and processing frames. It does not process audio or subtitles, and is best used for tasks involving objects, scenes, or visual patterns appearing in the video." | |
| inputs = { | |
| "video_url": { | |
| "type": "string", | |
| "description": "The URL of the YouTube video to process." | |
| }, | |
| "user_query": { | |
| "type": "string", | |
| "description": "The user's query." | |
| } | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| pass | |
| def is_initialized(self) -> bool: | |
| return True | |
| def download_youtube_video(url: str): | |
| # Download the video using yt-dlp (saves as youtube_video.mp4) | |
| ydl_opts = { | |
| 'format': 'mp4', | |
| 'outtmpl': 'youtube_video.mp4' | |
| } | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| return 'youtube_video.mp4' | |
| def extract_frames(video_path: str, output_dir="frames"): | |
| os.makedirs(output_dir, exist_ok=True) | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| frame_interval = int(fps * 5) # 5 seconds | |
| frame_count = 0 | |
| saved_count = 0 | |
| while cap.isOpened(): | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| if frame_count % frame_interval == 0: | |
| frame_filename = os.path.join(output_dir, f"frame_{saved_count:03d}.jpg") | |
| cv2.imwrite(frame_filename, frame) | |
| saved_count += 1 | |
| frame_count += 1 | |
| cap.release() | |
| return output_dir | |
| def encode_image(image_path:str, new_size=512): | |
| # Resize image to upper 512 pixels and return in base64 format | |
| with Image.open(image_path) as image: | |
| original_width, original_height = image.size | |
| if original_width > original_height: | |
| ratio = new_size / original_width | |
| else: | |
| ratio = new_size / original_height | |
| new_width = int(original_width * ratio) | |
| new_height = int(original_height * ratio) | |
| resized_image = image.resize((new_width, new_height)) | |
| buffered = io.BytesIO() | |
| resized_image.save(buffered, format='JPEG') | |
| return base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| def call_vision_llm(folder_path: str, user_query: str): | |
| encoded_images = [] | |
| responses = [] | |
| model = OpenAIServerModel( | |
| api_key=OPENAI_API_KEY, | |
| model_id='gpt-4o-mini', | |
| temperature=0, | |
| ) | |
| for filename in sorted(os.listdir(folder_path)): | |
| if filename.endswith(".jpg"): | |
| img_path = os.path.join(folder_path, filename) | |
| encoded_image = YouTubeVisionAnalyzer.encode_image(img_path) | |
| encoded_images.append(encoded_image) | |
| batch_size = 12 | |
| for i in range(0, len(encoded_images), batch_size): | |
| batch = encoded_images[i:i+batch_size] | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": "You are an assistant analyzing image frames extracted from a video. If the user query refers to a video, remember these are frames from the video. Do not provide extra information or external inference.", | |
| } | |
| ] | |
| }, | |
| { | |
| "role": "user", | |
| "content": [ | |
| { | |
| "type": "text", | |
| "text": user_query, | |
| }, | |
| *[ | |
| { | |
| "type": "image_url", | |
| "image_url": { | |
| "url": f"data:image/jpeg;base64,{encoded_image}", | |
| "detail": "low" | |
| } | |
| } | |
| for encoded_image in batch | |
| ] | |
| ] | |
| } | |
| ] | |
| responses.append(model(messages).content) | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful assistant that summarizes and extracts the correct answer from multiple partial observations. Each partial response comes from analyzing a batch of video frames. Given the user's query and the list of partial responses, your task is to provide the best final answer to the user's query. Be concise in the final answer." | |
| }, | |
| { | |
| "role": "user", | |
| "content": f"User's query:\n{user_query}.\n\nPartial responses:\n" + "\n".join(f"- {response}" for response in responses) | |
| } | |
| ] | |
| final_response = model(messages).content | |
| return final_response | |
| def delete_video_file(video_path: str, folder_path: str): | |
| if os.path.exists(video_path): | |
| os.remove(video_path) | |
| if os.path.exists(folder_path): | |
| for filename in os.listdir(folder_path): | |
| if filename.endswith(".jpg"): | |
| file_path = os.path.join(folder_path, filename) | |
| os.remove(file_path) | |
| def forward(self, video_url: str, user_query: str): | |
| # Process video: download, extract frames, detect objects, call llm | |
| #video_path = YouTubeVisionAnalyzer.download_youtube_video(video_url) | |
| video_path = 'youtube_video.mp4' | |
| folder_path = YouTubeVisionAnalyzer.extract_frames(video_path) | |
| response = YouTubeVisionAnalyzer.call_vision_llm(folder_path, user_query) | |
| #YouTubeVisionAnalyzer.delete_video_file(video_path, folder_path) | |
| return response | |
| class YouTubeTranscriptTool(Tool): | |
| name = "youtube_transcript_tool" | |
| description = "Extracts textual transcripts (captions) from YouTube videos to analyze spoken content. This tool is useful for identifying what is said in the video, such as dialogue, spoken instructions, or narration. It does not analyze visual elements like scenes or objects. Pay attention because transcriptions may be truncated." | |
| inputs = { | |
| "video_url": { | |
| "type": "string", | |
| "description": "The YouTube video URL." | |
| } | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| pass | |
| def is_initialized(self) -> bool: | |
| return True | |
| def forward(self, video_url: str): | |
| # Extract the video ID from the URL | |
| # video_id = video_url.split("v=")[-1] | |
| try: | |
| # Fetch the transcript using YouTubeTranscriptApi | |
| # transcript = YouTubeTranscriptApi.get_transcript(video_id) | |
| # transcript = str([element['text'] for element in transcript]) | |
| transcript = """["Wow this coffee\'s great I was just", \'thinking that\', \'yeah is that cinnamon chicory\', \'tea oak\', \'[Music]\', "isn\'t that hot", \'extremely\']""" | |
| return transcript | |
| except Exception as e: | |
| return str(e) | |
| class AudioFileTranscriptTool(Tool): | |
| name = "audio_file_transcript_tool" | |
| description = "Extracts text transcripts from uploaded audio files (e.g., MP3, WAV). Use this tool to analyze spoken content from user-provided files, not from YouTube or video links. It only processes audio, not visual information." | |
| inputs = { | |
| "file_id": { | |
| "type": "string", | |
| "description": "Metadata required to download the audio." | |
| }, | |
| "file_url": { | |
| "type": "string", | |
| "description": "Metadata required to download the audio." | |
| }, | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| # Load Whisper model | |
| self.whisper_model = whisper.load_model("base", device="cpu") | |
| def is_initialized(self) -> bool: | |
| return True | |
| def forward(self, file_id: str, file_url: str): | |
| # Downloads an audio file and transcript it to text | |
| #questions_files = f"{file_url}/files" | |
| #response = requests.get(f"{questions_files}/{file_id}", timeout=15) | |
| response = get_file_content(file_id, file_url) | |
| # Save MP3 bytes to a file | |
| with open("audio.mp3", "wb") as f: | |
| f.write(response.content) | |
| # Transcribe the audio | |
| client = openai.OpenAI(api_key=OPENAI_API_KEY) | |
| with open("audio.mp3", "rb") as f: | |
| transcript = client.audio.transcriptions.create( | |
| model="gpt-4o-mini-transcribe", | |
| file=f, | |
| language="en" | |
| ) | |
| return transcript.text | |
| class PythonFileDownloader(Tool): | |
| name = "python_file_downloader" | |
| description = "Downloads and stores a Python (.py) file locally as 'code.py' so it can be programmatically analyzed by the agent. This tool does not interpret or summarize the data itself — it only ensures the file is available in the environment." | |
| inputs = { | |
| "file_id": { | |
| "type": "string", | |
| "description": "Metadata required to download the file." | |
| }, | |
| "file_url": { | |
| "type": "string", | |
| "description": "Metadata required to download the file." | |
| }, | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| pass | |
| def is_initialized(self) -> bool: | |
| return True | |
| def forward(self, file_id: str, file_url: str): | |
| # Downloads a python file and decode it | |
| #questions_files = f"{file_url}/files" | |
| #response = requests.get(f"{questions_files}/{file_id}", timeout=15) | |
| response = get_file_content(file_id, file_url) | |
| # Save bytes to a Python file | |
| with open("code.py", "wb") as f: | |
| f.write(response.content) | |
| return "The file is available as 'code.py'." | |
| class ExcelFileLoader(Tool): | |
| name = "excel_file_loader" | |
| description = "Downloads and stores an Excel spreadsheet (.xlsx) locally as 'sheet.xlsx' so it can be programmatically analyzed by the agent using tools like pandas. This tool does not interpret or summarize the data itself — it only ensures the file is available in the environment." | |
| inputs = { | |
| "file_id": { | |
| "type": "string", | |
| "description": "Metadata required to download the file." | |
| }, | |
| "file_url": { | |
| "type": "string", | |
| "description": "Metadata required to download the file." | |
| }, | |
| } | |
| output_type = "string" | |
| def __init__(self): | |
| pass | |
| def is_initialized(self) -> bool: | |
| return True | |
| def forward(self, file_id: str, file_url: str): | |
| # Downloads a spreadsheet and saves it | |
| #questions_files = f"{file_url}/files" | |
| #response = requests.get(f"{questions_files}/{file_id}", timeout=15) | |
| response = get_file_content(file_id, file_url) | |
| # Save bytes to a spreadsheet file | |
| with open("sheet.xlsx", "wb") as f: | |
| f.write(response.content) | |
| return "The file is available as 'sheet.xlsx'." |