@woai
π§Ή Major code cleanup and internationalization - Remove Russian comments/strings, translate UI to English, clean linter errors, remove hardcoded tokens, delete test files. Ready for production deployment
e775565
| import asyncio | |
| import logging | |
| from typing import Optional | |
| import aiohttp | |
| from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup | |
| from telegram.ext import Application, CommandHandler, MessageHandler, CallbackQueryHandler, filters, ContextTypes | |
| from telegram.constants import ParseMode | |
| import os | |
| from dotenv import load_dotenv | |
| import warnings | |
| # Ignore standard warnings | |
| warnings.filterwarnings("ignore", message="SSL shutdown timed out") | |
| warnings.filterwarnings("ignore", message="Certificate verification failed") | |
| warnings.filterwarnings("ignore", message="SSL handshake failed") | |
| warnings.filterwarnings("ignore", message="Connection lost") | |
| # Disable SSL error logging from all possible sources | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore").setLevel(logging.WARNING) | |
| logging.getLogger("httpcore.connection").setLevel(logging.ERROR) | |
| logging.getLogger("httpcore.http11").setLevel(logging.ERROR) | |
| logging.getLogger("asyncio").setLevel(logging.WARNING) | |
| # Create custom filter to suppress SSL errors | |
| class SSLErrorFilter(logging.Filter): | |
| def filter(self, record): | |
| message = record.getMessage() | |
| return not any(phrase in message.lower() for phrase in [ | |
| 'ssl shutdown timed out', | |
| 'connection lost', | |
| 'ssl handshake failed', | |
| 'certificate verification failed' | |
| ]) | |
| # Apply filter to root logger | |
| logging.getLogger().addFilter(SSLErrorFilter()) | |
| # Load environment variables | |
| load_dotenv() | |
| # Configuration | |
| TELEGRAM_TOKEN = os.getenv("TELEGRAM_TOKEN") | |
| if not TELEGRAM_TOKEN: | |
| raise ValueError("TELEGRAM_TOKEN environment variable is required") | |
| # Support both variable name variants | |
| MCP_BASE_URL = os.getenv("MCP_BASE_URL", os.getenv("MCP_BASE_URL", "https://youtube-bot.tuttech.net/api/mcp")) | |
| # Set up logging | |
| logging.basicConfig( | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| level=logging.INFO | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def escape_markdown(text: str) -> str: | |
| """Escape markdown special characters.""" | |
| escape_chars = ['_', '*', '[', ']', '(', ')', '~', '`', '>', '#', '+', '-', '=', '|', '{', '}', '.', '!'] | |
| for char in escape_chars: | |
| text = text.replace(char, '\\' + char) | |
| return text | |
| class TubeMetaBot: | |
| def __init__(self): | |
| self.app = Application.builder().token(TELEGRAM_TOKEN).build() | |
| self.setup_handlers() | |
| def setup_handlers(self): | |
| """Set up command and message handlers""" | |
| self.app.add_handler(CommandHandler("start", self.start_command)) | |
| self.app.add_handler(CommandHandler("help", self.help_command)) | |
| self.app.add_handler(CommandHandler("search", self.search_command)) | |
| self.app.add_handler(CommandHandler("analyze", self.analyze_command)) | |
| self.app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)) | |
| self.app.add_handler(CallbackQueryHandler(self.handle_callback_query)) | |
| async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /start command""" | |
| welcome_text = """ | |
| π¬ **Welcome to TubeMeta Bot!** | |
| I can help you with YouTube videos: | |
| β’ π Search for videos | |
| β’ π Get video metadata | |
| β’ π Extract transcripts | |
| β’ β° Generate AI timecodes with Gemini 2.0 | |
| **How to use:** | |
| β’ Send me a YouTube URL for full analysis | |
| β’ Use `/search <query>` to find videos | |
| β’ Send any text to search YouTube | |
| Type `/help` for more information! | |
| """ | |
| await update.message.reply_text(welcome_text, parse_mode=ParseMode.MARKDOWN) | |
| async def help_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /help command""" | |
| help_text = """ | |
| π€ **TubeMeta Bot Help** | |
| **Commands:** | |
| β’ `/start` - Welcome message | |
| β’ `/help` - Show this help | |
| β’ `/search <query>` - Search YouTube videos | |
| β’ `/analyze` - Analyze YouTube video (send after this command) | |
| **Features:** | |
| β’ π **Video Search** - Find YouTube videos by keywords | |
| β’ π **Video Analysis** - Get detailed metadata (title, duration, views, etc.) | |
| β’ π **Transcripts** - Extract video transcripts/subtitles | |
| β’ β° **AI Timecodes** - Generate smart timecodes with Gemini 2.0 | |
| **Usage Examples:** | |
| β’ Search: `/search machine learning tutorial` | |
| β’ Analysis: `/analyze` then send YouTube URL | |
| β’ Or just send: `python programming` for search | |
| **Supported Languages:** | |
| πΊπ¦ Ukrainian | π·πΊ Russian | π¬π§ English | |
| Powered by Gemini 2.0 AI π§ | |
| """ | |
| await update.message.reply_text(help_text, parse_mode=ParseMode.MARKDOWN) | |
| async def search_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /search command""" | |
| if not context.args: | |
| await update.message.reply_text("Please provide a search query. Example: `/search python tutorial`") | |
| return | |
| query = " ".join(context.args) | |
| await self.handle_search(update, query) | |
| async def analyze_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle /analyze command""" | |
| if not update.message: | |
| return # Skip if no message (shouldn't happen in command handlers) | |
| if context.args: | |
| # URL provided with command | |
| url = " ".join(context.args) | |
| if self.is_youtube_url(url): | |
| await self.handle_youtube_url(update, url) | |
| else: | |
| await update.message.reply_text("β Please provide a valid YouTube URL. Example: `/analyze https://youtu.be/dQw4w9WgXcQ`") | |
| else: | |
| # Ask for URL | |
| await update.message.reply_text("πΊ Please send me a YouTube URL to analyze.\n\nExample: `https://youtu.be/dQw4w9WgXcQ`", parse_mode=ParseMode.MARKDOWN) | |
| async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle regular text messages""" | |
| text = update.message.text.strip() | |
| # Check if it's a YouTube URL | |
| if self.is_youtube_url(text): | |
| await update.message.reply_text( | |
| "πΊ I see you sent a YouTube URL! Use the `/analyze` command to analyze it.\n\n" | |
| "Example: `/analyze https://youtu.be/dQw4w9WgXcQ`\n" | |
| "Or just type `/analyze` and then send the URL.", | |
| parse_mode=ParseMode.MARKDOWN | |
| ) | |
| else: | |
| # Treat as search query | |
| await self.handle_search(update, text) | |
| def is_youtube_url(self, text: str) -> bool: | |
| """Check if text contains a YouTube URL""" | |
| youtube_domains = [ | |
| 'youtube.com', 'youtu.be', 'www.youtube.com', | |
| 'm.youtube.com', 'music.youtube.com' | |
| ] | |
| return any(domain in text.lower() for domain in youtube_domains) | |
| async def handle_youtube_url(self, update: Update, url: str): | |
| """Handle YouTube URL - provide full analysis options""" | |
| # Send initial message | |
| processing_msg = await update.message.reply_text("π Analyzing YouTube video...") | |
| try: | |
| # Get basic video info first | |
| video_info_response = await self.call_mcp_action("video_info", {"video_id": url}) | |
| # Check if we got a valid response | |
| if not video_info_response: | |
| await processing_msg.edit_text("β Could not analyze this YouTube video. Please check the URL.") | |
| return | |
| # Check for error in response | |
| if video_info_response.get("error"): | |
| await processing_msg.edit_text(f"β Error: {video_info_response['error']}") | |
| return | |
| # Check if we have video data | |
| video_data = video_info_response.get("data") | |
| if not video_data: | |
| await processing_msg.edit_text("β Could not retrieve video information. Please check the URL.") | |
| return | |
| # Format video info for display | |
| info_text = self.format_video_info_from_data(video_data) | |
| # Create action buttons | |
| video_id = video_data.get("video_id", url) | |
| # Limit callback data to avoid Button_data_invalid error (Telegram limit is 64 bytes) | |
| safe_video_id = video_id[:30] if video_id else url[:30] # Limit video ID | |
| video_title = video_data.get('title', 'related videos') | |
| # Truncate title for search callback to fit in 64 byte limit | |
| safe_title = video_title[:30] if len(video_title) > 30 else video_title | |
| keyboard = [ | |
| [ | |
| InlineKeyboardButton("π Get Transcript", callback_data=f"transcript:{safe_video_id}"), | |
| InlineKeyboardButton("β° AI Timecodes", callback_data=f"timecodes:{safe_video_id}") | |
| ], | |
| [ | |
| InlineKeyboardButton("π Search Similar", callback_data=f"search:{safe_title}") | |
| ] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await processing_msg.edit_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
| except Exception as e: | |
| logger.error(f"Error handling YouTube URL: {e}") | |
| await processing_msg.edit_text("β An error occurred while analyzing the video.") | |
| def format_video_info_from_data(self, video_data: dict) -> str: | |
| """Format video information from MCP response data for display using HTML""" | |
| title = video_data.get("title", "Unknown Title") | |
| channel = video_data.get("channel_title", "Unknown Channel") | |
| duration = video_data.get("duration", "Unknown") | |
| view_count = video_data.get("view_count", "Unknown") | |
| upload_date = video_data.get("published_at", "Unknown") | |
| like_count = video_data.get("like_count", "Unknown") | |
| comment_count = video_data.get("comment_count", "Unknown") | |
| info_text = f"""π¬ <b>{title}</b> | |
| π€ <b>Channel:</b> {channel} | |
| β±οΈ <b>Duration:</b> {duration} | |
| ποΈ <b>Views:</b> {view_count} | |
| π <b>Likes:</b> {like_count} | |
| π¬ <b>Comments:</b> {comment_count} | |
| π <b>Uploaded:</b> {upload_date} | |
| Choose an action below:""" | |
| return info_text | |
| async def handle_search(self, update: Update, query: str): | |
| """Handle search query""" | |
| processing_msg = await update.message.reply_text(f"π Searching for: <b>{query}</b>", parse_mode=ParseMode.HTML) | |
| try: | |
| results = await self.call_mcp_action("search", {"query": query, "max_results": 5}) | |
| if not results or (isinstance(results, dict) and "error" in results): | |
| await processing_msg.edit_text("β No results found for your search.") | |
| return | |
| # Format search results | |
| search_text = f"π <b>Search Results for:</b> {query}\n\n" | |
| # Handle the case where results is a list (new format) | |
| if isinstance(results, list): | |
| videos = results | |
| else: | |
| # Fallback for old format | |
| videos = results.get("videos", []) | |
| for i, video_obj in enumerate(videos, 1): | |
| # Extract video data from the response object | |
| if isinstance(video_obj, dict) and "data" in video_obj: | |
| video = video_obj["data"] | |
| else: | |
| video = video_obj | |
| # Build video info (HTML auto-escapes dangerous chars) | |
| title = video.get('title', 'Unknown Title') | |
| channel = video.get('channel_title', video.get('channel', 'Unknown Channel')) | |
| duration = str(video.get('duration', 'Unknown')) | |
| view_count = str(video.get('view_count', 'Unknown')) | |
| video_id = video.get('video_id', '') | |
| search_text += f"<b>{i}. {title}</b>\n" | |
| search_text += f"π€ {channel}\n" | |
| if duration != 'Unknown': | |
| search_text += f"β±οΈ {duration}\n" | |
| if view_count != 'Unknown': | |
| search_text += f"ποΈ {view_count} views\n" | |
| search_text += f"π https://www.youtube.com/watch?v={video_id}\n\n" | |
| # Add search refinement buttons | |
| keyboard = [ | |
| [InlineKeyboardButton("π New Search", callback_data="new_search")] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await processing_msg.edit_text(search_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
| except Exception as e: | |
| logger.error(f"Error handling search: {e}") | |
| await processing_msg.edit_text("β An error occurred during search.") | |
| async def handle_callback_query(self, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| """Handle inline keyboard button presses""" | |
| query = update.callback_query | |
| await query.answer() | |
| data = query.data | |
| logger.info(f"Callback query: {data}") | |
| if data.startswith("transcript:"): | |
| url = data.replace("transcript:", "") | |
| await self.get_transcript(query, url) | |
| elif data.startswith("timecodes:"): | |
| url = data.replace("timecodes:", "") | |
| await self.get_timecodes(query, url) | |
| elif data.startswith("search:"): | |
| search_query = data.replace("search:", "") | |
| await self.handle_search_callback(query, search_query) | |
| elif data.startswith("back:"): | |
| url = data.replace("back:", "") | |
| await self.handle_back_to_video(query, url) | |
| elif data.startswith("full_transcript:"): | |
| url = data.replace("full_transcript:", "") | |
| await self.send_full_transcript(query, url) | |
| elif data.startswith("full_timecodes:"): | |
| url = data.replace("full_timecodes:", "") | |
| await self.send_full_timecodes(query, url) | |
| elif data.startswith("analyze:"): | |
| video_id = data.replace("analyze:", "") | |
| await self.analyze_video(query, f"https://www.youtube.com/watch?v={video_id}") | |
| elif data.startswith("back_to_analysis:"): | |
| url = data.replace("back_to_analysis:", "") | |
| await self.handle_back_to_video(query, url) | |
| elif data == "new_search": | |
| await query.edit_message_text( | |
| "π **Send me a new search query!**\n\nJust type your search terms and I'll find YouTube videos for you.", | |
| parse_mode=ParseMode.MARKDOWN | |
| ) | |
| else: | |
| await query.edit_message_text("β Unknown action") | |
| async def get_transcript(self, query, url: str): | |
| """Get video transcript""" | |
| await query.edit_message_text("π Extracting transcript...") | |
| try: | |
| transcript_response = await self.call_mcp_action("transcript", {"video_id": url}) | |
| # Check if we got a valid response | |
| if not transcript_response: | |
| await query.edit_message_text("β Could not extract transcript. Please try again later.") | |
| return | |
| # Check for error in response | |
| if transcript_response.get("error"): | |
| await query.edit_message_text(f"β {transcript_response['error']}") | |
| return | |
| # Check if we have transcript data | |
| if transcript_response.get("type") not in ["youtube_transcript"]: | |
| await query.edit_message_text("β Invalid transcript response format.") | |
| return | |
| # Get the markdown formatted transcript | |
| transcript_text = transcript_response.get("markdown", "") | |
| if not transcript_text: | |
| await query.edit_message_text("β Transcript is empty or unavailable.") | |
| return | |
| # Handle long transcripts more intelligently | |
| max_length = 4000 # Leave room for buttons and formatting | |
| if len(transcript_text) > max_length: | |
| # Create a summary message with first part | |
| summary_text = "π **Transcript Preview** (showing first {} characters)\n\n".format(max_length) | |
| summary_text += transcript_text[:max_length-200] + "...\n\n" | |
| summary_text += f"<i>π Full transcript: {len(transcript_text)} characters total</i>\n" | |
| summary_text += "<i>β οΈ Transcript is too long for Telegram. Showing preview only.</i>" | |
| # Add back button and full transcript button | |
| keyboard = [ | |
| [ | |
| InlineKeyboardButton("π Get Full Text", callback_data=f"full_transcript:{url}"), | |
| InlineKeyboardButton("β¬ οΈ Back", callback_data=f"back:{url}") | |
| ] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await query.edit_message_text(summary_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
| else: | |
| # Short enough to display fully | |
| # Add back button | |
| keyboard = [[InlineKeyboardButton("β¬ οΈ Back", callback_data=f"back:{url}")]] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await query.edit_message_text(transcript_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
| except Exception as e: | |
| logger.error(f"Error getting transcript: {e}") | |
| await query.edit_message_text("β An error occurred while extracting transcript.") | |
| async def get_timecodes(self, query, url: str): | |
| """Generate AI timecodes""" | |
| await query.edit_message_text("β° Generating AI timecodes with Gemini 2.0...") | |
| try: | |
| timecodes_response = await self.call_mcp_action("gemini_timecodes", { | |
| "video_id": url, | |
| "format": "youtube" | |
| }) | |
| # Check if we got a valid response | |
| if not timecodes_response: | |
| await query.edit_message_text("β Could not generate timecodes. Please try again later.") | |
| return | |
| # Check for error in response | |
| if timecodes_response.get("error"): | |
| await query.edit_message_text(f"β {timecodes_response['error']}") | |
| return | |
| # Check if we have timecodes data | |
| if timecodes_response.get("type") not in ["youtube_gemini_timecodes"]: | |
| await query.edit_message_text("β Invalid timecodes response format.") | |
| return | |
| # Get the markdown formatted timecodes | |
| timecodes_text = timecodes_response.get("markdown", "") | |
| if not timecodes_text: | |
| await query.edit_message_text("β No timecodes were generated.") | |
| return | |
| # Handle long timecodes more intelligently | |
| max_length = 4000 # Leave room for buttons and formatting | |
| if len(timecodes_text) > max_length: | |
| # Create a summary message with preview | |
| data = timecodes_response.get("data", {}) | |
| timecodes_list = data.get("timecodes", []) | |
| detected_language = data.get("detected_language", "unknown") | |
| summary_text = "β° **AI Timecodes Generated**\n\n" | |
| summary_text += f"π€ **Model:** {data.get('model', 'Gemini AI')}\n" | |
| summary_text += f"π **Language:** {detected_language}\n" | |
| summary_text += f"π **Total timecodes:** {len(timecodes_list)}\n\n" | |
| # Calculate how many timecodes we can show | |
| available_space = max_length - len(summary_text) - 300 # Reserve space for buttons and footer | |
| # Show as many timecodes as possible within space limit | |
| preview_text = "<b>Timecodes Preview:</b>\n<pre>" | |
| current_length = 0 | |
| shown_count = 0 | |
| for tc in timecodes_list: | |
| tc_line = f"{tc}\n" | |
| if current_length + len(tc_line) < available_space: | |
| preview_text += tc_line | |
| current_length += len(tc_line) | |
| shown_count += 1 | |
| else: | |
| break | |
| preview_text += "</pre>\n\n" | |
| if shown_count < len(timecodes_list): | |
| summary_text += preview_text | |
| summary_text += f"<i>π Showing {shown_count} of {len(timecodes_list)} timecodes</i>\n" | |
| summary_text += "<i>πΎ Download full file for complete list</i>" | |
| else: | |
| # All timecodes fit, show them directly | |
| summary_text = timecodes_text | |
| # Add buttons for full timecodes and back | |
| keyboard = [ | |
| [ | |
| InlineKeyboardButton("π Get Full List", callback_data=f"full_timecodes:{url}"), | |
| InlineKeyboardButton("β¬ οΈ Back", callback_data=f"back:{url}") | |
| ] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await query.edit_message_text(summary_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
| else: | |
| # Short enough to display fully | |
| # Add back button | |
| keyboard = [[InlineKeyboardButton("β¬ οΈ Back", callback_data=f"back:{url}")]] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| # Convert markdown to HTML for proper code block rendering | |
| html_timecodes = self.convert_markdown_to_html(timecodes_text) | |
| await query.edit_message_text(html_timecodes, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
| except Exception as e: | |
| logger.error(f"Error generating timecodes: {e}") | |
| await query.edit_message_text("β An error occurred while generating timecodes.") | |
| async def send_full_transcript(self, query, url: str): | |
| """Send full transcript as a text file""" | |
| await query.edit_message_text("π Preparing full transcript file...") | |
| try: | |
| transcript_response = await self.call_mcp_action("transcript", {"video_id": url}) | |
| if not transcript_response or transcript_response.get("error"): | |
| await query.edit_message_text("β Could not extract full transcript.") | |
| return | |
| # Get full transcript text | |
| full_transcript = transcript_response.get("markdown", "") | |
| if not full_transcript: | |
| await query.edit_message_text("β Transcript is empty.") | |
| return | |
| # Create a simple text version (without markdown formatting) | |
| simple_text = full_transcript.replace("# π Transcript\n\n", "") | |
| simple_text = simple_text.replace("**[", "[").replace("]**", "]") | |
| # Send as document | |
| from io import BytesIO | |
| import re | |
| # Extract video title from URL for filename | |
| video_info = await self.call_mcp_action("video_info", {"video_id": url}) | |
| title = "transcript" | |
| if video_info and video_info.get("data"): | |
| title = video_info["data"].get("title", "transcript") | |
| # Clean title for filename | |
| title = re.sub(r'[<>:"/\\|?*]', '_', title)[:50] | |
| # Create file | |
| transcript_file = BytesIO(simple_text.encode('utf-8')) | |
| transcript_file.name = f"{title}_transcript.txt" | |
| # Send file and create a new message instead of editing | |
| await query.message.reply_document( | |
| document=transcript_file, | |
| caption=f"π Full transcript for: {title}", | |
| reply_markup=InlineKeyboardMarkup([[ | |
| InlineKeyboardButton("πΉ Back to Video", callback_data=f"back_to_analysis:{url}") | |
| ]]) | |
| ) | |
| # Edit original message to show completion | |
| await query.edit_message_text( | |
| "β Transcript file sent!\n\nπ Check the file above for the complete transcript.", | |
| reply_markup=InlineKeyboardMarkup([[ | |
| InlineKeyboardButton("πΉ Back to Video", callback_data=f"back_to_analysis:{url}") | |
| ]]) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error sending full transcript: {e}") | |
| await query.edit_message_text("β An error occurred while preparing transcript file.") | |
| async def send_full_timecodes(self, query, url: str): | |
| """Send full timecodes as a text file""" | |
| await query.edit_message_text("π Preparing full timecodes file...") | |
| try: | |
| timecodes_response = await self.call_mcp_action("gemini_timecodes", { | |
| "video_id": url, | |
| "format": "youtube" | |
| }) | |
| if not timecodes_response or timecodes_response.get("error"): | |
| await query.edit_message_text("β Could not generate full timecodes.") | |
| return | |
| # Get full timecodes | |
| data = timecodes_response.get("data", {}) | |
| timecodes_list = data.get("timecodes", []) | |
| if not timecodes_list: | |
| await query.edit_message_text("β No timecodes available.") | |
| return | |
| # Create text content | |
| content = "AI Generated Timecodes\n" | |
| content += f"Model: {data.get('model', 'Gemini AI')}\n" | |
| content += f"Language: {data.get('detected_language', 'auto-detected')}\n" | |
| content += f"Total: {len(timecodes_list)} timecodes\n\n" | |
| content += "\n".join(timecodes_list) | |
| # Send as document | |
| from io import BytesIO | |
| import re | |
| # Extract video title for filename | |
| video_info = await self.call_mcp_action("video_info", {"video_id": url}) | |
| title = "timecodes" | |
| if video_info and video_info.get("data"): | |
| title = video_info["data"].get("title", "timecodes") | |
| # Clean title for filename | |
| title = re.sub(r'[<>:"/\\|?*]', '_', title)[:50] | |
| # Create file | |
| timecodes_file = BytesIO(content.encode('utf-8')) | |
| timecodes_file.name = f"{title}_timecodes.txt" | |
| # Send file and create a new message instead of editing | |
| await query.message.reply_document( | |
| document=timecodes_file, | |
| caption=f"β° AI Timecodes for: {title}", | |
| reply_markup=InlineKeyboardMarkup([[ | |
| InlineKeyboardButton("πΉ Back to Video", callback_data=f"back_to_analysis:{url}") | |
| ]]) | |
| ) | |
| # Edit original message to show completion | |
| await query.edit_message_text( | |
| "β Timecodes file sent!\n\nπ Check the file above for all timecodes.", | |
| reply_markup=InlineKeyboardMarkup([[ | |
| InlineKeyboardButton("πΉ Back to Video", callback_data=f"back_to_analysis:{url}") | |
| ]]) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error sending full timecodes: {e}") | |
| await query.edit_message_text("β An error occurred while preparing timecodes file.") | |
| async def handle_back_to_video(self, query, url: str): | |
| """Return to video analysis view""" | |
| try: | |
| # Check if the current message has text that can be edited | |
| current_message = query.message | |
| if not current_message or not current_message.text: | |
| # If no text to edit, send a new message instead of editing | |
| await query.answer("π Loading video information...") | |
| # Get basic video info | |
| video_info_response = await self.call_mcp_action("video_info", {"video_id": url}) | |
| if not video_info_response or video_info_response.get("error"): | |
| await current_message.reply_text("β Could not analyze this YouTube video. Please check the URL.") | |
| return | |
| video_data = video_info_response.get("data") | |
| if not video_data: | |
| await current_message.reply_text("β Could not retrieve video information. Please check the URL.") | |
| return | |
| # Format video info for display | |
| info_text = self.format_video_info_from_data(video_data) | |
| # Create action buttons | |
| video_id = video_data.get("video_id", url) | |
| safe_video_id = video_id[:30] if video_id else url[:30] | |
| video_title = video_data.get('title', 'related videos') | |
| safe_title = video_title[:30] if len(video_title) > 30 else video_title | |
| keyboard = [ | |
| [ | |
| InlineKeyboardButton("π Get Transcript", callback_data=f"transcript:{safe_video_id}"), | |
| InlineKeyboardButton("β° AI Timecodes", callback_data=f"timecodes:{safe_video_id}") | |
| ], | |
| [ | |
| InlineKeyboardButton("π Search Similar", callback_data=f"search:{safe_title}") | |
| ] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await current_message.reply_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
| return | |
| # If message has text, proceed with normal editing | |
| await query.edit_message_text("π Loading video information...") | |
| # Re-analyze the video by calling handle_youtube_url logic | |
| # Get basic video info first | |
| video_info_response = await self.call_mcp_action("video_info", {"video_id": url}) | |
| # Check if we got a valid response | |
| if not video_info_response: | |
| await query.edit_message_text("β Could not analyze this YouTube video. Please check the URL.") | |
| return | |
| # Check for error in response | |
| if video_info_response.get("error"): | |
| await query.edit_message_text(f"β Error: {video_info_response['error']}") | |
| return | |
| # Check if we have video data | |
| video_data = video_info_response.get("data") | |
| if not video_data: | |
| await query.edit_message_text("β Could not retrieve video information. Please check the URL.") | |
| return | |
| # Format video info for display | |
| info_text = self.format_video_info_from_data(video_data) | |
| # Create action buttons | |
| video_id = video_data.get("video_id", url) | |
| # Limit callback data to avoid Button_data_invalid error (Telegram limit is 64 bytes) | |
| safe_video_id = video_id[:30] if video_id else url[:30] # Limit video ID | |
| video_title = video_data.get('title', 'related videos') | |
| # Truncate title for search callback to fit in 64 byte limit | |
| safe_title = video_title[:30] if len(video_title) > 30 else video_title | |
| keyboard = [ | |
| [ | |
| InlineKeyboardButton("π Get Transcript", callback_data=f"transcript:{safe_video_id}"), | |
| InlineKeyboardButton("β° AI Timecodes", callback_data=f"timecodes:{safe_video_id}") | |
| ], | |
| [ | |
| InlineKeyboardButton("π Search Similar", callback_data=f"search:{safe_title}") | |
| ] | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await query.edit_message_text(info_text, reply_markup=reply_markup, parse_mode=ParseMode.HTML) | |
| except Exception as e: | |
| logger.error(f"Error returning to video: {e}") | |
| # If edit fails, try to send a new message | |
| try: | |
| await query.answer("β Could not load video information.") | |
| await query.message.reply_text("β Could not load video information. Please try again.") | |
| except Exception: | |
| pass # Ignore if this also fails | |
| async def analyze_video(self, query, url: str): | |
| """Analyze video for callback queries (alias for handle_back_to_video)""" | |
| await self.handle_back_to_video(query, url) | |
| async def handle_search_callback(self, query, search_query: str): | |
| """Handle search callback from inline keyboard""" | |
| await query.edit_message_text(f"π Searching for: {search_query}...") | |
| try: | |
| # Call MCP search action | |
| search_response = await self.call_mcp_action("search", { | |
| "query": search_query, | |
| "max_results": 5 | |
| }) | |
| if not search_response: | |
| await query.edit_message_text("β No results found for your search.") | |
| return | |
| # Format search results | |
| if isinstance(search_response, list) and len(search_response) > 0: | |
| results_text = f"π **Search Results for:** {search_query}\n\n" | |
| keyboard = [] | |
| for i, result in enumerate(search_response[:5], 1): | |
| if result.get("data"): | |
| video_data = result["data"] | |
| title = video_data.get("title", "Unknown Title")[:50] | |
| channel = video_data.get("channel_title", "Unknown Channel") | |
| video_id = video_data.get("video_id", "") | |
| results_text += f"**{i}.** {title}\n" | |
| results_text += f"π€ {channel}\n\n" | |
| # Add analyze button for each video | |
| if video_id: | |
| keyboard.append([InlineKeyboardButton( | |
| f"πΉ Analyze Video {i}", | |
| callback_data=f"analyze:{video_id}" | |
| )]) | |
| # Add new search button | |
| keyboard.append([InlineKeyboardButton("π New Search", callback_data="new_search")]) | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await query.edit_message_text(results_text, reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN) | |
| else: | |
| await query.edit_message_text("β No results found for your search.") | |
| except Exception as e: | |
| logger.error(f"Error in search callback: {e}") | |
| await query.edit_message_text("β An error occurred during search.") | |
| async def call_mcp_action(self, action: str, params: dict) -> Optional[dict]: | |
| """Call MCP server action""" | |
| try: | |
| # Configure timeout and connection settings | |
| timeout = aiohttp.ClientTimeout(total=30, connect=10) | |
| # Create connector without SSL for full SSL verification bypass | |
| connector = aiohttp.TCPConnector( | |
| limit=10, | |
| limit_per_host=5, | |
| ttl_dns_cache=300, | |
| use_dns_cache=True, | |
| enable_cleanup_closed=False, # Disable cleanup to prevent errors | |
| ssl=False | |
| ) | |
| # Create session with full SSL bypass | |
| async with aiohttp.ClientSession( | |
| timeout=timeout, | |
| connector=connector, | |
| trust_env=True, | |
| skip_auto_headers={'User-Agent'} | |
| ) as session: | |
| payload = { | |
| "action": action, | |
| "parameters": params | |
| } | |
| headers = { | |
| 'Content-Type': 'application/json', | |
| 'User-Agent': 'TubeMetaBot/1.0' | |
| } | |
| try: | |
| # Execute request with full SSL bypass | |
| async with session.post( | |
| MCP_BASE_URL, | |
| json=payload, | |
| headers=headers, | |
| ssl=False | |
| ) as response: | |
| if response.status == 200: | |
| result = await response.json() | |
| logger.info(f"MCP request successful: {action}") | |
| return result | |
| else: | |
| logger.error(f"MCP server error: {response.status}") | |
| return None | |
| except aiohttp.ClientConnectorError as e: | |
| logger.error(f"Connection error: {e}") | |
| return None | |
| except asyncio.TimeoutError as e: | |
| logger.error(f"Timeout error: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Request error: {e}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error calling MCP server: {e}") | |
| return None | |
| def format_video_info(self, video_info: dict) -> str: | |
| """Format video information for display using HTML""" | |
| title = video_info.get("title", "Unknown Title") | |
| channel = video_info.get("channel", "Unknown Channel") | |
| duration = video_info.get("duration", "Unknown") | |
| view_count = video_info.get("view_count", "Unknown") | |
| upload_date = video_info.get("upload_date", "Unknown") | |
| description = video_info.get("description", "") | |
| # Truncate description if too long | |
| if len(description) > 200: | |
| description = description[:200] + "..." | |
| info_text = f"""π¬ <b>{title}</b> | |
| π€ <b>Channel:</b> {channel} | |
| β±οΈ <b>Duration:</b> {duration} | |
| ποΈ <b>Views:</b> {view_count} | |
| π <b>Uploaded:</b> {upload_date} | |
| π <b>Description:</b> | |
| {description} | |
| Choose an action below:""" | |
| return info_text | |
| async def handle_analyze(self, update: Update): | |
| """Handle analyze request""" | |
| await update.message.reply_text("πΊ Please send me a YouTube URL to analyze.\n\nExample: `https://youtu.be/dQw4w9WgXcQ`", parse_mode=ParseMode.MARKDOWN) | |
| async def run(self): | |
| """Start the bot""" | |
| logger.info("Starting TubeMeta Bot...") | |
| try: | |
| await self.app.initialize() | |
| await self.app.start() | |
| await self.app.updater.start_polling(drop_pending_updates=True) | |
| logger.info(f"Bot successfully started! MCP URL: {MCP_BASE_URL}") | |
| # Keep the bot running | |
| await asyncio.Event().wait() | |
| except KeyboardInterrupt: | |
| logger.info("Shutting down bot due to keyboard interrupt...") | |
| except Exception as e: | |
| logger.error(f"Error in bot operation: {e}") | |
| finally: | |
| # Graceful shutdown | |
| logger.info("Shutting down bot...") | |
| try: | |
| await self.app.updater.stop() | |
| await self.app.stop() | |
| await self.app.shutdown() | |
| logger.info("Bot shutdown complete") | |
| except Exception as e: | |
| logger.error(f"Error during shutdown: {e}") | |
| def convert_markdown_to_html(self, text: str) -> str: | |
| """Convert markdown formatting to HTML for Telegram.""" | |
| # Convert code blocks (```) to HTML | |
| import re | |
| # Replace triple backticks with HTML pre tags | |
| text = re.sub(r'```\n(.*?)\n```', r'<pre>\1</pre>', text, flags=re.DOTALL) | |
| # Convert **bold** to HTML | |
| text = re.sub(r'\*\*(.*?)\*\*', r'<b>\1</b>', text) | |
| # Convert *italic* to HTML | |
| text = re.sub(r'\*(.*?)\*', r'<i>\1</i>', text) | |
| # Convert inline code `code` to HTML | |
| text = re.sub(r'`(.*?)`', r'<code>\1</code>', text) | |
| return text | |
| async def main(): | |
| """Main function""" | |
| bot = TubeMetaBot() | |
| await bot.run() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |