Spaces:
Sleeping
Sleeping
| """YouTube transcript extraction module.""" | |
| import os | |
| import re | |
| import urllib.request | |
| import json | |
| import requests | |
| from typing import Optional, Dict, Any, List | |
| # Try to import youtube_transcript_api (may fail on HF Spaces due to network restrictions) | |
| try: | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| YOUTUBE_TRANSCRIPT_API_AVAILABLE = True | |
| except ImportError: | |
| YOUTUBE_TRANSCRIPT_API_AVAILABLE = False | |
| print("Warning: youtube_transcript_api not available. Using external API only.") | |
| # Default external API URL for transcript extraction (works on HF Spaces) | |
| # You can use services like: | |
| # - RapidAPI YouTube Transcript API | |
| # - Your own proxy server | |
| TRANSCRIPT_API_URL = os.getenv("TRANSCRIPT_API_URL", "") | |
| RAPIDAPI_KEY = os.getenv("RAPIDAPI_KEY", "") | |
| RAPIDAPI_HOST = os.getenv("RAPIDAPI_HOST", "youtube-transcriptor.p.rapidapi.com") | |
| class YouTubeExtractor: | |
| """Extract transcripts from YouTube videos.""" | |
| def get_video_title(self, video_id: str) -> str: | |
| """ | |
| Fetch the video title from YouTube using oEmbed. | |
| Note: This may fail on HF Spaces due to network restrictions. | |
| Falls back to generic title. | |
| Args: | |
| video_id: YouTube video ID | |
| Returns: | |
| Video title or fallback title if not available | |
| """ | |
| try: | |
| # Use YouTube's oEmbed API to get video info | |
| oembed_url = f"https://www.youtube.com/oembed?url=https://www.youtube.com/watch?v={video_id}&format=json" | |
| with urllib.request.urlopen(oembed_url, timeout=10) as response: | |
| data = json.loads(response.read().decode()) | |
| return data.get('title', f"YouTube Video {video_id}") | |
| except Exception as e: | |
| # Fallback to generic title if oEmbed fails (common on HF Spaces) | |
| print(f"Could not fetch video title: {e}") | |
| return f"YouTube Video {video_id}" | |
| def extract_video_id(self, url: str) -> Optional[str]: | |
| """Extract video ID from various YouTube URL formats.""" | |
| patterns = [ | |
| r'(?:v=|\/)([0-9A-Za-z_-]{11}).*', | |
| r'(?:youtu\.be\/)([0-9A-Za-z_-]{11})', | |
| r'(?:youtube\.com/shorts/)([0-9A-Za-z_-]{11})', | |
| r'(?:youtube\.com/embed/)([0-9A-Za-z_-]{11})', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def get_transcript_via_rapidapi(self, video_id: str, language: str = "en") -> Dict[str, Any]: | |
| """ | |
| Get transcript using RapidAPI YouTube Transcriptor. | |
| This works on HF Spaces as it uses a proxy service. | |
| Args: | |
| video_id: YouTube video ID | |
| language: Language code (default: "en") | |
| Returns: | |
| Dict containing transcript and metadata | |
| """ | |
| if not RAPIDAPI_KEY: | |
| return { | |
| 'success': False, | |
| 'error': 'RAPIDAPI_KEY not set. Set it in your HF Space secrets.', | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| try: | |
| url = f"https://{RAPIDAPI_HOST}/transcript" | |
| querystring = {"video_id": video_id} | |
| headers = { | |
| "x-rapidapi-key": RAPIDAPI_KEY, | |
| "x-rapidapi-host": RAPIDAPI_HOST | |
| } | |
| response = requests.get(url, headers=headers, params=querystring, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Parse the response - structure depends on the API | |
| if isinstance(data, dict) and 'transcript' in data: | |
| # Handle different response formats | |
| transcript_data = data['transcript'] | |
| if isinstance(transcript_data, list): | |
| # List of segments with text | |
| text_parts = [] | |
| for entry in transcript_data: | |
| if isinstance(entry, dict): | |
| text_parts.append(entry.get('text', '')) | |
| else: | |
| text_parts.append(str(entry)) | |
| transcript_text = ' '.join(text_parts) | |
| else: | |
| transcript_text = str(transcript_data) | |
| elif isinstance(data, list): | |
| # Direct list of segments | |
| text_parts = [] | |
| for entry in data: | |
| if isinstance(entry, dict): | |
| text_parts.append(entry.get('text', '')) | |
| transcript_text = ' '.join(text_parts) | |
| else: | |
| transcript_text = str(data) | |
| return { | |
| 'success': True, | |
| 'transcript': transcript_text, | |
| 'metadata': { | |
| 'video_id': video_id, | |
| 'title': self.get_video_title(video_id), | |
| 'language': language, | |
| 'transcript_length': len(transcript_text) | |
| } | |
| } | |
| except requests.exceptions.RequestException as e: | |
| return { | |
| 'success': False, | |
| 'error': f"RapidAPI request failed: {str(e)}", | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': f"Error parsing transcript: {str(e)}", | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| def get_transcript_via_custom_api(self, video_id: str, language: str = "en", api_url: str = None) -> Dict[str, Any]: | |
| """ | |
| Get transcript via a custom API server. | |
| Args: | |
| video_id: YouTube video ID | |
| language: Language code | |
| api_url: Custom API base URL | |
| Returns: | |
| Dict containing transcript and metadata | |
| """ | |
| base_url = api_url or TRANSCRIPT_API_URL | |
| if not base_url: | |
| return { | |
| 'success': False, | |
| 'error': 'No transcript API URL configured. Set TRANSCRIPT_API_URL in your secrets.', | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| try: | |
| response = requests.get( | |
| f"{base_url}/transcript", | |
| params={"video_id": video_id, "language": language}, | |
| timeout=30 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("status") == "success" or data.get("transcript"): | |
| transcript_text = data.get("transcript", "") | |
| return { | |
| 'success': True, | |
| 'transcript': transcript_text, | |
| 'metadata': { | |
| 'video_id': video_id, | |
| 'title': data.get("title") or self.get_video_title(video_id), | |
| 'language': data.get("language", language), | |
| 'transcript_length': len(transcript_text) | |
| } | |
| } | |
| else: | |
| return { | |
| 'success': False, | |
| 'error': data.get("message", "Unknown error from API"), | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| except requests.exceptions.RequestException as e: | |
| return { | |
| 'success': False, | |
| 'error': f"API request failed: {str(e)}", | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| def get_transcript_direct(self, url: str, language: str = "en") -> Dict[str, Any]: | |
| """ | |
| Get transcript directly using youtube_transcript_api. | |
| Note: This may fail on HF Spaces due to network restrictions. | |
| Args: | |
| url: YouTube video URL | |
| language: Preferred language code (default: "en") | |
| Returns: | |
| Dict containing transcript text and metadata | |
| """ | |
| if not YOUTUBE_TRANSCRIPT_API_AVAILABLE: | |
| return { | |
| 'success': False, | |
| 'error': 'youtube_transcript_api not available', | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| video_id = self.extract_video_id(url) | |
| if not video_id: | |
| raise ValueError(f"Could not extract video ID from URL: {url}") | |
| try: | |
| # Use the API instance method | |
| api = YouTubeTranscriptApi() | |
| transcript_list = api.list(video_id) | |
| # Try to find transcript in the requested language | |
| transcript = None | |
| try: | |
| transcript = transcript_list.find_manually_created_transcript([language]) | |
| except: | |
| try: | |
| transcript = transcript_list.find_generated_transcript([language]) | |
| except: | |
| # Get any available transcript | |
| transcripts = list(transcript_list) | |
| if transcripts: | |
| transcript = transcripts[0] | |
| else: | |
| raise ValueError(f"No transcripts available for video: {url}") | |
| # Fetch the transcript data | |
| transcript_data = transcript.fetch() | |
| # Combine text - handle both dict and object formats | |
| text_parts = [] | |
| for entry in transcript_data: | |
| if isinstance(entry, dict): | |
| text_parts.append(entry.get("text", "")) | |
| else: | |
| # Handle FetchedTranscriptSnippet object | |
| text_parts.append(entry.text) | |
| transcript_text = " ".join(text_parts) | |
| # Get detected language | |
| detected_language = transcript.language_code if hasattr(transcript, 'language_code') else language | |
| # Get video title | |
| video_title = self.get_video_title(video_id) | |
| return { | |
| 'transcript': transcript_text, | |
| 'video_id': video_id, | |
| 'title': video_title, | |
| 'language': detected_language, | |
| 'transcript_length': len(transcript_text), | |
| 'success': True | |
| } | |
| except Exception as e: | |
| raise ValueError(f"Could not retrieve transcript: {str(e)}") | |
| def get_transcript(self, url: str, language: str = "en") -> Dict[str, Any]: | |
| """ | |
| Get transcript from a YouTube video using the best available method. | |
| Tries in order: | |
| 1. RapidAPI (if RAPIDAPI_KEY is set) - works on HF Spaces | |
| 2. Custom API (if TRANSCRIPT_API_URL is set) - works on HF Spaces | |
| 3. Direct youtube_transcript_api (may fail on HF Spaces) | |
| Args: | |
| url: YouTube video URL | |
| language: Preferred language code (default: "en") | |
| Returns: | |
| Dict containing transcript text and metadata | |
| """ | |
| video_id = self.extract_video_id(url) | |
| if not video_id: | |
| return { | |
| 'success': False, | |
| 'error': f"Could not extract video ID from URL: {url}", | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| # Method 1: Try RapidAPI (works on HF Spaces) | |
| if RAPIDAPI_KEY: | |
| result = self.get_transcript_via_rapidapi(video_id, language) | |
| if result['success']: | |
| return result | |
| print(f"RapidAPI failed: {result['error']}") | |
| # Method 2: Try custom API (works on HF Spaces) | |
| if TRANSCRIPT_API_URL: | |
| result = self.get_transcript_via_custom_api(video_id, language) | |
| if result['success']: | |
| return result | |
| print(f"Custom API failed: {result['error']}") | |
| # Method 3: Try direct access (may fail on HF Spaces) | |
| if YOUTUBE_TRANSCRIPT_API_AVAILABLE: | |
| try: | |
| result = self.get_transcript_direct(url, language) | |
| result['success'] = True | |
| return result | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': f"Direct access failed (HF Spaces may block YouTube): {str(e)}", | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| # No method worked | |
| return { | |
| 'success': False, | |
| 'error': 'No transcript extraction method available. Set RAPIDAPI_KEY or TRANSCRIPT_API_URL in your HF Space secrets.', | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| def extract_transcript(url: str, language: str = "en") -> Dict[str, Any]: | |
| """ | |
| Convenience function to extract transcript. | |
| Args: | |
| url: YouTube video URL | |
| language: Preferred language code | |
| Returns: | |
| Dict containing transcript and metadata | |
| """ | |
| extractor = YouTubeExtractor() | |
| try: | |
| result = extractor.get_transcript(url, language) | |
| if result.get('success'): | |
| return { | |
| 'success': True, | |
| 'transcript': result.get('transcript'), | |
| 'metadata': { | |
| 'video_id': result.get('video_id'), | |
| 'title': result.get('title'), | |
| 'language': result.get('language'), | |
| 'transcript_length': result.get('transcript_length') | |
| } | |
| } | |
| return result | |
| except Exception as e: | |
| return { | |
| 'success': False, | |
| 'error': str(e), | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| def get_transcript_via_api(video_id: str, language: str = "en", api_url: str = None) -> Dict[str, Any]: | |
| """ | |
| Get transcript via a custom API server. | |
| Args: | |
| video_id: YouTube video ID | |
| language: Language code (default: "en") | |
| api_url: API base URL | |
| Returns: | |
| Dict containing transcript and metadata | |
| """ | |
| extractor = YouTubeExtractor() | |
| return extractor.get_transcript_via_custom_api(video_id, language, api_url) | |
| def extract_transcript_via_api(url: str, language: str = "en", api_url: str = None) -> Dict[str, Any]: | |
| """ | |
| Extract transcript via API (for HF Spaces deployment). | |
| Args: | |
| url: YouTube video URL | |
| language: Preferred language code | |
| api_url: API base URL | |
| Returns: | |
| Dict containing transcript and metadata | |
| """ | |
| extractor = YouTubeExtractor() | |
| video_id = extractor.extract_video_id(url) | |
| if not video_id: | |
| return { | |
| 'success': False, | |
| 'error': f"Could not extract video ID from URL: {url}", | |
| 'transcript': None, | |
| 'metadata': {} | |
| } | |
| # Try RapidAPI first | |
| if RAPIDAPI_KEY: | |
| result = extractor.get_transcript_via_rapidapi(video_id, language) | |
| if result['success']: | |
| return result | |
| # Try custom API | |
| result = extractor.get_transcript_via_custom_api(video_id, language, api_url) | |
| if result['success']: | |
| # Add video title if not present | |
| if not result['metadata'].get('title'): | |
| result['metadata']['title'] = extractor.get_video_title(video_id) | |
| return result | |
| if __name__ == "__main__": | |
| # Test the extractor | |
| import sys | |
| if len(sys.argv) > 1: | |
| url = sys.argv[1] | |
| result = extract_transcript(url) | |
| if result['success']: | |
| print(f"Successfully extracted transcript") | |
| print(f" Video: {result['metadata']['title']}") | |
| print(f" Length: {result['metadata']['transcript_length']:,} characters") | |
| print(f"\nFirst 500 characters:") | |
| print(result['transcript'][:500] + "...") | |
| else: | |
| print(f"Error: {result['error']}") | |
| else: | |
| print("Usage: python youtube_transcript.py <YouTube_URL>") | |
| print("\nEnvironment variables:") | |
| print(" RAPIDAPI_KEY - Your RapidAPI key for YouTube Transcriptor") | |
| print(" TRANSCRIPT_API_URL - Custom transcript API URL") | |