GAIA-Agent / tools /media_tools.py
DenisRz's picture
Initial upload: GAIA Agent
67d287e
"""
Media processing tools for the GAIA Agent.
Includes YouTube transcript, audio transcription, and video analysis.
"""
import os
import json
import math
import shutil
import tempfile
import subprocess
from typing import Optional, List, Dict
from urllib.parse import urlparse, parse_qs
import requests
import openai
import base64
from langchain_core.tools import tool
from dotenv import load_dotenv
load_dotenv()
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def _extract_youtube_id(url: str) -> Optional[str]:
"""Extract YouTube video ID from various URL formats."""
try:
u = urlparse(url)
host = (u.netloc or "").lower()
path = u.path or ""
# watch?v=VIDEO_ID
qs = parse_qs(u.query)
if "v" in qs and qs["v"]:
vid = qs["v"][0]
if len(vid) == 11:
return vid
# youtu.be/VIDEO_ID
if "youtu.be" in host:
seg = path.strip("/").split("/")
if seg and len(seg[0]) == 11:
return seg[0]
# /embed/VIDEO_ID, /shorts/VIDEO_ID, /live/VIDEO_ID
parts = path.strip("/").split("/")
for i, p in enumerate(parts[:-1]):
if p in {"embed", "shorts", "live"}:
vid = parts[i + 1]
if len(vid) == 11:
return vid
return None
except Exception:
return None
def _transcribe_audio_file(path: str) -> str:
"""Shared helper to transcribe a local audio file with Whisper-1."""
with open(path, "rb") as audio_file:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
response_format="text"
)
return transcript
def _encode_image_to_data_url(path: str) -> str:
"""Helper to turn a local image into a data: URL for GPT-4o vision."""
with open(path, "rb") as img_file:
image_data = base64.b64encode(img_file.read()).decode("utf-8")
ext = path.lower().split('.')[-1]
media_type = {
"png": "image/png",
"jpg": "image/jpeg",
"jpeg": "image/jpeg",
"gif": "image/gif",
"webp": "image/webp",
}.get(ext, "image/png")
return f"data:{media_type};base64,{image_data}"
@tool
def youtube_transcript(video_url: str, languages: str = "en") -> str:
"""Get the transcript/captions from a YouTube video.
First tries to get existing captions. If captions are disabled,
falls back to downloading audio and transcribing with Whisper.
Args:
video_url: The YouTube video URL
languages: Comma-separated language codes to prefer (default: "en")
"""
video_id = _extract_youtube_id(video_url)
if not video_id:
return f"Could not extract video ID from: {video_url}"
lang_list = [l.strip() for l in languages.split(",") if l.strip()]
# First, try to get existing captions
caption_error = None
try:
from youtube_transcript_api import YouTubeTranscriptApi
api = YouTubeTranscriptApi()
tlist = api.fetch(video_id)
transcript = None
# Prefer manually created captions
for lang in lang_list:
try:
transcript = tlist.find_manually_created_transcript([lang])
break
except Exception:
pass
# Otherwise try auto-generated captions
if transcript is None:
for lang in lang_list:
try:
transcript = tlist.find_generated_transcript([lang])
break
except Exception:
pass
# Otherwise fall back to whatever exists for those languages
if transcript is None:
for lang in lang_list:
try:
transcript = tlist.find_transcript([lang])
break
except Exception:
pass
if transcript is not None:
items = transcript.fetch()
text = " ".join(i.get("text", "") for i in items).strip()
if text:
return text[:8000]
except Exception as e:
# Captions might be disabled - we'll try fallback
caption_error = f"{type(e).__name__}: {e}"
# Fallback: Download audio and transcribe with Whisper
try:
return youtube_audio_transcribe.invoke({"video_url": video_url})
except Exception as whisper_error:
return (
f"Transcript error: Captions unavailable and audio transcription failed.\n"
f"Caption error: {caption_error or 'Unknown'}\n"
f"Whisper error: {whisper_error}\n\n"
f"Suggestion: Try using web_search to find information about this video instead."
)
@tool
def youtube_audio_transcribe(video_url: str) -> str:
"""Download YouTube audio and transcribe with Whisper-1.
Use when captions are unavailable or you want an audio-based transcript.
Args:
video_url: The YouTube video URL
"""
video_id = _extract_youtube_id(video_url)
if not video_id:
return f"Could not extract video ID from: {video_url}"
# Create temp directory for audio
with tempfile.TemporaryDirectory() as tmpdir:
audio_path = f"{tmpdir}/{video_id}.webm"
# Download audio using yt-dlp
result = subprocess.run(
[
"yt-dlp",
"-f", "bestaudio/best",
"-o", audio_path,
"--no-playlist",
"--max-filesize", "25M",
video_url,
],
capture_output=True,
text=True,
timeout=120
)
if result.returncode != 0 or not os.path.exists(audio_path):
raise RuntimeError(f"yt-dlp failed.\nSTDERR:\n{result.stderr}\nSTDOUT:\n{result.stdout}")
return _transcribe_audio_file(audio_path)
@tool
def audio_transcribe(file_path: str) -> str:
"""Transcribe an audio file to text using speech recognition.
Args:
file_path: Path to the audio file (.mp3, .wav, .m4a, etc.) or an http/https URL
"""
try:
# If it's a URL, download first
if file_path.lower().startswith(("http://", "https://")):
with tempfile.NamedTemporaryFile(suffix=".audio", delete=False) as tmp:
r = requests.get(file_path, timeout=120)
r.raise_for_status()
tmp.write(r.content)
tmp_path = tmp.name
try:
return _transcribe_audio_file(tmp_path)
finally:
os.unlink(tmp_path)
else:
return _transcribe_audio_file(file_path)
except Exception as e:
return f"Transcription error: {str(e)}"
@tool
def video_metadata(video_url: str) -> str:
"""Fetch coarse metadata for a video (duration, resolution, fps, title) using yt-dlp.
Args:
video_url: The video URL (YouTube or direct link)
"""
try:
result = subprocess.run(
[
"yt-dlp",
"--dump-single-json",
"--no-playlist",
"--no-warnings",
video_url,
],
capture_output=True,
text=True,
timeout=90,
)
if result.returncode != 0:
return f"Metadata error: yt-dlp failed.\nStdout: {result.stdout[:4000]}\nStderr: {result.stderr[:4000]}"
data = json.loads(result.stdout)
core = {
"title": data.get("title"),
"uploader": data.get("uploader"),
"duration_seconds": data.get("duration"),
"width": data.get("width"),
"height": data.get("height"),
"fps": data.get("fps"),
"url": video_url,
}
return json.dumps(core, indent=2)
except Exception as e:
return f"Metadata error: {str(e)}"
@tool
def video_frame_analyze(
video_url: str,
vision_task_prompt: str,
scene_threshold: Optional[float] = None,
scene_threshold_low: float = 0.2,
scene_threshold_high: float = 0.4,
max_frames: int = 120,
batch_size: int = 6,
) -> str:
"""Download a video, extract scene-change frames, and run GPT-4o vision batches.
Args:
video_url: URL to the video (YouTube or direct)
vision_task_prompt: Task for the vision model (e.g., count bird species per frame)
scene_threshold: Optional direct ffmpeg scene threshold (0-1). If None, use mid of low/high.
scene_threshold_low: Lower bound for threshold (default 0.2)
scene_threshold_high: Upper bound for threshold (default 0.4)
max_frames: Cap on frames to send to vision (downsamples if exceeded).
batch_size: Number of frames per GPT-4o call (keep modest to control context size).
"""
tmpdir = tempfile.mkdtemp(prefix="video_analyze_")
try:
video_path = os.path.join(tmpdir, "video.mp4")
frame_dir = os.path.join(tmpdir, "frames")
os.makedirs(frame_dir, exist_ok=True)
# Step 1: obtain video (URL via yt-dlp, or local path copy)
if video_url.lower().startswith(("http://", "https://")):
# Ensure ffmpeg exists for merging
if shutil.which("ffmpeg") is None and shutil.which("avconv") is None:
return "Download error: ffmpeg/avconv not found in PATH; required for muxing."
# Use an AVC/H.264 + m4a combination to avoid unsupported codecs, cap at 1080p.
out_template = os.path.join(tmpdir, "video.%(ext)s")
dl = subprocess.run(
[
"yt-dlp",
"-f",
"bestvideo[ext=mp4][vcodec^=avc1][height<=1080]+bestaudio[ext=m4a]/best[ext=mp4]/best",
"--merge-output-format",
"mp4",
"--recode-video",
"mp4",
"--no-keep-video",
"--no-playlist",
"--no-warnings",
"-o",
out_template,
video_url,
],
capture_output=True,
text=True,
timeout=240,
)
if dl.returncode != 0:
return f"Download error: {dl.stderr[:4000] or dl.stdout[:4000]}"
# Locate the merged/re-encoded mp4
candidates = [
os.path.join(tmpdir, f)
for f in os.listdir(tmpdir)
if f.lower().endswith(".mp4")
]
if not candidates:
return (
"Download error: no mp4 produced after merge/recode. "
f"yt-dlp stdout: {dl.stdout[:2000]} stderr: {dl.stderr[:2000]}"
)
# Pick the largest mp4 (most likely the merged one)
best_mp4 = max(candidates, key=lambda p: os.path.getsize(p))
if os.path.getsize(best_mp4) < 1024:
return (
"Download error: merged file is empty or too small. "
f"yt-dlp stdout: {dl.stdout[:2000]} stderr: {dl.stderr[:2000]}"
)
shutil.move(best_mp4, video_path)
else:
if not os.path.exists(video_url):
return f"Video path not found: {video_url}"
shutil.copy2(video_url, video_path)
# Step 2: choose scene threshold
thr_low = max(0.0, min(1.0, scene_threshold_low))
thr_high = max(thr_low, min(1.0, scene_threshold_high))
if scene_threshold is not None:
thr = max(thr_low, min(thr_high, scene_threshold))
else:
thr = (thr_low + thr_high) / 2.0
# Step 3: extract frames on scene changes
ffmpeg_cmd = [
"ffmpeg",
"-i",
video_path,
"-vf",
f"select='gt(scene,{thr})',showinfo",
"-vsync",
"vfr",
os.path.join(frame_dir, "frame_%05d.jpg"),
]
ff = subprocess.run(
ffmpeg_cmd,
capture_output=True,
text=True,
timeout=180,
)
frames = sorted(
[
os.path.join(frame_dir, f)
for f in os.listdir(frame_dir)
if f.lower().endswith((".jpg", ".jpeg", ".png", ".webp"))
]
)
if not frames:
return f"No frames extracted with scene threshold {thr}. ffmpeg stderr: {ff.stderr[:2000]}"
total_frames = len(frames)
if total_frames > max_frames:
step = math.ceil(total_frames / max_frames)
frames = frames[::step]
# Step 4: batch frames and call GPT-4o vision
batches = [frames[i : i + batch_size] for i in range(0, len(frames), batch_size)]
batch_outputs: List[Dict[str, str]] = []
for idx, batch in enumerate(batches, start=1):
content = [
{
"type": "text",
"text": (
"You are a vision assistant. "
"For each image, run the requested task and return a compact JSON array "
"with objects: {frame_id, result}. "
"frame_id should match the filename. "
"Task:\n"
f"{vision_task_prompt}"
),
}
]
for p in batch:
data_url = _encode_image_to_data_url(p)
content.append(
{
"type": "image_url",
"image_url": {"url": data_url},
}
)
resp = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": content}],
max_tokens=1200,
)
batch_outputs.append(
{
"batch_index": idx,
"frames": [os.path.basename(p) for p in batch],
"response": resp.choices[0].message.content,
}
)
summary = {
"scene_threshold_used": thr,
"frames_extracted": total_frames,
"frames_sent": len(frames),
"batch_size": batch_size,
"batches": batch_outputs,
}
return json.dumps(summary, indent=2)
except Exception as e:
return f"Video frame analyze error: {str(e)}"
finally:
shutil.rmtree(tmpdir, ignore_errors=True)