agentbee / src /tools /youtube.py
mangubee's picture
fix: correct author name formatting in multiple files
e7b4937
"""
YouTube Video Analysis Tool - Extract transcripts or analyze frames from YouTube videos
Author: @mangubee
Date: 2026-01-13
Provides two modes for YouTube video analysis:
- Transcript Mode: youtube-transcript-api (instant, 1-3 seconds) or Whisper fallback
- Frame Mode: Extract video frames and analyze with vision models
Transcript Mode Workflow:
YouTube URL
├─ Has transcript? ✅ → Use youtube-transcript-api (instant)
└─ No transcript? ❌ → Download audio + Whisper (slower, but works)
Frame Mode Workflow:
YouTube URL
├─ Download video with yt-dlp
├─ Extract N frames at regular intervals
└─ Analyze frames with vision models (summarize findings)
Requirements:
- youtube-transcript-api: pip install youtube-transcript-api
- yt-dlp: pip install yt-dlp
- openai: pip install openai (via src.tools.audio)
- opencv-python: pip install opencv-python (for frame extraction)
- PIL: pip install Pillow (for image handling)
"""
import logging
import os
import re
import tempfile
from typing import Dict, Any, Optional
from pathlib import Path
# ============================================================================
# CONFIG
# ============================================================================
# YouTube URL patterns
YOUTUBE_PATTERNS = [
r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/shorts\/)([a-zA-Z0-9_-]{11})',
]
# Audio download settings
AUDIO_FORMAT = "mp3"
AUDIO_QUALITY = "128" # 128 kbps (sufficient for speech)
# Frame extraction settings
FRAME_COUNT = 6 # Number of frames to extract
FRAME_QUALITY = "worst" # YouTube-dl format quality for frame extraction (worst = faster download)
# Temporary file cleanup
CLEANUP_TEMP_FILES = True
# ============================================================================
# Logging Setup
# ============================================================================
logger = logging.getLogger(__name__)
# ============================================================================
# Transcript Cache
# ============================================================================
def save_transcript_to_cache(video_id: str, text: str, source: str) -> None:
"""
Save transcript to _log/ folder for debugging.
Args:
video_id: YouTube video ID
text: Transcript text
source: "api" or "whisper"
"""
try:
log_dir = Path("_log")
log_dir.mkdir(exist_ok=True)
cache_file = log_dir / f"{video_id}_transcript.md"
with open(cache_file, "w", encoding="utf-8") as f:
f.write(f"# YouTube Transcript\n\n")
f.write(f"**Video ID:** {video_id}\n")
f.write(f"**Source:** {source}\n")
f.write(f"**Length:** {len(text)} characters\n")
f.write(f"**Generated:** {__import__('datetime').datetime.now().isoformat()}\n\n")
f.write(f"## Transcript\n\n")
f.write(f"{text}\n")
logger.info(f"Transcript saved: {cache_file}")
except Exception as e:
logger.warning(f"Failed to save transcript: {e}")
# ============================================================================
# YouTube URL Parser
# =============================================================================
def extract_video_id(url: str) -> Optional[str]:
"""
Extract video ID from various YouTube URL formats.
Supports:
- youtube.com/watch?v=VIDEO_ID
- youtu.be/VIDEO_ID
- youtube.com/shorts/VIDEO_ID
Args:
url: YouTube URL
Returns:
Video ID (11 characters) or None if not found
Examples:
>>> extract_video_id("https://youtube.com/watch?v=dQw4w9WgXcQ")
"dQw4w9WgXcQ"
>>> extract_video_id("https://youtu.be/dQw4w9WgXcQ")
"dQw4w9WgXcQ"
"""
if not url:
return None
for pattern in YOUTUBE_PATTERNS:
match = re.search(pattern, url)
if match:
return match.group(1)
return None
# ============================================================================
# Transcript Extraction (Primary Method)
# =============================================================================
def get_youtube_transcript(video_id: str) -> Dict[str, Any]:
"""
Get transcript using youtube-transcript-api.
Args:
video_id: YouTube video ID (11 characters)
Returns:
Dict with structure: {
"text": str, # Transcript text
"video_id": str, # Video ID
"source": str, # "api" or "whisper"
"success": bool, # True if transcription succeeded
"error": str or None # Error message if failed
}
"""
try:
from youtube_transcript_api import YouTubeTranscriptApi
logger.info(f"Fetching transcript for video: {video_id}")
# Get transcript (auto-detect language, prefer English)
# Note: fetch() is an instance method in newer versions
api = YouTubeTranscriptApi()
transcript_list = api.fetch(
video_id,
languages=['en', 'en-US', 'en-GB']
)
# Clean transcript: remove timestamps, combine segments
text_parts = []
for entry in transcript_list:
text = entry.get('text', '').strip()
if text:
text_parts.append(text)
text = ' '.join(text_parts)
logger.info(f"Transcript fetched: {len(text)} characters")
# Save to cache for debugging
save_transcript_to_cache(video_id, text, "api")
return {
"text": text,
"video_id": video_id,
"source": "api",
"success": True,
"error": None
}
except Exception as e:
error_msg = str(e)
logger.error(f"YouTube transcript API failed: {error_msg}")
# Check if error is "No transcript found" (expected for videos without captions)
if "No transcript found" in error_msg or "Could not retrieve a transcript" in error_msg:
return {
"text": "",
"video_id": video_id,
"source": "api",
"success": False,
"error": "No transcript available (video may not have captions)"
}
return {
"text": "",
"video_id": video_id,
"source": "api",
"success": False,
"error": f"Transcript API error: {error_msg}"
}
# ============================================================================
# Audio Fallback (Secondary Method)
# =============================================================================
def download_audio(video_url: str) -> Optional[str]:
"""
Download audio from YouTube using yt-dlp.
Args:
video_url: Full YouTube URL
Returns:
Path to downloaded audio file or None if failed
"""
try:
import yt_dlp
logger.info(f"Downloading audio from: {video_url}")
# Create temp file for audio
temp_dir = tempfile.gettempdir()
output_path = os.path.join(temp_dir, f"youtube_audio_{os.getpid()}.{AUDIO_FORMAT}")
# yt-dlp options: audio only, best quality
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': AUDIO_FORMAT,
'preferredquality': AUDIO_QUALITY,
}],
'outtmpl': output_path.replace(f'.{AUDIO_FORMAT}', ''),
'quiet': True,
'no_warnings': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([video_url])
# yt-dlp adds .mp3 extension, adjust path
actual_path = output_path if os.path.exists(output_path) else output_path
if os.path.exists(actual_path):
logger.info(f"Audio downloaded: {actual_path} ({os.path.getsize(actual_path)} bytes)")
return actual_path
else:
# Find the file with the correct extension
for file in os.listdir(temp_dir):
if file.startswith(f"youtube_audio_{os.getpid()}"):
actual_path = os.path.join(temp_dir, file)
logger.info(f"Audio downloaded: {actual_path}")
return actual_path
logger.error("Audio file not found after download")
return None
except ImportError:
logger.error("yt-dlp not installed. Run: pip install yt-dlp")
return None
except Exception as e:
logger.error(f"Audio download failed: {e}")
return None
def transcribe_from_audio(video_url: str) -> Dict[str, Any]:
"""
Fallback: Download audio and transcribe with Whisper.
Args:
video_url: Full YouTube URL
Returns:
Dict with structure: {
"text": str, # Transcript text
"video_id": str, # Video ID
"source": str, # "whisper"
"success": bool, # True if transcription succeeded
"error": str or None # Error message if failed
}
"""
video_id = extract_video_id(video_url)
if not video_id:
return {
"text": "",
"video_id": "",
"source": "whisper",
"success": False,
"error": "Invalid YouTube URL"
}
# Download audio
audio_file = download_audio(video_url)
if not audio_file:
return {
"text": "",
"video_id": video_id,
"source": "whisper",
"success": False,
"error": "Failed to download audio"
}
try:
# Import transcribe_audio (avoid circular import)
from src.tools.audio import transcribe_audio
# Transcribe with Whisper
result = transcribe_audio(audio_file)
# Cleanup temp file
if CLEANUP_TEMP_FILES:
try:
os.remove(audio_file)
logger.info(f"Cleaned up temp file: {audio_file}")
except Exception as e:
logger.warning(f"Failed to cleanup temp file: {e}")
if result["success"]:
# Save to cache for debugging
save_transcript_to_cache(video_id, result["text"], "whisper")
return {
"text": result["text"],
"video_id": video_id,
"source": "whisper",
"success": True,
"error": None
}
else:
return {
"text": "",
"video_id": video_id,
"source": "whisper",
"success": False,
"error": result.get("error", "Transcription failed")
}
except Exception as e:
logger.error(f"Whisper transcription failed: {e}")
return {
"text": "",
"video_id": video_id,
"source": "whisper",
"success": False,
"error": f"Whisper transcription failed: {str(e)}"
}
# ============================================================================
# Frame Processing (Video Analysis Mode)
# =============================================================================
def download_video(url: str) -> Optional[str]:
"""
Download video from YouTube using yt-dlp for frame extraction.
Args:
url: Full YouTube URL
Returns:
Path to downloaded video file or None if failed
"""
try:
import yt_dlp
logger.info(f"Downloading video from: {url}")
# Create temp file for video
temp_dir = tempfile.gettempdir()
output_path = os.path.join(temp_dir, f"youtube_video_{os.getpid()}")
# yt-dlp options: video only, lowest quality (faster for frame extraction)
ydl_opts = {
'format': f'best[ext=mp4]/best',
'outtmpl': output_path,
'quiet': True,
'no_warnings': True,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
# Find the downloaded file (yt-dlp adds extension)
for file in os.listdir(temp_dir):
if file.startswith(f"youtube_video_{os.getpid()}"):
actual_path = os.path.join(temp_dir, file)
size_mb = os.path.getsize(actual_path) / (1024 * 1024)
logger.info(f"Video downloaded: {actual_path} ({size_mb:.2f}MB)")
return actual_path
logger.error("Video file not found after download")
return None
except ImportError:
logger.error("yt-dlp not installed. Run: pip install yt-dlp")
return None
except Exception as e:
logger.error(f"Video download failed: {e}")
return None
def extract_frames(video_path: str, count: int = FRAME_COUNT) -> list:
"""
Extract frames from video at regular intervals.
Args:
video_path: Path to video file
count: Number of frames to extract (default: FRAME_COUNT)
Returns:
List of (frame_path, timestamp) tuples
"""
try:
import cv2
cap = cv2.VideoCapture(video_path)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
duration = total_frames / fps if fps > 0 else 0
logger.info(f"Video: {total_frames} frames, {fps:.2f} FPS, {duration:.2f}s duration")
# Calculate frame indices at regular intervals
if total_frames <= count:
frame_indices = list(range(total_frames))
else:
interval = total_frames / count
frame_indices = [int(i * interval) for i in range(count)]
logger.info(f"Extracting {len(frame_indices)} frames at indices: {frame_indices[:3]}...")
frames = []
temp_dir = tempfile.gettempdir()
for idx, frame_idx in enumerate(frame_indices):
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
ret, frame = cap.read()
if ret:
timestamp = frame_idx / fps if fps > 0 else 0
frame_path = os.path.join(temp_dir, f"frame_{os.getpid()}_{idx}.jpg")
cv2.imwrite(frame_path, frame)
frames.append((frame_path, timestamp))
logger.debug(f"Frame {idx}: {timestamp:.2f}s -> {frame_path}")
else:
logger.warning(f"Failed to extract frame at index {frame_idx}")
cap.release()
logger.info(f"Extracted {len(frames)} frames")
return frames
except ImportError:
logger.error("opencv-python not installed. Run: pip install opencv-python")
return []
except Exception as e:
logger.error(f"Frame extraction failed: {e}")
return []
def analyze_frames(frames: list, question: str = None) -> Dict[str, Any]:
"""
Analyze video frames using vision models.
Args:
frames: List of (frame_path, timestamp) tuples
question: Optional question to ask about frames
Returns:
Dict with structure: {
"text": str, # Summarized analysis
"video_id": str, # Video ID (placeholder)
"source": str, # "frames"
"success": bool, # True if analysis succeeded
"error": str or None # Error message if failed
"frame_count": int, # Number of frames analyzed
}
"""
from src.tools.vision import analyze_image
if not frames:
return {
"text": "",
"video_id": "",
"source": "frames",
"success": False,
"error": "No frames to analyze",
"frame_count": 0,
}
# Default question for frame analysis
if not question:
question = "Describe what you see in this frame. Include any visible text, objects, people, or actions."
try:
logger.info(f"Analyzing {len(frames)} frames with vision model...")
frame_analyses = []
for idx, (frame_path, timestamp) in enumerate(frames):
logger.info(f"Analyzing frame {idx + 1}/{len(frames)} at {timestamp:.2f}s...")
# Customize question with timestamp context
frame_question = f"This is frame {idx + 1} of {len(frames)} from a video at timestamp {timestamp:.2f} seconds. {question}"
try:
result = analyze_image(frame_path, frame_question)
answer = result.get("answer", "")
# Add timestamp context
frame_analyses.append(f"[Frame {idx + 1} @ {timestamp:.2f}s]\n{answer}")
logger.info(f"Frame {idx + 1} analyzed: {len(answer)} chars")
except Exception as e:
logger.warning(f"Frame {idx + 1} analysis failed: {e}")
frame_analyses.append(f"[Frame {idx + 1} @ {timestamp:.2f}s]\nAnalysis failed: {str(e)}")
# Cleanup frame files
if CLEANUP_TEMP_FILES:
for frame_path, _ in frames:
try:
os.remove(frame_path)
except Exception as e:
logger.warning(f"Failed to cleanup frame {frame_path}: {e}")
# Combine all frame analyses
combined_text = "\n\n".join(frame_analyses)
logger.info(f"Frame analysis complete: {len(combined_text)} chars total")
return {
"text": combined_text,
"video_id": "",
"source": "frames",
"success": True,
"error": None,
"frame_count": len(frames),
}
except Exception as e:
logger.error(f"Frame analysis failed: {e}")
return {
"text": "",
"video_id": "",
"source": "frames",
"success": False,
"error": f"Frame analysis failed: {str(e)}",
"frame_count": len(frames),
}
def process_video_frames(url: str, question: str = None, frame_count: int = FRAME_COUNT) -> Dict[str, Any]:
"""
Download video, extract frames, and analyze with vision models.
Args:
url: Full YouTube URL
question: Optional question to ask about frames
frame_count: Number of frames to extract
Returns:
Dict with structure: {
"text": str, # Combined frame analyses
"video_id": str, # Video ID
"source": str, # "frames"
"success": bool, # True if processing succeeded
"error": str or None # Error message if failed
"frame_count": int # Number of frames analyzed
}
"""
video_id = extract_video_id(url)
if not video_id:
return {
"text": "",
"video_id": "",
"source": "frames",
"success": False,
"error": "Invalid YouTube URL",
"frame_count": 0,
}
# Download video
video_file = download_video(url)
if not video_file:
return {
"text": "",
"video_id": video_id,
"source": "frames",
"success": False,
"error": "Failed to download video",
"frame_count": 0,
}
try:
# Extract frames
frames = extract_frames(video_file, frame_count)
if not frames:
return {
"text": "",
"video_id": video_id,
"source": "frames",
"success": False,
"error": "Failed to extract frames",
"frame_count": 0,
}
# Analyze frames
result = analyze_frames(frames, question)
# Cleanup temp video file
if CLEANUP_TEMP_FILES:
try:
os.remove(video_file)
logger.info(f"Cleaned up temp video: {video_file}")
except Exception as e:
logger.warning(f"Failed to cleanup temp video: {e}")
# Add video_id to result
result["video_id"] = video_id
return result
except Exception as e:
logger.error(f"Video frame processing failed: {e}")
return {
"text": "",
"video_id": video_id,
"source": "frames",
"success": False,
"error": f"Video processing failed: {str(e)}",
"frame_count": 0,
}
# ============================================================================
# Main API Function
# =============================================================================
def youtube_analyze(url: str, mode: str = "transcript") -> Dict[str, Any]:
"""
Analyze YouTube video using transcript or frame processing mode.
Transcript Mode: Extract transcript (youtube-transcript-api or Whisper)
Frame Mode: Extract frames and analyze with vision models
Args:
url: YouTube video URL (youtube.com, youtu.be, shorts)
mode: Analysis mode - "transcript" (default) or "frames"
Returns:
Dict with structure: {
"text": str, # Transcript or frame analyses
"video_id": str, # Video ID
"source": str, # "api", "whisper", or "frames"
"success": bool, # True if analysis succeeded
"error": str or None # Error message if failed
"frame_count": int # Number of frames (frame mode only)
}
Raises:
ValueError: If URL is not valid or mode is invalid
Examples:
>>> youtube_analyze("https://youtube.com/watch?v=dQw4w9WgXcQ", mode="transcript")
{"text": "Never gonna give you up...", "video_id": "dQw4w9WgXcQ", "source": "api", "success": True, "error": None}
>>> youtube_analyze("https://youtube.com/watch?v=dQw4w9WgXcQ", mode="frames")
{"text": "[Frame 1 @ 0.00s]\nA man...", "video_id": "dQw4w9WgXcQ", "source": "frames", "success": True, "frame_count": 6, "error": None}
"""
# Validate URL and extract video ID
video_id = extract_video_id(url)
if not video_id:
logger.error(f"Invalid YouTube URL: {url}")
return {
"text": "",
"video_id": "",
"source": "none",
"success": False,
"error": f"Invalid YouTube URL: {url}"
}
# Validate mode
mode = mode.lower()
if mode not in ("transcript", "frames"):
logger.error(f"Invalid mode: {mode}")
return {
"text": "",
"video_id": video_id,
"source": "none",
"success": False,
"error": f"Invalid mode: {mode}. Valid: transcript, frames"
}
logger.info(f"Processing YouTube video: {video_id} (mode: {mode})")
# Route to appropriate processing mode
if mode == "frames":
# Frame processing mode
result = process_video_frames(url)
if result["success"]:
logger.info(f"Frame analysis complete: {result.get('frame_count', 0)} frames, {len(result['text'])} chars")
return result
else: # mode == "transcript"
# Transcript mode: Try API first, fallback to Whisper
result = get_youtube_transcript(video_id)
if result["success"]:
logger.info(f"Transcript retrieved via API: {len(result['text'])} characters")
logger.info(f"Transcript content: {result['text'][:200]}...")
return result
# Fallback to audio transcription (slow but works)
logger.info(f"Transcript API failed, trying audio transcription...")
result = transcribe_from_audio(url)
if result["success"]:
logger.info(f"Transcript retrieved via Whisper: {len(result['text'])} characters")
logger.info(f"Full transcript: {result['text']}")
else:
logger.error(f"All transcript methods failed for video: {video_id}")
return result
# Backward compatibility wrapper that respects YOUTUBE_MODE environment variable
def youtube_transcript(url: str) -> Dict[str, Any]:
"""
Wrapper for youtube_analyze that respects YOUTUBE_MODE environment variable.
This allows the agent to switch between transcript and frame modes
without changing the function signature used in the graph.
Mode selection:
- YOUTUBE_MODE env variable (set by UI): "transcript" or "frames"
- Default: "transcript" (backward compatible)
Args:
url: YouTube video URL
Returns:
Dict with structure from youtube_analyze()
"""
# Read mode from environment variable (set by app.py UI)
mode = os.getenv("YOUTUBE_MODE", "transcript").lower()
logger.info(f"youtube_transcript called with YOUTUBE_MODE={mode}")
return youtube_analyze(url, mode=mode)