@woai
🧹 Major code cleanup and internationalization - Remove Russian comments/strings, translate UI to English, clean linter errors, remove hardcoded tokens, delete test files. Ready for production deployment
e775565
| import os | |
| from fastapi import FastAPI, HTTPException, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from typing import Dict, List, Optional, Any, Union | |
| import httpx | |
| from googleapiclient.discovery import build | |
| from googleapiclient.errors import HttpError | |
| import json | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| from youtube_transcript_api.formatters import JSONFormatter | |
| from dotenv import load_dotenv | |
| from utils import format_timestamp, extract_video_id | |
| from models import MCPResponse | |
| import re | |
| # Load environment variables | |
| load_dotenv() | |
| # Get YouTube API key from environment variables | |
| YOUTUBE_API_KEY = os.getenv("YOUTUBE_API_KEY") | |
| app = FastAPI( | |
| title="YouTube MCP API", | |
| description="Model Context Protocol (MCP) server for interacting with YouTube API", | |
| version="0.1.0", | |
| ) | |
| # Configure CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Initialize YouTube API client | |
| def get_youtube_client(): | |
| if not YOUTUBE_API_KEY: | |
| raise HTTPException(status_code=500, detail="YouTube API key is not configured") | |
| try: | |
| return build("youtube", "v3", developerKey=YOUTUBE_API_KEY) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"YouTube API initialization error: {str(e)}") | |
| # Base data models for standard API requests | |
| class SearchRequest(BaseModel): | |
| query: str | |
| max_results: Optional[int] = 10 | |
| order: Optional[str] = "relevance" | |
| video_duration: Optional[str] = None | |
| class VideoInfoRequest(BaseModel): | |
| video_id: str | |
| class TranscriptRequest(BaseModel): | |
| video_id: str | |
| language_code: Optional[str] = None | |
| class MCPRequestData(BaseModel): | |
| action: str | |
| parameters: Dict[str, Any] | |
| # Add new endpoint for getting available transcript languages | |
| class TranscriptLanguagesRequest(BaseModel): | |
| video_id: str | |
| # Model for timecode requests | |
| class TimecodeRequest(BaseModel): | |
| video_id: str | |
| language_code: Optional[str] = None | |
| segment_length: Optional[int] = 60 # Segment length in seconds | |
| format: Optional[str] = "youtube" # youtube, markdown | |
| # Load gemini_helper module only after defining base models | |
| from gemini_helper import generate_timecodes_with_gemini, DEFAULT_MODEL | |
| # Model for Gemini timecode requests | |
| class GeminiTimecodeRequest(BaseModel): | |
| video_id: str | |
| language_code: Optional[str] = None | |
| format: Optional[str] = "youtube" # youtube, markdown | |
| model: Optional[str] = DEFAULT_MODEL # Gemini model (if None, uses default model) | |
| # Now we can load mcp_handlers | |
| from mcp_handlers import ( | |
| MCPQueryRequest, | |
| MCPVideoRequest, | |
| MCPTranscriptRequest, | |
| MCPTimecodeRequest, | |
| MCPGeminiRequest, | |
| process_mcp_search, | |
| process_mcp_video_info, | |
| process_mcp_transcript, | |
| process_mcp_timecodes, | |
| process_mcp_gemini_timecodes, | |
| create_text_response, | |
| create_error_response | |
| ) | |
| def normalize_language_code(language_code: str) -> str: | |
| """Normalize language codes, converting common aliases to standard codes.""" | |
| if not language_code: | |
| return language_code | |
| language_code = language_code.lower().strip() | |
| # Convert 'ua' to 'uk' for Ukrainian | |
| if language_code == 'ua': | |
| return 'uk' | |
| return language_code | |
| # Standard API routes | |
| async def search_videos(request: SearchRequest): | |
| try: | |
| youtube = get_youtube_client() | |
| search_response = youtube.search().list( | |
| q=request.query, | |
| part="snippet", | |
| maxResults=request.max_results, | |
| type="video", | |
| order=request.order, | |
| videoDuration=request.video_duration if request.video_duration else None | |
| ).execute() | |
| results = [] | |
| for item in search_response.get("items", []): | |
| video_id = item["id"]["videoId"] | |
| snippet = item["snippet"] | |
| results.append({ | |
| "video_id": video_id, | |
| "title": snippet["title"], | |
| "description": snippet["description"], | |
| "thumbnail": snippet["thumbnails"]["high"]["url"], | |
| "channel_title": snippet["channelTitle"], | |
| "published_at": snippet["publishedAt"] | |
| }) | |
| return {"content": results} | |
| except HttpError as e: | |
| return {"error": f"YouTube API error: {str(e)}"} | |
| except Exception as e: | |
| return {"error": f"Unexpected error: {str(e)}"} | |
| async def get_video_info(request: VideoInfoRequest): | |
| try: | |
| # Extract video ID from URL if it's a URL | |
| video_id = extract_video_id(request.video_id) | |
| youtube = get_youtube_client() | |
| video_response = youtube.videos().list( | |
| part="snippet,contentDetails,statistics", | |
| id=video_id | |
| ).execute() | |
| if not video_response.get("items"): | |
| return {"error": "Video not found"} | |
| video = video_response["items"][0] | |
| snippet = video["snippet"] | |
| content_details = video["contentDetails"] | |
| statistics = video["statistics"] | |
| return {"content": { | |
| "video_id": video_id, | |
| "title": snippet["title"], | |
| "description": snippet["description"], | |
| "channel_title": snippet["channelTitle"], | |
| "published_at": snippet["publishedAt"], | |
| "duration": content_details["duration"], | |
| "view_count": statistics.get("viewCount", "0"), | |
| "like_count": statistics.get("likeCount", "0"), | |
| "comment_count": statistics.get("commentCount", "0"), | |
| "tags": snippet.get("tags", []) | |
| }} | |
| except HttpError as e: | |
| return {"error": f"YouTube API error: {str(e)}"} | |
| except Exception as e: | |
| return {"error": f"Unexpected error: {str(e)}"} | |
| async def get_transcript(request: TranscriptRequest): | |
| try: | |
| # Extract video ID if URL is provided | |
| video_id = extract_video_id(request.video_id) | |
| # Normalize language code (ua -> uk) | |
| normalized_language = normalize_language_code(request.language_code) | |
| # Get list of available languages for the video | |
| try: | |
| available_languages = [] | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| for transcript in transcript_list: | |
| available_languages.append({ | |
| "language": transcript.language, | |
| "language_code": transcript.language_code, | |
| "is_generated": transcript.is_generated, | |
| "is_translatable": transcript.is_translatable | |
| }) | |
| except Exception as e: | |
| print(f"Error getting language list: {str(e)}") | |
| return {"error": f"Video not found or no transcripts available: {str(e)}"} | |
| print(f"Available languages for video {video_id}: {[lang['language_code'] for lang in available_languages]}") | |
| # Try to get transcript in requested language | |
| final_language = None | |
| transcript_list = None | |
| if normalized_language: | |
| try: | |
| print(f"Trying to get transcript in language: {normalized_language}") | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[normalized_language]) | |
| print(f"Successfully obtained transcript in language: {normalized_language}") | |
| final_language = normalized_language | |
| except Exception as e: | |
| print(f"Failed to get transcript in language {normalized_language}: {str(e)}") | |
| # If specific language failed or not requested, try first available | |
| if transcript_list is None and available_languages: | |
| try: | |
| first_language = available_languages[0]['language_code'] | |
| print(f"Trying to use first available language: {first_language}") | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[first_language]) | |
| print(f"Successfully obtained transcript in language: {first_language}") | |
| final_language = first_language | |
| except Exception as e: | |
| print(f"Failed to get transcript in language {first_language}: {str(e)}") | |
| return {"error": f"Failed to get transcript in any available language: {str(e)}"} | |
| if not transcript_list: | |
| return {"error": "Transcript for this video is unavailable"} | |
| formatted_transcript = [] | |
| for entry in transcript_list: | |
| formatted_transcript.append({ | |
| "text": entry.get("text", ""), | |
| "start": entry.get("start", 0), | |
| "duration": entry.get("duration", 0) | |
| }) | |
| response = {"content": formatted_transcript} | |
| if final_language: | |
| response["used_language"] = final_language | |
| return response | |
| except Exception as e: | |
| return {"error": f"Error getting transcript: {str(e)}"} | |
| async def get_transcript_languages(request: TranscriptLanguagesRequest): | |
| try: | |
| # Extract video ID from URL if it's a URL | |
| video_id = extract_video_id(request.video_id) | |
| try: | |
| print(f"Getting language list for ID: {video_id}") | |
| transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) | |
| languages = [] | |
| for transcript in transcript_list: | |
| languages.append({ | |
| "language_code": transcript.language_code, | |
| "language": transcript.language, | |
| "is_generated": transcript.is_generated | |
| }) | |
| return {"content": languages} | |
| except Exception as transcript_error: | |
| return {"error": f"Failed to get language list. Details: {str(transcript_error)}"} | |
| except Exception as e: | |
| return {"error": f"Error getting language list: {str(e)}"} | |
| # MCP endpoints | |
| async def mcp_endpoint(request: MCPRequestData): | |
| try: | |
| youtube = get_youtube_client() | |
| if request.action == "search": | |
| search_req = MCPQueryRequest(**request.parameters) | |
| result = await process_mcp_search(youtube, search_req) | |
| return result | |
| elif request.action == "video_info": | |
| video_req = MCPVideoRequest(**request.parameters) | |
| result = await process_mcp_video_info(youtube, video_req) | |
| return result | |
| elif request.action == "transcript": | |
| transcript_req = MCPTranscriptRequest(**request.parameters) | |
| result = await process_mcp_transcript(transcript_req) | |
| return result | |
| elif request.action == "timecodes": | |
| timecode_req = MCPTimecodeRequest(**request.parameters) | |
| result = await process_mcp_timecodes(youtube, timecode_req) | |
| return result | |
| elif request.action == "gemini_timecodes": | |
| gemini_req = MCPGeminiRequest(**request.parameters) | |
| result = await process_mcp_gemini_timecodes(youtube, gemini_req) | |
| return result | |
| else: | |
| return create_error_response(f"Unknown action: {request.action}") | |
| except Exception as e: | |
| return create_error_response(f"Error processing request: {str(e)}") | |
| # Route for health check | |
| async def health_check(): | |
| return {"status": "ok"} | |
| # Information route, describing API capabilities | |
| async def root(): | |
| return { | |
| "name": "YouTube MCP API", | |
| "version": "0.1.0", | |
| "description": "Model Context Protocol (MCP) server for interacting with YouTube API", | |
| "endpoints": { | |
| "standard": [ | |
| "/api/search - Search videos on YouTube", | |
| "/api/video_info - Get video information", | |
| "/api/transcript - Get video transcript" | |
| ], | |
| "mcp": [ | |
| "/api/mcp - Model Context Protocol endpoint" | |
| ] | |
| }, | |
| "actions": { | |
| "search": "Search videos on YouTube", | |
| "video_info": "Get video information", | |
| "transcript": "Get video transcript" | |
| } | |
| } | |
| async def generate_timecodes(request: TimecodeRequest): | |
| try: | |
| # Extract video ID from URL if it's a URL | |
| video_id = extract_video_id(request.video_id) | |
| print(f"Generating timecodes for ID: {video_id}") | |
| # Try to get list of available languages | |
| available_languages = [] | |
| try: | |
| transcript_list_obj = YouTubeTranscriptApi.list_transcripts(video_id) | |
| for transcript in transcript_list_obj: | |
| available_languages.append({ | |
| "language_code": transcript.language_code, | |
| "language": transcript.language, | |
| "is_generated": transcript.is_generated | |
| }) | |
| print(f"Available languages for video {video_id}: {[lang['language_code'] for lang in available_languages]}") | |
| except Exception as e: | |
| print(f"Failed to get language list: {str(e)}") | |
| # Get transcript | |
| transcript_list = None | |
| used_language = None | |
| # If language is specified, try to use it | |
| if request.language_code: | |
| try: | |
| print(f"Trying to get transcript in language: {request.language_code}") | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[request.language_code]) | |
| used_language = request.language_code | |
| print(f"Successfully obtained transcript in language: {request.language_code}") | |
| except Exception as e: | |
| print(f"Failed to get transcript in language {request.language_code}: {str(e)}") | |
| # If transcript not obtained and there are available languages, use first available | |
| if not transcript_list and available_languages: | |
| try: | |
| first_language = available_languages[0]["language_code"] | |
| print(f"Trying to use first available language: {first_language}") | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[first_language]) | |
| used_language = first_language | |
| print(f"Successfully obtained transcript in language: {first_language}") | |
| except Exception as e: | |
| print(f"Failed to get transcript in language {first_language}: {str(e)}") | |
| # If still no transcript, try to get on any language | |
| if not transcript_list: | |
| try: | |
| print("Trying to get transcript in any available language") | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| print("Transcript successfully obtained") | |
| except Exception as e: | |
| return {"error": f"Transcript not found. Details: {str(e)}"} | |
| if not transcript_list: | |
| return {"error": "Transcript for this video is unavailable"} | |
| # Group transcript by segments | |
| segments = [] | |
| current_segment = { | |
| "start": transcript_list[0]["start"], | |
| "end": 0, | |
| "text": [] | |
| } | |
| segment_length = request.segment_length | |
| for entry in transcript_list: | |
| start_time = entry["start"] | |
| # If current segment is empty or entry is within segment length | |
| if not current_segment["text"] or (start_time - current_segment["start"]) <= segment_length: | |
| current_segment["text"].append(entry["text"]) | |
| current_segment["end"] = start_time + entry["duration"] | |
| else: | |
| # Close current segment and start new | |
| segments.append(dict(current_segment)) | |
| current_segment = { | |
| "start": start_time, | |
| "end": start_time + entry["duration"], | |
| "text": [entry["text"]] | |
| } | |
| # Add last segment | |
| if current_segment["text"]: | |
| segments.append(current_segment) | |
| # Format timecodes according to selected format | |
| format_type = request.format.lower() | |
| timecodes = [] | |
| for segment in segments: | |
| start_formatted = format_timestamp(segment["start"]) | |
| # Summary text of segment (first 100 characters) | |
| text_summary = " ".join(segment["text"]) | |
| if len(text_summary) > 100: | |
| text_summary = text_summary[:97] + "..." | |
| if format_type == "youtube": | |
| # Format for YouTube (for embedding in description) | |
| timecodes.append(f"{start_formatted} {text_summary}") | |
| elif format_type == "markdown": | |
| # Format for Markdown | |
| youtube_link = f"https://www.youtube.com/watch?v={video_id}&t={int(segment['start'])}" | |
| timecodes.append(f"- [{start_formatted}]({youtube_link}) {text_summary}") | |
| # Return timecodes and additional information | |
| response = { | |
| "content": { | |
| "video_id": video_id, | |
| "timecodes": timecodes, | |
| "format": format_type, | |
| "segment_length": segment_length, | |
| "total_segments": len(segments) | |
| } | |
| } | |
| if used_language: | |
| response["content"]["used_language"] = used_language | |
| return response | |
| except Exception as e: | |
| return {"error": f"Error generating timecodes: {str(e)}"} | |
| async def generate_gemini_timecodes(request: GeminiTimecodeRequest): | |
| try: | |
| # Extract video ID if URL is provided | |
| video_id = extract_video_id(request.video_id) | |
| print(f"Generating Gemini timecodes for ID: {video_id}") | |
| # Normalize language code (ua -> uk) | |
| normalized_language = normalize_language_code(request.language_code) | |
| # Get list of available languages for the video | |
| try: | |
| available_languages = [] | |
| transcript_list_obj = YouTubeTranscriptApi.list_transcripts(video_id) | |
| for transcript in transcript_list_obj: | |
| available_languages.append({ | |
| "language": transcript.language, | |
| "language_code": transcript.language_code, | |
| "is_generated": transcript.is_generated, | |
| "is_translatable": transcript.is_translatable | |
| }) | |
| except Exception as e: | |
| print(f"Error getting language list: {str(e)}") | |
| return {"error": f"Video not found or no transcripts available: {str(e)}"} | |
| print(f"Available languages for video {video_id}: {[lang['language_code'] for lang in available_languages]}") | |
| # Try to get transcript in requested language | |
| transcript_list = None | |
| used_language = None | |
| if normalized_language: | |
| try: | |
| print(f"Trying to get transcript in language: {normalized_language}") | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[normalized_language]) | |
| used_language = normalized_language | |
| print(f"Successfully obtained transcript in language: {normalized_language}") | |
| except Exception as e: | |
| print(f"Failed to get transcript in language {normalized_language}: {str(e)}") | |
| # If specific language failed or not requested, try first available | |
| if transcript_list is None and available_languages: | |
| try: | |
| first_language = available_languages[0]["language_code"] | |
| print(f"Trying to use first available language: {first_language}") | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[first_language]) | |
| used_language = first_language | |
| print(f"Successfully obtained transcript in language: {first_language}") | |
| except Exception as e: | |
| print(f"Failed to get transcript in language {first_language}: {str(e)}") | |
| return {"error": f"Failed to get transcript in any available language: {str(e)}"} | |
| if not transcript_list: | |
| return {"error": "Transcript for this video is unavailable"} | |
| # Get video information for video title | |
| youtube = get_youtube_client() | |
| video_title = "YouTube Video" | |
| try: | |
| video_response = youtube.videos().list( | |
| part="snippet", | |
| id=video_id | |
| ).execute() | |
| if video_response.get("items"): | |
| video_title = video_response["items"][0]["snippet"]["title"] | |
| except Exception as e: | |
| print(f"Failed to get video information: {str(e)}") | |
| # Send request to Gemini with language specified | |
| result = await generate_timecodes_with_gemini( | |
| transcript_entries=transcript_list, | |
| video_title=video_title, | |
| format_type=request.format, | |
| model_name=request.model, | |
| language=used_language | |
| ) | |
| if "error" in result: | |
| return {"error": result["error"]} | |
| # Add transcript language information | |
| if used_language: | |
| result["used_language"] = used_language | |
| return {"content": result} | |
| except Exception as e: | |
| return {"error": f"Error generating timecodes with Gemini: {str(e)}"} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="127.0.0.1", port=8080) |