|
|
""" |
|
|
YouTube Video Analysis Tool - Extract transcripts or analyze frames from YouTube videos |
|
|
Author: @mangubee |
|
|
Date: 2026-01-13 |
|
|
|
|
|
Provides two modes for YouTube video analysis: |
|
|
- Transcript Mode: youtube-transcript-api (instant, 1-3 seconds) or Whisper fallback |
|
|
- Frame Mode: Extract video frames and analyze with vision models |
|
|
|
|
|
Transcript Mode Workflow: |
|
|
YouTube URL |
|
|
├─ Has transcript? ✅ → Use youtube-transcript-api (instant) |
|
|
└─ No transcript? ❌ → Download audio + Whisper (slower, but works) |
|
|
|
|
|
Frame Mode Workflow: |
|
|
YouTube URL |
|
|
├─ Download video with yt-dlp |
|
|
├─ Extract N frames at regular intervals |
|
|
└─ Analyze frames with vision models (summarize findings) |
|
|
|
|
|
Requirements: |
|
|
- youtube-transcript-api: pip install youtube-transcript-api |
|
|
- yt-dlp: pip install yt-dlp |
|
|
- openai: pip install openai (via src.tools.audio) |
|
|
- opencv-python: pip install opencv-python (for frame extraction) |
|
|
- PIL: pip install Pillow (for image handling) |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import os |
|
|
import re |
|
|
import tempfile |
|
|
from typing import Dict, Any, Optional |
|
|
from pathlib import Path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
YOUTUBE_PATTERNS = [ |
|
|
r'(?:youtube\.com\/watch\?v=|youtu\.be\/|youtube\.com\/shorts\/)([a-zA-Z0-9_-]{11})', |
|
|
] |
|
|
|
|
|
|
|
|
AUDIO_FORMAT = "mp3" |
|
|
AUDIO_QUALITY = "128" |
|
|
|
|
|
|
|
|
FRAME_COUNT = 6 |
|
|
FRAME_QUALITY = "worst" |
|
|
|
|
|
|
|
|
CLEANUP_TEMP_FILES = True |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_transcript_to_cache(video_id: str, text: str, source: str) -> None: |
|
|
""" |
|
|
Save transcript to _log/ folder for debugging. |
|
|
|
|
|
Args: |
|
|
video_id: YouTube video ID |
|
|
text: Transcript text |
|
|
source: "api" or "whisper" |
|
|
""" |
|
|
try: |
|
|
log_dir = Path("_log") |
|
|
log_dir.mkdir(exist_ok=True) |
|
|
|
|
|
cache_file = log_dir / f"{video_id}_transcript.md" |
|
|
with open(cache_file, "w", encoding="utf-8") as f: |
|
|
f.write(f"# YouTube Transcript\n\n") |
|
|
f.write(f"**Video ID:** {video_id}\n") |
|
|
f.write(f"**Source:** {source}\n") |
|
|
f.write(f"**Length:** {len(text)} characters\n") |
|
|
f.write(f"**Generated:** {__import__('datetime').datetime.now().isoformat()}\n\n") |
|
|
f.write(f"## Transcript\n\n") |
|
|
f.write(f"{text}\n") |
|
|
|
|
|
logger.info(f"Transcript saved: {cache_file}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to save transcript: {e}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_video_id(url: str) -> Optional[str]: |
|
|
""" |
|
|
Extract video ID from various YouTube URL formats. |
|
|
|
|
|
Supports: |
|
|
- youtube.com/watch?v=VIDEO_ID |
|
|
- youtu.be/VIDEO_ID |
|
|
- youtube.com/shorts/VIDEO_ID |
|
|
|
|
|
Args: |
|
|
url: YouTube URL |
|
|
|
|
|
Returns: |
|
|
Video ID (11 characters) or None if not found |
|
|
|
|
|
Examples: |
|
|
>>> extract_video_id("https://youtube.com/watch?v=dQw4w9WgXcQ") |
|
|
"dQw4w9WgXcQ" |
|
|
|
|
|
>>> extract_video_id("https://youtu.be/dQw4w9WgXcQ") |
|
|
"dQw4w9WgXcQ" |
|
|
""" |
|
|
if not url: |
|
|
return None |
|
|
|
|
|
for pattern in YOUTUBE_PATTERNS: |
|
|
match = re.search(pattern, url) |
|
|
if match: |
|
|
return match.group(1) |
|
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_youtube_transcript(video_id: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Get transcript using youtube-transcript-api. |
|
|
|
|
|
Args: |
|
|
video_id: YouTube video ID (11 characters) |
|
|
|
|
|
Returns: |
|
|
Dict with structure: { |
|
|
"text": str, # Transcript text |
|
|
"video_id": str, # Video ID |
|
|
"source": str, # "api" or "whisper" |
|
|
"success": bool, # True if transcription succeeded |
|
|
"error": str or None # Error message if failed |
|
|
} |
|
|
""" |
|
|
try: |
|
|
from youtube_transcript_api import YouTubeTranscriptApi |
|
|
|
|
|
logger.info(f"Fetching transcript for video: {video_id}") |
|
|
|
|
|
|
|
|
|
|
|
api = YouTubeTranscriptApi() |
|
|
transcript_list = api.fetch( |
|
|
video_id, |
|
|
languages=['en', 'en-US', 'en-GB'] |
|
|
) |
|
|
|
|
|
|
|
|
text_parts = [] |
|
|
for entry in transcript_list: |
|
|
text = entry.get('text', '').strip() |
|
|
if text: |
|
|
text_parts.append(text) |
|
|
|
|
|
text = ' '.join(text_parts) |
|
|
|
|
|
logger.info(f"Transcript fetched: {len(text)} characters") |
|
|
|
|
|
|
|
|
save_transcript_to_cache(video_id, text, "api") |
|
|
|
|
|
return { |
|
|
"text": text, |
|
|
"video_id": video_id, |
|
|
"source": "api", |
|
|
"success": True, |
|
|
"error": None |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e) |
|
|
logger.error(f"YouTube transcript API failed: {error_msg}") |
|
|
|
|
|
|
|
|
if "No transcript found" in error_msg or "Could not retrieve a transcript" in error_msg: |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": video_id, |
|
|
"source": "api", |
|
|
"success": False, |
|
|
"error": "No transcript available (video may not have captions)" |
|
|
} |
|
|
|
|
|
return { |
|
|
"text": "", |
|
|
"video_id": video_id, |
|
|
"source": "api", |
|
|
"success": False, |
|
|
"error": f"Transcript API error: {error_msg}" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_audio(video_url: str) -> Optional[str]: |
|
|
""" |
|
|
Download audio from YouTube using yt-dlp. |
|
|
|
|
|
Args: |
|
|
video_url: Full YouTube URL |
|
|
|
|
|
Returns: |
|
|
Path to downloaded audio file or None if failed |
|
|
""" |
|
|
try: |
|
|
import yt_dlp |
|
|
|
|
|
logger.info(f"Downloading audio from: {video_url}") |
|
|
|
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
|
output_path = os.path.join(temp_dir, f"youtube_audio_{os.getpid()}.{AUDIO_FORMAT}") |
|
|
|
|
|
|
|
|
ydl_opts = { |
|
|
'format': 'bestaudio/best', |
|
|
'postprocessors': [{ |
|
|
'key': 'FFmpegExtractAudio', |
|
|
'preferredcodec': AUDIO_FORMAT, |
|
|
'preferredquality': AUDIO_QUALITY, |
|
|
}], |
|
|
'outtmpl': output_path.replace(f'.{AUDIO_FORMAT}', ''), |
|
|
'quiet': True, |
|
|
'no_warnings': True, |
|
|
} |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
ydl.download([video_url]) |
|
|
|
|
|
|
|
|
actual_path = output_path if os.path.exists(output_path) else output_path |
|
|
|
|
|
if os.path.exists(actual_path): |
|
|
logger.info(f"Audio downloaded: {actual_path} ({os.path.getsize(actual_path)} bytes)") |
|
|
return actual_path |
|
|
else: |
|
|
|
|
|
for file in os.listdir(temp_dir): |
|
|
if file.startswith(f"youtube_audio_{os.getpid()}"): |
|
|
actual_path = os.path.join(temp_dir, file) |
|
|
logger.info(f"Audio downloaded: {actual_path}") |
|
|
return actual_path |
|
|
|
|
|
logger.error("Audio file not found after download") |
|
|
return None |
|
|
|
|
|
except ImportError: |
|
|
logger.error("yt-dlp not installed. Run: pip install yt-dlp") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Audio download failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def transcribe_from_audio(video_url: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Fallback: Download audio and transcribe with Whisper. |
|
|
|
|
|
Args: |
|
|
video_url: Full YouTube URL |
|
|
|
|
|
Returns: |
|
|
Dict with structure: { |
|
|
"text": str, # Transcript text |
|
|
"video_id": str, # Video ID |
|
|
"source": str, # "whisper" |
|
|
"success": bool, # True if transcription succeeded |
|
|
"error": str or None # Error message if failed |
|
|
} |
|
|
""" |
|
|
video_id = extract_video_id(video_url) |
|
|
|
|
|
if not video_id: |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": "", |
|
|
"source": "whisper", |
|
|
"success": False, |
|
|
"error": "Invalid YouTube URL" |
|
|
} |
|
|
|
|
|
|
|
|
audio_file = download_audio(video_url) |
|
|
|
|
|
if not audio_file: |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": video_id, |
|
|
"source": "whisper", |
|
|
"success": False, |
|
|
"error": "Failed to download audio" |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
from src.tools.audio import transcribe_audio |
|
|
|
|
|
|
|
|
result = transcribe_audio(audio_file) |
|
|
|
|
|
|
|
|
if CLEANUP_TEMP_FILES: |
|
|
try: |
|
|
os.remove(audio_file) |
|
|
logger.info(f"Cleaned up temp file: {audio_file}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to cleanup temp file: {e}") |
|
|
|
|
|
if result["success"]: |
|
|
|
|
|
save_transcript_to_cache(video_id, result["text"], "whisper") |
|
|
|
|
|
return { |
|
|
"text": result["text"], |
|
|
"video_id": video_id, |
|
|
"source": "whisper", |
|
|
"success": True, |
|
|
"error": None |
|
|
} |
|
|
else: |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": video_id, |
|
|
"source": "whisper", |
|
|
"success": False, |
|
|
"error": result.get("error", "Transcription failed") |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Whisper transcription failed: {e}") |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": video_id, |
|
|
"source": "whisper", |
|
|
"success": False, |
|
|
"error": f"Whisper transcription failed: {str(e)}" |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def download_video(url: str) -> Optional[str]: |
|
|
""" |
|
|
Download video from YouTube using yt-dlp for frame extraction. |
|
|
|
|
|
Args: |
|
|
url: Full YouTube URL |
|
|
|
|
|
Returns: |
|
|
Path to downloaded video file or None if failed |
|
|
""" |
|
|
try: |
|
|
import yt_dlp |
|
|
|
|
|
logger.info(f"Downloading video from: {url}") |
|
|
|
|
|
|
|
|
temp_dir = tempfile.gettempdir() |
|
|
output_path = os.path.join(temp_dir, f"youtube_video_{os.getpid()}") |
|
|
|
|
|
|
|
|
ydl_opts = { |
|
|
'format': f'best[ext=mp4]/best', |
|
|
'outtmpl': output_path, |
|
|
'quiet': True, |
|
|
'no_warnings': True, |
|
|
} |
|
|
|
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl: |
|
|
ydl.download([url]) |
|
|
|
|
|
|
|
|
for file in os.listdir(temp_dir): |
|
|
if file.startswith(f"youtube_video_{os.getpid()}"): |
|
|
actual_path = os.path.join(temp_dir, file) |
|
|
size_mb = os.path.getsize(actual_path) / (1024 * 1024) |
|
|
logger.info(f"Video downloaded: {actual_path} ({size_mb:.2f}MB)") |
|
|
return actual_path |
|
|
|
|
|
logger.error("Video file not found after download") |
|
|
return None |
|
|
|
|
|
except ImportError: |
|
|
logger.error("yt-dlp not installed. Run: pip install yt-dlp") |
|
|
return None |
|
|
except Exception as e: |
|
|
logger.error(f"Video download failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def extract_frames(video_path: str, count: int = FRAME_COUNT) -> list: |
|
|
""" |
|
|
Extract frames from video at regular intervals. |
|
|
|
|
|
Args: |
|
|
video_path: Path to video file |
|
|
count: Number of frames to extract (default: FRAME_COUNT) |
|
|
|
|
|
Returns: |
|
|
List of (frame_path, timestamp) tuples |
|
|
""" |
|
|
try: |
|
|
import cv2 |
|
|
|
|
|
cap = cv2.VideoCapture(video_path) |
|
|
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
fps = cap.get(cv2.CAP_PROP_FPS) |
|
|
duration = total_frames / fps if fps > 0 else 0 |
|
|
|
|
|
logger.info(f"Video: {total_frames} frames, {fps:.2f} FPS, {duration:.2f}s duration") |
|
|
|
|
|
|
|
|
if total_frames <= count: |
|
|
frame_indices = list(range(total_frames)) |
|
|
else: |
|
|
interval = total_frames / count |
|
|
frame_indices = [int(i * interval) for i in range(count)] |
|
|
|
|
|
logger.info(f"Extracting {len(frame_indices)} frames at indices: {frame_indices[:3]}...") |
|
|
|
|
|
frames = [] |
|
|
temp_dir = tempfile.gettempdir() |
|
|
|
|
|
for idx, frame_idx in enumerate(frame_indices): |
|
|
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx) |
|
|
ret, frame = cap.read() |
|
|
|
|
|
if ret: |
|
|
timestamp = frame_idx / fps if fps > 0 else 0 |
|
|
frame_path = os.path.join(temp_dir, f"frame_{os.getpid()}_{idx}.jpg") |
|
|
cv2.imwrite(frame_path, frame) |
|
|
frames.append((frame_path, timestamp)) |
|
|
logger.debug(f"Frame {idx}: {timestamp:.2f}s -> {frame_path}") |
|
|
else: |
|
|
logger.warning(f"Failed to extract frame at index {frame_idx}") |
|
|
|
|
|
cap.release() |
|
|
logger.info(f"Extracted {len(frames)} frames") |
|
|
return frames |
|
|
|
|
|
except ImportError: |
|
|
logger.error("opencv-python not installed. Run: pip install opencv-python") |
|
|
return [] |
|
|
except Exception as e: |
|
|
logger.error(f"Frame extraction failed: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
def analyze_frames(frames: list, question: str = None) -> Dict[str, Any]: |
|
|
""" |
|
|
Analyze video frames using vision models. |
|
|
|
|
|
Args: |
|
|
frames: List of (frame_path, timestamp) tuples |
|
|
question: Optional question to ask about frames |
|
|
|
|
|
Returns: |
|
|
Dict with structure: { |
|
|
"text": str, # Summarized analysis |
|
|
"video_id": str, # Video ID (placeholder) |
|
|
"source": str, # "frames" |
|
|
"success": bool, # True if analysis succeeded |
|
|
"error": str or None # Error message if failed |
|
|
"frame_count": int, # Number of frames analyzed |
|
|
} |
|
|
""" |
|
|
from src.tools.vision import analyze_image |
|
|
|
|
|
if not frames: |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": "", |
|
|
"source": "frames", |
|
|
"success": False, |
|
|
"error": "No frames to analyze", |
|
|
"frame_count": 0, |
|
|
} |
|
|
|
|
|
|
|
|
if not question: |
|
|
question = "Describe what you see in this frame. Include any visible text, objects, people, or actions." |
|
|
|
|
|
try: |
|
|
logger.info(f"Analyzing {len(frames)} frames with vision model...") |
|
|
|
|
|
frame_analyses = [] |
|
|
|
|
|
for idx, (frame_path, timestamp) in enumerate(frames): |
|
|
logger.info(f"Analyzing frame {idx + 1}/{len(frames)} at {timestamp:.2f}s...") |
|
|
|
|
|
|
|
|
frame_question = f"This is frame {idx + 1} of {len(frames)} from a video at timestamp {timestamp:.2f} seconds. {question}" |
|
|
|
|
|
try: |
|
|
result = analyze_image(frame_path, frame_question) |
|
|
answer = result.get("answer", "") |
|
|
|
|
|
|
|
|
frame_analyses.append(f"[Frame {idx + 1} @ {timestamp:.2f}s]\n{answer}") |
|
|
|
|
|
logger.info(f"Frame {idx + 1} analyzed: {len(answer)} chars") |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Frame {idx + 1} analysis failed: {e}") |
|
|
frame_analyses.append(f"[Frame {idx + 1} @ {timestamp:.2f}s]\nAnalysis failed: {str(e)}") |
|
|
|
|
|
|
|
|
if CLEANUP_TEMP_FILES: |
|
|
for frame_path, _ in frames: |
|
|
try: |
|
|
os.remove(frame_path) |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to cleanup frame {frame_path}: {e}") |
|
|
|
|
|
|
|
|
combined_text = "\n\n".join(frame_analyses) |
|
|
|
|
|
logger.info(f"Frame analysis complete: {len(combined_text)} chars total") |
|
|
|
|
|
return { |
|
|
"text": combined_text, |
|
|
"video_id": "", |
|
|
"source": "frames", |
|
|
"success": True, |
|
|
"error": None, |
|
|
"frame_count": len(frames), |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Frame analysis failed: {e}") |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": "", |
|
|
"source": "frames", |
|
|
"success": False, |
|
|
"error": f"Frame analysis failed: {str(e)}", |
|
|
"frame_count": len(frames), |
|
|
} |
|
|
|
|
|
|
|
|
def process_video_frames(url: str, question: str = None, frame_count: int = FRAME_COUNT) -> Dict[str, Any]: |
|
|
""" |
|
|
Download video, extract frames, and analyze with vision models. |
|
|
|
|
|
Args: |
|
|
url: Full YouTube URL |
|
|
question: Optional question to ask about frames |
|
|
frame_count: Number of frames to extract |
|
|
|
|
|
Returns: |
|
|
Dict with structure: { |
|
|
"text": str, # Combined frame analyses |
|
|
"video_id": str, # Video ID |
|
|
"source": str, # "frames" |
|
|
"success": bool, # True if processing succeeded |
|
|
"error": str or None # Error message if failed |
|
|
"frame_count": int # Number of frames analyzed |
|
|
} |
|
|
""" |
|
|
video_id = extract_video_id(url) |
|
|
|
|
|
if not video_id: |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": "", |
|
|
"source": "frames", |
|
|
"success": False, |
|
|
"error": "Invalid YouTube URL", |
|
|
"frame_count": 0, |
|
|
} |
|
|
|
|
|
|
|
|
video_file = download_video(url) |
|
|
|
|
|
if not video_file: |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": video_id, |
|
|
"source": "frames", |
|
|
"success": False, |
|
|
"error": "Failed to download video", |
|
|
"frame_count": 0, |
|
|
} |
|
|
|
|
|
try: |
|
|
|
|
|
frames = extract_frames(video_file, frame_count) |
|
|
|
|
|
if not frames: |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": video_id, |
|
|
"source": "frames", |
|
|
"success": False, |
|
|
"error": "Failed to extract frames", |
|
|
"frame_count": 0, |
|
|
} |
|
|
|
|
|
|
|
|
result = analyze_frames(frames, question) |
|
|
|
|
|
|
|
|
if CLEANUP_TEMP_FILES: |
|
|
try: |
|
|
os.remove(video_file) |
|
|
logger.info(f"Cleaned up temp video: {video_file}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to cleanup temp video: {e}") |
|
|
|
|
|
|
|
|
result["video_id"] = video_id |
|
|
|
|
|
return result |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Video frame processing failed: {e}") |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": video_id, |
|
|
"source": "frames", |
|
|
"success": False, |
|
|
"error": f"Video processing failed: {str(e)}", |
|
|
"frame_count": 0, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def youtube_analyze(url: str, mode: str = "transcript") -> Dict[str, Any]: |
|
|
""" |
|
|
Analyze YouTube video using transcript or frame processing mode. |
|
|
|
|
|
Transcript Mode: Extract transcript (youtube-transcript-api or Whisper) |
|
|
Frame Mode: Extract frames and analyze with vision models |
|
|
|
|
|
Args: |
|
|
url: YouTube video URL (youtube.com, youtu.be, shorts) |
|
|
mode: Analysis mode - "transcript" (default) or "frames" |
|
|
|
|
|
Returns: |
|
|
Dict with structure: { |
|
|
"text": str, # Transcript or frame analyses |
|
|
"video_id": str, # Video ID |
|
|
"source": str, # "api", "whisper", or "frames" |
|
|
"success": bool, # True if analysis succeeded |
|
|
"error": str or None # Error message if failed |
|
|
"frame_count": int # Number of frames (frame mode only) |
|
|
} |
|
|
|
|
|
Raises: |
|
|
ValueError: If URL is not valid or mode is invalid |
|
|
|
|
|
Examples: |
|
|
>>> youtube_analyze("https://youtube.com/watch?v=dQw4w9WgXcQ", mode="transcript") |
|
|
{"text": "Never gonna give you up...", "video_id": "dQw4w9WgXcQ", "source": "api", "success": True, "error": None} |
|
|
|
|
|
>>> youtube_analyze("https://youtube.com/watch?v=dQw4w9WgXcQ", mode="frames") |
|
|
{"text": "[Frame 1 @ 0.00s]\nA man...", "video_id": "dQw4w9WgXcQ", "source": "frames", "success": True, "frame_count": 6, "error": None} |
|
|
""" |
|
|
|
|
|
video_id = extract_video_id(url) |
|
|
|
|
|
if not video_id: |
|
|
logger.error(f"Invalid YouTube URL: {url}") |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": "", |
|
|
"source": "none", |
|
|
"success": False, |
|
|
"error": f"Invalid YouTube URL: {url}" |
|
|
} |
|
|
|
|
|
|
|
|
mode = mode.lower() |
|
|
if mode not in ("transcript", "frames"): |
|
|
logger.error(f"Invalid mode: {mode}") |
|
|
return { |
|
|
"text": "", |
|
|
"video_id": video_id, |
|
|
"source": "none", |
|
|
"success": False, |
|
|
"error": f"Invalid mode: {mode}. Valid: transcript, frames" |
|
|
} |
|
|
|
|
|
logger.info(f"Processing YouTube video: {video_id} (mode: {mode})") |
|
|
|
|
|
|
|
|
if mode == "frames": |
|
|
|
|
|
result = process_video_frames(url) |
|
|
if result["success"]: |
|
|
logger.info(f"Frame analysis complete: {result.get('frame_count', 0)} frames, {len(result['text'])} chars") |
|
|
return result |
|
|
|
|
|
else: |
|
|
|
|
|
result = get_youtube_transcript(video_id) |
|
|
|
|
|
if result["success"]: |
|
|
logger.info(f"Transcript retrieved via API: {len(result['text'])} characters") |
|
|
logger.info(f"Transcript content: {result['text'][:200]}...") |
|
|
return result |
|
|
|
|
|
|
|
|
logger.info(f"Transcript API failed, trying audio transcription...") |
|
|
result = transcribe_from_audio(url) |
|
|
|
|
|
if result["success"]: |
|
|
logger.info(f"Transcript retrieved via Whisper: {len(result['text'])} characters") |
|
|
logger.info(f"Full transcript: {result['text']}") |
|
|
else: |
|
|
logger.error(f"All transcript methods failed for video: {video_id}") |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
|
|
|
def youtube_transcript(url: str) -> Dict[str, Any]: |
|
|
""" |
|
|
Wrapper for youtube_analyze that respects YOUTUBE_MODE environment variable. |
|
|
|
|
|
This allows the agent to switch between transcript and frame modes |
|
|
without changing the function signature used in the graph. |
|
|
|
|
|
Mode selection: |
|
|
- YOUTUBE_MODE env variable (set by UI): "transcript" or "frames" |
|
|
- Default: "transcript" (backward compatible) |
|
|
|
|
|
Args: |
|
|
url: YouTube video URL |
|
|
|
|
|
Returns: |
|
|
Dict with structure from youtube_analyze() |
|
|
""" |
|
|
|
|
|
mode = os.getenv("YOUTUBE_MODE", "transcript").lower() |
|
|
|
|
|
logger.info(f"youtube_transcript called with YOUTUBE_MODE={mode}") |
|
|
|
|
|
return youtube_analyze(url, mode=mode) |
|
|
|