import asyncio import logging import os import random import time # NEW: Import psutil for system statistics import psutil import feedparser import yt_dlp from pymongo import MongoClient from pyrogram import Client as PyrogramClient from pyrogram.errors import FloodWait # NEW: Import Inline Keyboard classes from telegram import Bot, Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.constants import ParseMode from telegram.ext import ( Application, CallbackContext, CommandHandler, ConversationHandler, MessageHandler, filters, # NEW: CallbackQueryHandler for button presses CallbackQueryHandler ) # --- Basic Setup --- logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("pyrogram").setLevel(logging.WARNING) logger = logging.getLogger(__name__) # --- Configuration --- BOT_TOKEN = os.environ.get("BOT_TOKEN") API_ID = int(os.environ.get("API_ID")) API_HASH = os.environ.get("API_HASH") PYROGRAM_SESSION_STRING = os.environ.get("PYROGRAM_SESSION_STRING") OWNER_ID = int(os.environ.get("OWNER_ID")) SUPERGROUP_ID = int(os.environ.get("SUPERGROUP_ID")) CHECK_INTERVAL_SECONDS = int(os.environ.get("CHECK_INTERVAL_SECONDS", 1800)) MONGO_URI = os.environ.get("MONGO_URI", "").strip() DOWNLOAD_PATH = "/tmp/app/downloads" # Using /app which is the workdir in Docker COOKIE_FILE_PATH = "/tmp/cookies.txt" YOUTUBE_COOKIES = os.environ.get("YOUTUBE_COOKIES") # --- NEW: Global Status Tracker --- # This dictionary will hold the bot's current state for the /status command GLOBAL_STATUS = { "is_active": False, "current_task": "Idle", "video_title": "N/A", "channel_title": "N/A", "start_time": 0 } # --- Initialize Clients & DB --- os.makedirs(DOWNLOAD_PATH, exist_ok=True) user_bot = PyrogramClient(name="user_session", api_id=API_ID, api_hash=API_HASH, session_string=PYROGRAM_SESSION_STRING, no_updates=True) client = MongoClient(MONGO_URI) db = client.youtube_bot video_collection = db.downloaded_videos channel_collection = db.tracked_channels # --- Helper & DB Functions --- def escape_markdown_v2(text: str) -> str: reserved_chars = r'_*[]()~`>#+-=|{}.!' text = str(text) # Ensure text is a string text = text.replace('\\', '\\\\') for char in reserved_chars: text = text.replace(char, f'\\{char}') return text def get_channel_info(url_or_id: str) -> dict | None: ydl_opts = {'quiet': True, 'extract_flat': True, 'cachedir': False, 'no_check_certificate': True, 'extractor_args': {'youtubetab': {'skip': ['authcheck']}}, 'playlist_items': '0'} if YOUTUBE_COOKIES: ydl_opts['cookiefile'] = COOKIE_FILE_PATH try: with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(f"ytsearch1:{url_or_id}", download=False) if not info or not info.get('entries'): info = ydl.extract_info(url_or_id, download=False) channel_id = info.get('uploader_id') or (info.get('entries') and info['entries'][0].get('channel_id')) channel_title = info.get('uploader') or info.get('title') or (info.get('entries') and info['entries'][0].get('channel')) if channel_id and channel_id.startswith('UC'): logger.info(f"Resolved URL to Channel ID: {channel_id}, Title: {channel_title}") return {'id': channel_id, 'title': channel_title} logger.warning(f"Could not resolve '{url_or_id}' to a valid 'UC...' channel ID.") return None except Exception as e: logger.error(f"yt-dlp failed to resolve {url_or_id}: {e}") return None def add_channel_to_db(channel_id: str, channel_title: str, topic_id: int, quality: str): channel_collection.update_one({'channel_id': channel_id},{"$set": {'channel_title': channel_title, 'topic_id': topic_id, 'quality': quality}, "$setOnInsert": {'video_counter': 0}},upsert=True) def get_channel_data(channel_id): return channel_collection.find_one({'channel_id': channel_id}) def get_all_channels_from_db(): return list(channel_collection.find()) def remove_channel_from_db(channel_id: str): return channel_collection.delete_one({'channel_id': channel_id}).deleted_count def is_video_downloaded(video_id: str): return video_collection.find_one({"video_id": video_id}) is not None def mark_video_as_downloaded(video_id: str, channel_id: str): video_collection.insert_one({"video_id": video_id}) channel_collection.update_one({'channel_id': channel_id}, {'$inc': {'video_counter': 1}}) # --- Core Bot Logic --- async def download_and_send_video(bot: Bot, channel_data: dict, video_info: dict): global GLOBAL_STATUS downloaded_file_path = None try: # --- NEW: Update Global Status --- GLOBAL_STATUS.update({ "is_active": True, "current_task": "Downloading", "video_title": video_info['title'], "channel_title": channel_data['channel_title'], "start_time": time.time() }) logger.info(f"Starting download for: {video_info['title']}") ydl_opts = {'outtmpl': os.path.join(DOWNLOAD_PATH, '%(id)s.%(ext)s'),'noplaylist': True,'quiet': True,'format': channel_data.get('quality', 'bestvideo[height<=1080]+bestaudio/best[height<=1080]'),'cachedir': False,'no_check_certificate': True} if YOUTUBE_COOKIES: ydl_opts['cookiefile'] = COOKIE_FILE_PATH with yt_dlp.YoutubeDL(ydl_opts) as ydl: full_info = ydl.extract_info(video_info['webpage_url'], download=True) downloaded_file_path = ydl.prepare_filename(full_info) if not os.path.exists(downloaded_file_path): raise FileNotFoundError("Downloaded file not found.") # --- NEW: Update Global Status --- GLOBAL_STATUS["current_task"] = "Uploading" video_number = channel_data.get('video_counter', 0) + 1 escaped_title = escape_markdown_v2(full_info.get('title', 'N/A')) caption = f"*{video_number}\\.* {escaped_title}" logger.info(f"Uploading '{full_info['title']}' via user account...") await user_bot.send_video(chat_id=SUPERGROUP_ID,video=downloaded_file_path,reply_to_message_id=channel_data['topic_id'],caption=caption) logger.info(f"Successfully uploaded {full_info['title']}.") mark_video_as_downloaded(full_info['id'], channel_data['channel_id']) except FloodWait as e: logger.warning(f"Flood wait of {e.value} seconds. Sleeping...") await asyncio.sleep(e.value + 5) except Exception as e: logger.error(f"Error processing {video_info['webpage_url']}: {e}", exc_info=True) await bot.send_message(chat_id=OWNER_ID, text=f"Failed to process '{video_info['title']}'.\nReason: {str(e)[:1000]}") finally: # --- NEW: Critical - Reset status and clean up file --- GLOBAL_STATUS.update({"is_active": False, "current_task": "Idle", "start_time": 0}) if downloaded_file_path and os.path.exists(downloaded_file_path): os.remove(downloaded_file_path) logger.info(f"Cleaned up file: {downloaded_file_path}") delay = random.randint(10, 30) logger.info(f"Waiting for {delay} seconds before next task...") await asyncio.sleep(delay) async def check_youtube_channels(context: CallbackContext): logger.info("Running scheduled check for new videos...") if GLOBAL_STATUS["is_active"]: logger.warning("Skipping check, a task is already running.") return all_channels = get_all_channels_from_db() for channel_data in all_channels: # Check again in case a backfill was started if GLOBAL_STATUS["is_active"]: logger.info(f"Stopping check for {channel_data['channel_title']} because another task started.") break logger.info(f"Checking channel: {channel_data['channel_title']}") rss_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel_data['channel_id']}" feed = feedparser.parse(rss_url) if feed.bozo: logger.warning(f"Could not parse RSS feed for {channel_data['channel_title']}.") continue for entry in reversed(feed.entries): if not is_video_downloaded(entry.yt_videoid): logger.info(f"New RSS video found: {entry.title}") video_info = {'id': entry.yt_videoid, 'title': entry.title, 'webpage_url': entry.link} await download_and_send_video(context.bot, channel_data, video_info) logger.info("Scheduled check finished.") async def backfill_channel_job(context: CallbackContext): if GLOBAL_STATUS["is_active"]: logger.warning("Cannot start backfill, a task is already running.") await context.bot.send_message(chat_id=OWNER_ID, text="A task is already in progress. Please wait for it to finish before starting a backfill.") return channel_id = context.job.data['channel_id'] logger.info(f"Starting backfill job for channel: {channel_id}") ydl_opts = {'quiet': True, 'extract_flat': 'in_playlist', 'cachedir': False,'no_check_certificate': True, 'extractor_args': {'youtubetab': {'skip': ['authcheck']}}} if YOUTUBE_COOKIES: ydl_opts['cookiefile'] = COOKIE_FILE_PATH try: url_to_fetch = f"https://www.youtube.com/channel/{channel_id}/videos" with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url_to_fetch, download=False) if not info: logger.error(f"Backfill failed: Could not retrieve info for {channel_id}") await context.bot.send_message(chat_id=OWNER_ID, text=f"❌ Backfill for {channel_id} failed.") return video_entries = info.get('entries', []) logger.info(f"Found {len(video_entries)} videos for backfilling channel {channel_id}.") for entry in reversed(video_entries): video_id = entry.get('id') channel_data = get_channel_data(channel_id) if video_id and channel_data and not is_video_downloaded(video_id): logger.info(f"Backfilling video: {entry.get('title')}") video_info = {'id': video_id, 'title': entry.get('title'), 'webpage_url': f"https://www.youtube.com/watch?v={video_id}"} await download_and_send_video(context.bot, channel_data, video_info) except Exception as e: logger.error(f"Error during backfill for {channel_id}: {e}", exc_info=True) await context.bot.send_message(chat_id=OWNER_ID, text=f"❌ An error occurred during backfill for channel {channel_id}.\n`{e}`") await context.bot.send_message(chat_id=OWNER_ID, text=f"✅ Finished backfilling for channel: {channel_id}.") # --- Command Handlers --- async def start(update: Update, context: CallbackContext): start_message = ( "👋 *Welcome, Owner\\!*\n\n" "I'm your YouTube Archiver bot\\. Here are the commands:\n\n" "`/addchannel` \\- Add a new YouTube channel\\.\n" "`/removechannel ` \\- Remove a channel\\.\n" "`/listchannels` \\- Show tracked channels\\.\n" "`/status` \\- See what I'm currently doing\\.\n" "`/stats` \\- View server resource usage\\.\n" "`/cancel` \\- Cancel the `/addchannel` process\\." ) await update.message.reply_text(start_message, parse_mode=ParseMode.MARKDOWN_V2) # --- NEW: /status and /stats commands --- async def status(update: Update, context: CallbackContext): if update.effective_user.id != OWNER_ID: return if not GLOBAL_STATUS["is_active"]: await update.message.reply_text("STATUS: I'm currently idle and waiting for new videos.") return elapsed_time = time.strftime('%H:%M:%S', time.gmtime(time.time() - GLOBAL_STATUS['start_time'])) status_msg = ( f"🏃 *Active Task*\n\n" f"Status: `{GLOBAL_STATUS['current_task']}`\n" f"Channel: `{escape_markdown_v2(GLOBAL_STATUS['channel_title'])}`\n" f"Video: `{escape_markdown_v2(GLOBAL_STATUS['video_title'])}`\n" f"Running for: `{elapsed_time}`" ) await update.message.reply_text(status_msg, parse_mode=ParseMode.MARKDOWN_V2) async def stats(update: Update, context: CallbackContext): if update.effective_user.id != OWNER_ID: return cpu_usage = psutil.cpu_percent() ram = psutil.virtual_memory() ram_usage = ram.percent disk = psutil.disk_usage('/') disk_usage = disk.percent def format_bytes(byte_count): if byte_count is None: return "N/A" power = 1024 n = 0 power_labels = {0: '', 1: 'K', 2: 'M', 3: 'G', 4: 'T'} while byte_count >= power and n < len(power_labels) -1 : byte_count /= power n += 1 return f"{byte_count:.2f} {power_labels[n]}B" stats_msg = ( f"💻 *System Stats*\n\n" f"CPU Usage: `{cpu_usage}%`\n" f"RAM Usage: `{ram_usage}%`\n" f"Disk Usage: `{disk_usage}%`\n" f" Total: `{format_bytes(disk.total)}`\n" f" Used: `{format_bytes(disk.used)}`" ) await update.message.reply_text(stats_msg, parse_mode=ParseMode.MARKDOWN_V2) # --- NEW: Conversation Handler with Buttons --- GET_URL, GET_QUALITY = range(2) async def add_channel_start(update: Update, context: CallbackContext): if update.effective_user.id != OWNER_ID: return ConversationHandler.END await update.message.reply_text("Please send the URL of the YouTube channel.") return GET_URL async def get_url(update: Update, context: CallbackContext): await update.message.reply_text("Resolving URL, please wait...") channel_info = get_channel_info(update.message.text) if not channel_info: await update.message.reply_text("❌ Could not resolve this URL. Please send a valid YouTube channel URL, handle, or ID.") return GET_URL context.user_data['channel_info'] = channel_info # NEW: Create Inline Keyboard keyboard = [ [InlineKeyboardButton("Best (up to 4K)", callback_data='bestvideo+bestaudio/best')], [InlineKeyboardButton("1080p", callback_data='bestvideo[height<=1080]+bestaudio/best[height<=1080]')], [InlineKeyboardButton("720p", callback_data='bestvideo[height<=720]+bestaudio/best[height<=720]')], [InlineKeyboardButton("480p", callback_data='bestvideo[height<=480]+bestaudio/best[height<=480]')], ] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( f"✅ Channel found: *{escape_markdown_v2(channel_info['title'])}*\n\nPlease select the desired video quality:", reply_markup=reply_markup, parse_mode=ParseMode.MARKDOWN_V2 ) return GET_QUALITY async def get_quality_from_button(update: Update, context: CallbackContext): query = update.callback_query await query.answer() # Acknowledge the button press quality_val = query.data channel_info = context.user_data['channel_info'] # NEW: Give user feedback on their choice quality_map = {'best': 'Best', '1080': '1080p', '720': '720p', '480': '480p'} quality_name = next((v for k, v in quality_map.items() if k in quality_val), 'Custom') await query.edit_message_text(text=f"✅ Quality set to *{quality_name}* for channel *{escape_markdown_v2(channel_info['title'])}*\\.\n\nCreating topic and starting backfill\\, please wait\\.", parse_mode=ParseMode.MARKDOWN_V2) try: topic = await context.bot.create_forum_topic(chat_id=SUPERGROUP_ID, name=channel_info['title']) add_channel_to_db(channel_info['id'], channel_info['title'], topic.message_thread_id, quality_val) await context.bot.send_message(chat_id=OWNER_ID, text=f"Starting backfill for {channel_info['title']}. I will notify you when it's done.") context.application.job_queue.run_once(backfill_channel_job, when=1, data={'channel_id': channel_info['id']}) except Exception as e: logger.error(f"Failed to create topic or add to DB: {e}") await query.edit_message_text(text=f"❌ *Error:*\n`{e}`\n\nPlease ensure the bot is an administrator in the supergroup and that 'Topics' are enabled.", parse_mode=ParseMode.MARKDOWN_V2) context.user_data.clear() return ConversationHandler.END async def cancel_conversation(update: Update, context: CallbackContext): await update.message.reply_text("Add channel process cancelled.") context.user_data.clear() return ConversationHandler.END async def remove_channel(update: Update, context: CallbackContext): if update.effective_user.id != OWNER_ID: return if not context.args: await update.message.reply_text("Usage: /removechannel ") return await update.message.reply_text("Resolving and removing...") channel_info = get_channel_info(' '.join(context.args)) if not channel_info: await update.message.reply_text("Could not resolve this channel.") return if remove_channel_from_db(channel_info['id']) > 0: await update.message.reply_text(f"✅ Successfully removed '{channel_info['title']}'.") else: await update.message.reply_text("That channel was not in the tracking list.") async def list_channels(update: Update, context: CallbackContext): if update.effective_user.id != OWNER_ID: return channels = get_all_channels_from_db() if not channels: await update.message.reply_text("The tracking list is currently empty.") return message = "📄 *Currently Tracked Channels:*\n\n" for channel in channels: title = escape_markdown_v2(channel.get('channel_title', 'N/A')) quality_val = channel.get('quality', 'best') quality_name = 'Best' if '1080' in quality_val: quality_name = '1080p' elif '720' in quality_val: quality_name = '720p' elif '480' in quality_val: quality_name = '480p' message += f"▪️ *{title}*\n `Quality: {quality_name}`\n" await update.message.reply_text(message, parse_mode=ParseMode.MARKDOWN_V2) # --- Main Application Runner --- async def main(): # ... (Pre-flight checks remain the same) ... application = Application.builder().token(BOT_TOKEN).build() # NEW: Updated ConversationHandler to use CallbackQueryHandler for buttons add_channel_conv = ConversationHandler( entry_points=[CommandHandler("addchannel", add_channel_start)], states={ GET_URL: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_url)], GET_QUALITY: [CallbackQueryHandler(get_quality_from_button)], # This is the change }, fallbacks=[CommandHandler("cancel", cancel_conversation)], ) # Add all handlers application.add_handler(CommandHandler("start", start)) application.add_handler(add_channel_conv) application.add_handler(CommandHandler("removechannel", remove_channel)) application.add_handler(CommandHandler("listchannels", list_channels)) # NEW: Add handlers for new commands application.add_handler(CommandHandler("status", status)) application.add_handler(CommandHandler("stats", stats)) # Schedule the recurring job application.job_queue.run_repeating(check_youtube_channels, interval=CHECK_INTERVAL_SECONDS, first=10) async with user_bot, application: logger.info("Starting all clients...") await application.initialize() await application.start() await application.updater.start_polling() logger.info("Bot is now fully operational with new features.") await asyncio.Event().wait() if __name__ == "__main__": try: asyncio.run(main()) except (KeyboardInterrupt, SystemExit): logger.info("Bot stopped manually.")