from mcp.server.fastmcp import FastMCP import requests import random import time import uuid import re import concurrent.futures mcp = FastMCP("Youtube") def generate_random_headers(): """Generate randomized headers to simulate a new user.""" user_agents = [ 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36', 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15', 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36' ] random_user_agent = random.choice(user_agents) current_timestamp_ms = int(time.time() * 1000) anonymous_user_id = uuid.uuid4().hex track_user_id = f"G-{current_timestamp_ms + random.randint(1, 1000)}" uab_collina = f"{current_timestamp_ms}{random.randint(10**12, 10**13 - 1)}" sbox_guid = f"MTc1MDQyNjM3OXw{random.randint(100, 999)}|{random.randint(100000000, 999999999)}" g_state = f'{{"i_p":{current_timestamp_ms + random.randint(1, 1000)},"i_l":1}}' cookie_string = ( f"sbox-guid={sbox_guid}; " f"_uab_collina={uab_collina}; " f"_trackUserId={track_user_id}; " f"anonymous_user_id={anonymous_user_id}; " f"is_first_visit=true; " f"g_state={g_state}" ) headers = { 'User-Agent': random_user_agent, 'Accept': 'application/json, text/plain, */*', 'Accept-Language': 'en-US,en;q=0.5', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Connection': 'keep-alive', 'Referer': 'https://notegpt.io/youtube-transcript-generator', 'Cookie': cookie_string, 'Sec-Fetch-Dest': 'empty', 'Sec-Fetch-Mode': 'cors', 'Sec-Fetch-Site': 'same-origin', 'Priority': 'u=0', 'TE': 'trailers', } return headers def extract_video_id(url_or_id: str) -> str: """Extract video ID from YouTube URL or return if already an ID.""" id_pattern = r'^[\w-]{11}$' # Clean up input before matching url_or_id = url_or_id.strip() if re.match(id_pattern, url_or_id) and not ('youtube.com' in url_or_id or 'youtu.be' in url_or_id): return url_or_id patterns = [ r'(?:youtube\.com\/watch\?v=)([\w-]{11})', r'(?:youtube\.com\/embed\/)([\w-]{11})', r'(?:youtu\.be\/)([\w-]{11})', r'(?:youtube\.com\/v\/)([\w-]{11})', r'(?:youtube\.com\/shorts\/)([\w-]{11})', ] for pattern in patterns: match = re.search(pattern, url_or_id) if match: return match.group(1) if re.match(id_pattern, url_or_id): return url_or_id raise ValueError(f"Invalid YouTube URL or video ID: {url_or_id}") def _fetch_single_transcript(video_id: str, include_timestamps: bool) -> str: """Helper function to fetch and format a single video transcript.""" api_url = 'https://notegpt.io/api/v2/video-transcript' params = {'platform': 'youtube', 'video_id': video_id} try: # Each call gets its own unique headers headers = generate_random_headers() response = requests.get(api_url, params=params, headers=headers, timeout=30) response.raise_for_status() data = response.json() if data.get('code') != 100000: return f"Error for video {video_id}: API error - {data.get('message', 'Unknown error')}" video_info = data.get('data', {}).get('videoInfo', {}) video_title = video_info.get('name', 'Unknown Title') channel_name = video_info.get('author', 'Unknown Channel') transcripts = data.get('data', {}).get('transcripts', {}) transcript_entries = None for lang_code in ['en', 'en_auto']: if lang_code in transcripts: transcript_entries = transcripts[lang_code].get('custom', []) break if not transcript_entries and transcripts: first_lang = next(iter(transcripts.values())) transcript_entries = first_lang.get('custom', []) if not transcript_entries: return f"Error for video {video_id}: No transcript available." result_parts = [f"Title: {video_title}", f"Channel: {channel_name}", f"Video ID: {video_id}", "\n---"] if include_timestamps: formatted_transcript = "\n\n".join([f"[{entry['start']}] {entry['text']}" for entry in transcript_entries]) result_parts.append(formatted_transcript) else: result_parts.append(" ".join(entry['text'] for entry in transcript_entries)) return "\n".join(result_parts) except requests.exceptions.HTTPError as e: return f"Error for video {video_id}: HTTP error - {e}" except requests.exceptions.RequestException as e: return f"Error for video {video_id}: Network error - {e}" except Exception as e: return f"Error for video {video_id}: An unexpected error occurred - {e}" @mcp.tool() def get_youtube_video_transcript(video_urls_or_ids: str, include_timestamps: bool = False): """Get transcript text from one or more YouTube videos in parallel. Args: video_urls_or_ids: A single YouTube URL or 11-character video ID, OR a comma-separated string of multiple URLs or IDs. include_timestamps: Include timestamps in the output (default: False). Returns: A string containing the formatted transcript(s). If multiple videos are processed, their transcripts are concatenated and separated by a clear delimiter. Errors for individual videos are reported inline. Examples: # 1. Single URL get_youtube_video_transcript(video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ") # 2. Multiple URLs (comma-separated) get_youtube_video_transcript( video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ,https://youtu.be/L_Guz73e6fw" ) # 3. Multiple Video IDs (comma-separated) get_youtube_video_transcript( video_urls_or_ids="dQw4w9WgXcQ,L_Guz73e6fw,QH2-TGUlwu4" ) # 4. Mix of URLs and IDs with timestamps get_youtube_video_transcript( video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ,L_Guz73e6fw", include_timestamps=True ) """ inputs = [item.strip() for item in video_urls_or_ids.split(',')] video_ids = [] errors = [] for item in inputs: try: video_ids.append(extract_video_id(item)) except ValueError as e: errors.append(str(e)) all_results = [] if errors: all_results.append("--- INPUT ERRORS ---\n" + "\n".join(errors)) if not video_ids: if not errors: return "Error: No valid video URLs or IDs were provided." return "\n".join(all_results) # Use ThreadPoolExecutor to fetch transcripts in parallel with concurrent.futures.ThreadPoolExecutor() as executor: # map() ensures that each video_id is passed to the helper function # A lambda is used to pass the include_timestamps argument as well future_to_video = { executor.submit(_fetch_single_transcript, vid, include_timestamps): vid for vid in video_ids } # Process results as they complete for future in concurrent.futures.as_completed(future_to_video): all_results.append(future.result()) # Join all individual results with a clear separator return "\n\n--- --- ---\n\n".join(all_results) if __name__ == "__main__": mcp.run()