Spaces:
Sleeping
Sleeping
| from mcp.server.fastmcp import FastMCP | |
| import requests | |
| import random | |
| import time | |
| import uuid | |
| import re | |
| import concurrent.futures | |
| mcp = FastMCP("Youtube") | |
| def generate_random_headers(): | |
| """Generate randomized headers to simulate a new user.""" | |
| user_agents = [ | |
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36', | |
| 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36', | |
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', | |
| 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15', | |
| 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36' | |
| ] | |
| random_user_agent = random.choice(user_agents) | |
| current_timestamp_ms = int(time.time() * 1000) | |
| anonymous_user_id = uuid.uuid4().hex | |
| track_user_id = f"G-{current_timestamp_ms + random.randint(1, 1000)}" | |
| uab_collina = f"{current_timestamp_ms}{random.randint(10**12, 10**13 - 1)}" | |
| sbox_guid = f"MTc1MDQyNjM3OXw{random.randint(100, 999)}|{random.randint(100000000, 999999999)}" | |
| g_state = f'{{"i_p":{current_timestamp_ms + random.randint(1, 1000)},"i_l":1}}' | |
| cookie_string = ( | |
| f"sbox-guid={sbox_guid}; " | |
| f"_uab_collina={uab_collina}; " | |
| f"_trackUserId={track_user_id}; " | |
| f"anonymous_user_id={anonymous_user_id}; " | |
| f"is_first_visit=true; " | |
| f"g_state={g_state}" | |
| ) | |
| headers = { | |
| 'User-Agent': random_user_agent, | |
| 'Accept': 'application/json, text/plain, */*', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Accept-Encoding': 'gzip, deflate, br, zstd', | |
| 'Connection': 'keep-alive', | |
| 'Referer': 'https://notegpt.io/youtube-transcript-generator', | |
| 'Cookie': cookie_string, | |
| 'Sec-Fetch-Dest': 'empty', | |
| 'Sec-Fetch-Mode': 'cors', | |
| 'Sec-Fetch-Site': 'same-origin', | |
| 'Priority': 'u=0', | |
| 'TE': 'trailers', | |
| } | |
| return headers | |
| def extract_video_id(url_or_id: str) -> str: | |
| """Extract video ID from YouTube URL or return if already an ID.""" | |
| id_pattern = r'^[\w-]{11}$' | |
| # Clean up input before matching | |
| url_or_id = url_or_id.strip() | |
| if re.match(id_pattern, url_or_id) and not ('youtube.com' in url_or_id or 'youtu.be' in url_or_id): | |
| return url_or_id | |
| patterns = [ | |
| r'(?:youtube\.com\/watch\?v=)([\w-]{11})', | |
| r'(?:youtube\.com\/embed\/)([\w-]{11})', | |
| r'(?:youtu\.be\/)([\w-]{11})', | |
| r'(?:youtube\.com\/v\/)([\w-]{11})', | |
| r'(?:youtube\.com\/shorts\/)([\w-]{11})', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url_or_id) | |
| if match: | |
| return match.group(1) | |
| if re.match(id_pattern, url_or_id): | |
| return url_or_id | |
| raise ValueError(f"Invalid YouTube URL or video ID: {url_or_id}") | |
| def _fetch_single_transcript(video_id: str, include_timestamps: bool) -> str: | |
| """Helper function to fetch and format a single video transcript.""" | |
| api_url = 'https://notegpt.io/api/v2/video-transcript' | |
| params = {'platform': 'youtube', 'video_id': video_id} | |
| try: | |
| # Each call gets its own unique headers | |
| headers = generate_random_headers() | |
| response = requests.get(api_url, params=params, headers=headers, timeout=30) | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get('code') != 100000: | |
| return f"Error for video {video_id}: API error - {data.get('message', 'Unknown error')}" | |
| video_info = data.get('data', {}).get('videoInfo', {}) | |
| video_title = video_info.get('name', 'Unknown Title') | |
| channel_name = video_info.get('author', 'Unknown Channel') | |
| transcripts = data.get('data', {}).get('transcripts', {}) | |
| transcript_entries = None | |
| for lang_code in ['en', 'en_auto']: | |
| if lang_code in transcripts: | |
| transcript_entries = transcripts[lang_code].get('custom', []) | |
| break | |
| if not transcript_entries and transcripts: | |
| first_lang = next(iter(transcripts.values())) | |
| transcript_entries = first_lang.get('custom', []) | |
| if not transcript_entries: | |
| return f"Error for video {video_id}: No transcript available." | |
| result_parts = [f"Title: {video_title}", f"Channel: {channel_name}", f"Video ID: {video_id}", "\n---"] | |
| if include_timestamps: | |
| formatted_transcript = "\n\n".join([f"[{entry['start']}] {entry['text']}" for entry in transcript_entries]) | |
| result_parts.append(formatted_transcript) | |
| else: | |
| result_parts.append(" ".join(entry['text'] for entry in transcript_entries)) | |
| return "\n".join(result_parts) | |
| except requests.exceptions.HTTPError as e: | |
| return f"Error for video {video_id}: HTTP error - {e}" | |
| except requests.exceptions.RequestException as e: | |
| return f"Error for video {video_id}: Network error - {e}" | |
| except Exception as e: | |
| return f"Error for video {video_id}: An unexpected error occurred - {e}" | |
| def get_youtube_video_transcript(video_urls_or_ids: str, include_timestamps: bool = False): | |
| """Get transcript text from one or more YouTube videos in parallel. | |
| Args: | |
| video_urls_or_ids: A single YouTube URL or 11-character video ID, | |
| OR a comma-separated string of multiple URLs or IDs. | |
| include_timestamps: Include timestamps in the output (default: False). | |
| Returns: | |
| A string containing the formatted transcript(s). If multiple videos | |
| are processed, their transcripts are concatenated and separated by a | |
| clear delimiter. Errors for individual videos are reported inline. | |
| Examples: | |
| # 1. Single URL | |
| get_youtube_video_transcript(video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ") | |
| # 2. Multiple URLs (comma-separated) | |
| get_youtube_video_transcript( | |
| video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ,https://youtu.be/L_Guz73e6fw" | |
| ) | |
| # 3. Multiple Video IDs (comma-separated) | |
| get_youtube_video_transcript( | |
| video_urls_or_ids="dQw4w9WgXcQ,L_Guz73e6fw,QH2-TGUlwu4" | |
| ) | |
| # 4. Mix of URLs and IDs with timestamps | |
| get_youtube_video_transcript( | |
| video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ,L_Guz73e6fw", | |
| include_timestamps=True | |
| ) | |
| """ | |
| inputs = [item.strip() for item in video_urls_or_ids.split(',')] | |
| video_ids = [] | |
| errors = [] | |
| for item in inputs: | |
| try: | |
| video_ids.append(extract_video_id(item)) | |
| except ValueError as e: | |
| errors.append(str(e)) | |
| all_results = [] | |
| if errors: | |
| all_results.append("--- INPUT ERRORS ---\n" + "\n".join(errors)) | |
| if not video_ids: | |
| if not errors: | |
| return "Error: No valid video URLs or IDs were provided." | |
| return "\n".join(all_results) | |
| # Use ThreadPoolExecutor to fetch transcripts in parallel | |
| with concurrent.futures.ThreadPoolExecutor() as executor: | |
| # map() ensures that each video_id is passed to the helper function | |
| # A lambda is used to pass the include_timestamps argument as well | |
| future_to_video = { | |
| executor.submit(_fetch_single_transcript, vid, include_timestamps): vid for vid in video_ids | |
| } | |
| # Process results as they complete | |
| for future in concurrent.futures.as_completed(future_to_video): | |
| all_results.append(future.result()) | |
| # Join all individual results with a clear separator | |
| return "\n\n--- --- ---\n\n".join(all_results) | |
| if __name__ == "__main__": | |
| mcp.run() |