Spaces:
Sleeping
Sleeping
| """ | |
| Media processing tools for the GAIA Agent. | |
| Includes YouTube transcript, audio transcription, and video analysis. | |
| """ | |
| import os | |
| import json | |
| import math | |
| import shutil | |
| import tempfile | |
| import subprocess | |
| from typing import Optional, List, Dict | |
| from urllib.parse import urlparse, parse_qs | |
| import requests | |
| import openai | |
| import base64 | |
| from langchain_core.tools import tool | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| def _extract_youtube_id(url: str) -> Optional[str]: | |
| """Extract YouTube video ID from various URL formats.""" | |
| try: | |
| u = urlparse(url) | |
| host = (u.netloc or "").lower() | |
| path = u.path or "" | |
| # watch?v=VIDEO_ID | |
| qs = parse_qs(u.query) | |
| if "v" in qs and qs["v"]: | |
| vid = qs["v"][0] | |
| if len(vid) == 11: | |
| return vid | |
| # youtu.be/VIDEO_ID | |
| if "youtu.be" in host: | |
| seg = path.strip("/").split("/") | |
| if seg and len(seg[0]) == 11: | |
| return seg[0] | |
| # /embed/VIDEO_ID, /shorts/VIDEO_ID, /live/VIDEO_ID | |
| parts = path.strip("/").split("/") | |
| for i, p in enumerate(parts[:-1]): | |
| if p in {"embed", "shorts", "live"}: | |
| vid = parts[i + 1] | |
| if len(vid) == 11: | |
| return vid | |
| return None | |
| except Exception: | |
| return None | |
| def _transcribe_audio_file(path: str) -> str: | |
| """Shared helper to transcribe a local audio file with Whisper-1.""" | |
| with open(path, "rb") as audio_file: | |
| transcript = client.audio.transcriptions.create( | |
| model="whisper-1", | |
| file=audio_file, | |
| response_format="text" | |
| ) | |
| return transcript | |
| def _encode_image_to_data_url(path: str) -> str: | |
| """Helper to turn a local image into a data: URL for GPT-4o vision.""" | |
| with open(path, "rb") as img_file: | |
| image_data = base64.b64encode(img_file.read()).decode("utf-8") | |
| ext = path.lower().split('.')[-1] | |
| media_type = { | |
| "png": "image/png", | |
| "jpg": "image/jpeg", | |
| "jpeg": "image/jpeg", | |
| "gif": "image/gif", | |
| "webp": "image/webp", | |
| }.get(ext, "image/png") | |
| return f"data:{media_type};base64,{image_data}" | |
| def youtube_transcript(video_url: str, languages: str = "en") -> str: | |
| """Get the transcript/captions from a YouTube video. | |
| First tries to get existing captions. If captions are disabled, | |
| falls back to downloading audio and transcribing with Whisper. | |
| Args: | |
| video_url: The YouTube video URL | |
| languages: Comma-separated language codes to prefer (default: "en") | |
| """ | |
| video_id = _extract_youtube_id(video_url) | |
| if not video_id: | |
| return f"Could not extract video ID from: {video_url}" | |
| lang_list = [l.strip() for l in languages.split(",") if l.strip()] | |
| # First, try to get existing captions | |
| caption_error = None | |
| try: | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| api = YouTubeTranscriptApi() | |
| tlist = api.fetch(video_id) | |
| transcript = None | |
| # Prefer manually created captions | |
| for lang in lang_list: | |
| try: | |
| transcript = tlist.find_manually_created_transcript([lang]) | |
| break | |
| except Exception: | |
| pass | |
| # Otherwise try auto-generated captions | |
| if transcript is None: | |
| for lang in lang_list: | |
| try: | |
| transcript = tlist.find_generated_transcript([lang]) | |
| break | |
| except Exception: | |
| pass | |
| # Otherwise fall back to whatever exists for those languages | |
| if transcript is None: | |
| for lang in lang_list: | |
| try: | |
| transcript = tlist.find_transcript([lang]) | |
| break | |
| except Exception: | |
| pass | |
| if transcript is not None: | |
| items = transcript.fetch() | |
| text = " ".join(i.get("text", "") for i in items).strip() | |
| if text: | |
| return text[:8000] | |
| except Exception as e: | |
| # Captions might be disabled - we'll try fallback | |
| caption_error = f"{type(e).__name__}: {e}" | |
| # Fallback: Download audio and transcribe with Whisper | |
| try: | |
| return youtube_audio_transcribe.invoke({"video_url": video_url}) | |
| except Exception as whisper_error: | |
| return ( | |
| f"Transcript error: Captions unavailable and audio transcription failed.\n" | |
| f"Caption error: {caption_error or 'Unknown'}\n" | |
| f"Whisper error: {whisper_error}\n\n" | |
| f"Suggestion: Try using web_search to find information about this video instead." | |
| ) | |
| def youtube_audio_transcribe(video_url: str) -> str: | |
| """Download YouTube audio and transcribe with Whisper-1. | |
| Use when captions are unavailable or you want an audio-based transcript. | |
| Args: | |
| video_url: The YouTube video URL | |
| """ | |
| video_id = _extract_youtube_id(video_url) | |
| if not video_id: | |
| return f"Could not extract video ID from: {video_url}" | |
| # Create temp directory for audio | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| audio_path = f"{tmpdir}/{video_id}.webm" | |
| # Download audio using yt-dlp | |
| result = subprocess.run( | |
| [ | |
| "yt-dlp", | |
| "-f", "bestaudio/best", | |
| "-o", audio_path, | |
| "--no-playlist", | |
| "--max-filesize", "25M", | |
| video_url, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=120 | |
| ) | |
| if result.returncode != 0 or not os.path.exists(audio_path): | |
| raise RuntimeError(f"yt-dlp failed.\nSTDERR:\n{result.stderr}\nSTDOUT:\n{result.stdout}") | |
| return _transcribe_audio_file(audio_path) | |
| def audio_transcribe(file_path: str) -> str: | |
| """Transcribe an audio file to text using speech recognition. | |
| Args: | |
| file_path: Path to the audio file (.mp3, .wav, .m4a, etc.) or an http/https URL | |
| """ | |
| try: | |
| # If it's a URL, download first | |
| if file_path.lower().startswith(("http://", "https://")): | |
| with tempfile.NamedTemporaryFile(suffix=".audio", delete=False) as tmp: | |
| r = requests.get(file_path, timeout=120) | |
| r.raise_for_status() | |
| tmp.write(r.content) | |
| tmp_path = tmp.name | |
| try: | |
| return _transcribe_audio_file(tmp_path) | |
| finally: | |
| os.unlink(tmp_path) | |
| else: | |
| return _transcribe_audio_file(file_path) | |
| except Exception as e: | |
| return f"Transcription error: {str(e)}" | |
| def video_metadata(video_url: str) -> str: | |
| """Fetch coarse metadata for a video (duration, resolution, fps, title) using yt-dlp. | |
| Args: | |
| video_url: The video URL (YouTube or direct link) | |
| """ | |
| try: | |
| result = subprocess.run( | |
| [ | |
| "yt-dlp", | |
| "--dump-single-json", | |
| "--no-playlist", | |
| "--no-warnings", | |
| video_url, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=90, | |
| ) | |
| if result.returncode != 0: | |
| return f"Metadata error: yt-dlp failed.\nStdout: {result.stdout[:4000]}\nStderr: {result.stderr[:4000]}" | |
| data = json.loads(result.stdout) | |
| core = { | |
| "title": data.get("title"), | |
| "uploader": data.get("uploader"), | |
| "duration_seconds": data.get("duration"), | |
| "width": data.get("width"), | |
| "height": data.get("height"), | |
| "fps": data.get("fps"), | |
| "url": video_url, | |
| } | |
| return json.dumps(core, indent=2) | |
| except Exception as e: | |
| return f"Metadata error: {str(e)}" | |
| def video_frame_analyze( | |
| video_url: str, | |
| vision_task_prompt: str, | |
| scene_threshold: Optional[float] = None, | |
| scene_threshold_low: float = 0.2, | |
| scene_threshold_high: float = 0.4, | |
| max_frames: int = 120, | |
| batch_size: int = 6, | |
| ) -> str: | |
| """Download a video, extract scene-change frames, and run GPT-4o vision batches. | |
| Args: | |
| video_url: URL to the video (YouTube or direct) | |
| vision_task_prompt: Task for the vision model (e.g., count bird species per frame) | |
| scene_threshold: Optional direct ffmpeg scene threshold (0-1). If None, use mid of low/high. | |
| scene_threshold_low: Lower bound for threshold (default 0.2) | |
| scene_threshold_high: Upper bound for threshold (default 0.4) | |
| max_frames: Cap on frames to send to vision (downsamples if exceeded). | |
| batch_size: Number of frames per GPT-4o call (keep modest to control context size). | |
| """ | |
| tmpdir = tempfile.mkdtemp(prefix="video_analyze_") | |
| try: | |
| video_path = os.path.join(tmpdir, "video.mp4") | |
| frame_dir = os.path.join(tmpdir, "frames") | |
| os.makedirs(frame_dir, exist_ok=True) | |
| # Step 1: obtain video (URL via yt-dlp, or local path copy) | |
| if video_url.lower().startswith(("http://", "https://")): | |
| # Ensure ffmpeg exists for merging | |
| if shutil.which("ffmpeg") is None and shutil.which("avconv") is None: | |
| return "Download error: ffmpeg/avconv not found in PATH; required for muxing." | |
| # Use an AVC/H.264 + m4a combination to avoid unsupported codecs, cap at 1080p. | |
| out_template = os.path.join(tmpdir, "video.%(ext)s") | |
| dl = subprocess.run( | |
| [ | |
| "yt-dlp", | |
| "-f", | |
| "bestvideo[ext=mp4][vcodec^=avc1][height<=1080]+bestaudio[ext=m4a]/best[ext=mp4]/best", | |
| "--merge-output-format", | |
| "mp4", | |
| "--recode-video", | |
| "mp4", | |
| "--no-keep-video", | |
| "--no-playlist", | |
| "--no-warnings", | |
| "-o", | |
| out_template, | |
| video_url, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=240, | |
| ) | |
| if dl.returncode != 0: | |
| return f"Download error: {dl.stderr[:4000] or dl.stdout[:4000]}" | |
| # Locate the merged/re-encoded mp4 | |
| candidates = [ | |
| os.path.join(tmpdir, f) | |
| for f in os.listdir(tmpdir) | |
| if f.lower().endswith(".mp4") | |
| ] | |
| if not candidates: | |
| return ( | |
| "Download error: no mp4 produced after merge/recode. " | |
| f"yt-dlp stdout: {dl.stdout[:2000]} stderr: {dl.stderr[:2000]}" | |
| ) | |
| # Pick the largest mp4 (most likely the merged one) | |
| best_mp4 = max(candidates, key=lambda p: os.path.getsize(p)) | |
| if os.path.getsize(best_mp4) < 1024: | |
| return ( | |
| "Download error: merged file is empty or too small. " | |
| f"yt-dlp stdout: {dl.stdout[:2000]} stderr: {dl.stderr[:2000]}" | |
| ) | |
| shutil.move(best_mp4, video_path) | |
| else: | |
| if not os.path.exists(video_url): | |
| return f"Video path not found: {video_url}" | |
| shutil.copy2(video_url, video_path) | |
| # Step 2: choose scene threshold | |
| thr_low = max(0.0, min(1.0, scene_threshold_low)) | |
| thr_high = max(thr_low, min(1.0, scene_threshold_high)) | |
| if scene_threshold is not None: | |
| thr = max(thr_low, min(thr_high, scene_threshold)) | |
| else: | |
| thr = (thr_low + thr_high) / 2.0 | |
| # Step 3: extract frames on scene changes | |
| ffmpeg_cmd = [ | |
| "ffmpeg", | |
| "-i", | |
| video_path, | |
| "-vf", | |
| f"select='gt(scene,{thr})',showinfo", | |
| "-vsync", | |
| "vfr", | |
| os.path.join(frame_dir, "frame_%05d.jpg"), | |
| ] | |
| ff = subprocess.run( | |
| ffmpeg_cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=180, | |
| ) | |
| frames = sorted( | |
| [ | |
| os.path.join(frame_dir, f) | |
| for f in os.listdir(frame_dir) | |
| if f.lower().endswith((".jpg", ".jpeg", ".png", ".webp")) | |
| ] | |
| ) | |
| if not frames: | |
| return f"No frames extracted with scene threshold {thr}. ffmpeg stderr: {ff.stderr[:2000]}" | |
| total_frames = len(frames) | |
| if total_frames > max_frames: | |
| step = math.ceil(total_frames / max_frames) | |
| frames = frames[::step] | |
| # Step 4: batch frames and call GPT-4o vision | |
| batches = [frames[i : i + batch_size] for i in range(0, len(frames), batch_size)] | |
| batch_outputs: List[Dict[str, str]] = [] | |
| for idx, batch in enumerate(batches, start=1): | |
| content = [ | |
| { | |
| "type": "text", | |
| "text": ( | |
| "You are a vision assistant. " | |
| "For each image, run the requested task and return a compact JSON array " | |
| "with objects: {frame_id, result}. " | |
| "frame_id should match the filename. " | |
| "Task:\n" | |
| f"{vision_task_prompt}" | |
| ), | |
| } | |
| ] | |
| for p in batch: | |
| data_url = _encode_image_to_data_url(p) | |
| content.append( | |
| { | |
| "type": "image_url", | |
| "image_url": {"url": data_url}, | |
| } | |
| ) | |
| resp = client.chat.completions.create( | |
| model="gpt-4o", | |
| messages=[{"role": "user", "content": content}], | |
| max_tokens=1200, | |
| ) | |
| batch_outputs.append( | |
| { | |
| "batch_index": idx, | |
| "frames": [os.path.basename(p) for p in batch], | |
| "response": resp.choices[0].message.content, | |
| } | |
| ) | |
| summary = { | |
| "scene_threshold_used": thr, | |
| "frames_extracted": total_frames, | |
| "frames_sent": len(frames), | |
| "batch_size": batch_size, | |
| "batches": batch_outputs, | |
| } | |
| return json.dumps(summary, indent=2) | |
| except Exception as e: | |
| return f"Video frame analyze error: {str(e)}" | |
| finally: | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |