@woai commited on
Commit
d619c43
·
0 Parent(s):

Prepare for Hugging Face Spaces deployment

Browse files
Files changed (12) hide show
  1. .gitignore +45 -0
  2. README.md +157 -0
  3. api_server.py +559 -0
  4. app.py +401 -0
  5. gemini_helper.py +297 -0
  6. gradio_app.py +383 -0
  7. main.py +83 -0
  8. mcp_handlers.py +478 -0
  9. models.py +10 -0
  10. pyproject.toml +17 -0
  11. requirements.txt +9 -0
  12. utils.py +57 -0
.gitignore ADDED
@@ -0,0 +1,45 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Python-generated files
2
+ __pycache__/
3
+ *.py[oc]
4
+ *.pyc
5
+ *.pyo
6
+ *.pyd
7
+ build/
8
+ dist/
9
+ wheels/
10
+ *.egg-info/
11
+ *.egg
12
+
13
+ # Virtual environments
14
+ .venv/
15
+ venv/
16
+ env/
17
+
18
+ # Environment files
19
+ .env
20
+ .env.local
21
+ .env.development.local
22
+ .env.test.local
23
+ .env.production.local
24
+
25
+ # IDE files
26
+ .vscode/
27
+ .idea/
28
+ *.swp
29
+ *.swo
30
+ *~
31
+
32
+ # OS files
33
+ .DS_Store
34
+ Thumbs.db
35
+
36
+ # Logs
37
+ *.log
38
+
39
+ # Temporary files
40
+ *.tmp
41
+ *.temp
42
+
43
+ # Test files
44
+ test_*.py
45
+ debug_*.py
README.md ADDED
@@ -0,0 +1,157 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # 🎬 YouTube Creator MetaData Extractor
2
+
3
+ AI-powered tool for content creators to analyze YouTube videos and generate professional metadata using advanced language models.
4
+
5
+ ## 🚀 Features
6
+
7
+ - **🔍 Video Search**: Search YouTube videos by keywords with advanced filters
8
+ - **📊 Video Analysis**: Extract comprehensive video metadata (views, likes, duration, etc.)
9
+ - **📝 Transcript Extraction**: Get video transcripts in multiple languages
10
+ - **⏱️ Smart Timecodes**: AI-generated timecodes for better video navigation
11
+ - **🤖 Gemini AI Integration**: Advanced timecode generation using Google's Gemini 2.0
12
+ - **🌐 Multi-language Support**: Works with videos in Ukrainian, Russian, English, and more
13
+ - **📱 URL Flexibility**: Supports all YouTube URL formats (regular, shorts, embed links)
14
+
15
+ ## 🛠️ Setup
16
+
17
+ ### Required API Keys
18
+
19
+ To use this tool, you need two API keys:
20
+
21
+ 1. **YouTube Data API v3 Key**
22
+ - Go to [Google Cloud Console](https://console.developers.google.com/)
23
+ - Create a new project or select existing
24
+ - Enable "YouTube Data API v3"
25
+ - Create credentials (API Key)
26
+
27
+ 2. **Gemini API Key** (for AI features)
28
+ - Visit [Google AI Studio](https://ai.google.dev/)
29
+ - Get your free API key for Gemini
30
+
31
+ ### Environment Variables
32
+
33
+ Set these in your Hugging Face Space settings:
34
+
35
+ ```
36
+ YOUTUBE_API_KEY=your_youtube_api_key_here
37
+ GEMINI_API_KEY=your_gemini_api_key_here
38
+ ```
39
+
40
+ ## 📖 How to Use
41
+
42
+ ### 1. Video Search
43
+ - Enter keywords to find YouTube videos
44
+ - Filter by upload date, view count, duration
45
+ - Get detailed metadata for any video
46
+
47
+ ### 2. Transcript Analysis
48
+ - Extract transcripts from videos with subtitles
49
+ - Support for auto-generated and manual captions
50
+ - Multiple language detection and support
51
+
52
+ ### 3. Timecode Generation
53
+
54
+ **Basic Timecodes**: Algorithmic segmentation based on transcript timing
55
+ **AI Timecodes**: Intelligent topic-based segmentation using Gemini AI
56
+
57
+ **Supported Formats**:
58
+ - **YouTube**: Ready for video descriptions (e.g., `05:30 Topic description`)
59
+ - **Markdown**: Clickable links with timestamps (e.g., `- [05:30](link) Topic`)
60
+
61
+ **Language Codes**:
62
+ - `uk` - Ukrainian
63
+ - `ru` - Russian
64
+ - `en` - English
65
+ - And many others (ISO 639-1 standard)
66
+
67
+ ## 🔧 API Reference
68
+
69
+ This application provides both a web interface and REST API endpoints:
70
+
71
+ ### Search Videos
72
+ ```http
73
+ POST /api/search
74
+ {
75
+ "query": "your search query",
76
+ "max_results": 10,
77
+ "order": "relevance"
78
+ }
79
+ ```
80
+
81
+ ### Get Video Info
82
+ ```http
83
+ POST /api/video_info
84
+ {
85
+ "video_id": "video_id_or_full_url"
86
+ }
87
+ ```
88
+
89
+ ### Extract Transcript
90
+ ```http
91
+ POST /api/transcript
92
+ {
93
+ "video_id": "video_id_or_full_url",
94
+ "language_code": "uk"
95
+ }
96
+ ```
97
+
98
+ ### Generate AI Timecodes
99
+ ```http
100
+ POST /api/gemini_timecodes
101
+ {
102
+ "video_id": "video_id_or_full_url",
103
+ "language_code": "uk",
104
+ "format": "youtube",
105
+ "model": "gemini-2.0-flash-001"
106
+ }
107
+ ```
108
+
109
+ ## 🏗️ Architecture
110
+
111
+ - **Frontend**: Gradio web interface with responsive design
112
+ - **Backend**: FastAPI server with async processing
113
+ - **AI Integration**: Google Gemini 2.0 for intelligent content analysis
114
+ - **APIs**: YouTube Data API v3 for video metadata
115
+ - **Transcript**: YouTube Transcript API for subtitle extraction
116
+
117
+ ## 📁 Project Structure
118
+
119
+ ```
120
+ ├── app.py # Main Gradio application (HF Spaces entry point)
121
+ ├── api_server.py # FastAPI backend server
122
+ ├── gemini_helper.py # Gemini AI integration
123
+ ├── utils.py # Utility functions
124
+ ├── models.py # Data models
125
+ ├── mcp_handlers.py # Model Context Protocol handlers
126
+ ├── requirements.txt # Python dependencies
127
+ └── README.md # This file
128
+ ```
129
+
130
+ ## 🔬 Technology Stack
131
+
132
+ - **Python 3.13+**
133
+ - **Gradio** - Web interface framework
134
+ - **FastAPI** - High-performance API framework
135
+ - **Google Gemini 2.0** - Advanced language model for content analysis
136
+ - **YouTube APIs** - Official Google APIs for video data
137
+ - **AsyncIO** - Asynchronous processing for better performance
138
+
139
+ ## 🌟 Use Cases
140
+
141
+ - **Content Creators**: Generate professional timecodes for YouTube videos
142
+ - **Educators**: Extract and analyze educational content structure
143
+ - **Researchers**: Analyze video metadata and transcripts at scale
144
+ - **Marketers**: Research competitor content and trends
145
+ - **Accessibility**: Create better navigation for long-form content
146
+
147
+ ## 📄 License
148
+
149
+ MIT License - feel free to use in your projects!
150
+
151
+ ## 🤝 Contributing
152
+
153
+ Contributions welcome! This project is designed to help content creators worldwide.
154
+
155
+ ---
156
+
157
+ **Made with ❤️ for the YouTube creator community**
api_server.py ADDED
@@ -0,0 +1,559 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from fastapi import FastAPI, HTTPException, Request
3
+ from fastapi.middleware.cors import CORSMiddleware
4
+ from pydantic import BaseModel
5
+ from typing import Dict, List, Optional, Any, Union
6
+ import httpx
7
+ from googleapiclient.discovery import build
8
+ from googleapiclient.errors import HttpError
9
+ import json
10
+ from youtube_transcript_api import YouTubeTranscriptApi
11
+ from youtube_transcript_api.formatters import JSONFormatter
12
+ from dotenv import load_dotenv
13
+ from utils import format_timestamp, extract_video_id
14
+ from models import MCPResponse
15
+ import re
16
+
17
+ # Загрузка переменных окружения
18
+ load_dotenv()
19
+
20
+ # Получение API ключа YouTube из переменных окружения
21
+ YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY")
22
+
23
+ app = FastAPI(
24
+ title="YouTube MCP API",
25
+ description="Model Context Protocol (MCP) server for interacting with YouTube API",
26
+ version="0.1.0",
27
+ )
28
+
29
+ # Настройка CORS
30
+ app.add_middleware(
31
+ CORSMiddleware,
32
+ allow_origins=["*"],
33
+ allow_credentials=True,
34
+ allow_methods=["*"],
35
+ allow_headers=["*"],
36
+ )
37
+
38
+ # Инициализация YouTube API клиента
39
+ def get_youtube_client():
40
+ if not YOUTUBE_API_KEY:
41
+ raise HTTPException(status_code=500, detail="YouTube API key is not configured")
42
+
43
+ try:
44
+ return build("youtube", "v3", developerKey=YOUTUBE_API_KEY)
45
+ except Exception as e:
46
+ raise HTTPException(status_code=500, detail=f"YouTube API initialization error: {str(e)}")
47
+
48
+ # Базовые модели данных для стандартных API запросов
49
+ class SearchRequest(BaseModel):
50
+ query: str
51
+ max_results: Optional[int] = 10
52
+ order: Optional[str] = "relevance"
53
+ video_duration: Optional[str] = None
54
+
55
+ class VideoInfoRequest(BaseModel):
56
+ video_id: str
57
+
58
+ class TranscriptRequest(BaseModel):
59
+ video_id: str
60
+ language_code: Optional[str] = None
61
+
62
+ class MCPRequestData(BaseModel):
63
+ action: str
64
+ parameters: Dict[str, Any]
65
+
66
+ # Добавим новый маршрут для получения доступных языков транскрипта
67
+ class TranscriptLanguagesRequest(BaseModel):
68
+ video_id: str
69
+
70
+ # Модель для запроса тайм-кодов
71
+ class TimecodeRequest(BaseModel):
72
+ video_id: str
73
+ language_code: Optional[str] = None
74
+ segment_length: Optional[int] = 60 # Длина сегмента в секундах
75
+ format: Optional[str] = "youtube" # youtube, markdown
76
+
77
+ # Загрузим модуль gemini_helper только после определения базовых моделей
78
+ from gemini_helper import generate_timecodes_with_gemini, DEFAULT_MODEL
79
+
80
+ # Модель для запроса тайм-кодов с помощью Gemini
81
+ class GeminiTimecodeRequest(BaseModel):
82
+ video_id: str
83
+ language_code: Optional[str] = None
84
+ format: Optional[str] = "youtube" # youtube, markdown
85
+ model: Optional[str] = DEFAULT_MODEL # модель Gemini (если None, используется модель по умолчанию)
86
+
87
+ # Теперь можно загрузить mcp_handlers
88
+ from mcp_handlers import (
89
+ MCPQueryRequest,
90
+ MCPVideoRequest,
91
+ MCPTranscriptRequest,
92
+ MCPTimecodeRequest,
93
+ MCPGeminiRequest,
94
+ process_mcp_search,
95
+ process_mcp_video_info,
96
+ process_mcp_transcript,
97
+ process_mcp_timecodes,
98
+ process_mcp_gemini_timecodes,
99
+ create_text_response,
100
+ create_error_response
101
+ )
102
+
103
+ def normalize_language_code(language_code: str) -> str:
104
+ """Normalize language codes, converting common aliases to standard codes."""
105
+ if not language_code:
106
+ return language_code
107
+
108
+ language_code = language_code.lower().strip()
109
+
110
+ # Convert 'ua' to 'uk' for Ukrainian
111
+ if language_code == 'ua':
112
+ return 'uk'
113
+
114
+ return language_code
115
+
116
+ # Стандартные API маршруты
117
+ @app.post("/api/search")
118
+ async def search_videos(request: SearchRequest):
119
+ try:
120
+ youtube = get_youtube_client()
121
+ search_response = youtube.search().list(
122
+ q=request.query,
123
+ part="snippet",
124
+ maxResults=request.max_results,
125
+ type="video",
126
+ order=request.order,
127
+ videoDuration=request.video_duration if request.video_duration else None
128
+ ).execute()
129
+
130
+ results = []
131
+ for item in search_response.get("items", []):
132
+ video_id = item["id"]["videoId"]
133
+ snippet = item["snippet"]
134
+
135
+ results.append({
136
+ "video_id": video_id,
137
+ "title": snippet["title"],
138
+ "description": snippet["description"],
139
+ "thumbnail": snippet["thumbnails"]["high"]["url"],
140
+ "channel_title": snippet["channelTitle"],
141
+ "published_at": snippet["publishedAt"]
142
+ })
143
+
144
+ return {"content": results}
145
+ except HttpError as e:
146
+ return {"error": f"YouTube API error: {str(e)}"}
147
+ except Exception as e:
148
+ return {"error": f"Unexpected error: {str(e)}"}
149
+
150
+ @app.post("/api/video_info")
151
+ async def get_video_info(request: VideoInfoRequest):
152
+ try:
153
+ # Извлекаем ID видео из ссылки, если это ссылка
154
+ video_id = extract_video_id(request.video_id)
155
+
156
+ youtube = get_youtube_client()
157
+ video_response = youtube.videos().list(
158
+ part="snippet,contentDetails,statistics",
159
+ id=video_id
160
+ ).execute()
161
+
162
+ if not video_response.get("items"):
163
+ return {"error": "Video not found"}
164
+
165
+ video = video_response["items"][0]
166
+ snippet = video["snippet"]
167
+ content_details = video["contentDetails"]
168
+ statistics = video["statistics"]
169
+
170
+ return {"content": {
171
+ "video_id": video_id,
172
+ "title": snippet["title"],
173
+ "description": snippet["description"],
174
+ "channel_title": snippet["channelTitle"],
175
+ "published_at": snippet["publishedAt"],
176
+ "duration": content_details["duration"],
177
+ "view_count": statistics.get("viewCount", "0"),
178
+ "like_count": statistics.get("likeCount", "0"),
179
+ "comment_count": statistics.get("commentCount", "0"),
180
+ "tags": snippet.get("tags", [])
181
+ }}
182
+ except HttpError as e:
183
+ return {"error": f"YouTube API error: {str(e)}"}
184
+ except Exception as e:
185
+ return {"error": f"Unexpected error: {str(e)}"}
186
+
187
+ @app.post("/api/transcript")
188
+ async def get_transcript(request: TranscriptRequest):
189
+ try:
190
+ # Extract video ID if URL is provided
191
+ video_id = extract_video_id(request.video_id)
192
+
193
+ # Normalize language code (ua -> uk)
194
+ normalized_language = normalize_language_code(request.language_code)
195
+
196
+ # Get list of available languages for the video
197
+ try:
198
+ available_languages = []
199
+ transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
200
+ for transcript in transcript_list:
201
+ available_languages.append({
202
+ "language": transcript.language,
203
+ "language_code": transcript.language_code,
204
+ "is_generated": transcript.is_generated,
205
+ "is_translatable": transcript.is_translatable
206
+ })
207
+ except Exception as e:
208
+ print(f"Error getting language list: {str(e)}")
209
+ return {"error": f"Video not found or no transcripts available: {str(e)}"}
210
+
211
+ print(f"Available languages for video {video_id}: {[lang['language_code'] for lang in available_languages]}")
212
+
213
+ # Try to get transcript in requested language
214
+ final_language = None
215
+ transcript_list = None
216
+
217
+ if normalized_language:
218
+ try:
219
+ print(f"Trying to get transcript in language: {normalized_language}")
220
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[normalized_language])
221
+ print(f"Successfully obtained transcript in language: {normalized_language}")
222
+ final_language = normalized_language
223
+ except Exception as e:
224
+ print(f"Failed to get transcript in language {normalized_language}: {str(e)}")
225
+
226
+ # If specific language failed or not requested, try first available
227
+ if transcript_list is None and available_languages:
228
+ try:
229
+ first_language = available_languages[0]['language_code']
230
+ print(f"Trying to use first available language: {first_language}")
231
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[first_language])
232
+ print(f"Successfully obtained transcript in language: {first_language}")
233
+ final_language = first_language
234
+ except Exception as e:
235
+ print(f"Failed to get transcript in language {first_language}: {str(e)}")
236
+ return {"error": f"Failed to get transcript in any available language: {str(e)}"}
237
+
238
+ if not transcript_list:
239
+ return {"error": "Transcript for this video is unavailable"}
240
+
241
+ formatted_transcript = []
242
+ for entry in transcript_list:
243
+ formatted_transcript.append({
244
+ "text": entry.get("text", ""),
245
+ "start": entry.get("start", 0),
246
+ "duration": entry.get("duration", 0)
247
+ })
248
+
249
+ response = {"content": formatted_transcript}
250
+ if final_language:
251
+ response["used_language"] = final_language
252
+
253
+ return response
254
+ except Exception as e:
255
+ return {"error": f"Error getting transcript: {str(e)}"}
256
+
257
+ @app.post("/api/transcript_languages")
258
+ async def get_transcript_languages(request: TranscriptLanguagesRequest):
259
+ try:
260
+ # Извлекаем ID вид��о из ссылки, если это ссылка
261
+ video_id = extract_video_id(request.video_id)
262
+
263
+ try:
264
+ print(f"Getting language list for ID: {video_id}")
265
+ transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
266
+
267
+ languages = []
268
+ for transcript in transcript_list:
269
+ languages.append({
270
+ "language_code": transcript.language_code,
271
+ "language": transcript.language,
272
+ "is_generated": transcript.is_generated
273
+ })
274
+
275
+ return {"content": languages}
276
+ except Exception as transcript_error:
277
+ return {"error": f"Failed to get language list. Details: {str(transcript_error)}"}
278
+ except Exception as e:
279
+ return {"error": f"Error getting language list: {str(e)}"}
280
+
281
+ # MCP эндпоинты
282
+ @app.post("/api/mcp")
283
+ async def mcp_endpoint(request: MCPRequestData):
284
+ try:
285
+ youtube = get_youtube_client()
286
+
287
+ if request.action == "search":
288
+ search_req = MCPQueryRequest(**request.parameters)
289
+ result = await process_mcp_search(youtube, search_req)
290
+ return result
291
+ elif request.action == "video_info":
292
+ video_req = MCPVideoRequest(**request.parameters)
293
+ result = await process_mcp_video_info(youtube, video_req)
294
+ return result
295
+ elif request.action == "transcript":
296
+ transcript_req = MCPTranscriptRequest(**request.parameters)
297
+ result = await process_mcp_transcript(transcript_req)
298
+ return result
299
+ elif request.action == "timecodes":
300
+ timecode_req = MCPTimecodeRequest(**request.parameters)
301
+ result = await process_mcp_timecodes(youtube, timecode_req)
302
+ return result
303
+ elif request.action == "gemini_timecodes":
304
+ gemini_req = MCPGeminiRequest(**request.parameters)
305
+ result = await process_mcp_gemini_timecodes(youtube, gemini_req)
306
+ return result
307
+ else:
308
+ return create_error_response(f"Unknown action: {request.action}")
309
+ except Exception as e:
310
+ return create_error_response(f"Error processing request: {str(e)}")
311
+
312
+ # Маршрут для проверки здоровья сервера
313
+ @app.get("/health")
314
+ async def health_check():
315
+ return {"status": "ok"}
316
+
317
+ # Информационный маршрут, описывающий возможности API
318
+ @app.get("/")
319
+ async def root():
320
+ return {
321
+ "name": "YouTube MCP API",
322
+ "version": "0.1.0",
323
+ "description": "Model Context Protocol (MCP) server for interacting with YouTube API",
324
+ "endpoints": {
325
+ "standard": [
326
+ "/api/search - Search videos on YouTube",
327
+ "/api/video_info - Get video information",
328
+ "/api/transcript - Get video transcript"
329
+ ],
330
+ "mcp": [
331
+ "/api/mcp - Model Context Protocol endpoint"
332
+ ]
333
+ },
334
+ "actions": {
335
+ "search": "Search videos on YouTube",
336
+ "video_info": "Get video information",
337
+ "transcript": "Get video transcript"
338
+ }
339
+ }
340
+
341
+ @app.post("/api/timecodes")
342
+ async def generate_timecodes(request: TimecodeRequest):
343
+ try:
344
+ # Извлекаем ID видео из ссылки, если это ссылка
345
+ video_id = extract_video_id(request.video_id)
346
+ print(f"Generating timecodes for ID: {video_id}")
347
+
348
+ # Пытаемся получить список доступных языков
349
+ available_languages = []
350
+ try:
351
+ transcript_list_obj = YouTubeTranscriptApi.list_transcripts(video_id)
352
+ for transcript in transcript_list_obj:
353
+ available_languages.append({
354
+ "language_code": transcript.language_code,
355
+ "language": transcript.language,
356
+ "is_generated": transcript.is_generated
357
+ })
358
+ print(f"Available languages for video {video_id}: {[lang['language_code'] for lang in available_languages]}")
359
+ except Exception as e:
360
+ print(f"Failed to get language list: {str(e)}")
361
+
362
+ # Получаем транскрипт
363
+ transcript_list = None
364
+ used_language = None
365
+
366
+ # Если указан язык, пробуем его использовать
367
+ if request.language_code:
368
+ try:
369
+ print(f"Trying to get transcript in language: {request.language_code}")
370
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[request.language_code])
371
+ used_language = request.language_code
372
+ print(f"Successfully obtained transcript in language: {request.language_code}")
373
+ except Exception as e:
374
+ print(f"Failed to get transcript in language {request.language_code}: {str(e)}")
375
+
376
+ # Если транскрипт не получен и есть доступные языки, используем первый доступный
377
+ if not transcript_list and available_languages:
378
+ try:
379
+ first_language = available_languages[0]["language_code"]
380
+ print(f"Trying to use first available language: {first_language}")
381
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[first_language])
382
+ used_language = first_language
383
+ print(f"Successfully obtained transcript in language: {first_language}")
384
+ except Exception as e:
385
+ print(f"Failed to get transcript in language {first_language}: {str(e)}")
386
+
387
+ # Если все еще нет транскрипта, пробуем получить на любом языке
388
+ if not transcript_list:
389
+ try:
390
+ print("Trying to get transcript in any available language")
391
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
392
+ print("Transcript successfully obtained")
393
+ except Exception as e:
394
+ return {"error": f"Transcript not found. Details: {str(e)}"}
395
+
396
+ if not transcript_list:
397
+ return {"error": "Transcript for this video is unavailable"}
398
+
399
+ # Группируем транскрипт по сегментам
400
+ segments = []
401
+ current_segment = {
402
+ "start": transcript_list[0]["start"],
403
+ "end": 0,
404
+ "text": []
405
+ }
406
+
407
+ segment_length = request.segment_length
408
+
409
+ for entry in transcript_list:
410
+ start_time = entry["start"]
411
+
412
+ # Если текущий сегмент пустой или запись находится в пределах длины сегмента
413
+ if not current_segment["text"] or (start_time - current_segment["start"]) <= segment_length:
414
+ current_segment["text"].append(entry["text"])
415
+ current_segment["end"] = start_time + entry["duration"]
416
+ else:
417
+ # Закрываем текущий сегмент и начинаем новый
418
+ segments.append(dict(current_segment))
419
+ current_segment = {
420
+ "start": start_time,
421
+ "end": start_time + entry["duration"],
422
+ "text": [entry["text"]]
423
+ }
424
+
425
+ # Добавляем последний сегмент
426
+ if current_segment["text"]:
427
+ segments.append(current_segment)
428
+
429
+ # Форматируем тайм-коды в соответствии с выбранным форматом
430
+ format_type = request.format.lower()
431
+ timecodes = []
432
+
433
+ for segment in segments:
434
+ start_formatted = format_timestamp(segment["start"])
435
+
436
+ # Суммарный текст сегмента (первые 100 символов)
437
+ text_summary = " ".join(segment["text"])
438
+ if len(text_summary) > 100:
439
+ text_summary = text_summary[:97] + "..."
440
+
441
+ if format_type == "youtube":
442
+ # Формат для YouTube (для вставки в описание)
443
+ timecodes.append(f"{start_formatted} {text_summary}")
444
+ elif format_type == "markdown":
445
+ # Формат для Markdown
446
+ youtube_link = f"https://www.youtube.com/watch?v={video_id}&t={int(segment['start'])}"
447
+ timecodes.append(f"- [{start_formatted}]({youtube_link}) {text_summary}")
448
+
449
+ # Возвращаем тайм-коды и дополнительную информацию
450
+ response = {
451
+ "content": {
452
+ "video_id": video_id,
453
+ "timecodes": timecodes,
454
+ "format": format_type,
455
+ "segment_length": segment_length,
456
+ "total_segments": len(segments)
457
+ }
458
+ }
459
+
460
+ if used_language:
461
+ response["content"]["used_language"] = used_language
462
+
463
+ return response
464
+ except Exception as e:
465
+ return {"error": f"Error generating timecodes: {str(e)}"}
466
+
467
+ @app.post("/api/gemini_timecodes")
468
+ async def generate_gemini_timecodes(request: GeminiTimecodeRequest):
469
+ try:
470
+ # Extract video ID if URL is provided
471
+ video_id = extract_video_id(request.video_id)
472
+ print(f"Generating Gemini timecodes for ID: {video_id}")
473
+
474
+ # Normalize language code (ua -> uk)
475
+ normalized_language = normalize_language_code(request.language_code)
476
+
477
+ # Get list of available languages for the video
478
+ try:
479
+ available_languages = []
480
+ transcript_list_obj = YouTubeTranscriptApi.list_transcripts(video_id)
481
+ for transcript in transcript_list_obj:
482
+ available_languages.append({
483
+ "language": transcript.language,
484
+ "language_code": transcript.language_code,
485
+ "is_generated": transcript.is_generated,
486
+ "is_translatable": transcript.is_translatable
487
+ })
488
+ except Exception as e:
489
+ print(f"Error getting language list: {str(e)}")
490
+ return {"error": f"Video not found or no transcripts available: {str(e)}"}
491
+
492
+ print(f"Available languages for video {video_id}: {[lang['language_code'] for lang in available_languages]}")
493
+
494
+ # Try to get transcript in requested language
495
+ transcript_list = None
496
+ used_language = None
497
+
498
+ if normalized_language:
499
+ try:
500
+ print(f"Trying to get transcript in language: {normalized_language}")
501
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[normalized_language])
502
+ used_language = normalized_language
503
+ print(f"Successfully obtained transcript in language: {normalized_language}")
504
+ except Exception as e:
505
+ print(f"Failed to get transcript in language {normalized_language}: {str(e)}")
506
+
507
+ # If specific language failed or not requested, try first available
508
+ if transcript_list is None and available_languages:
509
+ try:
510
+ first_language = available_languages[0]["language_code"]
511
+ print(f"Trying to use first available language: {first_language}")
512
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[first_language])
513
+ used_language = first_language
514
+ print(f"Successfully obtained transcript in language: {first_language}")
515
+ except Exception as e:
516
+ print(f"Failed to get transcript in language {first_language}: {str(e)}")
517
+ return {"error": f"Failed to get transcript in any available language: {str(e)}"}
518
+
519
+ if not transcript_list:
520
+ return {"error": "Transcript for this video is unavailable"}
521
+
522
+ # Получаем информацию о видео для заголовка
523
+ youtube = get_youtube_client()
524
+ video_title = "YouTube Video"
525
+
526
+ try:
527
+ video_response = youtube.videos().list(
528
+ part="snippet",
529
+ id=video_id
530
+ ).execute()
531
+
532
+ if video_response.get("items"):
533
+ video_title = video_response["items"][0]["snippet"]["title"]
534
+ except Exception as e:
535
+ print(f"Failed to get video information: {str(e)}")
536
+
537
+ # Отправляем запрос в Gemini с указанием языка
538
+ result = await generate_timecodes_with_gemini(
539
+ transcript_entries=transcript_list,
540
+ video_title=video_title,
541
+ format_type=request.format,
542
+ model_name=request.model,
543
+ language=used_language
544
+ )
545
+
546
+ if "error" in result:
547
+ return {"error": result["error"]}
548
+
549
+ # Добавляем информацию о языке транскрипта
550
+ if used_language:
551
+ result["used_language"] = used_language
552
+
553
+ return {"content": result}
554
+ except Exception as e:
555
+ return {"error": f"Error generating timecodes with Gemini: {str(e)}"}
556
+
557
+ if __name__ == "__main__":
558
+ import uvicorn
559
+ uvicorn.run(app, host="127.0.0.1", port=8080)
app.py ADDED
@@ -0,0 +1,401 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ import json
3
+ import httpx
4
+ import os
5
+ import traceback
6
+ import asyncio
7
+ import threading
8
+ import uvicorn
9
+ from fastapi import FastAPI, HTTPException
10
+ from fastapi.middleware.cors import CORSMiddleware
11
+ from dotenv import load_dotenv
12
+ from utils import format_timestamp, extract_video_id
13
+
14
+ # Load environment variables
15
+ load_dotenv()
16
+
17
+ # Import API server components
18
+ from api_server import app as fastapi_app
19
+
20
+ # Start FastAPI server in background
21
+ def start_fastapi_server():
22
+ uvicorn.run(fastapi_app, host="0.0.0.0", port=7860)
23
+
24
+ # Start FastAPI server in a separate thread
25
+ server_thread = threading.Thread(target=start_fastapi_server, daemon=True)
26
+ server_thread.start()
27
+
28
+ # Wait a moment for server to start
29
+ import time
30
+ time.sleep(2)
31
+
32
+ # API URL for Hugging Face Spaces
33
+ API_URL = "http://localhost:7860/api"
34
+
35
+ async def search_youtube(query, max_results, order, video_duration):
36
+ """Function for searching videos on YouTube."""
37
+ try:
38
+ async with httpx.AsyncClient() as client:
39
+ response = await client.post(
40
+ f"{API_URL}/search",
41
+ json={
42
+ "query": query,
43
+ "max_results": max_results,
44
+ "order": order,
45
+ "video_duration": video_duration if video_duration != "any" else None
46
+ }
47
+ )
48
+ data = response.json()
49
+
50
+ if "error" in data and data["error"]:
51
+ return f"Error: {data['error']}", None
52
+
53
+ results = data.get("content", [])
54
+ formatted_results = []
55
+
56
+ for video in results:
57
+ formatted_results.append(
58
+ f"**{video['title']}**\n"
59
+ f"ID: {video['video_id']}\n"
60
+ f"Channel: {video['channel_title']}\n"
61
+ f"Published: {video['published_at']}\n"
62
+ f"[Thumbnail]({video['thumbnail']})\n\n"
63
+ f"{video['description'][:200]}...\n\n"
64
+ f"---\n"
65
+ )
66
+
67
+ return "\n".join(formatted_results), json.dumps(results, indent=2, ensure_ascii=False)
68
+ except Exception as e:
69
+ return f"Error: {str(e)}", None
70
+
71
+ async def get_video_info(video_id):
72
+ """Function for getting video information."""
73
+ try:
74
+ async with httpx.AsyncClient() as client:
75
+ response = await client.post(
76
+ f"{API_URL}/video_info",
77
+ json={"video_id": video_id}
78
+ )
79
+ data = response.json()
80
+
81
+ if "error" in data and data["error"]:
82
+ return f"Error: {data['error']}", None
83
+
84
+ video_info = data.get("content", {})
85
+
86
+ formatted_info = (
87
+ f"**{video_info.get('title')}**\n\n"
88
+ f"Channel: {video_info.get('channel_title')}\n"
89
+ f"Published: {video_info.get('published_at')}\n"
90
+ f"Views: {video_info.get('view_count')}\n"
91
+ f"Likes: {video_info.get('like_count')}\n"
92
+ f"Comments: {video_info.get('comment_count')}\n"
93
+ f"Duration: {video_info.get('duration')}\n\n"
94
+ f"**Description:**\n{video_info.get('description')}\n\n"
95
+ f"**Tags:**\n{', '.join(video_info.get('tags', []))}"
96
+ )
97
+
98
+ return formatted_info, json.dumps(video_info, indent=2, ensure_ascii=False)
99
+ except Exception as e:
100
+ return f"Error: {str(e)}", None
101
+
102
+ async def get_transcript(video_id, language_code):
103
+ """Function for getting video transcript."""
104
+ try:
105
+ async with httpx.AsyncClient() as client:
106
+ response = await client.post(
107
+ f"{API_URL}/transcript",
108
+ json={
109
+ "video_id": video_id,
110
+ "language_code": language_code if language_code else None
111
+ }
112
+ )
113
+ data = response.json()
114
+
115
+ if "error" in data and data["error"]:
116
+ return f"Error: {data['error']}", None
117
+
118
+ transcript = data.get("content", [])
119
+
120
+ formatted_transcript = ""
121
+ for entry in transcript:
122
+ start_time = entry.get("start", 0)
123
+ duration = entry.get("duration", 0)
124
+ end_time = start_time + duration
125
+
126
+ # Format time to hours:minutes:seconds format
127
+ start_formatted = format_timestamp(start_time)
128
+ end_formatted = format_timestamp(end_time)
129
+
130
+ formatted_transcript += f"[{start_formatted} - {end_formatted}] {entry.get('text', '')}\n\n"
131
+
132
+ return formatted_transcript, json.dumps(transcript, indent=2, ensure_ascii=False)
133
+ except Exception as e:
134
+ return f"Error: {str(e)}", None
135
+
136
+ async def get_available_languages(video_id):
137
+ """Function for getting available transcript languages."""
138
+ try:
139
+ async with httpx.AsyncClient() as client:
140
+ response = await client.post(
141
+ f"{API_URL}/transcript_languages",
142
+ json={"video_id": video_id}
143
+ )
144
+ data = response.json()
145
+
146
+ if "error" in data and data["error"]:
147
+ return f"Error: {data['error']}", None
148
+
149
+ languages = data.get("content", [])
150
+
151
+ formatted_languages = []
152
+ for lang in languages:
153
+ status = "Auto-generated" if lang.get("is_generated") else "Official subtitles"
154
+ translatable = "Translation available" if lang.get("is_translatable") else "Translation not available"
155
+ formatted_languages.append(
156
+ f"{lang.get('language')} ({lang.get('language_code')}): {status}, {translatable}"
157
+ )
158
+
159
+ return "\n".join(formatted_languages), json.dumps(languages, indent=2, ensure_ascii=False)
160
+ except Exception as e:
161
+ return f"Error: {str(e)}", None
162
+
163
+ async def generate_timecodes(video_id, language_code, segment_length, format_type):
164
+ """Function for generating timecodes."""
165
+ try:
166
+ async with httpx.AsyncClient() as client:
167
+ response = await client.post(
168
+ f"{API_URL}/timecodes",
169
+ json={
170
+ "video_id": video_id,
171
+ "language_code": language_code if language_code else None,
172
+ "segment_length": segment_length,
173
+ "format": format_type
174
+ }
175
+ )
176
+ data = response.json()
177
+
178
+ if "error" in data and data["error"]:
179
+ return f"Error: {data['error']}", None
180
+
181
+ timecodes = data.get("content", {}).get("timecodes", [])
182
+
183
+ if format_type == "youtube":
184
+ formatted_timecodes = "```\n" + "\n".join(timecodes) + "\n```"
185
+ elif format_type == "markdown":
186
+ formatted_timecodes = "\n".join(timecodes)
187
+
188
+ return formatted_timecodes, json.dumps(data.get("content", {}), indent=2, ensure_ascii=False)
189
+ except Exception as e:
190
+ return f"Error: {str(e)}", None
191
+
192
+ async def generate_gemini_timecodes(video_id, language_code, format_type, model):
193
+ """Function for generating timecodes using Gemini."""
194
+ try:
195
+ print(f"Sending request to {API_URL}/gemini_timecodes")
196
+ print(f"Parameters: video_id={video_id}, language_code={language_code}, format={format_type}, model={model}")
197
+
198
+ # Send request to API
199
+ async with httpx.AsyncClient() as client:
200
+ response = await client.post(
201
+ f"{API_URL}/gemini_timecodes",
202
+ json={
203
+ "video_id": video_id,
204
+ "language_code": language_code,
205
+ "format": format_type,
206
+ "model": model
207
+ },
208
+ timeout=120 # Increase timeout for Gemini API
209
+ )
210
+
211
+ print(f"Response status: {response.status_code}")
212
+
213
+ # Parse response
214
+ data = response.json()
215
+
216
+ if "error" in data:
217
+ print(f"Error in API response: {data['error']}")
218
+ return f"⚠️ Error: {data['error']}", {"error": data['error']}
219
+
220
+ # Extract timecodes from response
221
+ content = data.get("content", {})
222
+ timecodes = content.get("timecodes", [])
223
+
224
+ print(f"Received {len(timecodes)} timecodes")
225
+
226
+ # Format timecodes for display
227
+ if timecodes:
228
+ timecodes_text = "\n".join(timecodes)
229
+
230
+ # Model and language information
231
+ model_info = content.get("model", "Unknown")
232
+ language_info = content.get("detected_language", "Unknown")
233
+ duration_info = content.get("video_duration_minutes", "Unknown")
234
+
235
+ summary = f"🤖 Model: {model_info}\n🗣️ Language: {language_info}\n⏱️ Duration: {duration_info} min\n📝 Timecodes: {len(timecodes)}"
236
+
237
+ return summary, content # Return content object instead of timecodes_text
238
+ else:
239
+ return "⚠️ No timecodes generated", {"message": "No timecodes generated"}
240
+
241
+ except Exception as e:
242
+ print(f"Exception during timecode generation: {str(e)}")
243
+ traceback.print_exc()
244
+ return f"Error: {str(e)}", {"error": str(e)}
245
+
246
+ # Create Gradio interface
247
+ with gr.Blocks(title="YouTube MCP", theme=gr.themes.Soft()) as demo:
248
+ gr.Markdown("# 🎬 YouTube Creator MetaData Extractor")
249
+ gr.Markdown("This tool helps content creators analyze YouTube videos and generate metadata using AI")
250
+ gr.Markdown("### Supports all YouTube URL formats: regular links, short links, shorts and embedded videos")
251
+ gr.Markdown("**💡 Language codes:** uk = Ukrainian, ru = Russian, en = English (ISO 639-1 standard)")
252
+ gr.Markdown("---")
253
+
254
+ with gr.Tab("🔍 Video Search"):
255
+ with gr.Row():
256
+ with gr.Column():
257
+ search_query = gr.Textbox(label="Search Query", placeholder="Enter your search query...")
258
+ with gr.Row():
259
+ max_results = gr.Slider(minimum=1, maximum=50, value=10, step=1, label="Max Results")
260
+ order = gr.Dropdown(
261
+ choices=["relevance", "date", "viewCount", "rating", "title"],
262
+ value="relevance",
263
+ label="Sort By"
264
+ )
265
+ video_duration = gr.Dropdown(
266
+ choices=["any", "short", "medium", "long"],
267
+ value="any",
268
+ label="Duration"
269
+ )
270
+ search_button = gr.Button("🔍 Search", variant="primary")
271
+
272
+ with gr.Column():
273
+ search_results = gr.Markdown(label="Search Results")
274
+ search_json = gr.JSON(label="JSON Data")
275
+
276
+ search_button.click(
277
+ search_youtube,
278
+ inputs=[search_query, max_results, order, video_duration],
279
+ outputs=[search_results, search_json]
280
+ )
281
+
282
+ with gr.Tab("ℹ️ Video Info"):
283
+ with gr.Row():
284
+ with gr.Column():
285
+ video_id_input = gr.Textbox(
286
+ label="Video ID or URL",
287
+ placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..."
288
+ )
289
+ get_info_button = gr.Button("📊 Get Info", variant="primary")
290
+
291
+ with gr.Column():
292
+ video_info_output = gr.Markdown(label="Video Information")
293
+ video_info_json = gr.JSON(label="JSON Data")
294
+
295
+ get_info_button.click(
296
+ get_video_info,
297
+ inputs=[video_id_input],
298
+ outputs=[video_info_output, video_info_json]
299
+ )
300
+
301
+ with gr.Tab("📝 Transcript"):
302
+ with gr.Row():
303
+ with gr.Column():
304
+ transcript_video_id = gr.Textbox(
305
+ label="Video ID or URL",
306
+ placeholder="Enter video ID or full URL..."
307
+ )
308
+ language_code = gr.Textbox(label="Language Code (optional)", placeholder="uk (Ukrainian), ru (Russian), en (English), etc...")
309
+ with gr.Row():
310
+ get_transcript_button = gr.Button("📝 Get Transcript", variant="primary")
311
+ get_languages_button = gr.Button("🌐 Available Languages")
312
+
313
+ with gr.Column():
314
+ transcript_output = gr.Markdown(label="Transcript")
315
+ transcript_json = gr.JSON(label="JSON Data")
316
+
317
+ get_transcript_button.click(
318
+ get_transcript,
319
+ inputs=[transcript_video_id, language_code],
320
+ outputs=[transcript_output, transcript_json]
321
+ )
322
+
323
+ get_languages_button.click(
324
+ get_available_languages,
325
+ inputs=[transcript_video_id],
326
+ outputs=[transcript_output, transcript_json]
327
+ )
328
+
329
+ with gr.Tab("⏱️ Basic Timecodes"):
330
+ with gr.Row():
331
+ with gr.Column():
332
+ timecode_video_id = gr.Textbox(
333
+ label="Video ID or URL",
334
+ placeholder="Enter video ID or full URL..."
335
+ )
336
+ timecode_language = gr.Textbox(label="Language Code (optional)", placeholder="uk (Ukrainian), ru (Russian), en (English), etc...")
337
+ segment_length = gr.Slider(minimum=30, maximum=300, value=60, step=30, label="Segment Length (seconds)")
338
+ format_type = gr.Dropdown(
339
+ choices=["youtube", "markdown"],
340
+ value="youtube",
341
+ label="Format"
342
+ )
343
+ generate_timecodes_button = gr.Button("⏱️ Generate Timecodes", variant="primary")
344
+
345
+ with gr.Column():
346
+ timecodes_output = gr.Markdown(label="Timecodes")
347
+ timecodes_json = gr.JSON(label="JSON Data")
348
+
349
+ generate_timecodes_button.click(
350
+ generate_timecodes,
351
+ inputs=[timecode_video_id, timecode_language, segment_length, format_type],
352
+ outputs=[timecodes_output, timecodes_json]
353
+ )
354
+
355
+ with gr.Tab("🤖 AI Timecodes"):
356
+ with gr.Row():
357
+ with gr.Column():
358
+ gemini_video_id = gr.Textbox(
359
+ label="Video ID or URL",
360
+ placeholder="Enter video ID or full URL..."
361
+ )
362
+ gemini_language = gr.Textbox(label="Language Code (optional)", placeholder="uk (Ukrainian), ru (Russian), en (English), etc...")
363
+ gemini_format = gr.Dropdown(
364
+ choices=["youtube", "markdown"],
365
+ value="youtube",
366
+ label="Format"
367
+ )
368
+ gemini_model = gr.Dropdown(
369
+ choices=["gemini-2.0-flash-001", "gemini-2.0-pro-001", "gemini-2.0-pro-vision-001"],
370
+ value="gemini-2.0-flash-001",
371
+ label="AI Model"
372
+ )
373
+ generate_gemini_button = gr.Button("🤖 Generate AI Timecodes", variant="primary")
374
+
375
+ with gr.Column():
376
+ gemini_output = gr.Markdown(label="Generation Info")
377
+ gemini_timecodes = gr.Textbox(label="AI Timecodes", lines=10, max_lines=20, show_copy_button=True)
378
+ gemini_json = gr.JSON(label="JSON Data")
379
+
380
+ async def process_gemini_result(video_id, language_code, format_type, model):
381
+ result = await generate_gemini_timecodes(video_id, language_code, format_type, model)
382
+ if result is None:
383
+ return "Error occurred", "", {}
384
+
385
+ summary, json_data = result
386
+
387
+ # Extract timecodes from json_data
388
+ timecodes = json_data.get("timecodes", [])
389
+ timecodes_text = "\n".join(timecodes) if timecodes else "No timecodes generated"
390
+
391
+ return summary, timecodes_text, json_data
392
+
393
+ generate_gemini_button.click(
394
+ process_gemini_result,
395
+ inputs=[gemini_video_id, gemini_language, gemini_format, gemini_model],
396
+ outputs=[gemini_output, gemini_timecodes, gemini_json]
397
+ )
398
+
399
+ # Launch the app
400
+ if __name__ == "__main__":
401
+ demo.launch()
gemini_helper.py ADDED
@@ -0,0 +1,297 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from google import genai
3
+ from google.genai import types
4
+ from dotenv import load_dotenv
5
+ from typing import List, Dict, Any, Optional
6
+ import traceback
7
+
8
+ # Load environment variables
9
+ load_dotenv()
10
+
11
+ # Get Gemini API key from environment variables
12
+ GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
13
+ print(f"GEMINI_API_KEY is set: {'Yes' if GEMINI_API_KEY else 'No'}")
14
+
15
+ # Initialize Gemini API
16
+ client = None
17
+ if GEMINI_API_KEY:
18
+ try:
19
+ client = genai.Client(api_key=GEMINI_API_KEY)
20
+ print("Gemini client successfully initialized")
21
+ except Exception as e:
22
+ print(f"Error initializing Gemini client: {str(e)}")
23
+ traceback.print_exc()
24
+ else:
25
+ print("WARNING: Gemini API key not configured. LLM timecode generation functions will be unavailable.")
26
+
27
+ # Default Gemini model
28
+ DEFAULT_MODEL = "gemini-2.0-flash-001"
29
+ # Alternative models if main one doesn't work
30
+ ALTERNATIVE_MODELS = ["gemini-1.5-flash-001"]
31
+
32
+ def format_transcript_for_prompt(transcript_entries: List[Dict[str, Any]], video_duration_seconds: int = None) -> str:
33
+ """Formats transcript for passing to prompt."""
34
+ formatted_transcript = ""
35
+
36
+ # Determine maximum time in transcript if video duration is not provided
37
+ if video_duration_seconds is None:
38
+ if transcript_entries:
39
+ last_entry = transcript_entries[-1]
40
+ max_time = last_entry.get("start", 0) + last_entry.get("duration", 0)
41
+ video_duration_seconds = int(max_time) + 10 # Add small buffer
42
+
43
+ for entry in transcript_entries:
44
+ start_time = entry.get("start", 0)
45
+ text = entry.get("text", "")
46
+
47
+ # Check that time doesn't exceed total video duration
48
+ if video_duration_seconds and start_time > video_duration_seconds:
49
+ continue
50
+
51
+ # Format time in hours:minutes:seconds format
52
+ time_str = format_time_hms(start_time)
53
+
54
+ formatted_transcript += f"[{time_str}] {text}\n"
55
+
56
+ return formatted_transcript
57
+
58
+ def format_time_hms(seconds: float) -> str:
59
+ """
60
+ Formats time in seconds to hours:minutes:seconds format.
61
+ For videos shorter than an hour, uses minutes:seconds format.
62
+ """
63
+ hours = int(seconds // 3600)
64
+ minutes = int((seconds % 3600) // 60)
65
+ secs = int(seconds % 60)
66
+
67
+ if hours > 0:
68
+ return f"{hours:02d}:{minutes:02d}:{secs:02d}"
69
+ else:
70
+ return f"{minutes:02d}:{secs:02d}"
71
+
72
+ def get_timecode_prompt(video_title: str, transcript: str, format_type: str = "youtube", language: str = None, video_duration_minutes: int = None) -> str:
73
+ """Creates prompt for generating timecodes based on transcript."""
74
+
75
+ # Determine prompt language based on video language
76
+ if language and (language.lower().startswith('uk') or language.lower().startswith('ua')):
77
+ target_language = "Ukrainian"
78
+ example_description = "Discussion of main principles"
79
+ elif language and language.lower().startswith('ru'):
80
+ target_language = "Russian"
81
+ example_description = "Обсуждение основных принципов"
82
+ else:
83
+ target_language = "the same language as the video transcript"
84
+ example_description = "Discussion of main principles"
85
+
86
+ # Determine number of timecodes based on video duration
87
+ if video_duration_minutes:
88
+ if video_duration_minutes <= 30:
89
+ timecode_count = "10-15"
90
+ elif video_duration_minutes <= 60:
91
+ timecode_count = "15-20"
92
+ else:
93
+ timecode_count = "20-30"
94
+ else:
95
+ timecode_count = "15-25"
96
+
97
+ if format_type == "youtube":
98
+ format_instructions = (
99
+ f"Format should be: MM:SS Topic description for videos under 1 hour, or HH:MM:SS Topic description for longer videos\n"
100
+ f"Example: 05:30 {example_description} or 1:05:30 {example_description}\n"
101
+ f"This format is suitable for YouTube video descriptions."
102
+ )
103
+ elif format_type == "markdown":
104
+ format_instructions = (
105
+ f"Format should be Markdown: - [MM:SS](link) Topic description for videos under 1 hour, or - [HH:MM:SS](link) Topic description for longer videos\n"
106
+ f"Example: - [05:30](https://youtu.be/VIDEOID?t=330) {example_description} or - [1:05:30](https://youtu.be/VIDEOID?t=3930) {example_description}\n"
107
+ f"This format creates clickable links in Markdown."
108
+ )
109
+ else: # txt
110
+ format_instructions = (
111
+ f"Format should be: MM:SS - Topic description for videos under 1 hour, or HH:MM:SS - Topic description for longer videos\n"
112
+ f"Example: 05:30 - {example_description} or 1:05:30 - {example_description}\n"
113
+ f"This format is suitable for plain text representation."
114
+ )
115
+
116
+ prompt = f"""
117
+ You are an expert at creating timestamps for YouTube videos. You have been provided with a transcript of the video "{video_title}".
118
+
119
+ Your task is to create timestamps for the main themes and segments of the video based on the provided transcript.
120
+ Create timestamp descriptions in {target_language}.
121
+
122
+ {format_instructions}
123
+
124
+ Rules for creating timestamps:
125
+ 1. Select {timecode_count} key video segments
126
+ 2. Use the time markers provided in the transcript to determine the start of each segment
127
+ 3. Create brief (3-7 words) descriptions for each segment that reflect its main theme, using appropriate terminology and style
128
+ 4. Distribute timestamps approximately evenly throughout the video length
129
+ 5. Use MM:SS format for videos under 1 hour (example: 05:30, 45:20), and HH:MM:SS format for videos 1 hour or longer (example: 1:05:30, 1:45:20)
130
+ 6. DO NOT include standard markers like "Video start" or "Video end"
131
+ 7. Ensure a clear structure so viewers can easily navigate through the video
132
+ 8. The first timestamp does NOT have to be 00:00, start with the first meaningful topic
133
+
134
+ Here is the video transcript:
135
+
136
+ {transcript}
137
+
138
+ Create a list of timestamps in the specified format. Reply with ONLY the list of timestamps, without introduction or conclusion.
139
+ """
140
+
141
+ return prompt
142
+
143
+ async def generate_timecodes_with_gemini(
144
+ transcript_entries: List[Dict[str, Any]],
145
+ video_title: str,
146
+ format_type: str = "youtube",
147
+ model_name: Optional[str] = None,
148
+ language: Optional[str] = None
149
+ ) -> Dict[str, Any]:
150
+ """
151
+ Generates timecodes using Gemini based on transcript.
152
+
153
+ Args:
154
+ transcript_entries: List of transcript entries
155
+ video_title: Video title
156
+ format_type: Timecode format (youtube, markdown)
157
+ model_name: Gemini model name (defaults to DEFAULT_MODEL)
158
+ language: Transcript language (if known)
159
+
160
+ Returns:
161
+ Dictionary with generation results
162
+ """
163
+ if not GEMINI_API_KEY or client is None:
164
+ return {
165
+ "error": "Gemini API key is not configured. Please add GEMINI_API_KEY to .env file"
166
+ }
167
+
168
+ try:
169
+ print(f"Starting timecode generation with model: {model_name or DEFAULT_MODEL}")
170
+
171
+ # Determine transcript language if not provided
172
+ detected_language = language
173
+ if not detected_language:
174
+ # Simple heuristic for language detection from first 10 segments
175
+ text_sample = " ".join([entry.get("text", "") for entry in transcript_entries[:10]])
176
+
177
+ # Set of Ukrainian letters that differ from Russian alphabet
178
+ ukrainian_specific = set("ґєії")
179
+ # If there's at least one specific Ukrainian letter
180
+ if any(char in ukrainian_specific for char in text_sample.lower()):
181
+ detected_language = "uk"
182
+ print("Detected transcript language: Ukrainian")
183
+ # Check for Cyrillic in general
184
+ elif any(ord('а') <= ord(char) <= ord('я') for char in text_sample.lower()):
185
+ detected_language = "ru"
186
+ print("Detected transcript language: Russian")
187
+ else:
188
+ detected_language = "en"
189
+ print("Detected transcript language: English (or other)")
190
+
191
+ # Determine video duration (in seconds and minutes)
192
+ video_duration_seconds = 0
193
+ if transcript_entries:
194
+ last_entry = transcript_entries[-1]
195
+ video_duration_seconds = last_entry.get("start", 0) + last_entry.get("duration", 0)
196
+ video_duration_minutes = int(video_duration_seconds / 60)
197
+ print(f"Determined video duration: {video_duration_minutes} minutes ({video_duration_seconds} seconds)")
198
+ else:
199
+ video_duration_minutes = None
200
+
201
+ # Format transcript for prompt
202
+ formatted_transcript = format_transcript_for_prompt(transcript_entries, video_duration_seconds)
203
+
204
+ # Create prompt considering language and duration
205
+ prompt = get_timecode_prompt(
206
+ video_title,
207
+ formatted_transcript,
208
+ format_type,
209
+ detected_language,
210
+ video_duration_minutes
211
+ )
212
+ print(f"Prompt prepared, length: {len(prompt)} characters")
213
+
214
+ # List of models to try
215
+ models_to_try = [model_name or DEFAULT_MODEL] + [m for m in ALTERNATIVE_MODELS if m != (model_name or DEFAULT_MODEL)]
216
+
217
+ last_error = None
218
+ for current_model in models_to_try:
219
+ try:
220
+ # Use async API client for content generation
221
+ print(f"Making request to Gemini API with model {current_model}...")
222
+ response = await client.aio.models.generate_content(
223
+ model=current_model,
224
+ contents=prompt,
225
+ config=types.GenerateContentConfig(
226
+ temperature=0.2, # Low temperature for more deterministic results
227
+ max_output_tokens=2048, # Enough for timecode list
228
+ )
229
+ )
230
+ print(f"Response received: {type(response)}")
231
+
232
+ # Get response text
233
+ timecodes_text = response.text
234
+ print(f"Response text length: {len(timecodes_text)}")
235
+
236
+ # Split into lines and clean
237
+ timecodes = [line.strip() for line in timecodes_text.split('\n') if line.strip()]
238
+
239
+ # Filter timecodes to remove "video start" and "video end"
240
+ filtered_timecodes = []
241
+ for tc in timecodes:
242
+ # Extract description (everything after time)
243
+ parts = tc.split(" ", 1)
244
+ if len(parts) > 1:
245
+ time_part, description = parts
246
+ # Skip timecodes with "video start" or "video end"
247
+ lowercase_desc = description.lower()
248
+ if any(phrase in lowercase_desc for phrase in [
249
+ "начало видео", "конец видео", "початок відео", "кінець відео",
250
+ "start of video", "end of video", "video start", "video end",
251
+ "beginning", "conclusion", "intro", "outro"
252
+ ]):
253
+ continue
254
+ filtered_timecodes.append(tc)
255
+
256
+ # If too many timecodes, select evenly distributed ones
257
+ max_timecodes = 25 # Maximum recommended number of timecodes
258
+ if len(filtered_timecodes) > max_timecodes:
259
+ print(f"Too many timecodes ({len(filtered_timecodes)}), reducing to {max_timecodes}")
260
+ # Calculate step for selecting timecodes evenly
261
+ step = len(filtered_timecodes) / max_timecodes
262
+ # Select indices for timecodes
263
+ indices = [int(i * step) for i in range(max_timecodes)]
264
+ # Ensure we have first and last timecode
265
+ if indices[-1] != len(filtered_timecodes) - 1:
266
+ indices[-1] = len(filtered_timecodes) - 1
267
+ # Select timecodes by indices
268
+ final_timecodes = [filtered_timecodes[i] for i in indices]
269
+ else:
270
+ final_timecodes = filtered_timecodes
271
+
272
+ print(f"Final timecodes count after processing: {len(final_timecodes)}")
273
+
274
+ return {
275
+ "timecodes": final_timecodes,
276
+ "format": format_type,
277
+ "model": current_model,
278
+ "video_title": video_title,
279
+ "detected_language": detected_language,
280
+ "video_duration_minutes": video_duration_minutes
281
+ }
282
+ except Exception as api_error:
283
+ print(f"Error with model {current_model}: {str(api_error)}")
284
+ traceback.print_exc()
285
+ last_error = api_error
286
+ continue
287
+
288
+ # If all models failed
289
+ return {
290
+ "error": f"Failed to execute request with any model. Last error: {str(last_error)}"
291
+ }
292
+ except Exception as e:
293
+ print(f"General error: {str(e)}")
294
+ traceback.print_exc()
295
+ return {
296
+ "error": f"Error generating timecodes with Gemini: {str(e)}"
297
+ }
gradio_app.py ADDED
@@ -0,0 +1,383 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import gradio as gr
2
+ import json
3
+ import httpx
4
+ import os
5
+ import traceback
6
+ from dotenv import load_dotenv
7
+ from utils import format_timestamp, extract_video_id
8
+
9
+ # Load environment variables
10
+ load_dotenv()
11
+
12
+ # API URL for local development
13
+ API_URL = "http://127.0.0.1:8080/api"
14
+ # API URL for Hugging Face Spaces
15
+ # API_URL = "https://your-huggingface-space-url/api"
16
+
17
+ async def search_youtube(query, max_results, order, video_duration):
18
+ """Function for searching videos on YouTube."""
19
+ try:
20
+ async with httpx.AsyncClient() as client:
21
+ response = await client.post(
22
+ f"{API_URL}/search",
23
+ json={
24
+ "query": query,
25
+ "max_results": max_results,
26
+ "order": order,
27
+ "video_duration": video_duration if video_duration != "any" else None
28
+ }
29
+ )
30
+ data = response.json()
31
+
32
+ if "error" in data and data["error"]:
33
+ return f"Error: {data['error']}", None
34
+
35
+ results = data.get("content", [])
36
+ formatted_results = []
37
+
38
+ for video in results:
39
+ formatted_results.append(
40
+ f"**{video['title']}**\n"
41
+ f"ID: {video['video_id']}\n"
42
+ f"Channel: {video['channel_title']}\n"
43
+ f"Published: {video['published_at']}\n"
44
+ f"[Thumbnail]({video['thumbnail']})\n\n"
45
+ f"{video['description'][:200]}...\n\n"
46
+ f"---\n"
47
+ )
48
+
49
+ return "\n".join(formatted_results), json.dumps(results, indent=2, ensure_ascii=False)
50
+ except Exception as e:
51
+ return f"Error: {str(e)}", None
52
+
53
+ async def get_video_info(video_id):
54
+ """Function for getting video information."""
55
+ try:
56
+ # No need to extract video ID here, it is done on the server
57
+ async with httpx.AsyncClient() as client:
58
+ response = await client.post(
59
+ f"{API_URL}/video_info",
60
+ json={"video_id": video_id}
61
+ )
62
+ data = response.json()
63
+
64
+ if "error" in data and data["error"]:
65
+ return f"Error: {data['error']}", None
66
+
67
+ video_info = data.get("content", {})
68
+
69
+ formatted_info = (
70
+ f"**{video_info.get('title')}**\n\n"
71
+ f"Channel: {video_info.get('channel_title')}\n"
72
+ f"Published: {video_info.get('published_at')}\n"
73
+ f"Views: {video_info.get('view_count')}\n"
74
+ f"Likes: {video_info.get('like_count')}\n"
75
+ f"Comments: {video_info.get('comment_count')}\n"
76
+ f"Duration: {video_info.get('duration')}\n\n"
77
+ f"**Description:**\n{video_info.get('description')}\n\n"
78
+ f"**Tags:**\n{', '.join(video_info.get('tags', []))}"
79
+ )
80
+
81
+ return formatted_info, json.dumps(video_info, indent=2, ensure_ascii=False)
82
+ except Exception as e:
83
+ return f"Error: {str(e)}", None
84
+
85
+ async def get_transcript(video_id, language_code):
86
+ """Function for getting video transcript."""
87
+ try:
88
+ async with httpx.AsyncClient() as client:
89
+ response = await client.post(
90
+ f"{API_URL}/transcript",
91
+ json={
92
+ "video_id": video_id,
93
+ "language_code": language_code if language_code else None
94
+ }
95
+ )
96
+ data = response.json()
97
+
98
+ if "error" in data and data["error"]:
99
+ return f"Error: {data['error']}", None
100
+
101
+ transcript = data.get("content", [])
102
+
103
+ formatted_transcript = ""
104
+ for entry in transcript:
105
+ start_time = entry.get("start", 0)
106
+ duration = entry.get("duration", 0)
107
+ end_time = start_time + duration
108
+
109
+ # Format time to hours:minutes:seconds format
110
+ start_formatted = format_timestamp(start_time)
111
+ end_formatted = format_timestamp(end_time)
112
+
113
+ formatted_transcript += f"[{start_formatted} - {end_formatted}] {entry.get('text', '')}\n\n"
114
+
115
+ return formatted_transcript, json.dumps(transcript, indent=2, ensure_ascii=False)
116
+ except Exception as e:
117
+ return f"Error: {str(e)}", None
118
+
119
+ async def get_available_languages(video_id):
120
+ """Function for getting available transcript languages."""
121
+ try:
122
+ async with httpx.AsyncClient() as client:
123
+ response = await client.post(
124
+ f"{API_URL}/transcript_languages",
125
+ json={"video_id": video_id}
126
+ )
127
+ data = response.json()
128
+
129
+ if "error" in data and data["error"]:
130
+ return f"Error: {data['error']}", None
131
+
132
+ languages = data.get("content", [])
133
+
134
+ formatted_languages = []
135
+ for lang in languages:
136
+ status = "Auto-generated" if lang.get("is_generated") else "Official subtitles"
137
+ translatable = "Translation available" if lang.get("is_translatable") else "Translation not available"
138
+ formatted_languages.append(
139
+ f"{lang.get('language')} ({lang.get('language_code')}): {status}, {translatable}"
140
+ )
141
+
142
+ return "\n".join(formatted_languages), json.dumps(languages, indent=2, ensure_ascii=False)
143
+ except Exception as e:
144
+ return f"Error: {str(e)}", None
145
+
146
+ async def generate_timecodes(video_id, language_code, segment_length, format_type):
147
+ """Function for generating timecodes."""
148
+ try:
149
+ async with httpx.AsyncClient() as client:
150
+ response = await client.post(
151
+ f"{API_URL}/timecodes",
152
+ json={
153
+ "video_id": video_id,
154
+ "language_code": language_code if language_code else None,
155
+ "segment_length": segment_length,
156
+ "format": format_type
157
+ }
158
+ )
159
+ data = response.json()
160
+
161
+ if "error" in data and data["error"]:
162
+ return f"Error: {data['error']}", None
163
+
164
+ timecodes = data.get("content", {}).get("timecodes", [])
165
+
166
+ if format_type == "youtube":
167
+ formatted_timecodes = "```\n" + "\n".join(timecodes) + "\n```"
168
+ elif format_type == "markdown":
169
+ formatted_timecodes = "\n".join(timecodes)
170
+ else:
171
+ formatted_timecodes = "```\n" + "\n".join(timecodes) + "\n```"
172
+
173
+ return formatted_timecodes, json.dumps(data.get("content", {}), indent=2, ensure_ascii=False)
174
+ except Exception as e:
175
+ return f"Error: {str(e)}", None
176
+
177
+ async def generate_gemini_timecodes(video_id, language_code, format_type, model):
178
+ """Function for generating timecodes using Gemini."""
179
+ try:
180
+ print(f"Sending request to {API_URL}/gemini_timecodes")
181
+ print(f"Parameters: video_id={video_id}, language_code={language_code}, format={format_type}, model={model}")
182
+
183
+ # Send request to API
184
+ async with httpx.AsyncClient() as client:
185
+ response = await client.post(
186
+ f"{API_URL}/gemini_timecodes",
187
+ json={
188
+ "video_id": video_id,
189
+ "language_code": language_code,
190
+ "format": format_type,
191
+ "model": model
192
+ },
193
+ timeout=120 # Increase timeout for Gemini API
194
+ )
195
+
196
+ print(f"Response status: {response.status_code}")
197
+
198
+ # Parse response
199
+ data = response.json()
200
+
201
+ if "error" in data:
202
+ print(f"Error in API response: {data['error']}")
203
+ return f"⚠️ Error: {data['error']}", {"error": data['error']}
204
+
205
+ # Extract timecodes from response
206
+ content = data.get("content", {})
207
+ timecodes = content.get("timecodes", [])
208
+
209
+ print(f"Received {len(timecodes)} timecodes")
210
+
211
+ # Format timecodes for display
212
+ if timecodes:
213
+ timecodes_text = "\n".join(timecodes)
214
+
215
+ # Model and language information
216
+ model_info = content.get("model", "Unknown")
217
+ language_info = content.get("detected_language", "Unknown")
218
+ duration_info = content.get("video_duration_minutes", "Unknown")
219
+
220
+ summary = f"🤖 Model: {model_info}\n🗣️ Language: {language_info}\n⏱️ Duration: {duration_info} min\n📝 Timecodes: {len(timecodes)}"
221
+
222
+ return summary, content # Return content object instead of timecodes_text
223
+ else:
224
+ return "⚠️ No timecodes generated", {"message": "No timecodes generated"}
225
+
226
+ except Exception as e:
227
+ print(f"Exception during timecode generation: {str(e)}")
228
+ traceback.print_exc()
229
+ return f"Error: {str(e)}", {"error": str(e)}
230
+
231
+ # Create Gradio interface
232
+ with gr.Blocks(title="YouTube MCP") as demo:
233
+ gr.Markdown("# YouTube Model Context Protocol (MCP)")
234
+ gr.Markdown("This interface allows interaction with YouTube API through MCP protocol")
235
+
236
+ with gr.Tab("Поиск видео"):
237
+ with gr.Row():
238
+ with gr.Column():
239
+ search_query = gr.Textbox(label="Поисковый запрос", placeholder="Введите запрос...")
240
+ with gr.Row():
241
+ max_results = gr.Slider(minimum=1, maximum=50, value=10, step=1, label="Колич��ство результатов")
242
+ order = gr.Dropdown(
243
+ choices=["relevance", "date", "viewCount", "rating", "title"],
244
+ value="relevance",
245
+ label="Сортировка"
246
+ )
247
+ video_duration = gr.Dropdown(
248
+ choices=["any", "short", "medium", "long"],
249
+ value="any",
250
+ label="Длительность"
251
+ )
252
+ search_button = gr.Button("Поиск")
253
+
254
+ with gr.Column():
255
+ search_results = gr.Markdown(label="Результаты")
256
+ search_json = gr.JSON(label="JSON данные")
257
+
258
+ search_button.click(
259
+ search_youtube,
260
+ inputs=[search_query, max_results, order, video_duration],
261
+ outputs=[search_results, search_json]
262
+ )
263
+
264
+ with gr.Tab("Информация о видео"):
265
+ with gr.Row():
266
+ with gr.Column():
267
+ video_id_input = gr.Textbox(
268
+ label="ID видео или ссылка на видео",
269
+ placeholder="Введите ID видео или полную ссылку (youtube.com, youtu.be, shorts, embed)..."
270
+ )
271
+ get_info_button = gr.Button("Получить информацию")
272
+
273
+ with gr.Column():
274
+ video_info_output = gr.Markdown(label="Информация о видео")
275
+ video_info_json = gr.JSON(label="JSON данные")
276
+
277
+ get_info_button.click(
278
+ get_video_info,
279
+ inputs=[video_id_input],
280
+ outputs=[video_info_output, video_info_json]
281
+ )
282
+
283
+ with gr.Tab("Транскрипт видео"):
284
+ with gr.Row():
285
+ with gr.Column():
286
+ transcript_video_id = gr.Textbox(
287
+ label="ID видео или ссылка на видео",
288
+ placeholder="Введите ID видео или полную ссылку (youtube.com, youtu.be, shorts, embed)..."
289
+ )
290
+ language_code = gr.Textbox(label="Код языка (опционально)", placeholder="ru, en, etc...")
291
+ with gr.Row():
292
+ get_transcript_button = gr.Button("Получить транскрипт")
293
+ get_languages_button = gr.Button("Получить доступные языки")
294
+
295
+ with gr.Column():
296
+ transcript_output = gr.Markdown(label="Транскрипт")
297
+ transcript_json = gr.JSON(label="JSON данные")
298
+
299
+ get_transcript_button.click(
300
+ get_transcript,
301
+ inputs=[transcript_video_id, language_code],
302
+ outputs=[transcript_output, transcript_json]
303
+ )
304
+
305
+ get_languages_button.click(
306
+ get_available_languages,
307
+ inputs=[transcript_video_id],
308
+ outputs=[transcript_output, transcript_json]
309
+ )
310
+
311
+ with gr.Tab("Тайм-коды"):
312
+ with gr.Row():
313
+ with gr.Column():
314
+ timecode_video_id = gr.Textbox(
315
+ label="ID видео или ссылка на видео",
316
+ placeholder="Введите ID видео или полную ссылку (youtube.com, youtu.be, shorts, embed)..."
317
+ )
318
+ timecode_language = gr.Textbox(label="Код языка (опционально)", placeholder="ru, en, etc...")
319
+ segment_length = gr.Slider(minimum=30, maximum=300, value=60, step=30, label="Длина сегмента (секунды)")
320
+ format_type = gr.Dropdown(
321
+ choices=["youtube", "markdown"],
322
+ value="youtube",
323
+ label="Формат тайм-кодов"
324
+ )
325
+ generate_timecodes_button = gr.Button("Сгенерировать тайм-коды")
326
+
327
+ with gr.Column():
328
+ timecodes_output = gr.Markdown(label="Тайм-коды")
329
+ timecodes_json = gr.JSON(label="JSON данные")
330
+
331
+ generate_timecodes_button.click(
332
+ generate_timecodes,
333
+ inputs=[timecode_video_id, timecode_language, segment_length, format_type],
334
+ outputs=[timecodes_output, timecodes_json]
335
+ )
336
+
337
+ with gr.Tab("Gemini Тайм-коды"):
338
+ with gr.Row():
339
+ with gr.Column():
340
+ gemini_video_id = gr.Textbox(
341
+ label="ID видео или ссылка на видео",
342
+ placeholder="Введите ID видео или полную ссылку (youtube.com, youtu.be, shorts, embed)..."
343
+ )
344
+ gemini_language = gr.Textbox(label="Код языка (опционально)", placeholder="ru, en, etc...")
345
+ gemini_format = gr.Dropdown(
346
+ choices=["youtube", "markdown"],
347
+ value="youtube",
348
+ label="Формат тайм-кодов"
349
+ )
350
+ gemini_model = gr.Dropdown(
351
+ choices=["gemini-2.0-flash-001", "gemini-2.0-pro-001", "gemini-2.0-pro-vision-001"],
352
+ value="gemini-2.0-flash-001",
353
+ label="Модель Gemini"
354
+ )
355
+ generate_gemini_button = gr.Button("Сгенерировать тайм-коды с Gemini")
356
+
357
+ with gr.Column():
358
+ gemini_output = gr.Markdown(label="Информация о генерации")
359
+ gemini_timecodes = gr.Textbox(label="Тайм-коды", lines=10, max_lines=20, show_copy_button=True)
360
+ gemini_json = gr.JSON(label="JSON данные")
361
+
362
+ async def process_gemini_result(video_id, language_code, format_type, model):
363
+ result = await generate_gemini_timecodes(video_id, language_code, format_type, model)
364
+ if result is None:
365
+ return "Error occurred", "", {}
366
+
367
+ summary, json_data = result
368
+
369
+ # Extract timecodes from json_data
370
+ timecodes = json_data.get("timecodes", [])
371
+ timecodes_text = "\n".join(timecodes) if timecodes else "No timecodes generated"
372
+
373
+ return summary, timecodes_text, json_data
374
+
375
+ generate_gemini_button.click(
376
+ process_gemini_result,
377
+ inputs=[gemini_video_id, gemini_language, gemini_format, gemini_model],
378
+ outputs=[gemini_output, gemini_timecodes, gemini_json]
379
+ )
380
+
381
+ # Запуск приложения
382
+ if __name__ == "__main__":
383
+ demo.launch()
main.py ADDED
@@ -0,0 +1,83 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ #!/usr/bin/env python3
2
+ """
3
+ Unified launcher for YouTube MCP application.
4
+ Provides options to run API server, Gradio UI, or both.
5
+ """
6
+
7
+ import argparse
8
+ import asyncio
9
+ import uvicorn
10
+ import threading
11
+ import time
12
+ from dotenv import load_dotenv
13
+
14
+ # Load environment variables
15
+ load_dotenv()
16
+
17
+ def start_api_server(host="127.0.0.1", port=8080):
18
+ """Start FastAPI server."""
19
+ from api_server import app
20
+ print(f"Starting API server on http://{host}:{port}")
21
+ uvicorn.run(app, host=host, port=port)
22
+
23
+ def start_gradio_ui(host="127.0.0.1", port=8081):
24
+ """Start Gradio UI."""
25
+ import gradio_app
26
+ print(f"Starting Gradio UI on http://{host}:{port}")
27
+ gradio_app.demo.launch(server_name=host, server_port=port, share=False)
28
+
29
+ def start_both(api_host="127.0.0.1", api_port=8080, ui_host="127.0.0.1", ui_port=8081):
30
+ """Start both API server and Gradio UI."""
31
+ print(f"Starting API server on http://{api_host}:{api_port}")
32
+ print(f"Starting Gradio UI on http://{ui_host}:{ui_port}")
33
+
34
+ # Start API server in a separate thread
35
+ api_thread = threading.Thread(
36
+ target=start_api_server,
37
+ args=(api_host, api_port),
38
+ daemon=True
39
+ )
40
+ api_thread.start()
41
+
42
+ # Wait a moment for API server to start
43
+ time.sleep(2)
44
+
45
+ # Start Gradio UI in main thread
46
+ start_gradio_ui(ui_host, ui_port)
47
+
48
+ def main():
49
+ parser = argparse.ArgumentParser(description="YouTube MCP Application Launcher")
50
+ parser.add_argument(
51
+ "--mode",
52
+ choices=["api", "ui", "both"],
53
+ default="both",
54
+ help="Launch mode: 'api' for FastAPI server only, 'ui' for Gradio UI only, 'both' for both services"
55
+ )
56
+ parser.add_argument(
57
+ "--host",
58
+ default="127.0.0.1",
59
+ help="Host address (default: 127.0.0.1)"
60
+ )
61
+ parser.add_argument(
62
+ "--port",
63
+ type=int,
64
+ default=8080,
65
+ help="Port number (default: 8080 for API, 8081 for UI in 'both' mode)"
66
+ )
67
+
68
+ args = parser.parse_args()
69
+
70
+ try:
71
+ if args.mode == "api":
72
+ start_api_server(args.host, args.port)
73
+ elif args.mode == "ui":
74
+ start_gradio_ui(args.host, args.port)
75
+ elif args.mode == "both":
76
+ start_both(args.host, args.port, args.host, args.port + 1)
77
+ except KeyboardInterrupt:
78
+ print("\nKeyboard interruption in main thread... closing server.")
79
+ except Exception as e:
80
+ print(f"Error starting application: {e}")
81
+
82
+ if __name__ == "__main__":
83
+ main()
mcp_handlers.py ADDED
@@ -0,0 +1,478 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import Request, HTTPException
2
+ from typing import Dict, List, Any, Optional, Union
3
+ from pydantic import BaseModel
4
+ import json
5
+ import httpx
6
+ from googleapiclient.discovery import build
7
+ from googleapiclient.errors import HttpError
8
+ from youtube_transcript_api import YouTubeTranscriptApi
9
+ from youtube_transcript_api.formatters import JSONFormatter
10
+ from gemini_helper import generate_timecodes_with_gemini, DEFAULT_MODEL
11
+ from models import MCPResponse
12
+ from utils import format_timestamp, extract_video_id
13
+
14
+ # Data models for MCP
15
+ class MCPQueryRequest(BaseModel):
16
+ query: str
17
+ max_results: Optional[int] = 10
18
+
19
+ class MCPVideoRequest(BaseModel):
20
+ video_id: str
21
+
22
+ class MCPTranscriptRequest(BaseModel):
23
+ video_id: str
24
+ language_code: Optional[str] = None
25
+
26
+ # Model for timecode requests through MCP
27
+ class MCPTimecodeRequest(BaseModel):
28
+ video_id: str
29
+ language_code: Optional[str] = None
30
+ segment_length: Optional[int] = 60 # Segment length in seconds
31
+ format: Optional[str] = "youtube" # youtube, markdown
32
+
33
+ # Model for Gemini timecode requests through MCP
34
+ class MCPGeminiRequest(BaseModel):
35
+ video_id: str
36
+ language_code: Optional[str] = None
37
+ format: Optional[str] = "youtube" # youtube, markdown
38
+ model: Optional[str] = DEFAULT_MODEL # Gemini model
39
+
40
+ # Functions for processing MCP requests
41
+ async def process_mcp_search(youtube_client, request: MCPQueryRequest) -> List[MCPResponse]:
42
+ """Process MCP request for video search."""
43
+ try:
44
+ search_response = youtube_client.search().list(
45
+ q=request.query,
46
+ part="snippet",
47
+ maxResults=request.max_results,
48
+ type="video"
49
+ ).execute()
50
+
51
+ results = []
52
+ for item in search_response.get("items", []):
53
+ video_id = item["id"]["videoId"]
54
+ snippet = item["snippet"]
55
+
56
+ # Create MCP format response
57
+ video_data = {
58
+ "video_id": video_id,
59
+ "title": snippet["title"],
60
+ "description": snippet["description"],
61
+ "thumbnail": snippet["thumbnails"]["high"]["url"],
62
+ "channel_title": snippet["channelTitle"],
63
+ "published_at": snippet["publishedAt"]
64
+ }
65
+
66
+ # Format markdown for video display
67
+ markdown_text = (
68
+ f"## {snippet['title']}\n"
69
+ f"**Channel:** {snippet['channelTitle']}\n"
70
+ f"**Published:** {snippet['publishedAt']}\n\n"
71
+ f"[![Thumbnail]({snippet['thumbnails']['high']['url']})](https://www.youtube.com/watch?v={video_id})\n\n"
72
+ f"{snippet['description'][:300]}...\n\n"
73
+ f"[Watch on YouTube](https://www.youtube.com/watch?v={video_id})"
74
+ )
75
+
76
+ results.append(MCPResponse(
77
+ type="youtube_video",
78
+ markdown=markdown_text,
79
+ data=video_data
80
+ ))
81
+
82
+ return results
83
+ except HttpError as e:
84
+ raise HTTPException(status_code=500, detail=f"YouTube API error: {str(e)}")
85
+ except Exception as e:
86
+ raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
87
+
88
+ async def process_mcp_video_info(youtube_client, request: MCPVideoRequest) -> MCPResponse:
89
+ """Process MCP request for video information."""
90
+ try:
91
+ # Extract video ID from URL if it's a URL
92
+ video_id = extract_video_id(request.video_id)
93
+
94
+ video_response = youtube_client.videos().list(
95
+ part="snippet,contentDetails,statistics",
96
+ id=video_id
97
+ ).execute()
98
+
99
+ if not video_response.get("items"):
100
+ return MCPResponse(
101
+ type="error",
102
+ error="Video not found"
103
+ )
104
+
105
+ video_item = video_response["items"][0]["snippet"]
106
+ content_details = video_response["items"][0].get("contentDetails", {})
107
+ statistics = video_response["items"][0].get("statistics", {})
108
+
109
+ # Get detailed video information
110
+ video_data = {
111
+ "video_id": video_id,
112
+ "title": video_item.get("title"),
113
+ "channel_title": video_item.get("channelTitle"),
114
+ "published_at": video_item.get("publishedAt"),
115
+ "view_count": statistics.get("viewCount"),
116
+ "like_count": statistics.get("likeCount"),
117
+ "comment_count": statistics.get("commentCount"),
118
+ "duration": content_details.get("duration"),
119
+ "thumbnail": video_item.get("thumbnails", {}).get("high", {}).get("url")
120
+ }
121
+
122
+ return MCPResponse(
123
+ type="text",
124
+ content=f"Video information:\n{json.dumps(video_data, indent=2, ensure_ascii=False)}"
125
+ )
126
+ except HttpError as e:
127
+ return MCPResponse(
128
+ type="error",
129
+ error=f"YouTube API error: {str(e)}"
130
+ )
131
+ except Exception as e:
132
+ return MCPResponse(
133
+ type="error",
134
+ error=f"Unexpected error: {str(e)}"
135
+ )
136
+
137
+ async def process_mcp_transcript(request: MCPTranscriptRequest) -> MCPResponse:
138
+ """Process MCP request for video transcript."""
139
+ try:
140
+ # Extract video ID from URL if it's a URL
141
+ video_id = extract_video_id(request.video_id)
142
+
143
+ try:
144
+ languages = [request.language_code] if request.language_code else None
145
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=languages)
146
+ except Exception as transcript_error:
147
+ if request.language_code:
148
+ try:
149
+ print(f"Failed to get transcript in language {request.language_code}, trying to get available transcripts")
150
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
151
+ except Exception as fallback_error:
152
+ return MCPResponse(
153
+ type="error",
154
+ error=f"Transcript not found. Details: {str(fallback_error)}"
155
+ )
156
+ else:
157
+ return MCPResponse(
158
+ type="error",
159
+ error=f"Failed to get transcript. Details: {str(transcript_error)}"
160
+ )
161
+
162
+ if not transcript_list:
163
+ return MCPResponse(
164
+ type="error",
165
+ error="Transcript for this video is unavailable"
166
+ )
167
+
168
+ formatted_transcript = []
169
+ for entry in transcript_list:
170
+ formatted_transcript.append({
171
+ "text": entry.get("text", ""),
172
+ "start": entry.get("start", 0),
173
+ "duration": entry.get("duration", 0)
174
+ })
175
+
176
+ # Format markdown for transcript display
177
+ markdown_text = "# Transcript\n\n"
178
+ for entry in formatted_transcript:
179
+ start_time = entry.get("start")
180
+ duration = entry.get("duration")
181
+ end_time = start_time + duration
182
+ text = entry.get("text")
183
+
184
+ # Convert time to hours:minutes:seconds format
185
+ start_formatted = format_timestamp(start_time)
186
+ end_formatted = format_timestamp(end_time)
187
+
188
+ markdown_text += f"[{start_formatted} - {end_formatted}] {text}\n\n"
189
+
190
+ return MCPResponse(
191
+ type="youtube_transcript",
192
+ markdown=markdown_text,
193
+ data={
194
+ "video_id": video_id,
195
+ "transcript": formatted_transcript
196
+ }
197
+ )
198
+ except Exception as e:
199
+ return MCPResponse(
200
+ type="error",
201
+ error=f"Error getting transcript: {str(e)}"
202
+ )
203
+
204
+ # Function for creating text response in MCP format
205
+ def create_text_response(text: str) -> MCPResponse:
206
+ """Creates text response in MCP format."""
207
+ return MCPResponse(
208
+ type="text",
209
+ text=text
210
+ )
211
+
212
+ # Function for creating error response in MCP format
213
+ def create_error_response(error_message: str) -> MCPResponse:
214
+ """Creates error response in MCP format."""
215
+ return MCPResponse(
216
+ type="error",
217
+ error=error_message
218
+ )
219
+
220
+ # Function for formatting time to hours:minutes:seconds format
221
+ def format_timestamp(seconds):
222
+ """Formats time in seconds to hours:minutes:seconds format."""
223
+ hours = int(seconds // 3600)
224
+ minutes = int((seconds % 3600) // 60)
225
+ secs = int(seconds % 60)
226
+
227
+ if hours > 0:
228
+ return f"{hours:02d}:{minutes:02d}:{secs:02d}"
229
+ else:
230
+ return f"{minutes:02d}:{secs:02d}"
231
+
232
+ async def process_mcp_timecodes(youtube_client, request: MCPTimecodeRequest) -> MCPResponse:
233
+ """Process MCP request for timecode generation."""
234
+ try:
235
+ # Extract video ID from URL if it's a URL
236
+ video_id = extract_video_id(request.video_id)
237
+
238
+ # Get transcript
239
+ try:
240
+ languages = [request.language_code] if request.language_code else None
241
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=languages)
242
+ except Exception as transcript_error:
243
+ if request.language_code:
244
+ try:
245
+ print(f"Failed to get transcript in language {request.language_code}, trying to get available transcripts")
246
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
247
+ except Exception as fallback_error:
248
+ return MCPResponse(
249
+ type="error",
250
+ error=f"Transcript not found. Details: {str(fallback_error)}"
251
+ )
252
+ else:
253
+ return MCPResponse(
254
+ type="error",
255
+ error=f"Failed to get transcript. Details: {str(transcript_error)}"
256
+ )
257
+
258
+ if not transcript_list:
259
+ return MCPResponse(
260
+ type="error",
261
+ error="Transcript for this video is unavailable"
262
+ )
263
+
264
+ # Group transcript into segments
265
+ segments = []
266
+ current_segment = {
267
+ "start": transcript_list[0]["start"],
268
+ "end": 0,
269
+ "text": []
270
+ }
271
+
272
+ segment_length = request.segment_length
273
+
274
+ for entry in transcript_list:
275
+ start_time = entry["start"]
276
+
277
+ # If current segment is empty or entry is within segment length
278
+ if not current_segment["text"] or (start_time - current_segment["start"]) <= segment_length:
279
+ current_segment["text"].append(entry["text"])
280
+ current_segment["end"] = start_time + entry["duration"]
281
+ else:
282
+ # Close current segment and start new
283
+ segments.append(dict(current_segment))
284
+ current_segment = {
285
+ "start": start_time,
286
+ "end": start_time + entry["duration"],
287
+ "text": [entry["text"]]
288
+ }
289
+
290
+ # Add last segment
291
+ if current_segment["text"]:
292
+ segments.append(current_segment)
293
+
294
+ # Format timecodes according to selected format
295
+ format_type = request.format.lower()
296
+ timecodes = []
297
+
298
+ for segment in segments:
299
+ start_formatted = format_timestamp(segment["start"])
300
+
301
+ # Summary text of segment (first 100 characters)
302
+ text_summary = " ".join(segment["text"])
303
+ if len(text_summary) > 100:
304
+ text_summary = text_summary[:97] + "..."
305
+
306
+ if format_type == "youtube":
307
+ # Format for YouTube (for embedding in description)
308
+ timecodes.append(f"{start_formatted} {text_summary}")
309
+ elif format_type == "markdown":
310
+ # Format for Markdown
311
+ youtube_link = f"https://www.youtube.com/watch?v={video_id}&t={int(segment['start'])}"
312
+ timecodes.append(f"- [{start_formatted}]({youtube_link}) {text_summary}")
313
+
314
+ # Create markdown with timecodes
315
+ markdown_text = f"# Timecodes for Video\n\n"
316
+
317
+ if format_type == "youtube":
318
+ markdown_text += "```\n"
319
+ markdown_text += "\n".join(timecodes)
320
+ markdown_text += "\n```"
321
+ elif format_type == "markdown":
322
+ markdown_text += "\n".join(timecodes)
323
+
324
+ # Get video information for title
325
+ try:
326
+ video_response = youtube_client.videos().list(
327
+ part="snippet",
328
+ id=video_id
329
+ ).execute()
330
+
331
+ if video_response.get("items"):
332
+ video_title = video_response["items"][0]["snippet"]["title"]
333
+ markdown_text = f"# Timecodes for Video: {video_title}\n\n" + markdown_text[markdown_text.find('\n\n') + 2:]
334
+ except Exception as e:
335
+ print(f"Failed to get video information: {str(e)}")
336
+ video_title = "YouTube Video"
337
+
338
+ return MCPResponse(
339
+ type="youtube_timecodes",
340
+ markdown=markdown_text,
341
+ data={
342
+ "video_id": video_id,
343
+ "timecodes": timecodes,
344
+ "format": format_type,
345
+ "segment_length": segment_length,
346
+ "total_segments": len(segments)
347
+ }
348
+ )
349
+ except Exception as e:
350
+ return MCPResponse(
351
+ type="error",
352
+ error=f"Error generating timecodes: {str(e)}"
353
+ )
354
+
355
+ async def process_mcp_gemini_timecodes(youtube_client, request: MCPGeminiRequest) -> MCPResponse:
356
+ """Process MCP request for Gemini timecode generation."""
357
+ try:
358
+ # Get transcript
359
+ try:
360
+ languages = [request.language_code] if request.language_code else None
361
+ transcript_list = YouTubeTranscriptApi.get_transcript(request.video_id, languages=languages)
362
+ except Exception as transcript_error:
363
+ if request.language_code:
364
+ try:
365
+ print(f"Failed to get transcript in language {request.language_code}, trying to get available transcripts")
366
+ transcript_list = YouTubeTranscriptApi.get_transcript(request.video_id)
367
+ except Exception as fallback_error:
368
+ return MCPResponse(
369
+ type="error",
370
+ error=f"Transcript not found. Details: {str(fallback_error)}"
371
+ )
372
+ else:
373
+ return MCPResponse(
374
+ type="error",
375
+ error=f"Failed to get transcript. Details: {str(transcript_error)}"
376
+ )
377
+
378
+ if not transcript_list:
379
+ return MCPResponse(
380
+ type="error",
381
+ error="Transcript for this video is unavailable"
382
+ )
383
+
384
+ # Get video information for title
385
+ try:
386
+ video_response = youtube_client.videos().list(
387
+ part="snippet",
388
+ id=request.video_id
389
+ ).execute()
390
+
391
+ if video_response.get("items"):
392
+ video_title = video_response["items"][0]["snippet"]["title"]
393
+ except Exception as e:
394
+ print(f"Failed to get video information: {str(e)}")
395
+ video_title = "YouTube Video"
396
+
397
+ # Send request to Gemini
398
+ result = await generate_timecodes_with_gemini(
399
+ transcript_entries=transcript_list,
400
+ video_title=video_title,
401
+ format_type=request.format,
402
+ model_name=request.model
403
+ )
404
+
405
+ if "error" in result:
406
+ return MCPResponse(
407
+ type="error",
408
+ error=result["error"]
409
+ )
410
+
411
+ # Create markdown with timecodes
412
+ timecodes = result.get("timecodes", [])
413
+ format_type = result.get("format", "youtube")
414
+
415
+ markdown_text = f"# Timecodes for Video: {video_title}\n\n"
416
+
417
+ if format_type == "youtube":
418
+ markdown_text += "```\n"
419
+ markdown_text += "\n".join(timecodes)
420
+ markdown_text += "\n```"
421
+ elif format_type == "markdown":
422
+ markdown_text += "\n".join(timecodes)
423
+
424
+ return MCPResponse(
425
+ type="youtube_gemini_timecodes",
426
+ markdown=markdown_text,
427
+ data=result
428
+ )
429
+ except Exception as e:
430
+ return MCPResponse(
431
+ type="error",
432
+ error=f"Error generating timecodes with Gemini: {str(e)}"
433
+ )
434
+
435
+ async def process_mcp_transcript_languages(request: MCPVideoRequest) -> MCPResponse:
436
+ """Process MCP request for getting list of available transcript languages."""
437
+ try:
438
+ # Extract video ID from URL if it's a URL
439
+ video_id = extract_video_id(request.video_id)
440
+
441
+ try:
442
+ transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
443
+
444
+ languages = []
445
+ for transcript in transcript_list:
446
+ languages.append({
447
+ "language_code": transcript.language_code,
448
+ "language": transcript.language,
449
+ "is_generated": transcript.is_generated,
450
+ "is_translatable": transcript.is_translatable
451
+ })
452
+
453
+ # Format markdown for displaying language list
454
+ markdown_text = "# Available Transcript Languages\n\n"
455
+ for language in languages:
456
+ lang_type = "Auto-generated" if language["is_generated"] else "Manually added"
457
+ translatable = "Available for translation" if language.get("is_translatable", False) else "Not available for translation"
458
+
459
+ markdown_text += f"- **{language['language']}** ({language['language_code']}): {lang_type}, {translatable}\n"
460
+
461
+ return MCPResponse(
462
+ type="youtube_transcript_languages",
463
+ markdown=markdown_text,
464
+ data={
465
+ "video_id": video_id,
466
+ "languages": languages
467
+ }
468
+ )
469
+ except Exception as transcript_error:
470
+ return MCPResponse(
471
+ type="error",
472
+ error=f"Failed to get language list. Details: {str(transcript_error)}"
473
+ )
474
+ except Exception as e:
475
+ return MCPResponse(
476
+ type="error",
477
+ error=f"Error getting language list: {str(e)}"
478
+ )
models.py ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Dict, Any, Optional
2
+ from pydantic import BaseModel
3
+
4
+ class MCPResponse(BaseModel):
5
+ """Response model for MCP API."""
6
+ type: str
7
+ text: Optional[str] = None
8
+ markdown: Optional[str] = None
9
+ data: Optional[Dict[str, Any]] = None
10
+ error: Optional[str] = None
pyproject.toml ADDED
@@ -0,0 +1,17 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [project]
2
+ name = "youtube"
3
+ version = "0.1.0"
4
+ description = "YouTube API integration for Model Context Protocol (MCP)"
5
+ readme = "README.md"
6
+ requires-python = ">=3.13"
7
+ dependencies = [
8
+ "fastapi>=0.104.0",
9
+ "uvicorn>=0.23.2",
10
+ "pydantic>=2.4.2",
11
+ "httpx>=0.25.0",
12
+ "python-dotenv>=1.0.0",
13
+ "google-api-python-client>=2.122.0",
14
+ "gradio>=4.4.0",
15
+ "youtube-transcript-api>=0.6.1",
16
+ "google-genai>=0.3.0"
17
+ ]
requirements.txt ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ fastapi>=0.104.0
2
+ uvicorn>=0.23.2
3
+ pydantic>=2.4.2
4
+ httpx>=0.25.0
5
+ python-dotenv>=1.0.0
6
+ google-api-python-client>=2.122.0
7
+ gradio>=4.4.0
8
+ youtube-transcript-api>=0.6.1
9
+ google-genai>=0.3.0
utils.py ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+
3
+ def format_timestamp(seconds):
4
+ """Formats time in seconds to hours:minutes:seconds format."""
5
+ hours = int(seconds // 3600)
6
+ minutes = int((seconds % 3600) // 60)
7
+ secs = int(seconds % 60)
8
+
9
+ if hours > 0:
10
+ return f"{hours:02d}:{minutes:02d}:{secs:02d}"
11
+ else:
12
+ return f"{minutes:02d}:{secs:02d}"
13
+
14
+ def extract_video_id(video_id_or_url):
15
+ """
16
+ Extracts video ID from a string that can be either an ID or full YouTube URL.
17
+
18
+ Supported formats:
19
+ - Simple ID (e.g., dQw4w9WgXcQ)
20
+ - https://www.youtube.com/watch?v=dQw4w9WgXcQ
21
+ - https://youtu.be/dQw4w9WgXcQ
22
+ - https://youtube.com/shorts/dQw4w9WgXcQ
23
+ - https://www.youtube.com/embed/dQw4w9WgXcQ
24
+ - https://youtube.com/live/dQw4w9WgXcQ
25
+
26
+ Returns:
27
+ - Video ID or original string if ID not found
28
+ """
29
+ print(f"Processing input value: {video_id_or_url}")
30
+
31
+ # If input string is empty or None, return empty string
32
+ if not video_id_or_url:
33
+ print("Empty video ID")
34
+ return ""
35
+
36
+ # Check for simple ID (without special characters)
37
+ if re.match(r'^[a-zA-Z0-9_-]{11}$', video_id_or_url):
38
+ print(f"Found simple ID: {video_id_or_url}")
39
+ return video_id_or_url
40
+
41
+ # Check for nested URLs (when URL is part of another URL)
42
+ inner_url_match = re.search(r'https?://(?:www\.)?(?:youtube\.com|youtu\.be).*?(?=&|$|\s)', video_id_or_url)
43
+ if inner_url_match:
44
+ inner_url = inner_url_match.group(0)
45
+ print(f"Found nested URL: {inner_url}")
46
+ video_id_or_url = inner_url
47
+
48
+ # Check for standard youtube.com/watch?v= link
49
+ match = re.search(r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/shorts/|youtube\.com/embed/|youtube\.com/live/)([a-zA-Z0-9_-]{11})', video_id_or_url)
50
+ if match:
51
+ video_id = match.group(1)
52
+ print(f"Extracted ID from URL: {video_id}")
53
+ return video_id
54
+
55
+ # If failed to extract ID, return original string
56
+ print(f"Failed to extract ID, returning original value: {video_id_or_url}")
57
+ return video_id_or_url