@woai commited on
Commit
e775565
·
1 Parent(s): 81339cb

🧹 Major code cleanup and internationalization - Remove Russian comments/strings, translate UI to English, clean linter errors, remove hardcoded tokens, delete test files. Ready for production deployment

Browse files
README.md CHANGED
@@ -148,14 +148,22 @@ POST /api/gemini_timecodes
148
  ## 📁 Project Structure
149
 
150
  ```
151
- ├── app.py # Main Gradio application (HF Spaces entry point)
152
- ├── api_server.py # FastAPI backend server
153
- ├── gemini_helper.py # Gemini AI integration
154
- ├── utils.py # Utility functions
155
- ├── models.py # Data models
156
- ├── mcp_handlers.py # Model Context Protocol handlers
157
- ├── requirements.txt # Python dependencies
158
- └── README.md # This file
 
 
 
 
 
 
 
 
159
  ```
160
 
161
  ## 🔬 Technology Stack
 
148
  ## 📁 Project Structure
149
 
150
  ```
151
+ ├── main.py # Unified launcher (API/UI/both modes)
152
+ ├── run_telegram_bot.py # Telegram bot launcher
153
+ ├── api_server.py # FastAPI backend server
154
+ ├── telegram_bot.py # Telegram bot implementation
155
+ ├── mcp_handlers.py # Model Context Protocol handlers
156
+ ├── gemini_helper.py # Gemini AI integration
157
+ ├── utils.py # Utility functions
158
+ ├── models.py # Data models
159
+ ├── app.py # Gradio app (HF Spaces entry point)
160
+ ├── gradio_app.py # Extended Gradio interface
161
+ ├── requirements.txt # Python dependencies
162
+ ├── telegram_requirements.txt # Telegram bot dependencies
163
+ ├── cloudflare-config.yml # Cloudflare tunnel configuration
164
+ ├── TUNNEL_SOLUTIONS.md # Tunnel troubleshooting guide
165
+ ├── youtube-content-metagen-agent.ipynb # Kaggle reference notebook
166
+ └── README.md # This file
167
  ```
168
 
169
  ## 🔬 Technology Stack
api_server.py CHANGED
@@ -14,10 +14,10 @@ from utils import format_timestamp, extract_video_id
14
  from models import MCPResponse
15
  import re
16
 
17
- # Загрузка переменных окружения
18
  load_dotenv()
19
 
20
- # Получение API ключа YouTube из переменных окружения
21
  YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY")
22
 
23
  app = FastAPI(
@@ -26,7 +26,7 @@ app = FastAPI(
26
  version="0.1.0",
27
  )
28
 
29
- # Настройка CORS
30
  app.add_middleware(
31
  CORSMiddleware,
32
  allow_origins=["*"],
@@ -35,7 +35,7 @@ app.add_middleware(
35
  allow_headers=["*"],
36
  )
37
 
38
- # Инициализация YouTube API клиента
39
  def get_youtube_client():
40
  if not YOUTUBE_API_KEY:
41
  raise HTTPException(status_code=500, detail="YouTube API key is not configured")
@@ -45,7 +45,7 @@ def get_youtube_client():
45
  except Exception as e:
46
  raise HTTPException(status_code=500, detail=f"YouTube API initialization error: {str(e)}")
47
 
48
- # Базовые модели данных для стандартных API запросов
49
  class SearchRequest(BaseModel):
50
  query: str
51
  max_results: Optional[int] = 10
@@ -63,28 +63,28 @@ class MCPRequestData(BaseModel):
63
  action: str
64
  parameters: Dict[str, Any]
65
 
66
- # Добавим новый маршрут для получения доступных языков транскрипта
67
  class TranscriptLanguagesRequest(BaseModel):
68
  video_id: str
69
 
70
- # Модель для запроса тайм-кодов
71
  class TimecodeRequest(BaseModel):
72
  video_id: str
73
  language_code: Optional[str] = None
74
- segment_length: Optional[int] = 60 # Длина сегмента в секундах
75
  format: Optional[str] = "youtube" # youtube, markdown
76
 
77
- # Загрузим модуль gemini_helper только после определения базовых моделей
78
  from gemini_helper import generate_timecodes_with_gemini, DEFAULT_MODEL
79
 
80
- # Модель для запроса тайм-кодов с помощью Gemini
81
  class GeminiTimecodeRequest(BaseModel):
82
  video_id: str
83
  language_code: Optional[str] = None
84
  format: Optional[str] = "youtube" # youtube, markdown
85
- model: Optional[str] = DEFAULT_MODEL # модель Gemini (если None, используется модель по умолчанию)
86
 
87
- # Теперь можно загрузить mcp_handlers
88
  from mcp_handlers import (
89
  MCPQueryRequest,
90
  MCPVideoRequest,
@@ -113,7 +113,7 @@ def normalize_language_code(language_code: str) -> str:
113
 
114
  return language_code
115
 
116
- # Стандартные API маршруты
117
  @app.post("/api/search")
118
  async def search_videos(request: SearchRequest):
119
  try:
@@ -150,7 +150,7 @@ async def search_videos(request: SearchRequest):
150
  @app.post("/api/video_info")
151
  async def get_video_info(request: VideoInfoRequest):
152
  try:
153
- # Извлекаем ID видео из ссылки, если это ссылка
154
  video_id = extract_video_id(request.video_id)
155
 
156
  youtube = get_youtube_client()
@@ -257,7 +257,7 @@ async def get_transcript(request: TranscriptRequest):
257
  @app.post("/api/transcript_languages")
258
  async def get_transcript_languages(request: TranscriptLanguagesRequest):
259
  try:
260
- # Извлекаем ID видео из ссылки, если это ссылка
261
  video_id = extract_video_id(request.video_id)
262
 
263
  try:
@@ -278,7 +278,7 @@ async def get_transcript_languages(request: TranscriptLanguagesRequest):
278
  except Exception as e:
279
  return {"error": f"Error getting language list: {str(e)}"}
280
 
281
- # MCP эндпоинты
282
  @app.post("/api/mcp")
283
  async def mcp_endpoint(request: MCPRequestData):
284
  try:
@@ -309,12 +309,12 @@ async def mcp_endpoint(request: MCPRequestData):
309
  except Exception as e:
310
  return create_error_response(f"Error processing request: {str(e)}")
311
 
312
- # Маршрут для проверки здоровья сервера
313
  @app.get("/health")
314
  async def health_check():
315
  return {"status": "ok"}
316
 
317
- # Информационный маршрут, описывающий возможности API
318
  @app.get("/")
319
  async def root():
320
  return {
@@ -341,11 +341,11 @@ async def root():
341
  @app.post("/api/timecodes")
342
  async def generate_timecodes(request: TimecodeRequest):
343
  try:
344
- # Извлекаем ID видео из ссылки, если это ссылка
345
  video_id = extract_video_id(request.video_id)
346
  print(f"Generating timecodes for ID: {video_id}")
347
 
348
- # Пытаемся получить список доступных языков
349
  available_languages = []
350
  try:
351
  transcript_list_obj = YouTubeTranscriptApi.list_transcripts(video_id)
@@ -359,11 +359,11 @@ async def generate_timecodes(request: TimecodeRequest):
359
  except Exception as e:
360
  print(f"Failed to get language list: {str(e)}")
361
 
362
- # Получаем транскрипт
363
  transcript_list = None
364
  used_language = None
365
 
366
- # Если указан язык, пробуем его использовать
367
  if request.language_code:
368
  try:
369
  print(f"Trying to get transcript in language: {request.language_code}")
@@ -373,7 +373,7 @@ async def generate_timecodes(request: TimecodeRequest):
373
  except Exception as e:
374
  print(f"Failed to get transcript in language {request.language_code}: {str(e)}")
375
 
376
- # Если транскрипт не получен и есть доступные языки, используем первый доступный
377
  if not transcript_list and available_languages:
378
  try:
379
  first_language = available_languages[0]["language_code"]
@@ -384,7 +384,7 @@ async def generate_timecodes(request: TimecodeRequest):
384
  except Exception as e:
385
  print(f"Failed to get transcript in language {first_language}: {str(e)}")
386
 
387
- # Если все еще нет транскрипта, пробуем получить на любом языке
388
  if not transcript_list:
389
  try:
390
  print("Trying to get transcript in any available language")
@@ -396,7 +396,7 @@ async def generate_timecodes(request: TimecodeRequest):
396
  if not transcript_list:
397
  return {"error": "Transcript for this video is unavailable"}
398
 
399
- # Группируем транскрипт по сегментам
400
  segments = []
401
  current_segment = {
402
  "start": transcript_list[0]["start"],
@@ -409,12 +409,12 @@ async def generate_timecodes(request: TimecodeRequest):
409
  for entry in transcript_list:
410
  start_time = entry["start"]
411
 
412
- # Если текущий сегмент пустой или запись находится в пределах длины сегмента
413
  if not current_segment["text"] or (start_time - current_segment["start"]) <= segment_length:
414
  current_segment["text"].append(entry["text"])
415
  current_segment["end"] = start_time + entry["duration"]
416
  else:
417
- # Закрываем текущий сегмент и начинаем новый
418
  segments.append(dict(current_segment))
419
  current_segment = {
420
  "start": start_time,
@@ -422,31 +422,31 @@ async def generate_timecodes(request: TimecodeRequest):
422
  "text": [entry["text"]]
423
  }
424
 
425
- # Добавляем последний сегмент
426
  if current_segment["text"]:
427
  segments.append(current_segment)
428
 
429
- # Форматируем тайм-коды в соответствии с выбранным форматом
430
  format_type = request.format.lower()
431
  timecodes = []
432
 
433
  for segment in segments:
434
  start_formatted = format_timestamp(segment["start"])
435
 
436
- # Суммарный текст сегмента (первые 100 символов)
437
  text_summary = " ".join(segment["text"])
438
  if len(text_summary) > 100:
439
  text_summary = text_summary[:97] + "..."
440
 
441
  if format_type == "youtube":
442
- # Формат для YouTube (для вставки в описание)
443
  timecodes.append(f"{start_formatted} {text_summary}")
444
  elif format_type == "markdown":
445
- # Формат для Markdown
446
  youtube_link = f"https://www.youtube.com/watch?v={video_id}&t={int(segment['start'])}"
447
  timecodes.append(f"- [{start_formatted}]({youtube_link}) {text_summary}")
448
 
449
- # Возвращаем тайм-коды и дополнительную информацию
450
  response = {
451
  "content": {
452
  "video_id": video_id,
@@ -519,7 +519,7 @@ async def generate_gemini_timecodes(request: GeminiTimecodeRequest):
519
  if not transcript_list:
520
  return {"error": "Transcript for this video is unavailable"}
521
 
522
- # Получаем информацию о видео для заголовка
523
  youtube = get_youtube_client()
524
  video_title = "YouTube Video"
525
 
@@ -534,7 +534,7 @@ async def generate_gemini_timecodes(request: GeminiTimecodeRequest):
534
  except Exception as e:
535
  print(f"Failed to get video information: {str(e)}")
536
 
537
- # Отправляем запрос в Gemini с указанием языка
538
  result = await generate_timecodes_with_gemini(
539
  transcript_entries=transcript_list,
540
  video_title=video_title,
@@ -546,7 +546,7 @@ async def generate_gemini_timecodes(request: GeminiTimecodeRequest):
546
  if "error" in result:
547
  return {"error": result["error"]}
548
 
549
- # Добавляем информацию о языке транскрипта
550
  if used_language:
551
  result["used_language"] = used_language
552
 
 
14
  from models import MCPResponse
15
  import re
16
 
17
+ # Load environment variables
18
  load_dotenv()
19
 
20
+ # Get YouTube API key from environment variables
21
  YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY")
22
 
23
  app = FastAPI(
 
26
  version="0.1.0",
27
  )
28
 
29
+ # Configure CORS
30
  app.add_middleware(
31
  CORSMiddleware,
32
  allow_origins=["*"],
 
35
  allow_headers=["*"],
36
  )
37
 
38
+ # Initialize YouTube API client
39
  def get_youtube_client():
40
  if not YOUTUBE_API_KEY:
41
  raise HTTPException(status_code=500, detail="YouTube API key is not configured")
 
45
  except Exception as e:
46
  raise HTTPException(status_code=500, detail=f"YouTube API initialization error: {str(e)}")
47
 
48
+ # Base data models for standard API requests
49
  class SearchRequest(BaseModel):
50
  query: str
51
  max_results: Optional[int] = 10
 
63
  action: str
64
  parameters: Dict[str, Any]
65
 
66
+ # Add new endpoint for getting available transcript languages
67
  class TranscriptLanguagesRequest(BaseModel):
68
  video_id: str
69
 
70
+ # Model for timecode requests
71
  class TimecodeRequest(BaseModel):
72
  video_id: str
73
  language_code: Optional[str] = None
74
+ segment_length: Optional[int] = 60 # Segment length in seconds
75
  format: Optional[str] = "youtube" # youtube, markdown
76
 
77
+ # Load gemini_helper module only after defining base models
78
  from gemini_helper import generate_timecodes_with_gemini, DEFAULT_MODEL
79
 
80
+ # Model for Gemini timecode requests
81
  class GeminiTimecodeRequest(BaseModel):
82
  video_id: str
83
  language_code: Optional[str] = None
84
  format: Optional[str] = "youtube" # youtube, markdown
85
+ model: Optional[str] = DEFAULT_MODEL # Gemini model (if None, uses default model)
86
 
87
+ # Now we can load mcp_handlers
88
  from mcp_handlers import (
89
  MCPQueryRequest,
90
  MCPVideoRequest,
 
113
 
114
  return language_code
115
 
116
+ # Standard API routes
117
  @app.post("/api/search")
118
  async def search_videos(request: SearchRequest):
119
  try:
 
150
  @app.post("/api/video_info")
151
  async def get_video_info(request: VideoInfoRequest):
152
  try:
153
+ # Extract video ID from URL if it's a URL
154
  video_id = extract_video_id(request.video_id)
155
 
156
  youtube = get_youtube_client()
 
257
  @app.post("/api/transcript_languages")
258
  async def get_transcript_languages(request: TranscriptLanguagesRequest):
259
  try:
260
+ # Extract video ID from URL if it's a URL
261
  video_id = extract_video_id(request.video_id)
262
 
263
  try:
 
278
  except Exception as e:
279
  return {"error": f"Error getting language list: {str(e)}"}
280
 
281
+ # MCP endpoints
282
  @app.post("/api/mcp")
283
  async def mcp_endpoint(request: MCPRequestData):
284
  try:
 
309
  except Exception as e:
310
  return create_error_response(f"Error processing request: {str(e)}")
311
 
312
+ # Route for health check
313
  @app.get("/health")
314
  async def health_check():
315
  return {"status": "ok"}
316
 
317
+ # Information route, describing API capabilities
318
  @app.get("/")
319
  async def root():
320
  return {
 
341
  @app.post("/api/timecodes")
342
  async def generate_timecodes(request: TimecodeRequest):
343
  try:
344
+ # Extract video ID from URL if it's a URL
345
  video_id = extract_video_id(request.video_id)
346
  print(f"Generating timecodes for ID: {video_id}")
347
 
348
+ # Try to get list of available languages
349
  available_languages = []
350
  try:
351
  transcript_list_obj = YouTubeTranscriptApi.list_transcripts(video_id)
 
359
  except Exception as e:
360
  print(f"Failed to get language list: {str(e)}")
361
 
362
+ # Get transcript
363
  transcript_list = None
364
  used_language = None
365
 
366
+ # If language is specified, try to use it
367
  if request.language_code:
368
  try:
369
  print(f"Trying to get transcript in language: {request.language_code}")
 
373
  except Exception as e:
374
  print(f"Failed to get transcript in language {request.language_code}: {str(e)}")
375
 
376
+ # If transcript not obtained and there are available languages, use first available
377
  if not transcript_list and available_languages:
378
  try:
379
  first_language = available_languages[0]["language_code"]
 
384
  except Exception as e:
385
  print(f"Failed to get transcript in language {first_language}: {str(e)}")
386
 
387
+ # If still no transcript, try to get on any language
388
  if not transcript_list:
389
  try:
390
  print("Trying to get transcript in any available language")
 
396
  if not transcript_list:
397
  return {"error": "Transcript for this video is unavailable"}
398
 
399
+ # Group transcript by segments
400
  segments = []
401
  current_segment = {
402
  "start": transcript_list[0]["start"],
 
409
  for entry in transcript_list:
410
  start_time = entry["start"]
411
 
412
+ # If current segment is empty or entry is within segment length
413
  if not current_segment["text"] or (start_time - current_segment["start"]) <= segment_length:
414
  current_segment["text"].append(entry["text"])
415
  current_segment["end"] = start_time + entry["duration"]
416
  else:
417
+ # Close current segment and start new
418
  segments.append(dict(current_segment))
419
  current_segment = {
420
  "start": start_time,
 
422
  "text": [entry["text"]]
423
  }
424
 
425
+ # Add last segment
426
  if current_segment["text"]:
427
  segments.append(current_segment)
428
 
429
+ # Format timecodes according to selected format
430
  format_type = request.format.lower()
431
  timecodes = []
432
 
433
  for segment in segments:
434
  start_formatted = format_timestamp(segment["start"])
435
 
436
+ # Summary text of segment (first 100 characters)
437
  text_summary = " ".join(segment["text"])
438
  if len(text_summary) > 100:
439
  text_summary = text_summary[:97] + "..."
440
 
441
  if format_type == "youtube":
442
+ # Format for YouTube (for embedding in description)
443
  timecodes.append(f"{start_formatted} {text_summary}")
444
  elif format_type == "markdown":
445
+ # Format for Markdown
446
  youtube_link = f"https://www.youtube.com/watch?v={video_id}&t={int(segment['start'])}"
447
  timecodes.append(f"- [{start_formatted}]({youtube_link}) {text_summary}")
448
 
449
+ # Return timecodes and additional information
450
  response = {
451
  "content": {
452
  "video_id": video_id,
 
519
  if not transcript_list:
520
  return {"error": "Transcript for this video is unavailable"}
521
 
522
+ # Get video information for video title
523
  youtube = get_youtube_client()
524
  video_title = "YouTube Video"
525
 
 
534
  except Exception as e:
535
  print(f"Failed to get video information: {str(e)}")
536
 
537
+ # Send request to Gemini with language specified
538
  result = await generate_timecodes_with_gemini(
539
  transcript_entries=transcript_list,
540
  video_title=video_title,
 
546
  if "error" in result:
547
  return {"error": result["error"]}
548
 
549
+ # Add transcript language information
550
  if used_language:
551
  result["used_language"] = used_language
552
 
cloudflare-config.yml ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ tunnel: 3b196473-a231-4909-b8dd-0a1e34e2a9c5
2
+ credentials-file: C:\Users\belov\.cloudflared\3b196473-a231-4909-b8dd-0a1e34e2a9c5.json
3
+
4
+ # Protocol and connection settings
5
+ protocol: http2
6
+ no-autoupdate: true
7
+ grace-period: 30s
8
+ retries: 5
9
+ loglevel: info
10
+
11
+ ingress:
12
+ - hostname: youtube-bot.tuttech.net
13
+ service: http://127.0.0.1:8080
14
+ originRequest:
15
+ noTLSVerify: true
16
+ connectTimeout: 30s
17
+ tlsTimeout: 10s
18
+ keepAliveTimeout: 90s
19
+ keepAliveConnections: 10
20
+ - service: http_status:404
deploy_changes.py DELETED
@@ -1,53 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Скрипт для быстрого деплоя изменений
4
- Usage: python deploy_changes.py "commit message"
5
- """
6
-
7
- import sys
8
- import subprocess
9
- import os
10
-
11
- def run_command(command, description):
12
- """Выполнить команду с описанием"""
13
- print(f"🔄 {description}...")
14
- try:
15
- result = subprocess.run(command, shell=True, check=True, capture_output=True, text=True)
16
- print(f"✅ {description} - успешно")
17
- return True
18
- except subprocess.CalledProcessError as e:
19
- print(f"❌ {description} - ошибка: {e.stderr}")
20
- return False
21
-
22
- def main():
23
- if len(sys.argv) != 2:
24
- print("Usage: python deploy_changes.py \"commit message\"")
25
- print("Example: python deploy_changes.py \"Fix telegram bot MCP parameters\"")
26
- sys.exit(1)
27
-
28
- commit_message = sys.argv[1]
29
-
30
- print("🚀 Начинаем деплой изменений...")
31
-
32
- # 1. Git add
33
- if not run_command("git add .", "Добавление файлов в git"):
34
- return
35
-
36
- # 2. Git commit
37
- if not run_command(f'git commit -m "{commit_message}"', "Создание коммита"):
38
- return
39
-
40
- # 3. Git push
41
- if not run_command("git push", "Отправка в удаленный репозиторий"):
42
- return
43
-
44
- print("\n✅ Изменения успешно отправлены!")
45
- print("\n📋 Следующие шаги на удаленном сервере:")
46
- print("1. git pull")
47
- print("2. Перезапустить MCP сервер: python main.py --mode api --host 0.0.0.0 --port 8080")
48
- print("3. Перезапустить Telegram бота: python run_telegram_bot.py")
49
- print("\n🔗 Или используйте команду:")
50
- print("ssh your-server 'cd /path/to/project && git pull && pkill -f main.py && python main.py --mode api --host 0.0.0.0 --port 8080 &'")
51
-
52
- if __name__ == "__main__":
53
- main()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
gemini_helper.py CHANGED
@@ -1,6 +1,7 @@
1
  import os
2
  from google import genai
3
  from google.genai import types
 
4
  from dotenv import load_dotenv
5
  from typing import List, Dict, Any, Optional
6
  import traceback
@@ -18,6 +19,24 @@ if GEMINI_API_KEY:
18
  try:
19
  client = genai.Client(api_key=GEMINI_API_KEY)
20
  print("Gemini client successfully initialized")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  except Exception as e:
22
  print(f"Error initializing Gemini client: {str(e)}")
23
  traceback.print_exc()
@@ -32,27 +51,52 @@ ALTERNATIVE_MODELS = ["gemini-1.5-flash-001"]
32
  def format_transcript_for_prompt(transcript_entries: List[Dict[str, Any]], video_duration_seconds: int = None) -> str:
33
  """Formats transcript for passing to prompt."""
34
  formatted_transcript = ""
35
-
36
  # Determine maximum time in transcript if video duration is not provided
37
  if video_duration_seconds is None:
38
  if transcript_entries:
39
  last_entry = transcript_entries[-1]
40
- max_time = last_entry.get("start", 0) + last_entry.get("duration", 0)
 
 
 
 
 
 
41
  video_duration_seconds = int(max_time) + 10 # Add small buffer
42
-
43
- for entry in transcript_entries:
44
- start_time = entry.get("start", 0)
45
- text = entry.get("text", "")
46
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  # Check that time doesn't exceed total video duration
48
  if video_duration_seconds and start_time > video_duration_seconds:
49
  continue
50
-
51
  # Format time in hours:minutes:seconds format
52
  time_str = format_time_hms(start_time)
53
-
54
  formatted_transcript += f"[{time_str}] {text}\n"
55
-
56
  return formatted_transcript
57
 
58
  def format_time_hms(seconds: float) -> str:
@@ -63,13 +107,13 @@ def format_time_hms(seconds: float) -> str:
63
  hours = int(seconds // 3600)
64
  minutes = int((seconds % 3600) // 60)
65
  secs = int(seconds % 60)
66
-
67
  if hours > 0:
68
  return f"{hours:02d}:{minutes:02d}:{secs:02d}"
69
  else:
70
  return f"{minutes:02d}:{secs:02d}"
71
 
72
- def get_timecode_prompt(video_title: str, transcript: str, format_type: str = "youtube", language: str = None, video_duration_minutes: int = None) -> str:
73
  """Creates prompt for generating timecodes based on transcript."""
74
 
75
  # Determine prompt language based on video language
@@ -78,64 +122,33 @@ def get_timecode_prompt(video_title: str, transcript: str, format_type: str = "y
78
  example_description = "Discussion of main principles"
79
  elif language and language.lower().startswith('ru'):
80
  target_language = "Russian"
81
- example_description = "Обсуждение основных принципов"
82
  else:
83
  target_language = "the same language as the video transcript"
84
  example_description = "Discussion of main principles"
85
 
86
- # Determine number of timecodes based on video duration
87
- if video_duration_minutes:
88
- if video_duration_minutes <= 30:
89
- timecode_count = "10-15"
90
- elif video_duration_minutes <= 60:
91
- timecode_count = "15-20"
92
- else:
93
- timecode_count = "20-30"
94
- else:
95
- timecode_count = "15-25"
96
-
97
- if format_type == "youtube":
98
- format_instructions = (
99
- f"Format should be: MM:SS Topic description for videos under 1 hour, or HH:MM:SS Topic description for longer videos\n"
100
- f"Example: 05:30 {example_description} or 1:05:30 {example_description}\n"
101
- f"This format is suitable for YouTube video descriptions."
102
- )
103
- elif format_type == "markdown":
104
- format_instructions = (
105
- f"Format should be Markdown: - [MM:SS](link) Topic description for videos under 1 hour, or - [HH:MM:SS](link) Topic description for longer videos\n"
106
- f"Example: - [05:30](https://youtu.be/VIDEOID?t=330) {example_description} or - [1:05:30](https://youtu.be/VIDEOID?t=3930) {example_description}\n"
107
- f"This format creates clickable links in Markdown."
108
- )
109
- else: # txt
110
- format_instructions = (
111
- f"Format should be: MM:SS - Topic description for videos under 1 hour, or HH:MM:SS - Topic description for longer videos\n"
112
- f"Example: 05:30 - {example_description} or 1:05:30 - {example_description}\n"
113
- f"This format is suitable for plain text representation."
114
- )
115
-
116
  prompt = f"""
117
- You are an expert at creating timestamps for YouTube videos. You have been provided with a transcript of the video "{video_title}".
118
-
119
- Your task is to create timestamps for the main themes and segments of the video based on the provided transcript.
120
- Create timestamp descriptions in {target_language}.
121
-
122
- {format_instructions}
123
-
124
- Rules for creating timestamps:
125
- 1. Select {timecode_count} key video segments
126
- 2. Use the time markers provided in the transcript to determine the start of each segment
127
- 3. Create brief (3-7 words) descriptions for each segment that reflect its main theme, using appropriate terminology and style
128
- 4. Distribute timestamps approximately evenly throughout the video length
129
- 5. Use MM:SS format for videos under 1 hour (example: 05:30, 45:20), and HH:MM:SS format for videos 1 hour or longer (example: 1:05:30, 1:45:20)
130
- 6. DO NOT include standard markers like "Video start" or "Video end"
131
- 7. Ensure a clear structure so viewers can easily navigate through the video
132
- 8. The first timestamp does NOT have to be 00:00, start with the first meaningful topic
133
-
134
- Here is the video transcript:
135
-
136
- {transcript}
137
-
138
- Create a list of timestamps in the specified format. Reply with ONLY the list of timestamps, without introduction or conclusion.
139
  """
140
 
141
  return prompt
@@ -149,14 +162,14 @@ async def generate_timecodes_with_gemini(
149
  ) -> Dict[str, Any]:
150
  """
151
  Generates timecodes using Gemini based on transcript.
152
-
153
  Args:
154
  transcript_entries: List of transcript entries
155
  video_title: Video title
156
  format_type: Timecode format (youtube, markdown)
157
  model_name: Gemini model name (defaults to DEFAULT_MODEL)
158
  language: Transcript language (if known)
159
-
160
  Returns:
161
  Dictionary with generation results
162
  """
@@ -164,16 +177,24 @@ async def generate_timecodes_with_gemini(
164
  return {
165
  "error": "Gemini API key is not configured. Please add GEMINI_API_KEY to .env file"
166
  }
167
-
168
  try:
169
  print(f"Starting timecode generation with model: {model_name or DEFAULT_MODEL}")
170
-
171
  # Determine transcript language if not provided
172
  detected_language = language
173
  if not detected_language:
174
  # Simple heuristic for language detection from first 10 segments
175
- text_sample = " ".join([entry.get("text", "") for entry in transcript_entries[:10]])
176
-
 
 
 
 
 
 
 
 
177
  # Set of Ukrainian letters that differ from Russian alphabet
178
  ukrainian_specific = set("ґєії")
179
  # If there's at least one specific Ukrainian letter
@@ -187,33 +208,76 @@ async def generate_timecodes_with_gemini(
187
  else:
188
  detected_language = "en"
189
  print("Detected transcript language: English (or other)")
190
-
191
  # Determine video duration (in seconds and minutes)
192
  video_duration_seconds = 0
 
193
  if transcript_entries:
194
  last_entry = transcript_entries[-1]
195
- video_duration_seconds = last_entry.get("start", 0) + last_entry.get("duration", 0)
 
 
 
 
196
  video_duration_minutes = int(video_duration_seconds / 60)
197
  print(f"Determined video duration: {video_duration_minutes} minutes ({video_duration_seconds} seconds)")
 
 
 
 
 
 
 
 
 
 
198
  else:
199
  video_duration_minutes = None
200
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
201
  # Format transcript for prompt
202
  formatted_transcript = format_transcript_for_prompt(transcript_entries, video_duration_seconds)
203
-
204
  # Create prompt considering language and duration
 
 
 
 
 
 
 
 
205
  prompt = get_timecode_prompt(
206
- video_title,
207
- formatted_transcript,
208
- format_type,
209
- detected_language,
210
- video_duration_minutes
 
 
211
  )
212
  print(f"Prompt prepared, length: {len(prompt)} characters")
213
-
214
  # List of models to try
215
  models_to_try = [model_name or DEFAULT_MODEL] + [m for m in ALTERNATIVE_MODELS if m != (model_name or DEFAULT_MODEL)]
216
-
217
  last_error = None
218
  for current_model in models_to_try:
219
  try:
@@ -228,14 +292,14 @@ async def generate_timecodes_with_gemini(
228
  )
229
  )
230
  print(f"Response received: {type(response)}")
231
-
232
  # Get response text
233
  timecodes_text = response.text
234
  print(f"Response text length: {len(timecodes_text)}")
235
-
236
  # Split into lines and clean
237
  timecodes = [line.strip() for line in timecodes_text.split('\n') if line.strip()]
238
-
239
  # Filter timecodes to remove "video start" and "video end"
240
  filtered_timecodes = []
241
  for tc in timecodes:
@@ -246,15 +310,13 @@ async def generate_timecodes_with_gemini(
246
  # Skip timecodes with "video start" or "video end"
247
  lowercase_desc = description.lower()
248
  if any(phrase in lowercase_desc for phrase in [
249
- "начало видео", "конец видео", "початок відео", "кінець відео",
250
- "start of video", "end of video", "video start", "video end",
251
  "beginning", "conclusion", "intro", "outro"
252
  ]):
253
  continue
254
  filtered_timecodes.append(tc)
255
-
256
  # If too many timecodes, select evenly distributed ones
257
- max_timecodes = 25 # Maximum recommended number of timecodes
258
  if len(filtered_timecodes) > max_timecodes:
259
  print(f"Too many timecodes ({len(filtered_timecodes)}), reducing to {max_timecodes}")
260
  # Calculate step for selecting timecodes evenly
@@ -268,9 +330,9 @@ async def generate_timecodes_with_gemini(
268
  final_timecodes = [filtered_timecodes[i] for i in indices]
269
  else:
270
  final_timecodes = filtered_timecodes
271
-
272
  print(f"Final timecodes count after processing: {len(final_timecodes)}")
273
-
274
  return {
275
  "timecodes": final_timecodes,
276
  "format": format_type,
@@ -284,7 +346,7 @@ async def generate_timecodes_with_gemini(
284
  traceback.print_exc()
285
  last_error = api_error
286
  continue
287
-
288
  # If all models failed
289
  return {
290
  "error": f"Failed to execute request with any model. Last error: {str(last_error)}"
@@ -294,4 +356,4 @@ async def generate_timecodes_with_gemini(
294
  traceback.print_exc()
295
  return {
296
  "error": f"Error generating timecodes with Gemini: {str(e)}"
297
- }
 
1
  import os
2
  from google import genai
3
  from google.genai import types
4
+ from google.api_core import retry
5
  from dotenv import load_dotenv
6
  from typing import List, Dict, Any, Optional
7
  import traceback
 
19
  try:
20
  client = genai.Client(api_key=GEMINI_API_KEY)
21
  print("Gemini client successfully initialized")
22
+
23
+ # Configure retry logic for API errors
24
+ def is_retriable(e):
25
+ return (isinstance(e, Exception) and
26
+ (hasattr(e, 'code') and e.code in {429, 503}))
27
+
28
+ # Apply retry to generate_content method
29
+ if hasattr(client.aio.models, 'generate_content'):
30
+ original_method = client.aio.models.generate_content
31
+ client.aio.models.generate_content = retry.Retry(
32
+ predicate=is_retriable,
33
+ initial=1.0, # Initial delay in seconds
34
+ maximum=60.0, # Maximum delay in seconds
35
+ multiplier=2.0, # Backoff multiplier
36
+ deadline=300.0 # Total timeout in seconds
37
+ )(original_method)
38
+ print("Retry logic configured for Gemini API")
39
+
40
  except Exception as e:
41
  print(f"Error initializing Gemini client: {str(e)}")
42
  traceback.print_exc()
 
51
  def format_transcript_for_prompt(transcript_entries: List[Dict[str, Any]], video_duration_seconds: int = None) -> str:
52
  """Formats transcript for passing to prompt."""
53
  formatted_transcript = ""
54
+
55
  # Determine maximum time in transcript if video duration is not provided
56
  if video_duration_seconds is None:
57
  if transcript_entries:
58
  last_entry = transcript_entries[-1]
59
+ # Handle both dict format and FetchedTranscriptSnippet objects
60
+ if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object
61
+ max_time = last_entry.start + last_entry.duration
62
+ elif isinstance(last_entry, dict): # Dict format
63
+ max_time = last_entry.get("start", 0) + last_entry.get("duration", 0)
64
+ else:
65
+ max_time = 0
66
  video_duration_seconds = int(max_time) + 10 # Add small buffer
67
+
68
+ # For very long videos (>60 min), sample transcript to ensure full coverage
69
+ if video_duration_seconds and video_duration_seconds > 3600: # More than 60 minutes
70
+ # Sample every 3rd entry to reduce size but maintain coverage
71
+ sampled_entries = transcript_entries[::3]
72
+ print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total")
73
+ elif video_duration_seconds and video_duration_seconds > 1800: # More than 30 minutes
74
+ # Sample every 2nd entry
75
+ sampled_entries = transcript_entries[::2]
76
+ print(f"Sampled transcript: {len(sampled_entries)} entries from {len(transcript_entries)} total")
77
+ else:
78
+ sampled_entries = transcript_entries
79
+
80
+ for entry in sampled_entries:
81
+ # Handle both dict format and FetchedTranscriptSnippet objects
82
+ if hasattr(entry, 'start'): # FetchedTranscriptSnippet object
83
+ start_time = entry.start
84
+ text = entry.text
85
+ elif isinstance(entry, dict): # Dict format
86
+ start_time = entry.get("start", 0)
87
+ text = entry.get("text", "")
88
+ else:
89
+ continue # Skip invalid entries
90
+
91
  # Check that time doesn't exceed total video duration
92
  if video_duration_seconds and start_time > video_duration_seconds:
93
  continue
94
+
95
  # Format time in hours:minutes:seconds format
96
  time_str = format_time_hms(start_time)
97
+
98
  formatted_transcript += f"[{time_str}] {text}\n"
99
+
100
  return formatted_transcript
101
 
102
  def format_time_hms(seconds: float) -> str:
 
107
  hours = int(seconds // 3600)
108
  minutes = int((seconds % 3600) // 60)
109
  secs = int(seconds % 60)
110
+
111
  if hours > 0:
112
  return f"{hours:02d}:{minutes:02d}:{secs:02d}"
113
  else:
114
  return f"{minutes:02d}:{secs:02d}"
115
 
116
+ def get_timecode_prompt(video_title: str, transcript: str, format_type: str = "youtube", language: str = None, video_duration_minutes: int = None, timecode_count: str = None, interval_text: str = None) -> str:
117
  """Creates prompt for generating timecodes based on transcript."""
118
 
119
  # Determine prompt language based on video language
 
122
  example_description = "Discussion of main principles"
123
  elif language and language.lower().startswith('ru'):
124
  target_language = "Russian"
125
+ example_description = "Discussion of main principles"
126
  else:
127
  target_language = "the same language as the video transcript"
128
  example_description = "Discussion of main principles"
129
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
130
  prompt = f"""
131
+ You are a YouTube assistant. Analyze the FULL TRANSCRIPT below and identify all major topic shifts or sections.
132
+
133
+ Your task:
134
+ - Generate timestamps that cover the ENTIRE {video_duration_minutes}-minute video
135
+ - Each timestamp must be paired with a precise time from the transcript
136
+ - Timestamps must reflect the actual content flow throughout the video
137
+
138
+ Format requirements:
139
+ - Plain text output ONLY
140
+ - Each line format: MM:SS Topic description (or HH:MM:SS for longer videos)
141
+ - Use {target_language} for descriptions (3-6 words each)
142
+ - Start with early timestamp (first few minutes)
143
+ - End with late timestamp (last 10-15 minutes of video)
144
+ - NO explanations, NO numbering, NO extra text
145
+
146
+ CRITICAL: The transcript below spans {video_duration_minutes} minutes. You MUST create timestamps that span from beginning to end, not just the first portion.
147
+
148
+ Full transcript to analyze:
149
+ {transcript}
150
+
151
+ Generate {timecode_count} timestamps covering the complete {video_duration_minutes}-minute duration:
 
152
  """
153
 
154
  return prompt
 
162
  ) -> Dict[str, Any]:
163
  """
164
  Generates timecodes using Gemini based on transcript.
165
+
166
  Args:
167
  transcript_entries: List of transcript entries
168
  video_title: Video title
169
  format_type: Timecode format (youtube, markdown)
170
  model_name: Gemini model name (defaults to DEFAULT_MODEL)
171
  language: Transcript language (if known)
172
+
173
  Returns:
174
  Dictionary with generation results
175
  """
 
177
  return {
178
  "error": "Gemini API key is not configured. Please add GEMINI_API_KEY to .env file"
179
  }
180
+
181
  try:
182
  print(f"Starting timecode generation with model: {model_name or DEFAULT_MODEL}")
183
+
184
  # Determine transcript language if not provided
185
  detected_language = language
186
  if not detected_language:
187
  # Simple heuristic for language detection from first 10 segments
188
+ # Handle both dict format and FetchedTranscriptSnippet objects
189
+ text_sample_parts = []
190
+ for entry in transcript_entries[:10]:
191
+ if hasattr(entry, 'text'): # FetchedTranscriptSnippet object
192
+ text_sample_parts.append(entry.text)
193
+ elif isinstance(entry, dict): # Dict format
194
+ text_sample_parts.append(entry.get("text", ""))
195
+
196
+ text_sample = " ".join(text_sample_parts)
197
+
198
  # Set of Ukrainian letters that differ from Russian alphabet
199
  ukrainian_specific = set("ґєії")
200
  # If there's at least one specific Ukrainian letter
 
208
  else:
209
  detected_language = "en"
210
  print("Detected transcript language: English (or other)")
211
+
212
  # Determine video duration (in seconds and minutes)
213
  video_duration_seconds = 0
214
+ max_timecodes = 30 # Default value
215
  if transcript_entries:
216
  last_entry = transcript_entries[-1]
217
+ # Handle both dict format and FetchedTranscriptSnippet objects
218
+ if hasattr(last_entry, 'start'): # FetchedTranscriptSnippet object
219
+ video_duration_seconds = last_entry.start + last_entry.duration
220
+ elif isinstance(last_entry, dict): # Dict format
221
+ video_duration_seconds = last_entry.get("start", 0) + last_entry.get("duration", 0)
222
  video_duration_minutes = int(video_duration_seconds / 60)
223
  print(f"Determined video duration: {video_duration_minutes} minutes ({video_duration_seconds} seconds)")
224
+
225
+ # Set max_timecodes based on video duration
226
+ if video_duration_minutes <= 30:
227
+ max_timecodes = 20
228
+ elif video_duration_minutes <= 60:
229
+ max_timecodes = 35
230
+ elif video_duration_minutes <= 120:
231
+ max_timecodes = 50
232
+ else:
233
+ max_timecodes = 60
234
  else:
235
  video_duration_minutes = None
236
+
237
+ # Determine number of timecodes based on video duration
238
+ if video_duration_minutes:
239
+ if video_duration_minutes <= 30:
240
+ timecode_count = "8-12"
241
+ max_timecodes = 15
242
+ elif video_duration_minutes <= 60:
243
+ timecode_count = "12-18"
244
+ max_timecodes = 20
245
+ elif video_duration_minutes <= 120:
246
+ timecode_count = "18-25"
247
+ max_timecodes = 30
248
+ else:
249
+ timecode_count = "25-35"
250
+ max_timecodes = 40
251
+ else:
252
+ timecode_count = "10-15"
253
+ max_timecodes = 20
254
+
255
  # Format transcript for prompt
256
  formatted_transcript = format_transcript_for_prompt(transcript_entries, video_duration_seconds)
257
+
258
  # Create prompt considering language and duration
259
+ # Calculate recommended interval for timestamps
260
+ if video_duration_minutes and timecode_count:
261
+ target_count = int(timecode_count.split('-')[0]) if timecode_count.split('-')[0].isdigit() else 20
262
+ interval_minutes = video_duration_minutes // target_count
263
+ interval_text = f"approximately every {interval_minutes}-{interval_minutes + 2} minutes"
264
+ else:
265
+ interval_text = "evenly throughout the video"
266
+
267
  prompt = get_timecode_prompt(
268
+ video_title,
269
+ formatted_transcript,
270
+ format_type,
271
+ detected_language,
272
+ video_duration_minutes,
273
+ timecode_count,
274
+ interval_text
275
  )
276
  print(f"Prompt prepared, length: {len(prompt)} characters")
277
+
278
  # List of models to try
279
  models_to_try = [model_name or DEFAULT_MODEL] + [m for m in ALTERNATIVE_MODELS if m != (model_name or DEFAULT_MODEL)]
280
+
281
  last_error = None
282
  for current_model in models_to_try:
283
  try:
 
292
  )
293
  )
294
  print(f"Response received: {type(response)}")
295
+
296
  # Get response text
297
  timecodes_text = response.text
298
  print(f"Response text length: {len(timecodes_text)}")
299
+
300
  # Split into lines and clean
301
  timecodes = [line.strip() for line in timecodes_text.split('\n') if line.strip()]
302
+
303
  # Filter timecodes to remove "video start" and "video end"
304
  filtered_timecodes = []
305
  for tc in timecodes:
 
310
  # Skip timecodes with "video start" or "video end"
311
  lowercase_desc = description.lower()
312
  if any(phrase in lowercase_desc for phrase in [
313
+ "video start", "video end", "start of video", "end of video",
 
314
  "beginning", "conclusion", "intro", "outro"
315
  ]):
316
  continue
317
  filtered_timecodes.append(tc)
318
+
319
  # If too many timecodes, select evenly distributed ones
 
320
  if len(filtered_timecodes) > max_timecodes:
321
  print(f"Too many timecodes ({len(filtered_timecodes)}), reducing to {max_timecodes}")
322
  # Calculate step for selecting timecodes evenly
 
330
  final_timecodes = [filtered_timecodes[i] for i in indices]
331
  else:
332
  final_timecodes = filtered_timecodes
333
+
334
  print(f"Final timecodes count after processing: {len(final_timecodes)}")
335
+
336
  return {
337
  "timecodes": final_timecodes,
338
  "format": format_type,
 
346
  traceback.print_exc()
347
  last_error = api_error
348
  continue
349
+
350
  # If all models failed
351
  return {
352
  "error": f"Failed to execute request with any model. Last error: {str(last_error)}"
 
356
  traceback.print_exc()
357
  return {
358
  "error": f"Error generating timecodes with Gemini: {str(e)}"
359
+ }
gradio_app.py CHANGED
@@ -233,27 +233,27 @@ with gr.Blocks(title="YouTube MCP") as demo:
233
  gr.Markdown("# YouTube Model Context Protocol (MCP)")
234
  gr.Markdown("This interface allows interaction with YouTube API through MCP protocol")
235
 
236
- with gr.Tab("Поиск видео"):
237
  with gr.Row():
238
  with gr.Column():
239
- search_query = gr.Textbox(label="Поисковый запрос", placeholder="Введите запрос...")
240
  with gr.Row():
241
- max_results = gr.Slider(minimum=1, maximum=50, value=10, step=1, label="Количество результатов")
242
  order = gr.Dropdown(
243
  choices=["relevance", "date", "viewCount", "rating", "title"],
244
  value="relevance",
245
- label="Сортировка"
246
  )
247
  video_duration = gr.Dropdown(
248
  choices=["any", "short", "medium", "long"],
249
  value="any",
250
- label="Длительность"
251
  )
252
- search_button = gr.Button("Поиск")
253
 
254
  with gr.Column():
255
- search_results = gr.Markdown(label="Результаты")
256
- search_json = gr.JSON(label="JSON данные")
257
 
258
  search_button.click(
259
  search_youtube,
@@ -261,18 +261,18 @@ with gr.Blocks(title="YouTube MCP") as demo:
261
  outputs=[search_results, search_json]
262
  )
263
 
264
- with gr.Tab("Информация о видео"):
265
  with gr.Row():
266
  with gr.Column():
267
  video_id_input = gr.Textbox(
268
- label="ID видео или ссылка на видео",
269
- placeholder="Введите ID видео или полную ссылку (youtube.com, youtu.be, shorts, embed)..."
270
  )
271
- get_info_button = gr.Button("Получить информацию")
272
 
273
  with gr.Column():
274
- video_info_output = gr.Markdown(label="Информация о видео")
275
- video_info_json = gr.JSON(label="JSON данные")
276
 
277
  get_info_button.click(
278
  get_video_info,
@@ -280,21 +280,21 @@ with gr.Blocks(title="YouTube MCP") as demo:
280
  outputs=[video_info_output, video_info_json]
281
  )
282
 
283
- with gr.Tab("Транскрипт видео"):
284
  with gr.Row():
285
  with gr.Column():
286
  transcript_video_id = gr.Textbox(
287
- label="ID видео или ссылка на видео",
288
- placeholder="Введите ID видео или полную ссылку (youtube.com, youtu.be, shorts, embed)..."
289
  )
290
- language_code = gr.Textbox(label="Код языка (опционально)", placeholder="ru, en, etc...")
291
  with gr.Row():
292
- get_transcript_button = gr.Button("Получить транскрипт")
293
- get_languages_button = gr.Button("Получить доступные языки")
294
 
295
  with gr.Column():
296
- transcript_output = gr.Markdown(label="Транскрипт")
297
- transcript_json = gr.JSON(label="JSON данные")
298
 
299
  get_transcript_button.click(
300
  get_transcript,
@@ -308,25 +308,25 @@ with gr.Blocks(title="YouTube MCP") as demo:
308
  outputs=[transcript_output, transcript_json]
309
  )
310
 
311
- with gr.Tab("Тайм-коды"):
312
  with gr.Row():
313
  with gr.Column():
314
  timecode_video_id = gr.Textbox(
315
- label="ID видео или ссылка на видео",
316
- placeholder="Введите ID видео или полную ссылку (youtube.com, youtu.be, shorts, embed)..."
317
  )
318
- timecode_language = gr.Textbox(label="Код языка (опционально)", placeholder="ru, en, etc...")
319
- segment_length = gr.Slider(minimum=30, maximum=300, value=60, step=30, label="Длина сегмента (секунды)")
320
  format_type = gr.Dropdown(
321
  choices=["youtube", "markdown"],
322
  value="youtube",
323
- label="Формат тайм-кодов"
324
  )
325
- generate_timecodes_button = gr.Button("Сгенерировать тайм-коды")
326
 
327
  with gr.Column():
328
- timecodes_output = gr.Markdown(label="Тайм-коды")
329
- timecodes_json = gr.JSON(label="JSON данные")
330
 
331
  generate_timecodes_button.click(
332
  generate_timecodes,
@@ -334,30 +334,30 @@ with gr.Blocks(title="YouTube MCP") as demo:
334
  outputs=[timecodes_output, timecodes_json]
335
  )
336
 
337
- with gr.Tab("Gemini Тайм-коды"):
338
  with gr.Row():
339
  with gr.Column():
340
  gemini_video_id = gr.Textbox(
341
- label="ID видео или ссылка на видео",
342
- placeholder="Введите ID видео или полную ссылку (youtube.com, youtu.be, shorts, embed)..."
343
  )
344
- gemini_language = gr.Textbox(label="Код языка (опционально)", placeholder="ru, en, etc...")
345
  gemini_format = gr.Dropdown(
346
  choices=["youtube", "markdown"],
347
  value="youtube",
348
- label="Формат тайм-кодов"
349
  )
350
  gemini_model = gr.Dropdown(
351
  choices=["gemini-2.0-flash-001", "gemini-2.0-pro-001", "gemini-2.0-pro-vision-001"],
352
  value="gemini-2.0-flash-001",
353
- label="Модель Gemini"
354
  )
355
- generate_gemini_button = gr.Button("Сгенерировать тайм-коды с Gemini")
356
 
357
  with gr.Column():
358
- gemini_output = gr.Markdown(label="Информация о генерации")
359
- gemini_timecodes = gr.Textbox(label="Тайм-коды", lines=10, max_lines=20, show_copy_button=True)
360
- gemini_json = gr.JSON(label="JSON данные")
361
 
362
  async def process_gemini_result(video_id, language_code, format_type, model):
363
  result = await generate_gemini_timecodes(video_id, language_code, format_type, model)
@@ -378,6 +378,6 @@ with gr.Blocks(title="YouTube MCP") as demo:
378
  outputs=[gemini_output, gemini_timecodes, gemini_json]
379
  )
380
 
381
- # Запуск приложения
382
  if __name__ == "__main__":
383
  demo.launch()
 
233
  gr.Markdown("# YouTube Model Context Protocol (MCP)")
234
  gr.Markdown("This interface allows interaction with YouTube API through MCP protocol")
235
 
236
+ with gr.Tab("Video Search"):
237
  with gr.Row():
238
  with gr.Column():
239
+ search_query = gr.Textbox(label="Search Query", placeholder="Enter search query...")
240
  with gr.Row():
241
+ max_results = gr.Slider(minimum=1, maximum=50, value=10, step=1, label="Number of Results")
242
  order = gr.Dropdown(
243
  choices=["relevance", "date", "viewCount", "rating", "title"],
244
  value="relevance",
245
+ label="Sort By"
246
  )
247
  video_duration = gr.Dropdown(
248
  choices=["any", "short", "medium", "long"],
249
  value="any",
250
+ label="Duration"
251
  )
252
+ search_button = gr.Button("Search")
253
 
254
  with gr.Column():
255
+ search_results = gr.Markdown(label="Results")
256
+ search_json = gr.JSON(label="JSON Data")
257
 
258
  search_button.click(
259
  search_youtube,
 
261
  outputs=[search_results, search_json]
262
  )
263
 
264
+ with gr.Tab("Video Information"):
265
  with gr.Row():
266
  with gr.Column():
267
  video_id_input = gr.Textbox(
268
+ label="Video ID or URL",
269
+ placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..."
270
  )
271
+ get_info_button = gr.Button("Get Information")
272
 
273
  with gr.Column():
274
+ video_info_output = gr.Markdown(label="Video Information")
275
+ video_info_json = gr.JSON(label="JSON Data")
276
 
277
  get_info_button.click(
278
  get_video_info,
 
280
  outputs=[video_info_output, video_info_json]
281
  )
282
 
283
+ with gr.Tab("Video Transcript"):
284
  with gr.Row():
285
  with gr.Column():
286
  transcript_video_id = gr.Textbox(
287
+ label="Video ID or URL",
288
+ placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..."
289
  )
290
+ language_code = gr.Textbox(label="Language Code (optional)", placeholder="ru, en, etc...")
291
  with gr.Row():
292
+ get_transcript_button = gr.Button("Get Transcript")
293
+ get_languages_button = gr.Button("Get Available Languages")
294
 
295
  with gr.Column():
296
+ transcript_output = gr.Markdown(label="Transcript")
297
+ transcript_json = gr.JSON(label="JSON Data")
298
 
299
  get_transcript_button.click(
300
  get_transcript,
 
308
  outputs=[transcript_output, transcript_json]
309
  )
310
 
311
+ with gr.Tab("Timecodes"):
312
  with gr.Row():
313
  with gr.Column():
314
  timecode_video_id = gr.Textbox(
315
+ label="Video ID or URL",
316
+ placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..."
317
  )
318
+ timecode_language = gr.Textbox(label="Language Code (optional)", placeholder="ru, en, etc...")
319
+ segment_length = gr.Slider(minimum=30, maximum=300, value=60, step=30, label="Segment Length (seconds)")
320
  format_type = gr.Dropdown(
321
  choices=["youtube", "markdown"],
322
  value="youtube",
323
+ label="Timecode Format"
324
  )
325
+ generate_timecodes_button = gr.Button("Generate Timecodes")
326
 
327
  with gr.Column():
328
+ timecodes_output = gr.Markdown(label="Timecodes")
329
+ timecodes_json = gr.JSON(label="JSON Data")
330
 
331
  generate_timecodes_button.click(
332
  generate_timecodes,
 
334
  outputs=[timecodes_output, timecodes_json]
335
  )
336
 
337
+ with gr.Tab("Gemini Timecodes"):
338
  with gr.Row():
339
  with gr.Column():
340
  gemini_video_id = gr.Textbox(
341
+ label="Video ID or URL",
342
+ placeholder="Enter video ID or full URL (youtube.com, youtu.be, shorts, embed)..."
343
  )
344
+ gemini_language = gr.Textbox(label="Language Code (optional)", placeholder="ru, en, etc...")
345
  gemini_format = gr.Dropdown(
346
  choices=["youtube", "markdown"],
347
  value="youtube",
348
+ label="Timecode Format"
349
  )
350
  gemini_model = gr.Dropdown(
351
  choices=["gemini-2.0-flash-001", "gemini-2.0-pro-001", "gemini-2.0-pro-vision-001"],
352
  value="gemini-2.0-flash-001",
353
+ label="Gemini Model"
354
  )
355
+ generate_gemini_button = gr.Button("Generate Timecodes with Gemini")
356
 
357
  with gr.Column():
358
+ gemini_output = gr.Markdown(label="Generation Information")
359
+ gemini_timecodes = gr.Textbox(label="Timecodes", lines=10, max_lines=20, show_copy_button=True)
360
+ gemini_json = gr.JSON(label="JSON Data")
361
 
362
  async def process_gemini_result(video_id, language_code, format_type, model):
363
  result = await generate_gemini_timecodes(video_id, language_code, format_type, model)
 
378
  outputs=[gemini_output, gemini_timecodes, gemini_json]
379
  )
380
 
381
+ # Launch the application
382
  if __name__ == "__main__":
383
  demo.launch()
mcp_handlers.py CHANGED
@@ -41,6 +41,7 @@ class MCPGeminiRequest(BaseModel):
41
  async def process_mcp_search(youtube_client, request: MCPQueryRequest) -> List[MCPResponse]:
42
  """Process MCP request for video search."""
43
  try:
 
44
  search_response = youtube_client.search().list(
45
  q=request.query,
46
  part="snippet",
@@ -48,41 +49,70 @@ async def process_mcp_search(youtube_client, request: MCPQueryRequest) -> List[M
48
  type="video"
49
  ).execute()
50
 
 
 
51
  results = []
52
- for item in search_response.get("items", []):
53
- video_id = item["id"]["videoId"]
54
- snippet = item["snippet"]
55
-
56
- # Create MCP format response
57
- video_data = {
58
- "video_id": video_id,
59
- "title": snippet["title"],
60
- "description": snippet["description"],
61
- "thumbnail": snippet["thumbnails"]["high"]["url"],
62
- "channel_title": snippet["channelTitle"],
63
- "published_at": snippet["publishedAt"]
64
- }
65
-
66
- # Format markdown for video display
67
- markdown_text = (
68
- f"## {snippet['title']}\n"
69
- f"**Channel:** {snippet['channelTitle']}\n"
70
- f"**Published:** {snippet['publishedAt']}\n\n"
71
- f"[![Thumbnail]({snippet['thumbnails']['high']['url']})](https://www.youtube.com/watch?v={video_id})\n\n"
72
- f"{snippet['description'][:300]}...\n\n"
73
- f"[Watch on YouTube](https://www.youtube.com/watch?v={video_id})"
74
- )
75
-
76
- results.append(MCPResponse(
77
- type="youtube_video",
78
- markdown=markdown_text,
79
- data=video_data
80
- ))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
81
 
 
82
  return results
83
  except HttpError as e:
 
84
  raise HTTPException(status_code=500, detail=f"YouTube API error: {str(e)}")
85
  except Exception as e:
 
86
  raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
87
 
88
  async def process_mcp_video_info(youtube_client, request: MCPVideoRequest) -> MCPResponse:
@@ -120,8 +150,20 @@ async def process_mcp_video_info(youtube_client, request: MCPVideoRequest) -> MC
120
  }
121
 
122
  return MCPResponse(
123
- type="text",
124
- content=f"Video information:\n{json.dumps(video_data, indent=2, ensure_ascii=False)}"
 
 
 
 
 
 
 
 
 
 
 
 
125
  )
126
  except HttpError as e:
127
  return MCPResponse(
@@ -139,53 +181,99 @@ async def process_mcp_transcript(request: MCPTranscriptRequest) -> MCPResponse:
139
  try:
140
  # Extract video ID from URL if it's a URL
141
  video_id = extract_video_id(request.video_id)
 
 
 
142
 
143
  try:
144
- languages = [request.language_code] if request.language_code else None
145
- transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=languages)
146
- except Exception as transcript_error:
 
147
  if request.language_code:
 
148
  try:
149
- print(f"Failed to get transcript in language {request.language_code}, trying to get available transcripts")
150
- transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
151
- except Exception as fallback_error:
152
- return MCPResponse(
153
- type="error",
154
- error=f"Transcript not found. Details: {str(fallback_error)}"
155
- )
156
- else:
157
- return MCPResponse(
158
- type="error",
159
- error=f"Failed to get transcript. Details: {str(transcript_error)}"
160
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
161
 
162
- if not transcript_list:
 
 
163
  return MCPResponse(
164
  type="error",
165
- error="Transcript for this video is unavailable"
166
  )
167
 
 
 
 
168
  formatted_transcript = []
169
  for entry in transcript_list:
170
- formatted_transcript.append({
171
- "text": entry.get("text", ""),
172
- "start": entry.get("start", 0),
173
- "duration": entry.get("duration", 0)
174
- })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
175
 
176
  # Format markdown for transcript display
177
- markdown_text = "# Transcript\n\n"
178
  for entry in formatted_transcript:
179
- start_time = entry.get("start")
180
- duration = entry.get("duration")
181
  end_time = start_time + duration
182
- text = entry.get("text")
183
 
184
  # Convert time to hours:minutes:seconds format
185
  start_formatted = format_timestamp(start_time)
186
  end_formatted = format_timestamp(end_time)
187
 
188
- markdown_text += f"[{start_formatted} - {end_formatted}] {text}\n\n"
189
 
190
  return MCPResponse(
191
  type="youtube_transcript",
@@ -196,6 +284,7 @@ async def process_mcp_transcript(request: MCPTranscriptRequest) -> MCPResponse:
196
  }
197
  )
198
  except Exception as e:
 
199
  return MCPResponse(
200
  type="error",
201
  error=f"Error getting transcript: {str(e)}"
@@ -236,29 +325,57 @@ async def process_mcp_timecodes(youtube_client, request: MCPTimecodeRequest) ->
236
  video_id = extract_video_id(request.video_id)
237
 
238
  # Get transcript
 
 
239
  try:
240
- languages = [request.language_code] if request.language_code else None
241
- transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=languages)
242
- except Exception as transcript_error:
 
243
  if request.language_code:
 
244
  try:
245
- print(f"Failed to get transcript in language {request.language_code}, trying to get available transcripts")
246
- transcript_list = YouTubeTranscriptApi.get_transcript(video_id)
247
- except Exception as fallback_error:
248
- return MCPResponse(
249
- type="error",
250
- error=f"Transcript not found. Details: {str(fallback_error)}"
251
- )
252
- else:
253
- return MCPResponse(
254
- type="error",
255
- error=f"Failed to get transcript. Details: {str(transcript_error)}"
256
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
257
 
258
- if not transcript_list:
 
 
259
  return MCPResponse(
260
  type="error",
261
- error="Transcript for this video is unavailable"
262
  )
263
 
264
  # Group transcript into segments
@@ -356,36 +473,65 @@ async def process_mcp_gemini_timecodes(youtube_client, request: MCPGeminiRequest
356
  """Process MCP request for Gemini timecode generation."""
357
  try:
358
  # Get transcript
 
 
 
359
  try:
360
- languages = [request.language_code] if request.language_code else None
361
- transcript_list = YouTubeTranscriptApi.get_transcript(request.video_id, languages=languages)
362
- except Exception as transcript_error:
 
363
  if request.language_code:
 
364
  try:
365
- print(f"Failed to get transcript in language {request.language_code}, trying to get available transcripts")
366
- transcript_list = YouTubeTranscriptApi.get_transcript(request.video_id)
367
- except Exception as fallback_error:
368
- return MCPResponse(
369
- type="error",
370
- error=f"Transcript not found. Details: {str(fallback_error)}"
371
- )
372
- else:
373
- return MCPResponse(
374
- type="error",
375
- error=f"Failed to get transcript. Details: {str(transcript_error)}"
376
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
377
 
378
- if not transcript_list:
 
 
379
  return MCPResponse(
380
  type="error",
381
- error="Transcript for this video is unavailable"
382
  )
383
 
384
  # Get video information for title
385
  try:
386
  video_response = youtube_client.videos().list(
387
  part="snippet",
388
- id=request.video_id
389
  ).execute()
390
 
391
  if video_response.get("items"):
 
41
  async def process_mcp_search(youtube_client, request: MCPQueryRequest) -> List[MCPResponse]:
42
  """Process MCP request for video search."""
43
  try:
44
+ print(f"Starting search for query: {request.query}")
45
  search_response = youtube_client.search().list(
46
  q=request.query,
47
  part="snippet",
 
49
  type="video"
50
  ).execute()
51
 
52
+ print(f"Search response received. Items count: {len(search_response.get('items', []))}")
53
+
54
  results = []
55
+ for i, item in enumerate(search_response.get("items", [])):
56
+ try:
57
+ print(f"Processing item {i}: {type(item)}")
58
+ print(f"Item keys: {list(item.keys()) if isinstance(item, dict) else 'Not a dict'}")
59
+
60
+ # Check if 'id' exists and has the right structure
61
+ if 'id' not in item:
62
+ print(f"Warning: 'id' key not found in item {i}")
63
+ continue
64
+
65
+ item_id = item["id"]
66
+ print(f"Item id type: {type(item_id)}, value: {item_id}")
67
+
68
+ if isinstance(item_id, dict) and "videoId" in item_id:
69
+ video_id = item_id["videoId"]
70
+ elif isinstance(item_id, str):
71
+ video_id = item_id
72
+ else:
73
+ print(f"Warning: Unexpected id structure in item {i}: {item_id}")
74
+ continue
75
+
76
+ snippet = item["snippet"]
77
+
78
+ # Create MCP format response
79
+ video_data = {
80
+ "video_id": video_id,
81
+ "title": snippet["title"],
82
+ "description": snippet["description"],
83
+ "thumbnail": snippet["thumbnails"]["high"]["url"],
84
+ "channel_title": snippet["channelTitle"],
85
+ "published_at": snippet["publishedAt"]
86
+ }
87
+
88
+ # Format markdown for video display
89
+ markdown_text = (
90
+ f"## {snippet['title']}\n"
91
+ f"**Channel:** {snippet['channelTitle']}\n"
92
+ f"**Published:** {snippet['publishedAt']}\n\n"
93
+ f"[![Thumbnail]({snippet['thumbnails']['high']['url']})](https://www.youtube.com/watch?v={video_id})\n\n"
94
+ f"{snippet['description'][:300]}...\n\n"
95
+ f"[Watch on YouTube](https://www.youtube.com/watch?v={video_id})"
96
+ )
97
+
98
+ results.append(MCPResponse(
99
+ type="youtube_video",
100
+ markdown=markdown_text,
101
+ data=video_data
102
+ ))
103
+ print(f"Successfully processed item {i}")
104
+ except Exception as item_error:
105
+ print(f"Error processing item {i}: {str(item_error)}")
106
+ print(f"Item data: {item}")
107
+ continue
108
 
109
+ print(f"Search completed. Total results: {len(results)}")
110
  return results
111
  except HttpError as e:
112
+ print(f"YouTube API HttpError: {str(e)}")
113
  raise HTTPException(status_code=500, detail=f"YouTube API error: {str(e)}")
114
  except Exception as e:
115
+ print(f"Unexpected error in search: {str(e)}")
116
  raise HTTPException(status_code=500, detail=f"Unexpected error: {str(e)}")
117
 
118
  async def process_mcp_video_info(youtube_client, request: MCPVideoRequest) -> MCPResponse:
 
150
  }
151
 
152
  return MCPResponse(
153
+ type="youtube_video_info",
154
+ data=video_data,
155
+ markdown=f"""# 📹 Video Information
156
+
157
+ **🎬 Title:** {video_data['title']}
158
+ **👤 Channel:** {video_data['channel_title']}
159
+ **📅 Published:** {video_data['published_at']}
160
+ **👁️ Views:** {video_data.get('view_count', 'N/A')}
161
+ **👍 Likes:** {video_data.get('like_count', 'N/A')}
162
+ **💬 Comments:** {video_data.get('comment_count', 'N/A')}
163
+ **⏱️ Duration:** {video_data.get('duration', 'N/A')}
164
+
165
+ [🔗 Watch on YouTube](https://www.youtube.com/watch?v={video_id})
166
+ """
167
  )
168
  except HttpError as e:
169
  return MCPResponse(
 
181
  try:
182
  # Extract video ID from URL if it's a URL
183
  video_id = extract_video_id(request.video_id)
184
+ print(f"Getting transcript for video: {video_id}")
185
+
186
+ transcript_list = None
187
 
188
  try:
189
+ # First, try to get list of available transcripts
190
+ available_transcripts = YouTubeTranscriptApi.list_transcripts(video_id)
191
+
192
+ # Try to get transcript with specified language first
193
  if request.language_code:
194
+ print(f"Trying to get transcript in requested language: {request.language_code}")
195
  try:
196
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[request.language_code])
197
+ except Exception as lang_error:
198
+ print(f"Requested language {request.language_code} failed: {str(lang_error)}")
199
+ # Fall through to try other languages
200
+
201
+ # If no transcript yet, try to find any available transcript
202
+ if not transcript_list:
203
+ print("Trying to get any available transcript")
204
+ for transcript in available_transcripts:
205
+ try:
206
+ print(f"Trying language: {transcript.language_code} ({transcript.language})")
207
+ # Try direct fetch first
208
+ transcript_list = transcript.fetch()
209
+ print(f"Successfully got transcript in {transcript.language_code}")
210
+ break
211
+ except Exception as lang_error:
212
+ print(f"Direct fetch failed for {transcript.language_code}: {str(lang_error)}")
213
+ # Try alternative method for problematic transcripts (usually Russian/Ukrainian)
214
+ try:
215
+ print(f"Trying alternative method for {transcript.language_code}")
216
+ # Use direct API call with language code
217
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[transcript.language_code])
218
+ print(f"Alternative method succeeded for {transcript.language_code}")
219
+ break
220
+ except Exception as alt_error:
221
+ print(f"Alternative method also failed for {transcript.language_code}: {str(alt_error)}")
222
+ continue
223
+
224
+ except Exception as transcript_error:
225
+ print(f"Failed to get transcript list: {str(transcript_error)}")
226
+ return MCPResponse(
227
+ type="error",
228
+ error="Transcript not available for this video. The video may not have subtitles or may be restricted."
229
+ )
230
 
231
+ # Check if we got a valid transcript
232
+ if not transcript_list or len(transcript_list) == 0:
233
+ print("No transcript data received")
234
  return MCPResponse(
235
  type="error",
236
+ error="No transcript available for this video. The video may not have subtitles."
237
  )
238
 
239
+ print(f"Successfully got transcript with {len(transcript_list)} entries")
240
+
241
+ # Format transcript entries
242
  formatted_transcript = []
243
  for entry in transcript_list:
244
+ # Handle both dict format (from direct API) and FetchedTranscriptSnippet objects
245
+ if hasattr(entry, 'text'): # FetchedTranscriptSnippet object
246
+ formatted_transcript.append({
247
+ "text": entry.text,
248
+ "start": entry.start,
249
+ "duration": entry.duration
250
+ })
251
+ elif isinstance(entry, dict): # Dict format
252
+ formatted_transcript.append({
253
+ "text": entry.get("text", ""),
254
+ "start": entry.get("start", 0),
255
+ "duration": entry.get("duration", 0)
256
+ })
257
+
258
+ if not formatted_transcript:
259
+ return MCPResponse(
260
+ type="error",
261
+ error="Transcript data is invalid or empty."
262
+ )
263
 
264
  # Format markdown for transcript display
265
+ markdown_text = "# 📝 Transcript\n\n"
266
  for entry in formatted_transcript:
267
+ start_time = entry.get("start", 0)
268
+ duration = entry.get("duration", 0)
269
  end_time = start_time + duration
270
+ text = entry.get("text", "")
271
 
272
  # Convert time to hours:minutes:seconds format
273
  start_formatted = format_timestamp(start_time)
274
  end_formatted = format_timestamp(end_time)
275
 
276
+ markdown_text += f"**[{start_formatted} - {end_formatted}]** {text}\n\n"
277
 
278
  return MCPResponse(
279
  type="youtube_transcript",
 
284
  }
285
  )
286
  except Exception as e:
287
+ print(f"Unexpected error in transcript processing: {str(e)}")
288
  return MCPResponse(
289
  type="error",
290
  error=f"Error getting transcript: {str(e)}"
 
325
  video_id = extract_video_id(request.video_id)
326
 
327
  # Get transcript
328
+ transcript_list = None
329
+
330
  try:
331
+ # First, try to get list of available transcripts
332
+ available_transcripts = YouTubeTranscriptApi.list_transcripts(video_id)
333
+
334
+ # Try to get transcript with specified language first
335
  if request.language_code:
336
+ print(f"Trying to get transcript for timecodes in requested language: {request.language_code}")
337
  try:
338
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[request.language_code])
339
+ except Exception as lang_error:
340
+ print(f"Requested language {request.language_code} failed: {str(lang_error)}")
341
+ # Fall through to try other languages
342
+
343
+ # If no transcript yet, try to find any available transcript
344
+ if not transcript_list:
345
+ print("Trying to get any available transcript for timecodes")
346
+ for transcript in available_transcripts:
347
+ try:
348
+ print(f"Trying language: {transcript.language_code} ({transcript.language})")
349
+ # Try direct fetch first
350
+ transcript_list = transcript.fetch()
351
+ print(f"Successfully got transcript in {transcript.language_code}")
352
+ break
353
+ except Exception as lang_error:
354
+ print(f"Direct fetch failed for {transcript.language_code}: {str(lang_error)}")
355
+ # Try alternative method for problematic transcripts (usually Russian/Ukrainian)
356
+ try:
357
+ print(f"Trying alternative method for {transcript.language_code}")
358
+ # Use direct API call with language code
359
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[transcript.language_code])
360
+ print(f"Alternative method succeeded for {transcript.language_code}")
361
+ break
362
+ except Exception as alt_error:
363
+ print(f"Alternative method also failed for {transcript.language_code}: {str(alt_error)}")
364
+ continue
365
+
366
+ except Exception as transcript_error:
367
+ print(f"Failed to get transcript list for timecodes: {str(transcript_error)}")
368
+ return MCPResponse(
369
+ type="error",
370
+ error="Transcript not available for timecode generation. The video may not have subtitles or may be restricted."
371
+ )
372
 
373
+ # Check if we got a valid transcript
374
+ if not transcript_list or len(transcript_list) == 0:
375
+ print("No transcript data received for timecodes")
376
  return MCPResponse(
377
  type="error",
378
+ error="No transcript available for timecode generation. The video may not have subtitles."
379
  )
380
 
381
  # Group transcript into segments
 
473
  """Process MCP request for Gemini timecode generation."""
474
  try:
475
  # Get transcript
476
+ transcript_list = None
477
+ video_id = extract_video_id(request.video_id)
478
+
479
  try:
480
+ # First, try to get list of available transcripts
481
+ available_transcripts = YouTubeTranscriptApi.list_transcripts(video_id)
482
+
483
+ # Try to get transcript with specified language first
484
  if request.language_code:
485
+ print(f"Trying to get transcript for Gemini timecodes in requested language: {request.language_code}")
486
  try:
487
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[request.language_code])
488
+ except Exception as lang_error:
489
+ print(f"Requested language {request.language_code} failed: {str(lang_error)}")
490
+ # Fall through to try other languages
491
+
492
+ # If no transcript yet, try to find any available transcript
493
+ if not transcript_list:
494
+ print("Trying to get any available transcript for Gemini timecodes")
495
+ for transcript in available_transcripts:
496
+ try:
497
+ print(f"Trying language: {transcript.language_code} ({transcript.language})")
498
+ # Try direct fetch first
499
+ transcript_list = transcript.fetch()
500
+ print(f"Successfully got transcript in {transcript.language_code}")
501
+ break
502
+ except Exception as lang_error:
503
+ print(f"Direct fetch failed for {transcript.language_code}: {str(lang_error)}")
504
+ # Try alternative method for problematic transcripts (usually Russian/Ukrainian)
505
+ try:
506
+ print(f"Trying alternative method for {transcript.language_code}")
507
+ # Use direct API call with language code
508
+ transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[transcript.language_code])
509
+ print(f"Alternative method succeeded for {transcript.language_code}")
510
+ break
511
+ except Exception as alt_error:
512
+ print(f"Alternative method also failed for {transcript.language_code}: {str(alt_error)}")
513
+ continue
514
+
515
+ except Exception as transcript_error:
516
+ print(f"Failed to get transcript list for Gemini timecodes: {str(transcript_error)}")
517
+ return MCPResponse(
518
+ type="error",
519
+ error="Transcript not available for AI timecode generation. The video may not have subtitles or may be restricted."
520
+ )
521
 
522
+ # Check if we got a valid transcript
523
+ if not transcript_list or len(transcript_list) == 0:
524
+ print("No transcript data received for Gemini timecodes")
525
  return MCPResponse(
526
  type="error",
527
+ error="No transcript available for AI timecode generation. The video may not have subtitles."
528
  )
529
 
530
  # Get video information for title
531
  try:
532
  video_response = youtube_client.videos().list(
533
  part="snippet",
534
+ id=video_id
535
  ).execute()
536
 
537
  if video_response.get("items"):
run_telegram_bot.py CHANGED
@@ -22,7 +22,7 @@ def setup_logging():
22
 
23
  if __name__ == "__main__":
24
  print("🤖 Starting TubeMeta Telegram Bot...")
25
- print("📋 Make sure your MCP server is running at: https://ag-source-knowledge-internal.trycloudflare.com")
26
  print("📱 Bot username: @tubemeta_bot")
27
  print("🔗 Bot link: https://t.me/tubemeta_bot")
28
  print("⏹️ Press Ctrl+C to stop\n")
 
22
 
23
  if __name__ == "__main__":
24
  print("🤖 Starting TubeMeta Telegram Bot...")
25
+ print("📋 Make sure your MCP server is running at: https://youtube-bot.tuttech.net")
26
  print("📱 Bot username: @tubemeta_bot")
27
  print("🔗 Bot link: https://t.me/tubemeta_bot")
28
  print("⏹️ Press Ctrl+C to stop\n")
telegram_bot.py CHANGED
@@ -1,5 +1,4 @@
1
  import asyncio
2
- import json
3
  import logging
4
  from typing import Optional
5
  import aiohttp
@@ -8,13 +7,45 @@ from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQu
8
  from telegram.constants import ParseMode
9
  import os
10
  from dotenv import load_dotenv
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
 
12
  # Load environment variables
13
  load_dotenv()
14
 
15
  # Configuration
16
- TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN", "8168377961:AAHG-9KyFczCTkBo92ZS9OYKIY62s-NmuVo")
17
- MCP_BASE_URL = os.getenv("MCP_BASE_URL", "https://ag-source-knowledge-internal.trycloudflare.com/api/mcp")
 
 
 
 
18
 
19
  # Set up logging
20
  logging.basicConfig(
@@ -23,19 +54,27 @@ logging.basicConfig(
23
  )
24
  logger = logging.getLogger(__name__)
25
 
 
 
 
 
 
 
 
26
  class TubeMetaBot:
27
  def __init__(self):
28
  self.app = Application.builder().token(TELEGRAM_TOKEN).build()
29
  self.setup_handlers()
30
-
31
  def setup_handlers(self):
32
  """Set up command and message handlers"""
33
  self.app.add_handler(CommandHandler("start", self.start_command))
34
  self.app.add_handler(CommandHandler("help", self.help_command))
35
  self.app.add_handler(CommandHandler("search", self.search_command))
 
36
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
37
- self.app.add_handler(CallbackQueryHandler(self.handle_callback))
38
-
39
  async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
40
  """Handle /start command"""
41
  welcome_text = """
@@ -43,7 +82,7 @@ class TubeMetaBot:
43
 
44
  I can help you with YouTube videos:
45
  • 🔍 Search for videos
46
- • 📊 Get video metadata
47
  • 📝 Extract transcripts
48
  • ⏰ Generate AI timecodes with Gemini 2.0
49
 
@@ -55,7 +94,7 @@ I can help you with YouTube videos:
55
  Type `/help` for more information!
56
  """
57
  await update.message.reply_text(welcome_text, parse_mode=ParseMode.MARKDOWN)
58
-
59
  async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
60
  """Handle /help command"""
61
  help_text = """
@@ -65,17 +104,18 @@ Type `/help` for more information!
65
  • `/start` - Welcome message
66
  • `/help` - Show this help
67
  • `/search <query>` - Search YouTube videos
 
68
 
69
  **Features:**
70
  • 🔍 **Video Search** - Find YouTube videos by keywords
71
- • 📊 **Video Info** - Get detailed metadata (title, duration, views, etc.)
72
  • 📝 **Transcripts** - Extract video transcripts/subtitles
73
  • ⏰ **AI Timecodes** - Generate smart timecodes with Gemini 2.0
74
 
75
  **Usage Examples:**
76
- • Send YouTube URL: `https://youtu.be/dQw4w9WgXcQ`
77
  • Search: `/search machine learning tutorial`
78
- Or just send: `python programming`
 
79
 
80
  **Supported Languages:**
81
  🇺🇦 Ukrainian | 🇷🇺 Russian | 🇬🇧 English
@@ -83,109 +123,196 @@ Type `/help` for more information!
83
  Powered by Gemini 2.0 AI 🧠
84
  """
85
  await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN)
86
-
87
  async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
88
  """Handle /search command"""
89
  if not context.args:
90
  await update.message.reply_text("Please provide a search query. Example: `/search python tutorial`")
91
  return
92
-
93
  query = " ".join(context.args)
94
  await self.handle_search(update, query)
95
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
97
  """Handle regular text messages"""
98
  text = update.message.text.strip()
99
-
100
  # Check if it's a YouTube URL
101
  if self.is_youtube_url(text):
102
- await self.handle_youtube_url(update, text)
 
 
 
 
 
103
  else:
104
  # Treat as search query
105
  await self.handle_search(update, text)
106
-
107
  def is_youtube_url(self, text: str) -> bool:
108
  """Check if text contains a YouTube URL"""
109
  youtube_domains = [
110
- 'youtube.com', 'youtu.be', 'www.youtube.com',
111
  'm.youtube.com', 'music.youtube.com'
112
  ]
113
  return any(domain in text.lower() for domain in youtube_domains)
114
-
115
  async def handle_youtube_url(self, update: Update, url: str):
116
  """Handle YouTube URL - provide full analysis options"""
117
  # Send initial message
118
  processing_msg = await update.message.reply_text("🔍 Analyzing YouTube video...")
119
-
120
  try:
121
  # Get basic video info first
122
- video_info = await self.call_mcp_action("video_info", {"video_id": url})
123
-
124
- if not video_info or "error" in video_info:
 
125
  await processing_msg.edit_text("❌ Could not analyze this YouTube video. Please check the URL.")
126
  return
127
-
128
- # Format video info
129
- info_text = self.format_video_info(video_info)
130
-
 
 
 
 
 
 
 
 
 
 
 
131
  # Create action buttons
 
 
 
 
 
 
 
 
132
  keyboard = [
133
  [
134
- InlineKeyboardButton("📝 Get Transcript", callback_data=f"transcript:{url}"),
135
- InlineKeyboardButton("⏰ AI Timecodes", callback_data=f"timecodes:{url}")
136
  ],
137
  [
138
- InlineKeyboardButton("🔍 Search Similar", callback_data=f"search:{video_info.get('title', 'related videos')}")
139
  ]
140
  ]
141
  reply_markup = InlineKeyboardMarkup(keyboard)
142
-
143
- await processing_msg.edit_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN)
144
-
145
  except Exception as e:
146
  logger.error(f"Error handling YouTube URL: {e}")
147
  await processing_msg.edit_text("❌ An error occurred while analyzing the video.")
148
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
149
  async def handle_search(self, update: Update, query: str):
150
  """Handle search query"""
151
- processing_msg = await update.message.reply_text(f"🔍 Searching for: *{query}*", parse_mode=ParseMode.MARKDOWN)
152
-
153
  try:
154
  results = await self.call_mcp_action("search", {"query": query, "max_results": 5})
155
-
156
- if not results or "error" in results:
157
  await processing_msg.edit_text("❌ No results found for your search.")
158
  return
159
-
160
  # Format search results
161
- search_text = f"🔍 **Search Results for:** {query}\n\n"
162
-
163
- for i, video in enumerate(results.get("videos", []), 1):
164
- search_text += f"**{i}. {video.get('title', 'Unknown Title')}**\n"
165
- search_text += f"👤 {video.get('channel', 'Unknown Channel')}\n"
166
- search_text += f"⏱️ {video.get('duration', 'Unknown')}\n"
167
- search_text += f"👁️ {video.get('view_count', 'Unknown')} views\n"
168
- search_text += f"🔗 {video.get('url', '')}\n\n"
169
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
170
  # Add search refinement buttons
171
  keyboard = [
172
  [InlineKeyboardButton("🔍 New Search", callback_data="new_search")]
173
  ]
174
  reply_markup = InlineKeyboardMarkup(keyboard)
175
-
176
- await processing_msg.edit_text(search_text, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN)
177
-
178
  except Exception as e:
179
  logger.error(f"Error handling search: {e}")
180
  await processing_msg.edit_text("❌ An error occurred during search.")
181
-
182
- async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
183
- """Handle inline keyboard callbacks"""
184
  query = update.callback_query
185
  await query.answer()
186
-
187
  data = query.data
188
-
 
189
  if data.startswith("transcript:"):
190
  url = data.replace("transcript:", "")
191
  await self.get_transcript(query, url)
@@ -195,143 +322,601 @@ Powered by Gemini 2.0 AI 🧠
195
  elif data.startswith("search:"):
196
  search_query = data.replace("search:", "")
197
  await self.handle_search_callback(query, search_query)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
198
  elif data == "new_search":
199
- await query.edit_message_text("🔍 Send me a search query or YouTube URL!")
200
-
 
 
 
 
 
201
  async def get_transcript(self, query, url: str):
202
  """Get video transcript"""
203
  await query.edit_message_text("📝 Extracting transcript...")
204
-
205
  try:
206
- transcript = await self.call_mcp_action("transcript", {"video_id": url})
207
-
208
- if not transcript or "error" in transcript:
209
- await query.edit_message_text("❌ Could not extract transcript. Video may not have subtitles or may be restricted.")
 
210
  return
211
-
212
- # Format transcript
213
- transcript_text = f"📝 **Transcript**\n\n{transcript.get('text', 'No transcript available')}"
214
-
215
- # Telegram message limit is 4096 characters
216
- if len(transcript_text) > 4000:
217
- transcript_text = transcript_text[:4000] + "...\n\n*Transcript truncated due to length*"
218
-
219
- # Add back button
220
- keyboard = [[InlineKeyboardButton("⬅️ Back", callback_data=f"back:{url}")]]
221
- reply_markup = InlineKeyboardMarkup(keyboard)
222
-
223
- await query.edit_message_text(transcript_text, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN)
224
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
225
  except Exception as e:
226
  logger.error(f"Error getting transcript: {e}")
227
  await query.edit_message_text("❌ An error occurred while extracting transcript.")
228
-
229
  async def get_timecodes(self, query, url: str):
230
  """Generate AI timecodes"""
231
  await query.edit_message_text("⏰ Generating AI timecodes with Gemini 2.0...")
232
-
233
  try:
234
- timecodes = await self.call_mcp_action("gemini_timecodes", {
235
  "video_id": url,
236
- "language_code": "en",
237
  "format": "youtube"
238
  })
239
-
240
- if not timecodes or "error" in timecodes:
241
- await query.edit_message_text("❌ Could not generate timecodes. Video may not have transcript or may be restricted.")
 
242
  return
243
-
244
- # Format timecodes
245
- timecodes_text = f"⏰ **AI Generated Timecodes**\n\n{timecodes.get('timecodes', 'No timecodes generated')}"
246
-
247
- # Telegram message limit
248
- if len(timecodes_text) > 4000:
249
- timecodes_text = timecodes_text[:4000] + "...\n\n*Timecodes truncated due to length*"
250
-
251
- # Add back button
252
- keyboard = [[InlineKeyboardButton("⬅️ Back", callback_data=f"back:{url}")]]
253
- reply_markup = InlineKeyboardMarkup(keyboard)
254
-
255
- await query.edit_message_text(timecodes_text, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN)
256
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
257
  except Exception as e:
258
  logger.error(f"Error generating timecodes: {e}")
259
  await query.edit_message_text("❌ An error occurred while generating timecodes.")
260
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
261
  async def handle_search_callback(self, query, search_query: str):
262
- """Handle search from callback"""
263
- await query.edit_message_text(f"🔍 Searching for: *{search_query}*", parse_mode=ParseMode.MARKDOWN)
264
-
265
- # Simulate update object for reuse of search logic
266
- class FakeUpdate:
267
- def __init__(self, message):
268
- self.message = message
269
-
270
- fake_update = FakeUpdate(query.message)
271
- await self.handle_search(fake_update, search_query)
272
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
273
  async def call_mcp_action(self, action: str, params: dict) -> Optional[dict]:
274
  """Call MCP server action"""
275
  try:
276
- async with aiohttp.ClientSession() as session:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
277
  payload = {
278
  "action": action,
279
  "parameters": params
280
  }
281
-
282
- async with session.post(MCP_BASE_URL, json=payload, timeout=30) as response:
283
- if response.status == 200:
284
- return await response.json()
285
- else:
286
- logger.error(f"MCP server error: {response.status}")
287
- return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
288
  except Exception as e:
289
  logger.error(f"Error calling MCP server: {e}")
290
  return None
291
-
292
  def format_video_info(self, video_info: dict) -> str:
293
- """Format video information for display"""
294
  title = video_info.get("title", "Unknown Title")
295
  channel = video_info.get("channel", "Unknown Channel")
296
  duration = video_info.get("duration", "Unknown")
297
  view_count = video_info.get("view_count", "Unknown")
298
  upload_date = video_info.get("upload_date", "Unknown")
299
  description = video_info.get("description", "")
300
-
301
  # Truncate description if too long
302
  if len(description) > 200:
303
  description = description[:200] + "..."
304
-
305
- info_text = f"""🎬 **{title}**
306
 
307
- 👤 **Channel:** {channel}
308
- ⏱️ **Duration:** {duration}
309
- 👁️ **Views:** {view_count}
310
- 📅 **Uploaded:** {upload_date}
 
 
311
 
312
- 📝 **Description:**
313
  {description}
314
 
315
  Choose an action below:"""
316
-
317
  return info_text
318
-
 
 
 
 
319
  async def run(self):
320
  """Start the bot"""
321
  logger.info("Starting TubeMeta Bot...")
322
- await self.app.initialize()
323
- await self.app.start()
324
- await self.app.updater.start_polling()
325
-
326
  try:
 
 
 
 
 
 
327
  # Keep the bot running
328
  await asyncio.Event().wait()
329
  except KeyboardInterrupt:
330
- logger.info("Shutting down bot...")
 
 
331
  finally:
332
- await self.app.updater.stop()
333
- await self.app.stop()
334
- await self.app.shutdown()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
335
 
336
  async def main():
337
  """Main function"""
@@ -339,4 +924,4 @@ async def main():
339
  await bot.run()
340
 
341
  if __name__ == "__main__":
342
- asyncio.run(main())
 
1
  import asyncio
 
2
  import logging
3
  from typing import Optional
4
  import aiohttp
 
7
  from telegram.constants import ParseMode
8
  import os
9
  from dotenv import load_dotenv
10
+ import warnings
11
+
12
+ # Ignore standard warnings
13
+ warnings.filterwarnings("ignore", message="SSL shutdown timed out")
14
+ warnings.filterwarnings("ignore", message="Certificate verification failed")
15
+ warnings.filterwarnings("ignore", message="SSL handshake failed")
16
+ warnings.filterwarnings("ignore", message="Connection lost")
17
+
18
+ # Disable SSL error logging from all possible sources
19
+ logging.getLogger("httpx").setLevel(logging.WARNING)
20
+ logging.getLogger("httpcore").setLevel(logging.WARNING)
21
+ logging.getLogger("httpcore.connection").setLevel(logging.ERROR)
22
+ logging.getLogger("httpcore.http11").setLevel(logging.ERROR)
23
+ logging.getLogger("asyncio").setLevel(logging.WARNING)
24
+
25
+ # Create custom filter to suppress SSL errors
26
+ class SSLErrorFilter(logging.Filter):
27
+ def filter(self, record):
28
+ message = record.getMessage()
29
+ return not any(phrase in message.lower() for phrase in [
30
+ 'ssl shutdown timed out',
31
+ 'connection lost',
32
+ 'ssl handshake failed',
33
+ 'certificate verification failed'
34
+ ])
35
+
36
+ # Apply filter to root logger
37
+ logging.getLogger().addFilter(SSLErrorFilter())
38
 
39
  # Load environment variables
40
  load_dotenv()
41
 
42
  # Configuration
43
+ TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN")
44
+ if not TELEGRAM_TOKEN:
45
+ raise ValueError("TELEGRAM_TOKEN environment variable is required")
46
+
47
+ # Support both variable name variants
48
+ MCP_BASE_URL = os.getenv("MCP_BASE_URL", os.getenv("MCP_BASE_URL", "https://youtube-bot.tuttech.net/api/mcp"))
49
 
50
  # Set up logging
51
  logging.basicConfig(
 
54
  )
55
  logger = logging.getLogger(__name__)
56
 
57
+ def escape_markdown(text: str) -> str:
58
+ """Escape markdown special characters."""
59
+ escape_chars = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!']
60
+ for char in escape_chars:
61
+ text = text.replace(char, '\\' + char)
62
+ return text
63
+
64
  class TubeMetaBot:
65
  def __init__(self):
66
  self.app = Application.builder().token(TELEGRAM_TOKEN).build()
67
  self.setup_handlers()
68
+
69
  def setup_handlers(self):
70
  """Set up command and message handlers"""
71
  self.app.add_handler(CommandHandler("start", self.start_command))
72
  self.app.add_handler(CommandHandler("help", self.help_command))
73
  self.app.add_handler(CommandHandler("search", self.search_command))
74
+ self.app.add_handler(CommandHandler("analyze", self.analyze_command))
75
  self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message))
76
+ self.app.add_handler(CallbackQueryHandler(self.handle_callback_query))
77
+
78
  async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
79
  """Handle /start command"""
80
  welcome_text = """
 
82
 
83
  I can help you with YouTube videos:
84
  • 🔍 Search for videos
85
+ • 📊 Get video metadata
86
  • 📝 Extract transcripts
87
  • ⏰ Generate AI timecodes with Gemini 2.0
88
 
 
94
  Type `/help` for more information!
95
  """
96
  await update.message.reply_text(welcome_text, parse_mode=ParseMode.MARKDOWN)
97
+
98
  async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
99
  """Handle /help command"""
100
  help_text = """
 
104
  • `/start` - Welcome message
105
  • `/help` - Show this help
106
  • `/search <query>` - Search YouTube videos
107
+ • `/analyze` - Analyze YouTube video (send after this command)
108
 
109
  **Features:**
110
  • 🔍 **Video Search** - Find YouTube videos by keywords
111
+ • 📊 **Video Analysis** - Get detailed metadata (title, duration, views, etc.)
112
  • 📝 **Transcripts** - Extract video transcripts/subtitles
113
  • ⏰ **AI Timecodes** - Generate smart timecodes with Gemini 2.0
114
 
115
  **Usage Examples:**
 
116
  • Search: `/search machine learning tutorial`
117
+ Analysis: `/analyze` then send YouTube URL
118
+ • Or just send: `python programming` for search
119
 
120
  **Supported Languages:**
121
  🇺🇦 Ukrainian | 🇷🇺 Russian | 🇬🇧 English
 
123
  Powered by Gemini 2.0 AI 🧠
124
  """
125
  await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN)
126
+
127
  async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
128
  """Handle /search command"""
129
  if not context.args:
130
  await update.message.reply_text("Please provide a search query. Example: `/search python tutorial`")
131
  return
132
+
133
  query = " ".join(context.args)
134
  await self.handle_search(update, query)
135
+
136
+ async def analyze_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
137
+ """Handle /analyze command"""
138
+ if not update.message:
139
+ return # Skip if no message (shouldn't happen in command handlers)
140
+
141
+ if context.args:
142
+ # URL provided with command
143
+ url = " ".join(context.args)
144
+ if self.is_youtube_url(url):
145
+ await self.handle_youtube_url(update, url)
146
+ else:
147
+ await update.message.reply_text("❌ Please provide a valid YouTube URL. Example: `/analyze https://youtu.be/dQw4w9WgXcQ`")
148
+ else:
149
+ # Ask for URL
150
+ await update.message.reply_text("📺 Please send me a YouTube URL to analyze.\n\nExample: `https://youtu.be/dQw4w9WgXcQ`", parse_mode=ParseMode.MARKDOWN)
151
+
152
  async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
153
  """Handle regular text messages"""
154
  text = update.message.text.strip()
155
+
156
  # Check if it's a YouTube URL
157
  if self.is_youtube_url(text):
158
+ await update.message.reply_text(
159
+ "📺 I see you sent a YouTube URL! Use the `/analyze` command to analyze it.\n\n"
160
+ "Example: `/analyze https://youtu.be/dQw4w9WgXcQ`\n"
161
+ "Or just type `/analyze` and then send the URL.",
162
+ parse_mode=ParseMode.MARKDOWN
163
+ )
164
  else:
165
  # Treat as search query
166
  await self.handle_search(update, text)
167
+
168
  def is_youtube_url(self, text: str) -> bool:
169
  """Check if text contains a YouTube URL"""
170
  youtube_domains = [
171
+ 'youtube.com', 'youtu.be', 'www.youtube.com',
172
  'm.youtube.com', 'music.youtube.com'
173
  ]
174
  return any(domain in text.lower() for domain in youtube_domains)
175
+
176
  async def handle_youtube_url(self, update: Update, url: str):
177
  """Handle YouTube URL - provide full analysis options"""
178
  # Send initial message
179
  processing_msg = await update.message.reply_text("🔍 Analyzing YouTube video...")
180
+
181
  try:
182
  # Get basic video info first
183
+ video_info_response = await self.call_mcp_action("video_info", {"video_id": url})
184
+
185
+ # Check if we got a valid response
186
+ if not video_info_response:
187
  await processing_msg.edit_text("❌ Could not analyze this YouTube video. Please check the URL.")
188
  return
189
+
190
+ # Check for error in response
191
+ if video_info_response.get("error"):
192
+ await processing_msg.edit_text(f"❌ Error: {video_info_response['error']}")
193
+ return
194
+
195
+ # Check if we have video data
196
+ video_data = video_info_response.get("data")
197
+ if not video_data:
198
+ await processing_msg.edit_text("❌ Could not retrieve video information. Please check the URL.")
199
+ return
200
+
201
+ # Format video info for display
202
+ info_text = self.format_video_info_from_data(video_data)
203
+
204
  # Create action buttons
205
+ video_id = video_data.get("video_id", url)
206
+
207
+ # Limit callback data to avoid Button_data_invalid error (Telegram limit is 64 bytes)
208
+ safe_video_id = video_id[:30] if video_id else url[:30] # Limit video ID
209
+ video_title = video_data.get('title', 'related videos')
210
+ # Truncate title for search callback to fit in 64 byte limit
211
+ safe_title = video_title[:30] if len(video_title) > 30 else video_title
212
+
213
  keyboard = [
214
  [
215
+ InlineKeyboardButton("📝 Get Transcript", callback_data=f"transcript:{safe_video_id}"),
216
+ InlineKeyboardButton("⏰ AI Timecodes", callback_data=f"timecodes:{safe_video_id}")
217
  ],
218
  [
219
+ InlineKeyboardButton("🔍 Search Similar", callback_data=f"search:{safe_title}")
220
  ]
221
  ]
222
  reply_markup = InlineKeyboardMarkup(keyboard)
223
+
224
+ await processing_msg.edit_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
225
+
226
  except Exception as e:
227
  logger.error(f"Error handling YouTube URL: {e}")
228
  await processing_msg.edit_text("❌ An error occurred while analyzing the video.")
229
+
230
+ def format_video_info_from_data(self, video_data: dict) -> str:
231
+ """Format video information from MCP response data for display using HTML"""
232
+ title = video_data.get("title", "Unknown Title")
233
+ channel = video_data.get("channel_title", "Unknown Channel")
234
+ duration = video_data.get("duration", "Unknown")
235
+ view_count = video_data.get("view_count", "Unknown")
236
+ upload_date = video_data.get("published_at", "Unknown")
237
+ like_count = video_data.get("like_count", "Unknown")
238
+ comment_count = video_data.get("comment_count", "Unknown")
239
+
240
+ info_text = f"""🎬 <b>{title}</b>
241
+
242
+ 👤 <b>Channel:</b> {channel}
243
+ ⏱️ <b>Duration:</b> {duration}
244
+ 👁️ <b>Views:</b> {view_count}
245
+ 👍 <b>Likes:</b> {like_count}
246
+ 💬 <b>Comments:</b> {comment_count}
247
+ 📅 <b>Uploaded:</b> {upload_date}
248
+
249
+ Choose an action below:"""
250
+
251
+ return info_text
252
+
253
  async def handle_search(self, update: Update, query: str):
254
  """Handle search query"""
255
+ processing_msg = await update.message.reply_text(f"🔍 Searching for: <b>{query}</b>", parse_mode=ParseMode.HTML)
256
+
257
  try:
258
  results = await self.call_mcp_action("search", {"query": query, "max_results": 5})
259
+
260
+ if not results or (isinstance(results, dict) and "error" in results):
261
  await processing_msg.edit_text("❌ No results found for your search.")
262
  return
263
+
264
  # Format search results
265
+ search_text = f"🔍 <b>Search Results for:</b> {query}\n\n"
266
+
267
+ # Handle the case where results is a list (new format)
268
+ if isinstance(results, list):
269
+ videos = results
270
+ else:
271
+ # Fallback for old format
272
+ videos = results.get("videos", [])
273
+
274
+ for i, video_obj in enumerate(videos, 1):
275
+ # Extract video data from the response object
276
+ if isinstance(video_obj, dict) and "data" in video_obj:
277
+ video = video_obj["data"]
278
+ else:
279
+ video = video_obj
280
+
281
+ # Build video info (HTML auto-escapes dangerous chars)
282
+ title = video.get('title', 'Unknown Title')
283
+ channel = video.get('channel_title', video.get('channel', 'Unknown Channel'))
284
+ duration = str(video.get('duration', 'Unknown'))
285
+ view_count = str(video.get('view_count', 'Unknown'))
286
+ video_id = video.get('video_id', '')
287
+
288
+ search_text += f"<b>{i}. {title}</b>\n"
289
+ search_text += f"👤 {channel}\n"
290
+ if duration != 'Unknown':
291
+ search_text += f"⏱️ {duration}\n"
292
+ if view_count != 'Unknown':
293
+ search_text += f"👁️ {view_count} views\n"
294
+ search_text += f"🔗 https://www.youtube.com/watch?v={video_id}\n\n"
295
+
296
  # Add search refinement buttons
297
  keyboard = [
298
  [InlineKeyboardButton("🔍 New Search", callback_data="new_search")]
299
  ]
300
  reply_markup = InlineKeyboardMarkup(keyboard)
301
+
302
+ await processing_msg.edit_text(search_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
303
+
304
  except Exception as e:
305
  logger.error(f"Error handling search: {e}")
306
  await processing_msg.edit_text("❌ An error occurred during search.")
307
+
308
+ async def handle_callback_query(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
309
+ """Handle inline keyboard button presses"""
310
  query = update.callback_query
311
  await query.answer()
312
+
313
  data = query.data
314
+ logger.info(f"Callback query: {data}")
315
+
316
  if data.startswith("transcript:"):
317
  url = data.replace("transcript:", "")
318
  await self.get_transcript(query, url)
 
322
  elif data.startswith("search:"):
323
  search_query = data.replace("search:", "")
324
  await self.handle_search_callback(query, search_query)
325
+ elif data.startswith("back:"):
326
+ url = data.replace("back:", "")
327
+ await self.handle_back_to_video(query, url)
328
+ elif data.startswith("full_transcript:"):
329
+ url = data.replace("full_transcript:", "")
330
+ await self.send_full_transcript(query, url)
331
+ elif data.startswith("full_timecodes:"):
332
+ url = data.replace("full_timecodes:", "")
333
+ await self.send_full_timecodes(query, url)
334
+ elif data.startswith("analyze:"):
335
+ video_id = data.replace("analyze:", "")
336
+ await self.analyze_video(query, f"https://www.youtube.com/watch?v={video_id}")
337
+ elif data.startswith("back_to_analysis:"):
338
+ url = data.replace("back_to_analysis:", "")
339
+ await self.handle_back_to_video(query, url)
340
  elif data == "new_search":
341
+ await query.edit_message_text(
342
+ "🔍 **Send me a new search query!**\n\nJust type your search terms and I'll find YouTube videos for you.",
343
+ parse_mode=ParseMode.MARKDOWN
344
+ )
345
+ else:
346
+ await query.edit_message_text("❌ Unknown action")
347
+
348
  async def get_transcript(self, query, url: str):
349
  """Get video transcript"""
350
  await query.edit_message_text("📝 Extracting transcript...")
351
+
352
  try:
353
+ transcript_response = await self.call_mcp_action("transcript", {"video_id": url})
354
+
355
+ # Check if we got a valid response
356
+ if not transcript_response:
357
+ await query.edit_message_text("❌ Could not extract transcript. Please try again later.")
358
  return
359
+
360
+ # Check for error in response
361
+ if transcript_response.get("error"):
362
+ await query.edit_message_text(f"❌ {transcript_response['error']}")
363
+ return
364
+
365
+ # Check if we have transcript data
366
+ if transcript_response.get("type") not in ["youtube_transcript"]:
367
+ await query.edit_message_text("❌ Invalid transcript response format.")
368
+ return
369
+
370
+ # Get the markdown formatted transcript
371
+ transcript_text = transcript_response.get("markdown", "")
372
+ if not transcript_text:
373
+ await query.edit_message_text("❌ Transcript is empty or unavailable.")
374
+ return
375
+
376
+ # Handle long transcripts more intelligently
377
+ max_length = 4000 # Leave room for buttons and formatting
378
+
379
+ if len(transcript_text) > max_length:
380
+ # Create a summary message with first part
381
+ summary_text = "📝 **Transcript Preview** (showing first {} characters)\n\n".format(max_length)
382
+ summary_text += transcript_text[:max_length-200] + "...\n\n"
383
+ summary_text += f"<i>📄 Full transcript: {len(transcript_text)} characters total</i>\n"
384
+ summary_text += "<i>⚠️ Transcript is too long for Telegram. Showing preview only.</i>"
385
+
386
+ # Add back button and full transcript button
387
+ keyboard = [
388
+ [
389
+ InlineKeyboardButton("📄 Get Full Text", callback_data=f"full_transcript:{url}"),
390
+ InlineKeyboardButton("⬅️ Back", callback_data=f"back:{url}")
391
+ ]
392
+ ]
393
+ reply_markup = InlineKeyboardMarkup(keyboard)
394
+
395
+ await query.edit_message_text(summary_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
396
+ else:
397
+ # Short enough to display fully
398
+ # Add back button
399
+ keyboard = [[InlineKeyboardButton("⬅️ Back", callback_data=f"back:{url}")]]
400
+ reply_markup = InlineKeyboardMarkup(keyboard)
401
+
402
+ await query.edit_message_text(transcript_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
403
+
404
  except Exception as e:
405
  logger.error(f"Error getting transcript: {e}")
406
  await query.edit_message_text("❌ An error occurred while extracting transcript.")
407
+
408
  async def get_timecodes(self, query, url: str):
409
  """Generate AI timecodes"""
410
  await query.edit_message_text("⏰ Generating AI timecodes with Gemini 2.0...")
411
+
412
  try:
413
+ timecodes_response = await self.call_mcp_action("gemini_timecodes", {
414
  "video_id": url,
 
415
  "format": "youtube"
416
  })
417
+
418
+ # Check if we got a valid response
419
+ if not timecodes_response:
420
+ await query.edit_message_text("❌ Could not generate timecodes. Please try again later.")
421
  return
422
+
423
+ # Check for error in response
424
+ if timecodes_response.get("error"):
425
+ await query.edit_message_text(f"❌ {timecodes_response['error']}")
426
+ return
427
+
428
+ # Check if we have timecodes data
429
+ if timecodes_response.get("type") not in ["youtube_gemini_timecodes"]:
430
+ await query.edit_message_text("❌ Invalid timecodes response format.")
431
+ return
432
+
433
+ # Get the markdown formatted timecodes
434
+ timecodes_text = timecodes_response.get("markdown", "")
435
+ if not timecodes_text:
436
+ await query.edit_message_text("❌ No timecodes were generated.")
437
+ return
438
+
439
+ # Handle long timecodes more intelligently
440
+ max_length = 4000 # Leave room for buttons and formatting
441
+
442
+ if len(timecodes_text) > max_length:
443
+ # Create a summary message with preview
444
+ data = timecodes_response.get("data", {})
445
+ timecodes_list = data.get("timecodes", [])
446
+ detected_language = data.get("detected_language", "unknown")
447
+
448
+ summary_text = "⏰ **AI Timecodes Generated**\n\n"
449
+ summary_text += f"🤖 **Model:** {data.get('model', 'Gemini AI')}\n"
450
+ summary_text += f"🌐 **Language:** {detected_language}\n"
451
+ summary_text += f"📊 **Total timecodes:** {len(timecodes_list)}\n\n"
452
+
453
+ # Calculate how many timecodes we can show
454
+ available_space = max_length - len(summary_text) - 300 # Reserve space for buttons and footer
455
+
456
+ # Show as many timecodes as possible within space limit
457
+ preview_text = "<b>Timecodes Preview:</b>\n<pre>"
458
+ current_length = 0
459
+ shown_count = 0
460
+
461
+ for tc in timecodes_list:
462
+ tc_line = f"{tc}\n"
463
+ if current_length + len(tc_line) < available_space:
464
+ preview_text += tc_line
465
+ current_length += len(tc_line)
466
+ shown_count += 1
467
+ else:
468
+ break
469
+
470
+ preview_text += "</pre>\n\n"
471
+
472
+ if shown_count < len(timecodes_list):
473
+ summary_text += preview_text
474
+ summary_text += f"<i>📄 Showing {shown_count} of {len(timecodes_list)} timecodes</i>\n"
475
+ summary_text += "<i>💾 Download full file for complete list</i>"
476
+ else:
477
+ # All timecodes fit, show them directly
478
+ summary_text = timecodes_text
479
+
480
+ # Add buttons for full timecodes and back
481
+ keyboard = [
482
+ [
483
+ InlineKeyboardButton("📄 Get Full List", callback_data=f"full_timecodes:{url}"),
484
+ InlineKeyboardButton("⬅️ Back", callback_data=f"back:{url}")
485
+ ]
486
+ ]
487
+ reply_markup = InlineKeyboardMarkup(keyboard)
488
+
489
+ await query.edit_message_text(summary_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
490
+ else:
491
+ # Short enough to display fully
492
+ # Add back button
493
+ keyboard = [[InlineKeyboardButton("⬅️ Back", callback_data=f"back:{url}")]]
494
+ reply_markup = InlineKeyboardMarkup(keyboard)
495
+
496
+ # Convert markdown to HTML for proper code block rendering
497
+ html_timecodes = self.convert_markdown_to_html(timecodes_text)
498
+ await query.edit_message_text(html_timecodes, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
499
+
500
  except Exception as e:
501
  logger.error(f"Error generating timecodes: {e}")
502
  await query.edit_message_text("❌ An error occurred while generating timecodes.")
503
+
504
+ async def send_full_transcript(self, query, url: str):
505
+ """Send full transcript as a text file"""
506
+ await query.edit_message_text("📄 Preparing full transcript file...")
507
+
508
+ try:
509
+ transcript_response = await self.call_mcp_action("transcript", {"video_id": url})
510
+
511
+ if not transcript_response or transcript_response.get("error"):
512
+ await query.edit_message_text("❌ Could not extract full transcript.")
513
+ return
514
+
515
+ # Get full transcript text
516
+ full_transcript = transcript_response.get("markdown", "")
517
+ if not full_transcript:
518
+ await query.edit_message_text("❌ Transcript is empty.")
519
+ return
520
+
521
+ # Create a simple text version (without markdown formatting)
522
+ simple_text = full_transcript.replace("# 📝 Transcript\n\n", "")
523
+ simple_text = simple_text.replace("**[", "[").replace("]**", "]")
524
+
525
+ # Send as document
526
+ from io import BytesIO
527
+ import re
528
+
529
+ # Extract video title from URL for filename
530
+ video_info = await self.call_mcp_action("video_info", {"video_id": url})
531
+ title = "transcript"
532
+ if video_info and video_info.get("data"):
533
+ title = video_info["data"].get("title", "transcript")
534
+ # Clean title for filename
535
+ title = re.sub(r'[<>:"/\\|?*]', '_', title)[:50]
536
+
537
+ # Create file
538
+ transcript_file = BytesIO(simple_text.encode('utf-8'))
539
+ transcript_file.name = f"{title}_transcript.txt"
540
+
541
+ # Send file and create a new message instead of editing
542
+ await query.message.reply_document(
543
+ document=transcript_file,
544
+ caption=f"📄 Full transcript for: {title}",
545
+ reply_markup=InlineKeyboardMarkup([[
546
+ InlineKeyboardButton("📹 Back to Video", callback_data=f"back_to_analysis:{url}")
547
+ ]])
548
+ )
549
+
550
+ # Edit original message to show completion
551
+ await query.edit_message_text(
552
+ "✅ Transcript file sent!\n\n📄 Check the file above for the complete transcript.",
553
+ reply_markup=InlineKeyboardMarkup([[
554
+ InlineKeyboardButton("📹 Back to Video", callback_data=f"back_to_analysis:{url}")
555
+ ]])
556
+ )
557
+
558
+ except Exception as e:
559
+ logger.error(f"Error sending full transcript: {e}")
560
+ await query.edit_message_text("❌ An error occurred while preparing transcript file.")
561
+
562
+ async def send_full_timecodes(self, query, url: str):
563
+ """Send full timecodes as a text file"""
564
+ await query.edit_message_text("📄 Preparing full timecodes file...")
565
+
566
+ try:
567
+ timecodes_response = await self.call_mcp_action("gemini_timecodes", {
568
+ "video_id": url,
569
+ "format": "youtube"
570
+ })
571
+
572
+ if not timecodes_response or timecodes_response.get("error"):
573
+ await query.edit_message_text("❌ Could not generate full timecodes.")
574
+ return
575
+
576
+ # Get full timecodes
577
+ data = timecodes_response.get("data", {})
578
+ timecodes_list = data.get("timecodes", [])
579
+
580
+ if not timecodes_list:
581
+ await query.edit_message_text("❌ No timecodes available.")
582
+ return
583
+
584
+ # Create text content
585
+ content = "AI Generated Timecodes\n"
586
+ content += f"Model: {data.get('model', 'Gemini AI')}\n"
587
+ content += f"Language: {data.get('detected_language', 'auto-detected')}\n"
588
+ content += f"Total: {len(timecodes_list)} timecodes\n\n"
589
+ content += "\n".join(timecodes_list)
590
+
591
+ # Send as document
592
+ from io import BytesIO
593
+ import re
594
+
595
+ # Extract video title for filename
596
+ video_info = await self.call_mcp_action("video_info", {"video_id": url})
597
+ title = "timecodes"
598
+ if video_info and video_info.get("data"):
599
+ title = video_info["data"].get("title", "timecodes")
600
+ # Clean title for filename
601
+ title = re.sub(r'[<>:"/\\|?*]', '_', title)[:50]
602
+
603
+ # Create file
604
+ timecodes_file = BytesIO(content.encode('utf-8'))
605
+ timecodes_file.name = f"{title}_timecodes.txt"
606
+
607
+ # Send file and create a new message instead of editing
608
+ await query.message.reply_document(
609
+ document=timecodes_file,
610
+ caption=f"⏰ AI Timecodes for: {title}",
611
+ reply_markup=InlineKeyboardMarkup([[
612
+ InlineKeyboardButton("📹 Back to Video", callback_data=f"back_to_analysis:{url}")
613
+ ]])
614
+ )
615
+
616
+ # Edit original message to show completion
617
+ await query.edit_message_text(
618
+ "✅ Timecodes file sent!\n\n📄 Check the file above for all timecodes.",
619
+ reply_markup=InlineKeyboardMarkup([[
620
+ InlineKeyboardButton("📹 Back to Video", callback_data=f"back_to_analysis:{url}")
621
+ ]])
622
+ )
623
+
624
+ except Exception as e:
625
+ logger.error(f"Error sending full timecodes: {e}")
626
+ await query.edit_message_text("❌ An error occurred while preparing timecodes file.")
627
+
628
+ async def handle_back_to_video(self, query, url: str):
629
+ """Return to video analysis view"""
630
+ try:
631
+ # Check if the current message has text that can be edited
632
+ current_message = query.message
633
+ if not current_message or not current_message.text:
634
+ # If no text to edit, send a new message instead of editing
635
+ await query.answer("🔄 Loading video information...")
636
+
637
+ # Get basic video info
638
+ video_info_response = await self.call_mcp_action("video_info", {"video_id": url})
639
+
640
+ if not video_info_response or video_info_response.get("error"):
641
+ await current_message.reply_text("❌ Could not analyze this YouTube video. Please check the URL.")
642
+ return
643
+
644
+ video_data = video_info_response.get("data")
645
+ if not video_data:
646
+ await current_message.reply_text("❌ Could not retrieve video information. Please check the URL.")
647
+ return
648
+
649
+ # Format video info for display
650
+ info_text = self.format_video_info_from_data(video_data)
651
+
652
+ # Create action buttons
653
+ video_id = video_data.get("video_id", url)
654
+ safe_video_id = video_id[:30] if video_id else url[:30]
655
+ video_title = video_data.get('title', 'related videos')
656
+ safe_title = video_title[:30] if len(video_title) > 30 else video_title
657
+
658
+ keyboard = [
659
+ [
660
+ InlineKeyboardButton("📝 Get Transcript", callback_data=f"transcript:{safe_video_id}"),
661
+ InlineKeyboardButton("⏰ AI Timecodes", callback_data=f"timecodes:{safe_video_id}")
662
+ ],
663
+ [
664
+ InlineKeyboardButton("🔍 Search Similar", callback_data=f"search:{safe_title}")
665
+ ]
666
+ ]
667
+ reply_markup = InlineKeyboardMarkup(keyboard)
668
+
669
+ await current_message.reply_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
670
+ return
671
+
672
+ # If message has text, proceed with normal editing
673
+ await query.edit_message_text("🔄 Loading video information...")
674
+
675
+ # Re-analyze the video by calling handle_youtube_url logic
676
+ # Get basic video info first
677
+ video_info_response = await self.call_mcp_action("video_info", {"video_id": url})
678
+
679
+ # Check if we got a valid response
680
+ if not video_info_response:
681
+ await query.edit_message_text("❌ Could not analyze this YouTube video. Please check the URL.")
682
+ return
683
+
684
+ # Check for error in response
685
+ if video_info_response.get("error"):
686
+ await query.edit_message_text(f"❌ Error: {video_info_response['error']}")
687
+ return
688
+
689
+ # Check if we have video data
690
+ video_data = video_info_response.get("data")
691
+ if not video_data:
692
+ await query.edit_message_text("❌ Could not retrieve video information. Please check the URL.")
693
+ return
694
+
695
+ # Format video info for display
696
+ info_text = self.format_video_info_from_data(video_data)
697
+
698
+ # Create action buttons
699
+ video_id = video_data.get("video_id", url)
700
+
701
+ # Limit callback data to avoid Button_data_invalid error (Telegram limit is 64 bytes)
702
+ safe_video_id = video_id[:30] if video_id else url[:30] # Limit video ID
703
+ video_title = video_data.get('title', 'related videos')
704
+ # Truncate title for search callback to fit in 64 byte limit
705
+ safe_title = video_title[:30] if len(video_title) > 30 else video_title
706
+
707
+ keyboard = [
708
+ [
709
+ InlineKeyboardButton("📝 Get Transcript", callback_data=f"transcript:{safe_video_id}"),
710
+ InlineKeyboardButton("⏰ AI Timecodes", callback_data=f"timecodes:{safe_video_id}")
711
+ ],
712
+ [
713
+ InlineKeyboardButton("🔍 Search Similar", callback_data=f"search:{safe_title}")
714
+ ]
715
+ ]
716
+ reply_markup = InlineKeyboardMarkup(keyboard)
717
+
718
+ await query.edit_message_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML)
719
+
720
+ except Exception as e:
721
+ logger.error(f"Error returning to video: {e}")
722
+ # If edit fails, try to send a new message
723
+ try:
724
+ await query.answer("❌ Could not load video information.")
725
+ await query.message.reply_text("❌ Could not load video information. Please try again.")
726
+ except Exception:
727
+ pass # Ignore if this also fails
728
+
729
+ async def analyze_video(self, query, url: str):
730
+ """Analyze video for callback queries (alias for handle_back_to_video)"""
731
+ await self.handle_back_to_video(query, url)
732
+
733
  async def handle_search_callback(self, query, search_query: str):
734
+ """Handle search callback from inline keyboard"""
735
+ await query.edit_message_text(f"🔍 Searching for: {search_query}...")
736
+
737
+ try:
738
+ # Call MCP search action
739
+ search_response = await self.call_mcp_action("search", {
740
+ "query": search_query,
741
+ "max_results": 5
742
+ })
743
+
744
+ if not search_response:
745
+ await query.edit_message_text("❌ No results found for your search.")
746
+ return
747
+
748
+ # Format search results
749
+ if isinstance(search_response, list) and len(search_response) > 0:
750
+ results_text = f"🔍 **Search Results for:** {search_query}\n\n"
751
+
752
+ keyboard = []
753
+ for i, result in enumerate(search_response[:5], 1):
754
+ if result.get("data"):
755
+ video_data = result["data"]
756
+ title = video_data.get("title", "Unknown Title")[:50]
757
+ channel = video_data.get("channel_title", "Unknown Channel")
758
+ video_id = video_data.get("video_id", "")
759
+
760
+ results_text += f"**{i}.** {title}\n"
761
+ results_text += f"👤 {channel}\n\n"
762
+
763
+ # Add analyze button for each video
764
+ if video_id:
765
+ keyboard.append([InlineKeyboardButton(
766
+ f"📹 Analyze Video {i}",
767
+ callback_data=f"analyze:{video_id}"
768
+ )])
769
+
770
+ # Add new search button
771
+ keyboard.append([InlineKeyboardButton("🔍 New Search", callback_data="new_search")])
772
+
773
+ reply_markup = InlineKeyboardMarkup(keyboard)
774
+ await query.edit_message_text(results_text, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN)
775
+ else:
776
+ await query.edit_message_text("❌ No results found for your search.")
777
+
778
+ except Exception as e:
779
+ logger.error(f"Error in search callback: {e}")
780
+ await query.edit_message_text("❌ An error occurred during search.")
781
+
782
  async def call_mcp_action(self, action: str, params: dict) -> Optional[dict]:
783
  """Call MCP server action"""
784
  try:
785
+ # Configure timeout and connection settings
786
+ timeout = aiohttp.ClientTimeout(total=30, connect=10)
787
+
788
+ # Create connector without SSL for full SSL verification bypass
789
+ connector = aiohttp.TCPConnector(
790
+ limit=10,
791
+ limit_per_host=5,
792
+ ttl_dns_cache=300,
793
+ use_dns_cache=True,
794
+ enable_cleanup_closed=False, # Disable cleanup to prevent errors
795
+ ssl=False
796
+ )
797
+
798
+ # Create session with full SSL bypass
799
+ async with aiohttp.ClientSession(
800
+ timeout=timeout,
801
+ connector=connector,
802
+ trust_env=True,
803
+ skip_auto_headers={'User-Agent'}
804
+ ) as session:
805
  payload = {
806
  "action": action,
807
  "parameters": params
808
  }
809
+
810
+ headers = {
811
+ 'Content-Type': 'application/json',
812
+ 'User-Agent': 'TubeMetaBot/1.0'
813
+ }
814
+
815
+ try:
816
+ # Execute request with full SSL bypass
817
+ async with session.post(
818
+ MCP_BASE_URL,
819
+ json=payload,
820
+ headers=headers,
821
+ ssl=False
822
+ ) as response:
823
+ if response.status == 200:
824
+ result = await response.json()
825
+ logger.info(f"MCP request successful: {action}")
826
+ return result
827
+ else:
828
+ logger.error(f"MCP server error: {response.status}")
829
+ return None
830
+ except aiohttp.ClientConnectorError as e:
831
+ logger.error(f"Connection error: {e}")
832
+ return None
833
+ except asyncio.TimeoutError as e:
834
+ logger.error(f"Timeout error: {e}")
835
+ return None
836
+ except Exception as e:
837
+ logger.error(f"Request error: {e}")
838
+ return None
839
  except Exception as e:
840
  logger.error(f"Error calling MCP server: {e}")
841
  return None
842
+
843
  def format_video_info(self, video_info: dict) -> str:
844
+ """Format video information for display using HTML"""
845
  title = video_info.get("title", "Unknown Title")
846
  channel = video_info.get("channel", "Unknown Channel")
847
  duration = video_info.get("duration", "Unknown")
848
  view_count = video_info.get("view_count", "Unknown")
849
  upload_date = video_info.get("upload_date", "Unknown")
850
  description = video_info.get("description", "")
851
+
852
  # Truncate description if too long
853
  if len(description) > 200:
854
  description = description[:200] + "..."
 
 
855
 
856
+ info_text = f"""🎬 <b>{title}</b>
857
+
858
+ 👤 <b>Channel:</b> {channel}
859
+ ⏱️ <b>Duration:</b> {duration}
860
+ 👁️ <b>Views:</b> {view_count}
861
+ 📅 <b>Uploaded:</b> {upload_date}
862
 
863
+ 📝 <b>Description:</b>
864
  {description}
865
 
866
  Choose an action below:"""
867
+
868
  return info_text
869
+
870
+ async def handle_analyze(self, update: Update):
871
+ """Handle analyze request"""
872
+ await update.message.reply_text("📺 Please send me a YouTube URL to analyze.\n\nExample: `https://youtu.be/dQw4w9WgXcQ`", parse_mode=ParseMode.MARKDOWN)
873
+
874
  async def run(self):
875
  """Start the bot"""
876
  logger.info("Starting TubeMeta Bot...")
877
+
 
 
 
878
  try:
879
+ await self.app.initialize()
880
+ await self.app.start()
881
+ await self.app.updater.start_polling(drop_pending_updates=True)
882
+
883
+ logger.info(f"Bot successfully started! MCP URL: {MCP_BASE_URL}")
884
+
885
  # Keep the bot running
886
  await asyncio.Event().wait()
887
  except KeyboardInterrupt:
888
+ logger.info("Shutting down bot due to keyboard interrupt...")
889
+ except Exception as e:
890
+ logger.error(f"Error in bot operation: {e}")
891
  finally:
892
+ # Graceful shutdown
893
+ logger.info("Shutting down bot...")
894
+ try:
895
+ await self.app.updater.stop()
896
+ await self.app.stop()
897
+ await self.app.shutdown()
898
+ logger.info("Bot shutdown complete")
899
+ except Exception as e:
900
+ logger.error(f"Error during shutdown: {e}")
901
+
902
+ def convert_markdown_to_html(self, text: str) -> str:
903
+ """Convert markdown formatting to HTML for Telegram."""
904
+ # Convert code blocks (```) to HTML
905
+ import re
906
+
907
+ # Replace triple backticks with HTML pre tags
908
+ text = re.sub(r'```\n(.*?)\n```', r'<pre>\1</pre>', text, flags=re.DOTALL)
909
+
910
+ # Convert **bold** to HTML
911
+ text = re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', text)
912
+
913
+ # Convert *italic* to HTML
914
+ text = re.sub(r'\*(.*?)\*', r'<i>\1</i>', text)
915
+
916
+ # Convert inline code `code` to HTML
917
+ text = re.sub(r'`(.*?)`', r'<code>\1</code>', text)
918
+
919
+ return text
920
 
921
  async def main():
922
  """Main function"""
 
924
  await bot.run()
925
 
926
  if __name__ == "__main__":
927
+ asyncio.run(main())
update_server.bat DELETED
@@ -1,43 +0,0 @@
1
- @echo off
2
- rem Batch скрипт для обновления Windows сервера
3
- echo 🔄 Обновление сервера...
4
-
5
- rem 1. Остановка текущих процессов
6
- echo ⏹️ Остановка текущих процессов...
7
- taskkill /f /im python.exe >nul 2>&1
8
-
9
- rem 2. Обновление кода
10
- echo 📥 Получение последних изменений...
11
- git pull
12
- if %ERRORLEVEL% NEQ 0 (
13
- echo ❌ Ошибка при git pull
14
- pause
15
- exit /b 1
16
- )
17
-
18
- rem 3. Проверка и установка зависимостей
19
- echo 📦 Проверка зависимостей...
20
- pip install -r requirements.txt --quiet
21
- pip install -r telegram_requirements.txt --quiet
22
-
23
- rem 4. Запуск MCP сервера в фоне
24
- echo 🚀 Запуск MCP сервера...
25
- start /B python main.py --mode api --host 0.0.0.0 --port 8080 > mcp_server.log 2>&1
26
-
27
- rem 5. Ждем 3 секунды
28
- timeout /t 3 /nobreak >nul
29
-
30
- rem 6. Запуск Telegram бота в фоне
31
- echo 🤖 Запуск Telegram бота...
32
- start /B python run_telegram_bot.py > telegram_bot.log 2>&1
33
-
34
- echo.
35
- echo ✅ Сервер обновлен и перезапущен!
36
- echo 📋 Процессы запущены в фоне
37
- echo.
38
- echo 📊 Для просмотра процессов: tasklist | findstr python
39
- echo 📊 Для просмотра логов:
40
- echo type mcp_server.log
41
- echo type telegram_bot.log
42
- echo.
43
- pause
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
update_server.ps1 DELETED
@@ -1,54 +0,0 @@
1
- # PowerShell скрипт для обновления Windows сервера
2
- # Usage: .\update_server.ps1
3
-
4
- Write-Host "🔄 Обновление сервера..." -ForegroundColor Yellow
5
-
6
- # 1. Остановка текущих процессов
7
- Write-Host "⏹️ Остановка текущих процессов..." -ForegroundColor Blue
8
- Get-Process | Where-Object {$_.ProcessName -like "*python*" -and $_.CommandLine -like "*main.py*"} | Stop-Process -Force -ErrorAction SilentlyContinue
9
- Get-Process | Where-Object {$_.ProcessName -like "*python*" -and $_.CommandLine -like "*run_telegram_bot.py*"} | Stop-Process -Force -ErrorAction SilentlyContinue
10
-
11
- # Альтернативный способ остановки процессов
12
- taskkill /f /im python.exe 2>$null
13
-
14
- Write-Host "✅ Процессы остановлены" -ForegroundColor Green
15
-
16
- # 2. Обновление кода
17
- Write-Host "📥 Получение последних изменений..." -ForegroundColor Blue
18
- git pull
19
-
20
- if ($LASTEXITCODE -ne 0) {
21
- Write-Host "❌ Ошибка при git pull" -ForegroundColor Red
22
- exit 1
23
- }
24
-
25
- # 3. Проверка и установка зависимостей
26
- Write-Host "📦 Проверка зависимостей..." -ForegroundColor Blue
27
- pip install -r requirements.txt --quiet
28
- pip install -r telegram_requirements.txt --quiet
29
-
30
- # 4. Запуск MCP сервера в фоне
31
- Write-Host "🚀 Запуск MCP сервера..." -ForegroundColor Blue
32
- $mcpJob = Start-Process python -ArgumentList "main.py --mode api --host 0.0.0.0 --port 8080" -PassThru -WindowStyle Hidden -RedirectStandardOutput "mcp_server.log" -RedirectStandardError "mcp_server_error.log"
33
- Write-Host "MCP сервер запущен с PID: $($mcpJob.Id)" -ForegroundColor Green
34
-
35
- # 5. Ждем 3 секунды для запуска MCP сервера
36
- Start-Sleep -Seconds 3
37
-
38
- # 6. Запуск Telegram бота в фоне
39
- Write-Host "🤖 Запуск Telegram бота..." -ForegroundColor Blue
40
- $botJob = Start-Process python -ArgumentList "run_telegram_bot.py" -PassThru -WindowStyle Hidden -RedirectStandardOutput "telegram_bot.log" -RedirectStandardError "telegram_bot_error.log"
41
- Write-Host "Telegram бот запущен с PID: $($botJob.Id)" -ForegroundColor Green
42
-
43
- Write-Host ""
44
- Write-Host "✅ Сервер обновлен и перезапущен!" -ForegroundColor Green
45
- Write-Host "📋 Процессы:" -ForegroundColor Yellow
46
- Write-Host " MCP сервер PID: $($mcpJob.Id)" -ForegroundColor White
47
- Write-Host " Telegram бот PID: $($botJob.Id)" -ForegroundColor White
48
- Write-Host ""
49
- Write-Host "📊 Для просмотра логов:" -ForegroundColor Yellow
50
- Write-Host " MCP сервер: Get-Content mcp_server.log -Wait" -ForegroundColor White
51
- Write-Host " Telegram бот: Get-Content telegram_bot.log -Wait" -ForegroundColor White
52
- Write-Host ""
53
- Write-Host "🔍 Для проверки процессов:" -ForegroundColor Yellow
54
- Write-Host " Get-Process python" -ForegroundColor White
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
update_server.sh DELETED
@@ -1,42 +0,0 @@
1
- #!/bin/bash
2
- # Скрипт для обновления сервера после git push
3
-
4
- echo "🔄 Обновление сервера..."
5
-
6
- # 1. Остановка текущих процессов
7
- echo "⏹️ Остановка текущих процессов..."
8
- pkill -f "main.py" || echo "MCP сервер не был запущен"
9
- pkill -f "run_telegram_bot.py" || echo "Telegram бот не был запущен"
10
-
11
- # 2. Обновление кода
12
- echo "📥 Получение последних изменений..."
13
- git pull
14
-
15
- # 3. Проверка и установка зависимостей
16
- echo "📦 Проверка зависимостей..."
17
- pip install -r requirements.txt --quiet
18
- pip install -r telegram_requirements.txt --quiet
19
-
20
- # 4. Запуск MCP сервера в фоне
21
- echo "🚀 Запуск MCP сервера..."
22
- nohup python main.py --mode api --host 0.0.0.0 --port 8080 > mcp_server.log 2>&1 &
23
- MCP_PID=$!
24
- echo "MCP сервер запущен с PID: $MCP_PID"
25
-
26
- # 5. Ждем 3 секунды для запуска MCP сервера
27
- sleep 3
28
-
29
- # 6. Запуск Telegram бота в фоне
30
- echo "🤖 Запуск Telegram бота..."
31
- nohup python run_telegram_bot.py > telegram_bot.log 2>&1 &
32
- BOT_PID=$!
33
- echo "Telegram бот запущен с PID: $BOT_PID"
34
-
35
- echo "✅ Сервер обновлен и перезапущен!"
36
- echo "📋 Процессы:"
37
- echo " MCP сервер PID: $MCP_PID"
38
- echo " Telegram бот PID: $BOT_PID"
39
- echo ""
40
- echo "📊 Для просмотра логов:"
41
- echo " MCP сервер: tail -f mcp_server.log"
42
- echo " Telegram бот: tail -f telegram_bot.log"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
update_tunnel.py DELETED
@@ -1,56 +0,0 @@
1
- #!/usr/bin/env python3
2
- """
3
- Script to update MCP server URL when LocalTunnel changes
4
- Usage: python update_tunnel.py <new_tunnel_url>
5
- """
6
-
7
- import sys
8
- import re
9
-
10
- def update_tunnel_url(new_url):
11
- """Update the MCP_BASE_URL in telegram_config.env"""
12
-
13
- # Validate URL format
14
- if not new_url.startswith('https://') or '.loca.lt' not in new_url:
15
- print("❌ Invalid URL format. Expected: https://xxx.loca.lt")
16
- return False
17
-
18
- # Add /api/mcp if not present
19
- if not new_url.endswith('/api/mcp'):
20
- if new_url.endswith('/'):
21
- new_url = new_url + 'api/mcp'
22
- else:
23
- new_url = new_url + '/api/mcp'
24
-
25
- try:
26
- # Read current config
27
- with open('telegram_config.env', 'r', encoding='utf-8') as f:
28
- content = f.read()
29
-
30
- # Update MCP_BASE_URL
31
- updated_content = re.sub(
32
- r'MCP_BASE_URL=.*',
33
- f'MCP_BASE_URL={new_url}',
34
- content
35
- )
36
-
37
- # Write back
38
- with open('telegram_config.env', 'w', encoding='utf-8') as f:
39
- f.write(updated_content)
40
-
41
- print(f"✅ Updated MCP_BASE_URL to: {new_url}")
42
- print("🔄 Please restart the Telegram bot to apply changes")
43
- return True
44
-
45
- except Exception as e:
46
- print(f"❌ Error updating config: {e}")
47
- return False
48
-
49
- if __name__ == "__main__":
50
- if len(sys.argv) != 2:
51
- print("Usage: python update_tunnel.py <new_tunnel_url>")
52
- print("Example: python update_tunnel.py https://abc-def-ghi.loca.lt")
53
- sys.exit(1)
54
-
55
- new_url = sys.argv[1]
56
- update_tunnel_url(new_url)