One-MCP / tools /youtube.py
arcticaurora's picture
Update tools/youtube.py
a4ac4ba verified
from mcp.server.fastmcp import FastMCP
import requests
import random
import time
import uuid
import re
import concurrent.futures
mcp = FastMCP("Youtube")
def generate_random_headers():
"""Generate randomized headers to simulate a new user."""
user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/109.0.0.0 Safari/537.36',
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/16.1 Safari/605.1.15',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'
]
random_user_agent = random.choice(user_agents)
current_timestamp_ms = int(time.time() * 1000)
anonymous_user_id = uuid.uuid4().hex
track_user_id = f"G-{current_timestamp_ms + random.randint(1, 1000)}"
uab_collina = f"{current_timestamp_ms}{random.randint(10**12, 10**13 - 1)}"
sbox_guid = f"MTc1MDQyNjM3OXw{random.randint(100, 999)}|{random.randint(100000000, 999999999)}"
g_state = f'{{"i_p":{current_timestamp_ms + random.randint(1, 1000)},"i_l":1}}'
cookie_string = (
f"sbox-guid={sbox_guid}; "
f"_uab_collina={uab_collina}; "
f"_trackUserId={track_user_id}; "
f"anonymous_user_id={anonymous_user_id}; "
f"is_first_visit=true; "
f"g_state={g_state}"
)
headers = {
'User-Agent': random_user_agent,
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate, br, zstd',
'Connection': 'keep-alive',
'Referer': 'https://notegpt.io/youtube-transcript-generator',
'Cookie': cookie_string,
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
'Priority': 'u=0',
'TE': 'trailers',
}
return headers
def extract_video_id(url_or_id: str) -> str:
"""Extract video ID from YouTube URL or return if already an ID."""
id_pattern = r'^[\w-]{11}$'
# Clean up input before matching
url_or_id = url_or_id.strip()
if re.match(id_pattern, url_or_id) and not ('youtube.com' in url_or_id or 'youtu.be' in url_or_id):
return url_or_id
patterns = [
r'(?:youtube\.com\/watch\?v=)([\w-]{11})',
r'(?:youtube\.com\/embed\/)([\w-]{11})',
r'(?:youtu\.be\/)([\w-]{11})',
r'(?:youtube\.com\/v\/)([\w-]{11})',
r'(?:youtube\.com\/shorts\/)([\w-]{11})',
]
for pattern in patterns:
match = re.search(pattern, url_or_id)
if match:
return match.group(1)
if re.match(id_pattern, url_or_id):
return url_or_id
raise ValueError(f"Invalid YouTube URL or video ID: {url_or_id}")
def _fetch_single_transcript(video_id: str, include_timestamps: bool) -> str:
"""Helper function to fetch and format a single video transcript."""
api_url = 'https://notegpt.io/api/v2/video-transcript'
params = {'platform': 'youtube', 'video_id': video_id}
try:
# Each call gets its own unique headers
headers = generate_random_headers()
response = requests.get(api_url, params=params, headers=headers, timeout=30)
response.raise_for_status()
data = response.json()
if data.get('code') != 100000:
return f"Error for video {video_id}: API error - {data.get('message', 'Unknown error')}"
video_info = data.get('data', {}).get('videoInfo', {})
video_title = video_info.get('name', 'Unknown Title')
channel_name = video_info.get('author', 'Unknown Channel')
transcripts = data.get('data', {}).get('transcripts', {})
transcript_entries = None
for lang_code in ['en', 'en_auto']:
if lang_code in transcripts:
transcript_entries = transcripts[lang_code].get('custom', [])
break
if not transcript_entries and transcripts:
first_lang = next(iter(transcripts.values()))
transcript_entries = first_lang.get('custom', [])
if not transcript_entries:
return f"Error for video {video_id}: No transcript available."
result_parts = [f"Title: {video_title}", f"Channel: {channel_name}", f"Video ID: {video_id}", "\n---"]
if include_timestamps:
formatted_transcript = "\n\n".join([f"[{entry['start']}] {entry['text']}" for entry in transcript_entries])
result_parts.append(formatted_transcript)
else:
result_parts.append(" ".join(entry['text'] for entry in transcript_entries))
return "\n".join(result_parts)
except requests.exceptions.HTTPError as e:
return f"Error for video {video_id}: HTTP error - {e}"
except requests.exceptions.RequestException as e:
return f"Error for video {video_id}: Network error - {e}"
except Exception as e:
return f"Error for video {video_id}: An unexpected error occurred - {e}"
@mcp.tool()
def get_youtube_video_transcript(video_urls_or_ids: str, include_timestamps: bool = False):
"""Get transcript text from one or more YouTube videos in parallel.
Args:
video_urls_or_ids: A single YouTube URL or 11-character video ID,
OR a comma-separated string of multiple URLs or IDs.
include_timestamps: Include timestamps in the output (default: False).
Returns:
A string containing the formatted transcript(s). If multiple videos
are processed, their transcripts are concatenated and separated by a
clear delimiter. Errors for individual videos are reported inline.
Examples:
# 1. Single URL
get_youtube_video_transcript(video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
# 2. Multiple URLs (comma-separated)
get_youtube_video_transcript(
video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ,https://youtu.be/L_Guz73e6fw"
)
# 3. Multiple Video IDs (comma-separated)
get_youtube_video_transcript(
video_urls_or_ids="dQw4w9WgXcQ,L_Guz73e6fw,QH2-TGUlwu4"
)
# 4. Mix of URLs and IDs with timestamps
get_youtube_video_transcript(
video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ,L_Guz73e6fw",
include_timestamps=True
)
"""
inputs = [item.strip() for item in video_urls_or_ids.split(',')]
video_ids = []
errors = []
for item in inputs:
try:
video_ids.append(extract_video_id(item))
except ValueError as e:
errors.append(str(e))
all_results = []
if errors:
all_results.append("--- INPUT ERRORS ---\n" + "\n".join(errors))
if not video_ids:
if not errors:
return "Error: No valid video URLs or IDs were provided."
return "\n".join(all_results)
# Use ThreadPoolExecutor to fetch transcripts in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
# map() ensures that each video_id is passed to the helper function
# A lambda is used to pass the include_timestamps argument as well
future_to_video = {
executor.submit(_fetch_single_transcript, vid, include_timestamps): vid for vid in video_ids
}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_video):
all_results.append(future.result())
# Join all individual results with a clear separator
return "\n\n--- --- ---\n\n".join(all_results)
if __name__ == "__main__":
mcp.run()