arcticaurora commited on
Commit
a4ac4ba
·
verified ·
1 Parent(s): afdc948

Update tools/youtube.py

Browse files
Files changed (1) hide show
  1. tools/youtube.py +87 -42
tools/youtube.py CHANGED
@@ -4,6 +4,7 @@ import random
4
  import time
5
  import uuid
6
  import re
 
7
 
8
  mcp = FastMCP("Youtube")
9
 
@@ -55,6 +56,9 @@ def extract_video_id(url_or_id: str) -> str:
55
  """Extract video ID from YouTube URL or return if already an ID."""
56
  id_pattern = r'^[\w-]{11}$'
57
 
 
 
 
58
  if re.match(id_pattern, url_or_id) and not ('youtube.com' in url_or_id or 'youtu.be' in url_or_id):
59
  return url_or_id
60
 
@@ -76,42 +80,27 @@ def extract_video_id(url_or_id: str) -> str:
76
 
77
  raise ValueError(f"Invalid YouTube URL or video ID: {url_or_id}")
78
 
79
- @mcp.tool()
80
- def get_youtube_video_transcript(video_url_or_id: str, include_timestamps: bool = False):
81
- """Get transcript text from a YouTube video.
82
-
83
- Args:
84
- video_url_or_id: YouTube URL or 11-char video ID
85
- include_timestamps: Include timestamps (default: False)
86
-
87
- """
88
- video_id = extract_video_id(video_url_or_id)
89
-
90
  api_url = 'https://notegpt.io/api/v2/video-transcript'
91
- params = {
92
- 'platform': 'youtube',
93
- 'video_id': video_id,
94
- }
95
-
96
- headers = generate_random_headers()
97
 
98
  try:
 
 
99
  response = requests.get(api_url, params=params, headers=headers, timeout=30)
100
  response.raise_for_status()
101
 
102
  data = response.json()
103
 
104
  if data.get('code') != 100000:
105
- raise ValueError(f"API error: {data.get('message', 'Unknown error')}")
106
 
107
- # Extract video info
108
  video_info = data.get('data', {}).get('videoInfo', {})
109
  video_title = video_info.get('name', 'Unknown Title')
110
  channel_name = video_info.get('author', 'Unknown Channel')
111
 
112
- # Extract transcripts
113
  transcripts = data.get('data', {}).get('transcripts', {})
114
-
115
  transcript_entries = None
116
  for lang_code in ['en', 'en_auto']:
117
  if lang_code in transcripts:
@@ -123,36 +112,92 @@ def get_youtube_video_transcript(video_url_or_id: str, include_timestamps: bool
123
  transcript_entries = first_lang.get('custom', [])
124
 
125
  if not transcript_entries:
126
- raise ValueError("No transcript available")
127
-
128
- # Format the transcript
129
- result_parts = [
130
- f"Title: {video_title}",
131
- f"Channel: {channel_name}",
132
- "\n---\n"
133
- ]
134
 
135
  if include_timestamps:
136
- formatted_transcript = []
137
- for entry in transcript_entries:
138
- timestamp = f"[{entry['start']}]"
139
- text = entry['text']
140
- formatted_transcript.append(f"{timestamp} {text}")
141
- result_parts.append("\n\n".join(formatted_transcript))
142
  else:
143
  result_parts.append(" ".join(entry['text'] for entry in transcript_entries))
144
 
145
  return "\n".join(result_parts)
146
-
147
  except requests.exceptions.HTTPError as e:
148
- if e.response.status_code == 404:
149
- raise ValueError(f"Video not found: {video_id}")
150
- else:
151
- raise ValueError(f"HTTP error: {e}")
152
  except requests.exceptions.RequestException as e:
153
- raise ValueError(f"Network error: {e}")
154
  except Exception as e:
155
- raise ValueError(f"Error: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
 
157
  if __name__ == "__main__":
158
  mcp.run()
 
4
  import time
5
  import uuid
6
  import re
7
+ import concurrent.futures
8
 
9
  mcp = FastMCP("Youtube")
10
 
 
56
  """Extract video ID from YouTube URL or return if already an ID."""
57
  id_pattern = r'^[\w-]{11}$'
58
 
59
+ # Clean up input before matching
60
+ url_or_id = url_or_id.strip()
61
+
62
  if re.match(id_pattern, url_or_id) and not ('youtube.com' in url_or_id or 'youtu.be' in url_or_id):
63
  return url_or_id
64
 
 
80
 
81
  raise ValueError(f"Invalid YouTube URL or video ID: {url_or_id}")
82
 
83
+ def _fetch_single_transcript(video_id: str, include_timestamps: bool) -> str:
84
+ """Helper function to fetch and format a single video transcript."""
 
 
 
 
 
 
 
 
 
85
  api_url = 'https://notegpt.io/api/v2/video-transcript'
86
+ params = {'platform': 'youtube', 'video_id': video_id}
 
 
 
 
 
87
 
88
  try:
89
+ # Each call gets its own unique headers
90
+ headers = generate_random_headers()
91
  response = requests.get(api_url, params=params, headers=headers, timeout=30)
92
  response.raise_for_status()
93
 
94
  data = response.json()
95
 
96
  if data.get('code') != 100000:
97
+ return f"Error for video {video_id}: API error - {data.get('message', 'Unknown error')}"
98
 
 
99
  video_info = data.get('data', {}).get('videoInfo', {})
100
  video_title = video_info.get('name', 'Unknown Title')
101
  channel_name = video_info.get('author', 'Unknown Channel')
102
 
 
103
  transcripts = data.get('data', {}).get('transcripts', {})
 
104
  transcript_entries = None
105
  for lang_code in ['en', 'en_auto']:
106
  if lang_code in transcripts:
 
112
  transcript_entries = first_lang.get('custom', [])
113
 
114
  if not transcript_entries:
115
+ return f"Error for video {video_id}: No transcript available."
116
+
117
+ result_parts = [f"Title: {video_title}", f"Channel: {channel_name}", f"Video ID: {video_id}", "\n---"]
 
 
 
 
 
118
 
119
  if include_timestamps:
120
+ formatted_transcript = "\n\n".join([f"[{entry['start']}] {entry['text']}" for entry in transcript_entries])
121
+ result_parts.append(formatted_transcript)
 
 
 
 
122
  else:
123
  result_parts.append(" ".join(entry['text'] for entry in transcript_entries))
124
 
125
  return "\n".join(result_parts)
126
+
127
  except requests.exceptions.HTTPError as e:
128
+ return f"Error for video {video_id}: HTTP error - {e}"
 
 
 
129
  except requests.exceptions.RequestException as e:
130
+ return f"Error for video {video_id}: Network error - {e}"
131
  except Exception as e:
132
+ return f"Error for video {video_id}: An unexpected error occurred - {e}"
133
+
134
+ @mcp.tool()
135
+ def get_youtube_video_transcript(video_urls_or_ids: str, include_timestamps: bool = False):
136
+ """Get transcript text from one or more YouTube videos in parallel.
137
+
138
+ Args:
139
+ video_urls_or_ids: A single YouTube URL or 11-character video ID,
140
+ OR a comma-separated string of multiple URLs or IDs.
141
+ include_timestamps: Include timestamps in the output (default: False).
142
+
143
+ Returns:
144
+ A string containing the formatted transcript(s). If multiple videos
145
+ are processed, their transcripts are concatenated and separated by a
146
+ clear delimiter. Errors for individual videos are reported inline.
147
+
148
+ Examples:
149
+ # 1. Single URL
150
+ get_youtube_video_transcript(video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ")
151
+
152
+ # 2. Multiple URLs (comma-separated)
153
+ get_youtube_video_transcript(
154
+ video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ,https://youtu.be/L_Guz73e6fw"
155
+ )
156
+
157
+ # 3. Multiple Video IDs (comma-separated)
158
+ get_youtube_video_transcript(
159
+ video_urls_or_ids="dQw4w9WgXcQ,L_Guz73e6fw,QH2-TGUlwu4"
160
+ )
161
+
162
+ # 4. Mix of URLs and IDs with timestamps
163
+ get_youtube_video_transcript(
164
+ video_urls_or_ids="https://www.youtube.com/watch?v=dQw4w9WgXcQ,L_Guz73e6fw",
165
+ include_timestamps=True
166
+ )
167
+ """
168
+ inputs = [item.strip() for item in video_urls_or_ids.split(',')]
169
+ video_ids = []
170
+ errors = []
171
+
172
+ for item in inputs:
173
+ try:
174
+ video_ids.append(extract_video_id(item))
175
+ except ValueError as e:
176
+ errors.append(str(e))
177
+
178
+ all_results = []
179
+ if errors:
180
+ all_results.append("--- INPUT ERRORS ---\n" + "\n".join(errors))
181
+
182
+ if not video_ids:
183
+ if not errors:
184
+ return "Error: No valid video URLs or IDs were provided."
185
+ return "\n".join(all_results)
186
+
187
+ # Use ThreadPoolExecutor to fetch transcripts in parallel
188
+ with concurrent.futures.ThreadPoolExecutor() as executor:
189
+ # map() ensures that each video_id is passed to the helper function
190
+ # A lambda is used to pass the include_timestamps argument as well
191
+ future_to_video = {
192
+ executor.submit(_fetch_single_transcript, vid, include_timestamps): vid for vid in video_ids
193
+ }
194
+
195
+ # Process results as they complete
196
+ for future in concurrent.futures.as_completed(future_to_video):
197
+ all_results.append(future.result())
198
+
199
+ # Join all individual results with a clear separator
200
+ return "\n\n--- --- ---\n\n".join(all_results)
201
 
202
  if __name__ == "__main__":
203
  mcp.run()