Spaces:
Sleeping
Sleeping
| import requests | |
| import io | |
| import base64 | |
| import openai | |
| from openai import OpenAI | |
| from smolagents import tool | |
| import os | |
| import io, time, itertools, functools | |
| from typing import List, Optional | |
| import sys, contextlib | |
| import av | |
| from pytube import YouTube | |
| from yt_dlp import YoutubeDL | |
| from PIL import Image | |
| from tqdm import tqdm | |
| import wikipediaapi | |
| import tempfile | |
| model_id = "gpt-4.1" | |
| def read_image(query: str, img_url: str) -> str: | |
| """ | |
| Use a visual question answering (VQA) model to generate a response to a query based on an image. | |
| Args: | |
| query (str): A natural language question about the image. | |
| img_url (str): The URL of the image to analyze. | |
| Returns: | |
| str: A response generated by the VQA model based on the provided image and question. | |
| """ | |
| client = OpenAI() | |
| response = client.responses.create( | |
| model=model_id, | |
| input=[ | |
| { | |
| "role": "user", | |
| "content": [ | |
| {"type": "input_text", "text": query}, | |
| { | |
| "type": "input_image", | |
| "image_url": img_url, | |
| }, | |
| ], | |
| } | |
| ], | |
| ) | |
| return response.output_text | |
| def read_code(file_url: str) -> str: | |
| """ | |
| Read the contents of a code file such as py file instead of executing it. Use this tool to analyze a code snippet. | |
| Args: | |
| file_url (str): The URL of the code file to retrieve. | |
| Returns: | |
| str: The content of the file as a string. | |
| """ | |
| response = requests.get(file_url) | |
| response.raise_for_status() | |
| return response.text | |
| def transcribe_audio(file_url: str, file_name: str) -> str: | |
| """ | |
| Download and transcribe an audio file using transcription model. | |
| Args: | |
| file_url (str): Direct URL to the audio file (e.g., .mp3, .wav). | |
| file_name (str): Filename including extension, used to determine format. | |
| Returns: | |
| str: The transcribed text from the audio file. | |
| """ | |
| response = requests.get(file_url) | |
| response.raise_for_status() | |
| extension = file_name.split(".")[-1].lower() or "mp3" | |
| audio_file = io.BytesIO(response.content) | |
| audio_file.name = f"audio.{extension}" | |
| client = OpenAI() | |
| transcription = client.audio.transcriptions.create( | |
| model="gpt-4o-transcribe", file=audio_file | |
| ) | |
| return transcription.text | |
| def _pytube_buffer(url: str) -> Optional[io.BytesIO]: | |
| try: | |
| from pytube import YouTube | |
| yt = YouTube(url) | |
| stream = ( | |
| yt.streams.filter(progressive=True, file_extension="mp4") | |
| .order_by("resolution") | |
| .desc() | |
| .first() | |
| ) | |
| if stream is None: | |
| raise RuntimeError("No MP4 with audio found") | |
| buf = io.BytesIO() | |
| stream.stream_to_buffer(buf) | |
| buf.seek(0) | |
| return buf | |
| except Exception as e: | |
| print(f"[youtube_to_buffer] PyTube failed → {e}", file=sys.stderr) | |
| return None | |
| def _ytdlp_buffer(url: str) -> io.BytesIO: | |
| """ | |
| Return a BytesIO containing some MP4 video stream for `url`. | |
| Works whether YouTube serves a progressive file or separate A/V. | |
| """ | |
| ydl_opts = { | |
| "quiet": True, | |
| "skip_download": True, | |
| "format": "bestvideo[ext=mp4]/best[ext=mp4]/best", | |
| } | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| if "entries" in info: | |
| info = info["entries"][0] | |
| if "url" in info: | |
| video_urls = [info["url"]] | |
| elif "requested_formats" in info: | |
| video_urls = [ | |
| fmt["url"] | |
| for fmt in info["requested_formats"] | |
| if fmt.get("vcodec") != "none" | |
| ] | |
| if not video_urls: | |
| raise RuntimeError("yt-dlp returned audio-only formats") | |
| else: | |
| raise RuntimeError("yt-dlp could not extract a stream URL") | |
| buf = io.BytesIO() | |
| for direct_url in video_urls: | |
| with requests.get(direct_url, stream=True) as r: | |
| r.raise_for_status() | |
| for chunk in r.iter_content(chunk_size=1 << 16): | |
| buf.write(chunk) | |
| buf.seek(0) | |
| return buf | |
| def youtube_to_buffer(url: str) -> io.BytesIO: | |
| """ | |
| Return a BytesIO containing a single progressive MP4 | |
| (H.264 + AAC) – the safest thing PyAV can open everywhere. | |
| """ | |
| ydl_opts = { | |
| "quiet": True, | |
| "skip_download": True, | |
| "format": ( | |
| "best[ext=mp4][vcodec^=avc1][acodec!=none]" "/best[ext=mp4][acodec!=none]" | |
| ), | |
| } | |
| with YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| if "entries" in info: | |
| info = info["entries"][0] | |
| direct_url = info.get("url") | |
| if not direct_url: | |
| raise RuntimeError("yt-dlp could not find a progressive MP4 track") | |
| buf = io.BytesIO() | |
| with requests.get(direct_url, stream=True) as r: | |
| r.raise_for_status() | |
| for chunk in r.iter_content(chunk_size=1 << 17): | |
| buf.write(chunk) | |
| buf.seek(0) | |
| return buf | |
| def sample_frames(video_bytes: io.BytesIO, n_frames: int = 6) -> List[Image.Image]: | |
| """Decode `n_frames` uniformly spaced RGB frames as PIL images.""" | |
| container = av.open(video_bytes, metadata_errors="ignore") | |
| video = container.streams.video[0] | |
| total = video.frames or 0 | |
| step = max(1, total // n_frames) if total else 30 | |
| frames: list[Image.Image] = [] | |
| for i, frame in enumerate(container.decode(video=0)): | |
| if i % step == 0: | |
| frames.append(frame.to_image()) | |
| if len(frames) >= n_frames: | |
| break | |
| container.close() | |
| return frames | |
| def pil_to_data_url(img: Image.Image, quality: int = 80) -> str: | |
| buf = io.BytesIO() | |
| img.save(buf, format="JPEG", quality=quality, optimize=True) | |
| b64 = base64.b64encode(buf.getvalue()).decode() | |
| return f"data:image/jpeg;base64,{b64}" | |
| def save_audio_stream_to_temp_wav_file(video_bytes: io.BytesIO) -> Optional[str]: | |
| """ | |
| Extracts the audio stream from video_bytes, saves it as a temporary WAV file, | |
| and returns the path to the file. | |
| Returns None if no audio stream is found or an error occurs. | |
| """ | |
| try: | |
| video_bytes.seek(0) | |
| input_container = av.open(video_bytes, metadata_errors="ignore") | |
| if not input_container.streams.audio: | |
| print("No audio streams found in the video.", file=sys.stderr) | |
| return None | |
| input_audio_stream = input_container.streams.audio[0] | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_file: | |
| temp_audio_file_path = tmp_file.name | |
| output_container = av.open(temp_audio_file_path, mode="w", format="wav") | |
| channel_layout = "stereo" | |
| if ( | |
| hasattr(input_audio_stream.codec_context, "layout") | |
| and input_audio_stream.codec_context.layout | |
| ): | |
| channel_layout = input_audio_stream.codec_context.layout.name | |
| elif ( | |
| hasattr(input_audio_stream.codec_context, "channels") | |
| and input_audio_stream.codec_context.channels == 1 | |
| ): | |
| channel_layout = "mono" | |
| output_audio_stream = output_container.add_stream( | |
| "pcm_s16le", | |
| rate=input_audio_stream.codec_context.sample_rate, | |
| layout=channel_layout, | |
| ) | |
| for frame in input_container.decode(input_audio_stream): | |
| for packet in output_audio_stream.encode(frame): | |
| output_container.mux(packet) | |
| for packet in output_audio_stream.encode(): | |
| output_container.mux(packet) | |
| output_container.close() | |
| input_container.close() | |
| return temp_audio_file_path | |
| except Exception as e: | |
| print(f"Error extracting audio to temp WAV file: {e}", file=sys.stderr) | |
| if "temp_audio_file_path" in locals() and os.path.exists(temp_audio_file_path): | |
| os.remove(temp_audio_file_path) | |
| return None | |
| def run_video(query: str, url: str) -> str: | |
| """ | |
| Get a YouTube video from url and return an answer to a natural-language query using the video. | |
| Args: | |
| query (str): A natural-language question whose answer is expected to be found in the visual content of the video. | |
| url (str): Fully qualified URL of the YouTube video to analyze. | |
| Returns: | |
| str: A response generated by the VQA model based on the provided video and question. | |
| """ | |
| n_frames = 4 | |
| buff = youtube_to_buffer(url) | |
| if buff is None: | |
| return "Error: Could not download or buffer the video." | |
| frames = sample_frames(buff, n_frames=n_frames) | |
| buff.seek(0) | |
| transcript = "[Audio could not be processed]" | |
| audio_file_path = None | |
| try: | |
| audio_file_path = save_audio_stream_to_temp_wav_file(buff) | |
| if audio_file_path: | |
| with open(audio_file_path, "rb") as audio_data: | |
| transcription_response = openai.audio.transcriptions.create( | |
| model="gpt-4o-transcribe", file=audio_data | |
| ) | |
| transcript = transcription_response.text | |
| else: | |
| transcript = "[No audio stream found or error during extraction]" | |
| print( | |
| "No audio file path returned, skipping transcription.", file=sys.stderr | |
| ) | |
| except Exception as e: | |
| print(f"Error during audio transcription: {e}", file=sys.stderr) | |
| transcript = f"[Error during audio transcription: {e}]" | |
| finally: | |
| if audio_file_path and os.path.exists(audio_file_path): | |
| os.remove(audio_file_path) | |
| prompt_text = f"Original Query: {query}\n\nVideo Transcript:\n{transcript}\n\nKey Visual Frames (analyze these along with the transcript to answer the query):" | |
| content = [{"type": "text", "text": prompt_text}] | |
| for img in frames: | |
| content.append( | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": pil_to_data_url(img)}, | |
| } | |
| ) | |
| try: | |
| resp = openai.chat.completions.create( | |
| model=model_id, | |
| messages=[{"role": "user", "content": content}], | |
| temperature=0.1, | |
| ) | |
| result = resp.choices[0].message.content.strip() | |
| except Exception as e: | |
| print(f"Error calling OpenAI API: {e}", file=sys.stderr) | |
| result = f"[Error processing with AI model: {e}]" | |
| return result | |
| def search_wikipedia(query: str) -> str: | |
| """ | |
| get the contents of wikipedia page retrieved by search query. | |
| Args: | |
| query (str): A search term to search within wikipedia. Ideally it should be one word or a group of few words. | |
| Returns: | |
| str: The text content of wikipedia page | |
| """ | |
| get_wiki = wikipediaapi.Wikipedia( | |
| language="en", | |
| user_agent="test_tokki", | |
| extract_format=wikipediaapi.ExtractFormat.WIKI, | |
| ) | |
| page_content = get_wiki.page(query) | |
| text_content = page_content.text | |
| cutoff = 25000 | |
| text_content = " ".join(text_content.split(" ")[:cutoff]) | |
| return text_content |