"""YouTube transcript extraction module.""" import os import re import urllib.request import json import requests from typing import Optional, Dict, Any, List # Try to import youtube_transcript_api (may fail on HF Spaces due to network restrictions) try: from youtube_transcript_api import YouTubeTranscriptApi YOUTUBE_TRANSCRIPT_API_AVAILABLE = True except ImportError: YOUTUBE_TRANSCRIPT_API_AVAILABLE = False print("Warning: youtube_transcript_api not available. Using external API only.") # Default external API URL for transcript extraction (works on HF Spaces) # You can use services like: # - RapidAPI YouTube Transcript API # - Your own proxy server TRANSCRIPT_API_URL = os.getenv("TRANSCRIPT_API_URL", "") RAPIDAPI_KEY = os.getenv("RAPIDAPI_KEY", "") RAPIDAPI_HOST = os.getenv("RAPIDAPI_HOST", "youtube-transcriptor.p.rapidapi.com") class YouTubeExtractor: """Extract transcripts from YouTube videos.""" def get_video_title(self, video_id: str) -> str: """ Fetch the video title from YouTube using oEmbed. Note: This may fail on HF Spaces due to network restrictions. Falls back to generic title. Args: video_id: YouTube video ID Returns: Video title or fallback title if not available """ try: # Use YouTube's oEmbed API to get video info oembed_url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json" with urllib.request.urlopen(oembed_url, timeout=10) as response: data = json.loads(response.read().decode()) return data.get('title', f"YouTube Video {video_id}") except Exception as e: # Fallback to generic title if oEmbed fails (common on HF Spaces) print(f"Could not fetch video title: {e}") return f"YouTube Video {video_id}" def extract_video_id(self, url: str) -> Optional[str]: """Extract video ID from various YouTube URL formats.""" patterns = [ r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})', r'(?:youtube\.com/shorts/)([0-9A-Za-z_-]{11})', r'(?:youtube\.com/embed/)([0-9A-Za-z_-]{11})', ] for pattern in patterns: match = re.search(pattern, url) if match: return match.group(1) return None def get_transcript_via_rapidapi(self, video_id: str, language: str = "en") -> Dict[str, Any]: """ Get transcript using RapidAPI YouTube Transcriptor. This works on HF Spaces as it uses a proxy service. Args: video_id: YouTube video ID language: Language code (default: "en") Returns: Dict containing transcript and metadata """ if not RAPIDAPI_KEY: return { 'success': False, 'error': 'RAPIDAPI_KEY not set. Set it in your HF Space secrets.', 'transcript': None, 'metadata': {} } try: url = f"https://{RAPIDAPI_HOST}/transcript" querystring = {"video_id": video_id} headers = { "x-rapidapi-key": RAPIDAPI_KEY, "x-rapidapi-host": RAPIDAPI_HOST } response = requests.get(url, headers=headers, params=querystring, timeout=30) response.raise_for_status() data = response.json() # Parse the response - structure depends on the API if isinstance(data, dict) and 'transcript' in data: # Handle different response formats transcript_data = data['transcript'] if isinstance(transcript_data, list): # List of segments with text text_parts = [] for entry in transcript_data: if isinstance(entry, dict): text_parts.append(entry.get('text', '')) else: text_parts.append(str(entry)) transcript_text = ' '.join(text_parts) else: transcript_text = str(transcript_data) elif isinstance(data, list): # Direct list of segments text_parts = [] for entry in data: if isinstance(entry, dict): text_parts.append(entry.get('text', '')) transcript_text = ' '.join(text_parts) else: transcript_text = str(data) return { 'success': True, 'transcript': transcript_text, 'metadata': { 'video_id': video_id, 'title': self.get_video_title(video_id), 'language': language, 'transcript_length': len(transcript_text) } } except requests.exceptions.RequestException as e: return { 'success': False, 'error': f"RapidAPI request failed: {str(e)}", 'transcript': None, 'metadata': {} } except Exception as e: return { 'success': False, 'error': f"Error parsing transcript: {str(e)}", 'transcript': None, 'metadata': {} } def get_transcript_via_custom_api(self, video_id: str, language: str = "en", api_url: str = None) -> Dict[str, Any]: """ Get transcript via a custom API server. Args: video_id: YouTube video ID language: Language code api_url: Custom API base URL Returns: Dict containing transcript and metadata """ base_url = api_url or TRANSCRIPT_API_URL if not base_url: return { 'success': False, 'error': 'No transcript API URL configured. Set TRANSCRIPT_API_URL in your secrets.', 'transcript': None, 'metadata': {} } try: response = requests.get( f"{base_url}/transcript", params={"video_id": video_id, "language": language}, timeout=30 ) response.raise_for_status() data = response.json() if data.get("status") == "success" or data.get("transcript"): transcript_text = data.get("transcript", "") return { 'success': True, 'transcript': transcript_text, 'metadata': { 'video_id': video_id, 'title': data.get("title") or self.get_video_title(video_id), 'language': data.get("language", language), 'transcript_length': len(transcript_text) } } else: return { 'success': False, 'error': data.get("message", "Unknown error from API"), 'transcript': None, 'metadata': {} } except requests.exceptions.RequestException as e: return { 'success': False, 'error': f"API request failed: {str(e)}", 'transcript': None, 'metadata': {} } def get_transcript_direct(self, url: str, language: str = "en") -> Dict[str, Any]: """ Get transcript directly using youtube_transcript_api. Note: This may fail on HF Spaces due to network restrictions. Args: url: YouTube video URL language: Preferred language code (default: "en") Returns: Dict containing transcript text and metadata """ if not YOUTUBE_TRANSCRIPT_API_AVAILABLE: return { 'success': False, 'error': 'youtube_transcript_api not available', 'transcript': None, 'metadata': {} } video_id = self.extract_video_id(url) if not video_id: raise ValueError(f"Could not extract video ID from URL: {url}") try: # Use the API instance method api = YouTubeTranscriptApi() transcript_list = api.list(video_id) # Try to find transcript in the requested language transcript = None try: transcript = transcript_list.find_manually_created_transcript([language]) except: try: transcript = transcript_list.find_generated_transcript([language]) except: # Get any available transcript transcripts = list(transcript_list) if transcripts: transcript = transcripts[0] else: raise ValueError(f"No transcripts available for video: {url}") # Fetch the transcript data transcript_data = transcript.fetch() # Combine text - handle both dict and object formats text_parts = [] for entry in transcript_data: if isinstance(entry, dict): text_parts.append(entry.get("text", "")) else: # Handle FetchedTranscriptSnippet object text_parts.append(entry.text) transcript_text = " ".join(text_parts) # Get detected language detected_language = transcript.language_code if hasattr(transcript, 'language_code') else language # Get video title video_title = self.get_video_title(video_id) return { 'transcript': transcript_text, 'video_id': video_id, 'title': video_title, 'language': detected_language, 'transcript_length': len(transcript_text), 'success': True } except Exception as e: raise ValueError(f"Could not retrieve transcript: {str(e)}") def get_transcript(self, url: str, language: str = "en") -> Dict[str, Any]: """ Get transcript from a YouTube video using the best available method. Tries in order: 1. RapidAPI (if RAPIDAPI_KEY is set) - works on HF Spaces 2. Custom API (if TRANSCRIPT_API_URL is set) - works on HF Spaces 3. Direct youtube_transcript_api (may fail on HF Spaces) Args: url: YouTube video URL language: Preferred language code (default: "en") Returns: Dict containing transcript text and metadata """ video_id = self.extract_video_id(url) if not video_id: return { 'success': False, 'error': f"Could not extract video ID from URL: {url}", 'transcript': None, 'metadata': {} } # Method 1: Try RapidAPI (works on HF Spaces) if RAPIDAPI_KEY: result = self.get_transcript_via_rapidapi(video_id, language) if result['success']: return result print(f"RapidAPI failed: {result['error']}") # Method 2: Try custom API (works on HF Spaces) if TRANSCRIPT_API_URL: result = self.get_transcript_via_custom_api(video_id, language) if result['success']: return result print(f"Custom API failed: {result['error']}") # Method 3: Try direct access (may fail on HF Spaces) if YOUTUBE_TRANSCRIPT_API_AVAILABLE: try: result = self.get_transcript_direct(url, language) result['success'] = True return result except Exception as e: return { 'success': False, 'error': f"Direct access failed (HF Spaces may block YouTube): {str(e)}", 'transcript': None, 'metadata': {} } # No method worked return { 'success': False, 'error': 'No transcript extraction method available. Set RAPIDAPI_KEY or TRANSCRIPT_API_URL in your HF Space secrets.', 'transcript': None, 'metadata': {} } def extract_transcript(url: str, language: str = "en") -> Dict[str, Any]: """ Convenience function to extract transcript. Args: url: YouTube video URL language: Preferred language code Returns: Dict containing transcript and metadata """ extractor = YouTubeExtractor() try: result = extractor.get_transcript(url, language) if result.get('success'): return { 'success': True, 'transcript': result.get('transcript'), 'metadata': { 'video_id': result.get('video_id'), 'title': result.get('title'), 'language': result.get('language'), 'transcript_length': result.get('transcript_length') } } return result except Exception as e: return { 'success': False, 'error': str(e), 'transcript': None, 'metadata': {} } def get_transcript_via_api(video_id: str, language: str = "en", api_url: str = None) -> Dict[str, Any]: """ Get transcript via a custom API server. Args: video_id: YouTube video ID language: Language code (default: "en") api_url: API base URL Returns: Dict containing transcript and metadata """ extractor = YouTubeExtractor() return extractor.get_transcript_via_custom_api(video_id, language, api_url) def extract_transcript_via_api(url: str, language: str = "en", api_url: str = None) -> Dict[str, Any]: """ Extract transcript via API (for HF Spaces deployment). Args: url: YouTube video URL language: Preferred language code api_url: API base URL Returns: Dict containing transcript and metadata """ extractor = YouTubeExtractor() video_id = extractor.extract_video_id(url) if not video_id: return { 'success': False, 'error': f"Could not extract video ID from URL: {url}", 'transcript': None, 'metadata': {} } # Try RapidAPI first if RAPIDAPI_KEY: result = extractor.get_transcript_via_rapidapi(video_id, language) if result['success']: return result # Try custom API result = extractor.get_transcript_via_custom_api(video_id, language, api_url) if result['success']: # Add video title if not present if not result['metadata'].get('title'): result['metadata']['title'] = extractor.get_video_title(video_id) return result if __name__ == "__main__": # Test the extractor import sys if len(sys.argv) > 1: url = sys.argv[1] result = extract_transcript(url) if result['success']: print(f"Successfully extracted transcript") print(f" Video: {result['metadata']['title']}") print(f" Length: {result['metadata']['transcript_length']:,} characters") print(f"\nFirst 500 characters:") print(result['transcript'][:500] + "...") else: print(f"Error: {result['error']}") else: print("Usage: python youtube_transcript.py ") print("\nEnvironment variables:") print(" RAPIDAPI_KEY - Your RapidAPI key for YouTube Transcriptor") print(" TRANSCRIPT_API_URL - Custom transcript API URL")