""" YouTube Video Analysis Tool - Extract transcripts or analyze frames from YouTube videos Author: @mangubee Date: 2026-01-13 Provides two modes for YouTube video analysis: - Transcript Mode: youtube-transcript-api (instant, 1-3 seconds) or Whisper fallback - Frame Mode: Extract video frames and analyze with vision models Transcript Mode Workflow: YouTube URL ├─ Has transcript? ✅ → Use youtube-transcript-api (instant) └─ No transcript? ❌ → Download audio + Whisper (slower, but works) Frame Mode Workflow: YouTube URL ├─ Download video with yt-dlp ├─ Extract N frames at regular intervals └─ Analyze frames with vision models (summarize findings) Requirements: - youtube-transcript-api: pip install youtube-transcript-api - yt-dlp: pip install yt-dlp - openai: pip install openai (via src.tools.audio) - opencv-python: pip install opencv-python (for frame extraction) - PIL: pip install Pillow (for image handling) """ import logging import os import re import tempfile from typing import Dict, Any, Optional from pathlib import Path # ============================================================================ # CONFIG # ============================================================================ # YouTube URL patterns YOUTUBE_PATTERNS = [ r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/shorts\/)([a-zA-Z0-9_-]{11})', ] # Audio download settings AUDIO_FORMAT = "mp3" AUDIO_QUALITY = "128" # 128 kbps (sufficient for speech) # Frame extraction settings FRAME_COUNT = 6 # Number of frames to extract FRAME_QUALITY = "worst" # YouTube-dl format quality for frame extraction (worst = faster download) # Temporary file cleanup CLEANUP_TEMP_FILES = True # ============================================================================ # Logging Setup # ============================================================================ logger = logging.getLogger(__name__) # ============================================================================ # Transcript Cache # ============================================================================ def save_transcript_to_cache(video_id: str, text: str, source: str) -> None: """ Save transcript to _log/ folder for debugging. Args: video_id: YouTube video ID text: Transcript text source: "api" or "whisper" """ try: log_dir = Path("_log") log_dir.mkdir(exist_ok=True) cache_file = log_dir / f"{video_id}_transcript.md" with open(cache_file, "w", encoding="utf-8") as f: f.write(f"# YouTube Transcript\n\n") f.write(f"**Video ID:** {video_id}\n") f.write(f"**Source:** {source}\n") f.write(f"**Length:** {len(text)} characters\n") f.write(f"**Generated:** {__import__('datetime').datetime.now().isoformat()}\n\n") f.write(f"## Transcript\n\n") f.write(f"{text}\n") logger.info(f"Transcript saved: {cache_file}") except Exception as e: logger.warning(f"Failed to save transcript: {e}") # ============================================================================ # YouTube URL Parser # ============================================================================= def extract_video_id(url: str) -> Optional[str]: """ Extract video ID from various YouTube URL formats. Supports: - youtube.com/watch?v=VIDEO_ID - youtu.be/VIDEO_ID - youtube.com/shorts/VIDEO_ID Args: url: YouTube URL Returns: Video ID (11 characters) or None if not found Examples: >>> extract_video_id("https://youtube.com/watch?v=dQw4w9WgXcQ") "dQw4w9WgXcQ" >>> extract_video_id("https://youtu.be/dQw4w9WgXcQ") "dQw4w9WgXcQ" """ if not url: return None for pattern in YOUTUBE_PATTERNS: match = re.search(pattern, url) if match: return match.group(1) return None # ============================================================================ # Transcript Extraction (Primary Method) # ============================================================================= def get_youtube_transcript(video_id: str) -> Dict[str, Any]: """ Get transcript using youtube-transcript-api. Args: video_id: YouTube video ID (11 characters) Returns: Dict with structure: { "text": str, # Transcript text "video_id": str, # Video ID "source": str, # "api" or "whisper" "success": bool, # True if transcription succeeded "error": str or None # Error message if failed } """ try: from youtube_transcript_api import YouTubeTranscriptApi logger.info(f"Fetching transcript for video: {video_id}") # Get transcript (auto-detect language, prefer English) # Note: fetch() is an instance method in newer versions api = YouTubeTranscriptApi() transcript_list = api.fetch( video_id, languages=['en', 'en-US', 'en-GB'] ) # Clean transcript: remove timestamps, combine segments text_parts = [] for entry in transcript_list: text = entry.get('text', '').strip() if text: text_parts.append(text) text = ' '.join(text_parts) logger.info(f"Transcript fetched: {len(text)} characters") # Save to cache for debugging save_transcript_to_cache(video_id, text, "api") return { "text": text, "video_id": video_id, "source": "api", "success": True, "error": None } except Exception as e: error_msg = str(e) logger.error(f"YouTube transcript API failed: {error_msg}") # Check if error is "No transcript found" (expected for videos without captions) if "No transcript found" in error_msg or "Could not retrieve a transcript" in error_msg: return { "text": "", "video_id": video_id, "source": "api", "success": False, "error": "No transcript available (video may not have captions)" } return { "text": "", "video_id": video_id, "source": "api", "success": False, "error": f"Transcript API error: {error_msg}" } # ============================================================================ # Audio Fallback (Secondary Method) # ============================================================================= def download_audio(video_url: str) -> Optional[str]: """ Download audio from YouTube using yt-dlp. Args: video_url: Full YouTube URL Returns: Path to downloaded audio file or None if failed """ try: import yt_dlp logger.info(f"Downloading audio from: {video_url}") # Create temp file for audio temp_dir = tempfile.gettempdir() output_path = os.path.join(temp_dir, f"youtube_audio_{os.getpid()}.{AUDIO_FORMAT}") # yt-dlp options: audio only, best quality ydl_opts = { 'format': 'bestaudio/best', 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': AUDIO_FORMAT, 'preferredquality': AUDIO_QUALITY, }], 'outtmpl': output_path.replace(f'.{AUDIO_FORMAT}', ''), 'quiet': True, 'no_warnings': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([video_url]) # yt-dlp adds .mp3 extension, adjust path actual_path = output_path if os.path.exists(output_path) else output_path if os.path.exists(actual_path): logger.info(f"Audio downloaded: {actual_path} ({os.path.getsize(actual_path)} bytes)") return actual_path else: # Find the file with the correct extension for file in os.listdir(temp_dir): if file.startswith(f"youtube_audio_{os.getpid()}"): actual_path = os.path.join(temp_dir, file) logger.info(f"Audio downloaded: {actual_path}") return actual_path logger.error("Audio file not found after download") return None except ImportError: logger.error("yt-dlp not installed. Run: pip install yt-dlp") return None except Exception as e: logger.error(f"Audio download failed: {e}") return None def transcribe_from_audio(video_url: str) -> Dict[str, Any]: """ Fallback: Download audio and transcribe with Whisper. Args: video_url: Full YouTube URL Returns: Dict with structure: { "text": str, # Transcript text "video_id": str, # Video ID "source": str, # "whisper" "success": bool, # True if transcription succeeded "error": str or None # Error message if failed } """ video_id = extract_video_id(video_url) if not video_id: return { "text": "", "video_id": "", "source": "whisper", "success": False, "error": "Invalid YouTube URL" } # Download audio audio_file = download_audio(video_url) if not audio_file: return { "text": "", "video_id": video_id, "source": "whisper", "success": False, "error": "Failed to download audio" } try: # Import transcribe_audio (avoid circular import) from src.tools.audio import transcribe_audio # Transcribe with Whisper result = transcribe_audio(audio_file) # Cleanup temp file if CLEANUP_TEMP_FILES: try: os.remove(audio_file) logger.info(f"Cleaned up temp file: {audio_file}") except Exception as e: logger.warning(f"Failed to cleanup temp file: {e}") if result["success"]: # Save to cache for debugging save_transcript_to_cache(video_id, result["text"], "whisper") return { "text": result["text"], "video_id": video_id, "source": "whisper", "success": True, "error": None } else: return { "text": "", "video_id": video_id, "source": "whisper", "success": False, "error": result.get("error", "Transcription failed") } except Exception as e: logger.error(f"Whisper transcription failed: {e}") return { "text": "", "video_id": video_id, "source": "whisper", "success": False, "error": f"Whisper transcription failed: {str(e)}" } # ============================================================================ # Frame Processing (Video Analysis Mode) # ============================================================================= def download_video(url: str) -> Optional[str]: """ Download video from YouTube using yt-dlp for frame extraction. Args: url: Full YouTube URL Returns: Path to downloaded video file or None if failed """ try: import yt_dlp logger.info(f"Downloading video from: {url}") # Create temp file for video temp_dir = tempfile.gettempdir() output_path = os.path.join(temp_dir, f"youtube_video_{os.getpid()}") # yt-dlp options: video only, lowest quality (faster for frame extraction) ydl_opts = { 'format': f'best[ext=mp4]/best', 'outtmpl': output_path, 'quiet': True, 'no_warnings': True, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: ydl.download([url]) # Find the downloaded file (yt-dlp adds extension) for file in os.listdir(temp_dir): if file.startswith(f"youtube_video_{os.getpid()}"): actual_path = os.path.join(temp_dir, file) size_mb = os.path.getsize(actual_path) / (1024 * 1024) logger.info(f"Video downloaded: {actual_path} ({size_mb:.2f}MB)") return actual_path logger.error("Video file not found after download") return None except ImportError: logger.error("yt-dlp not installed. Run: pip install yt-dlp") return None except Exception as e: logger.error(f"Video download failed: {e}") return None def extract_frames(video_path: str, count: int = FRAME_COUNT) -> list: """ Extract frames from video at regular intervals. Args: video_path: Path to video file count: Number of frames to extract (default: FRAME_COUNT) Returns: List of (frame_path, timestamp) tuples """ try: import cv2 cap = cv2.VideoCapture(video_path) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) duration = total_frames / fps if fps > 0 else 0 logger.info(f"Video: {total_frames} frames, {fps:.2f} FPS, {duration:.2f}s duration") # Calculate frame indices at regular intervals if total_frames <= count: frame_indices = list(range(total_frames)) else: interval = total_frames / count frame_indices = [int(i * interval) for i in range(count)] logger.info(f"Extracting {len(frame_indices)} frames at indices: {frame_indices[:3]}...") frames = [] temp_dir = tempfile.gettempdir() for idx, frame_idx in enumerate(frame_indices): cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) ret, frame = cap.read() if ret: timestamp = frame_idx / fps if fps > 0 else 0 frame_path = os.path.join(temp_dir, f"frame_{os.getpid()}_{idx}.jpg") cv2.imwrite(frame_path, frame) frames.append((frame_path, timestamp)) logger.debug(f"Frame {idx}: {timestamp:.2f}s -> {frame_path}") else: logger.warning(f"Failed to extract frame at index {frame_idx}") cap.release() logger.info(f"Extracted {len(frames)} frames") return frames except ImportError: logger.error("opencv-python not installed. Run: pip install opencv-python") return [] except Exception as e: logger.error(f"Frame extraction failed: {e}") return [] def analyze_frames(frames: list, question: str = None) -> Dict[str, Any]: """ Analyze video frames using vision models. Args: frames: List of (frame_path, timestamp) tuples question: Optional question to ask about frames Returns: Dict with structure: { "text": str, # Summarized analysis "video_id": str, # Video ID (placeholder) "source": str, # "frames" "success": bool, # True if analysis succeeded "error": str or None # Error message if failed "frame_count": int, # Number of frames analyzed } """ from src.tools.vision import analyze_image if not frames: return { "text": "", "video_id": "", "source": "frames", "success": False, "error": "No frames to analyze", "frame_count": 0, } # Default question for frame analysis if not question: question = "Describe what you see in this frame. Include any visible text, objects, people, or actions." try: logger.info(f"Analyzing {len(frames)} frames with vision model...") frame_analyses = [] for idx, (frame_path, timestamp) in enumerate(frames): logger.info(f"Analyzing frame {idx + 1}/{len(frames)} at {timestamp:.2f}s...") # Customize question with timestamp context frame_question = f"This is frame {idx + 1} of {len(frames)} from a video at timestamp {timestamp:.2f} seconds. {question}" try: result = analyze_image(frame_path, frame_question) answer = result.get("answer", "") # Add timestamp context frame_analyses.append(f"[Frame {idx + 1} @ {timestamp:.2f}s]\n{answer}") logger.info(f"Frame {idx + 1} analyzed: {len(answer)} chars") except Exception as e: logger.warning(f"Frame {idx + 1} analysis failed: {e}") frame_analyses.append(f"[Frame {idx + 1} @ {timestamp:.2f}s]\nAnalysis failed: {str(e)}") # Cleanup frame files if CLEANUP_TEMP_FILES: for frame_path, _ in frames: try: os.remove(frame_path) except Exception as e: logger.warning(f"Failed to cleanup frame {frame_path}: {e}") # Combine all frame analyses combined_text = "\n\n".join(frame_analyses) logger.info(f"Frame analysis complete: {len(combined_text)} chars total") return { "text": combined_text, "video_id": "", "source": "frames", "success": True, "error": None, "frame_count": len(frames), } except Exception as e: logger.error(f"Frame analysis failed: {e}") return { "text": "", "video_id": "", "source": "frames", "success": False, "error": f"Frame analysis failed: {str(e)}", "frame_count": len(frames), } def process_video_frames(url: str, question: str = None, frame_count: int = FRAME_COUNT) -> Dict[str, Any]: """ Download video, extract frames, and analyze with vision models. Args: url: Full YouTube URL question: Optional question to ask about frames frame_count: Number of frames to extract Returns: Dict with structure: { "text": str, # Combined frame analyses "video_id": str, # Video ID "source": str, # "frames" "success": bool, # True if processing succeeded "error": str or None # Error message if failed "frame_count": int # Number of frames analyzed } """ video_id = extract_video_id(url) if not video_id: return { "text": "", "video_id": "", "source": "frames", "success": False, "error": "Invalid YouTube URL", "frame_count": 0, } # Download video video_file = download_video(url) if not video_file: return { "text": "", "video_id": video_id, "source": "frames", "success": False, "error": "Failed to download video", "frame_count": 0, } try: # Extract frames frames = extract_frames(video_file, frame_count) if not frames: return { "text": "", "video_id": video_id, "source": "frames", "success": False, "error": "Failed to extract frames", "frame_count": 0, } # Analyze frames result = analyze_frames(frames, question) # Cleanup temp video file if CLEANUP_TEMP_FILES: try: os.remove(video_file) logger.info(f"Cleaned up temp video: {video_file}") except Exception as e: logger.warning(f"Failed to cleanup temp video: {e}") # Add video_id to result result["video_id"] = video_id return result except Exception as e: logger.error(f"Video frame processing failed: {e}") return { "text": "", "video_id": video_id, "source": "frames", "success": False, "error": f"Video processing failed: {str(e)}", "frame_count": 0, } # ============================================================================ # Main API Function # ============================================================================= def youtube_analyze(url: str, mode: str = "transcript") -> Dict[str, Any]: """ Analyze YouTube video using transcript or frame processing mode. Transcript Mode: Extract transcript (youtube-transcript-api or Whisper) Frame Mode: Extract frames and analyze with vision models Args: url: YouTube video URL (youtube.com, youtu.be, shorts) mode: Analysis mode - "transcript" (default) or "frames" Returns: Dict with structure: { "text": str, # Transcript or frame analyses "video_id": str, # Video ID "source": str, # "api", "whisper", or "frames" "success": bool, # True if analysis succeeded "error": str or None # Error message if failed "frame_count": int # Number of frames (frame mode only) } Raises: ValueError: If URL is not valid or mode is invalid Examples: >>> youtube_analyze("https://youtube.com/watch?v=dQw4w9WgXcQ", mode="transcript") {"text": "Never gonna give you up...", "video_id": "dQw4w9WgXcQ", "source": "api", "success": True, "error": None} >>> youtube_analyze("https://youtube.com/watch?v=dQw4w9WgXcQ", mode="frames") {"text": "[Frame 1 @ 0.00s]\nA man...", "video_id": "dQw4w9WgXcQ", "source": "frames", "success": True, "frame_count": 6, "error": None} """ # Validate URL and extract video ID video_id = extract_video_id(url) if not video_id: logger.error(f"Invalid YouTube URL: {url}") return { "text": "", "video_id": "", "source": "none", "success": False, "error": f"Invalid YouTube URL: {url}" } # Validate mode mode = mode.lower() if mode not in ("transcript", "frames"): logger.error(f"Invalid mode: {mode}") return { "text": "", "video_id": video_id, "source": "none", "success": False, "error": f"Invalid mode: {mode}. Valid: transcript, frames" } logger.info(f"Processing YouTube video: {video_id} (mode: {mode})") # Route to appropriate processing mode if mode == "frames": # Frame processing mode result = process_video_frames(url) if result["success"]: logger.info(f"Frame analysis complete: {result.get('frame_count', 0)} frames, {len(result['text'])} chars") return result else: # mode == "transcript" # Transcript mode: Try API first, fallback to Whisper result = get_youtube_transcript(video_id) if result["success"]: logger.info(f"Transcript retrieved via API: {len(result['text'])} characters") logger.info(f"Transcript content: {result['text'][:200]}...") return result # Fallback to audio transcription (slow but works) logger.info(f"Transcript API failed, trying audio transcription...") result = transcribe_from_audio(url) if result["success"]: logger.info(f"Transcript retrieved via Whisper: {len(result['text'])} characters") logger.info(f"Full transcript: {result['text']}") else: logger.error(f"All transcript methods failed for video: {video_id}") return result # Backward compatibility wrapper that respects YOUTUBE_MODE environment variable def youtube_transcript(url: str) -> Dict[str, Any]: """ Wrapper for youtube_analyze that respects YOUTUBE_MODE environment variable. This allows the agent to switch between transcript and frame modes without changing the function signature used in the graph. Mode selection: - YOUTUBE_MODE env variable (set by UI): "transcript" or "frames" - Default: "transcript" (backward compatible) Args: url: YouTube video URL Returns: Dict with structure from youtube_analyze() """ # Read mode from environment variable (set by app.py UI) mode = os.getenv("YOUTUBE_MODE", "transcript").lower() logger.info(f"youtube_transcript called with YOUTUBE_MODE={mode}") return youtube_analyze(url, mode=mode)