Spaces:
Paused
Paused
| import asyncio | |
| import logging | |
| import os | |
| import random | |
| import time | |
| # NEW: Import psutil for system statistics | |
| import psutil | |
| import feedparser | |
| import yt_dlp | |
| from pymongo import MongoClient | |
| from pyrogram import Client as PyrogramClient | |
| from pyrogram.errors import FloodWait | |
| # NEW: Import Inline Keyboard classes | |
| from telegram import Bot, Update, InlineKeyboardButton, InlineKeyboardMarkup | |
| from telegram.constants import ParseMode | |
| from telegram.ext import ( | |
| Application, CallbackContext, CommandHandler, ConversationHandler, MessageHandler, filters, | |
| # NEW: CallbackQueryHandler for button presses | |
| CallbackQueryHandler | |
| ) | |
| # --- Basic Setup --- | |
| logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO) | |
| logging.getLogger("httpx").setLevel(logging.WARNING) | |
| logging.getLogger("pyrogram").setLevel(logging.WARNING) | |
| logger = logging.getLogger(__name__) | |
| # --- Configuration --- | |
| BOT_TOKEN = os.environ.get("BOT_TOKEN") | |
| API_ID = int(os.environ.get("API_ID")) | |
| API_HASH = os.environ.get("API_HASH") | |
| PYROGRAM_SESSION_STRING = os.environ.get("PYROGRAM_SESSION_STRING") | |
| OWNER_ID = int(os.environ.get("OWNER_ID")) | |
| SUPERGROUP_ID = int(os.environ.get("SUPERGROUP_ID")) | |
| CHECK_INTERVAL_SECONDS = int(os.environ.get("CHECK_INTERVAL_SECONDS", 1800)) | |
| MONGO_URI = os.environ.get("MONGO_URI", "").strip() | |
| DOWNLOAD_PATH = "/tmp/app/downloads" # Using /app which is the workdir in Docker | |
| COOKIE_FILE_PATH = "/tmp/cookies.txt" | |
| YOUTUBE_COOKIES = os.environ.get("YOUTUBE_COOKIES") | |
| # --- NEW: Global Status Tracker --- | |
| # This dictionary will hold the bot's current state for the /status command | |
| GLOBAL_STATUS = { | |
| "is_active": False, | |
| "current_task": "Idle", | |
| "video_title": "N/A", | |
| "channel_title": "N/A", | |
| "start_time": 0 | |
| } | |
| # --- Initialize Clients & DB --- | |
| os.makedirs(DOWNLOAD_PATH, exist_ok=True) | |
| user_bot = PyrogramClient(name="user_session", api_id=API_ID, api_hash=API_HASH, session_string=PYROGRAM_SESSION_STRING, no_updates=True) | |
| client = MongoClient(MONGO_URI) | |
| db = client.youtube_bot | |
| video_collection = db.downloaded_videos | |
| channel_collection = db.tracked_channels | |
| # --- Helper & DB Functions --- | |
| def escape_markdown_v2(text: str) -> str: | |
| reserved_chars = r'_*[]()~`>#+-=|{}.!' | |
| text = str(text) # Ensure text is a string | |
| text = text.replace('\\', '\\\\') | |
| for char in reserved_chars: | |
| text = text.replace(char, f'\\{char}') | |
| return text | |
| def get_channel_info(url_or_id: str) -> dict | None: | |
| ydl_opts = {'quiet': True, 'extract_flat': True, 'cachedir': False, 'no_check_certificate': True, 'extractor_args': {'youtubetab': {'skip': ['authcheck']}}, 'playlist_items': '0'} | |
| if YOUTUBE_COOKIES: ydl_opts['cookiefile'] = COOKIE_FILE_PATH | |
| try: | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(f"ytsearch1:{url_or_id}", download=False) | |
| if not info or not info.get('entries'): info = ydl.extract_info(url_or_id, download=False) | |
| channel_id = info.get('uploader_id') or (info.get('entries') and info['entries'][0].get('channel_id')) | |
| channel_title = info.get('uploader') or info.get('title') or (info.get('entries') and info['entries'][0].get('channel')) | |
| if channel_id and channel_id.startswith('UC'): | |
| logger.info(f"Resolved URL to Channel ID: {channel_id}, Title: {channel_title}") | |
| return {'id': channel_id, 'title': channel_title} | |
| logger.warning(f"Could not resolve '{url_or_id}' to a valid 'UC...' channel ID.") | |
| return None | |
| except Exception as e: | |
| logger.error(f"yt-dlp failed to resolve {url_or_id}: {e}") | |
| return None | |
| def add_channel_to_db(channel_id: str, channel_title: str, topic_id: int, quality: str): | |
| channel_collection.update_one({'channel_id': channel_id},{"$set": {'channel_title': channel_title, 'topic_id': topic_id, 'quality': quality}, "$setOnInsert": {'video_counter': 0}},upsert=True) | |
| def get_channel_data(channel_id): return channel_collection.find_one({'channel_id': channel_id}) | |
| def get_all_channels_from_db(): return list(channel_collection.find()) | |
| def remove_channel_from_db(channel_id: str): return channel_collection.delete_one({'channel_id': channel_id}).deleted_count | |
| def is_video_downloaded(video_id: str): return video_collection.find_one({"video_id": video_id}) is not None | |
| def mark_video_as_downloaded(video_id: str, channel_id: str): | |
| video_collection.insert_one({"video_id": video_id}) | |
| channel_collection.update_one({'channel_id': channel_id}, {'$inc': {'video_counter': 1}}) | |
| # --- Core Bot Logic --- | |
| async def download_and_send_video(bot: Bot, channel_data: dict, video_info: dict): | |
| global GLOBAL_STATUS | |
| downloaded_file_path = None | |
| try: | |
| # --- NEW: Update Global Status --- | |
| GLOBAL_STATUS.update({ | |
| "is_active": True, "current_task": "Downloading", | |
| "video_title": video_info['title'], "channel_title": channel_data['channel_title'], | |
| "start_time": time.time() | |
| }) | |
| logger.info(f"Starting download for: {video_info['title']}") | |
| ydl_opts = {'outtmpl': os.path.join(DOWNLOAD_PATH, '%(id)s.%(ext)s'),'noplaylist': True,'quiet': True,'format': channel_data.get('quality', 'bestvideo[height<=1080]+bestaudio/best[height<=1080]'),'cachedir': False,'no_check_certificate': True} | |
| if YOUTUBE_COOKIES: ydl_opts['cookiefile'] = COOKIE_FILE_PATH | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| full_info = ydl.extract_info(video_info['webpage_url'], download=True) | |
| downloaded_file_path = ydl.prepare_filename(full_info) | |
| if not os.path.exists(downloaded_file_path): raise FileNotFoundError("Downloaded file not found.") | |
| # --- NEW: Update Global Status --- | |
| GLOBAL_STATUS["current_task"] = "Uploading" | |
| video_number = channel_data.get('video_counter', 0) + 1 | |
| escaped_title = escape_markdown_v2(full_info.get('title', 'N/A')) | |
| caption = f"*{video_number}\\.* {escaped_title}" | |
| logger.info(f"Uploading '{full_info['title']}' via user account...") | |
| await user_bot.send_video(chat_id=SUPERGROUP_ID,video=downloaded_file_path,reply_to_message_id=channel_data['topic_id'],caption=caption) | |
| logger.info(f"Successfully uploaded {full_info['title']}.") | |
| mark_video_as_downloaded(full_info['id'], channel_data['channel_id']) | |
| except FloodWait as e: | |
| logger.warning(f"Flood wait of {e.value} seconds. Sleeping...") | |
| await asyncio.sleep(e.value + 5) | |
| except Exception as e: | |
| logger.error(f"Error processing {video_info['webpage_url']}: {e}", exc_info=True) | |
| await bot.send_message(chat_id=OWNER_ID, text=f"Failed to process '{video_info['title']}'.\nReason: {str(e)[:1000]}") | |
| finally: | |
| # --- NEW: Critical - Reset status and clean up file --- | |
| GLOBAL_STATUS.update({"is_active": False, "current_task": "Idle", "start_time": 0}) | |
| if downloaded_file_path and os.path.exists(downloaded_file_path): | |
| os.remove(downloaded_file_path) | |
| logger.info(f"Cleaned up file: {downloaded_file_path}") | |
| delay = random.randint(10, 30) | |
| logger.info(f"Waiting for {delay} seconds before next task...") | |
| await asyncio.sleep(delay) | |
| async def check_youtube_channels(context: CallbackContext): | |
| logger.info("Running scheduled check for new videos...") | |
| if GLOBAL_STATUS["is_active"]: | |
| logger.warning("Skipping check, a task is already running.") | |
| return | |
| all_channels = get_all_channels_from_db() | |
| for channel_data in all_channels: | |
| # Check again in case a backfill was started | |
| if GLOBAL_STATUS["is_active"]: | |
| logger.info(f"Stopping check for {channel_data['channel_title']} because another task started.") | |
| break | |
| logger.info(f"Checking channel: {channel_data['channel_title']}") | |
| rss_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel_data['channel_id']}" | |
| feed = feedparser.parse(rss_url) | |
| if feed.bozo: | |
| logger.warning(f"Could not parse RSS feed for {channel_data['channel_title']}.") | |
| continue | |
| for entry in reversed(feed.entries): | |
| if not is_video_downloaded(entry.yt_videoid): | |
| logger.info(f"New RSS video found: {entry.title}") | |
| video_info = {'id': entry.yt_videoid, 'title': entry.title, 'webpage_url': entry.link} | |
| await download_and_send_video(context.bot, channel_data, video_info) | |
| logger.info("Scheduled check finished.") | |
| async def backfill_channel_job(context: CallbackContext): | |
| if GLOBAL_STATUS["is_active"]: | |
| logger.warning("Cannot start backfill, a task is already running.") | |
| await context.bot.send_message(chat_id=OWNER_ID, text="A task is already in progress. Please wait for it to finish before starting a backfill.") | |
| return | |
| channel_id = context.job.data['channel_id'] | |
| logger.info(f"Starting backfill job for channel: {channel_id}") | |
| ydl_opts = {'quiet': True, 'extract_flat': 'in_playlist', 'cachedir': False,'no_check_certificate': True, 'extractor_args': {'youtubetab': {'skip': ['authcheck']}}} | |
| if YOUTUBE_COOKIES: ydl_opts['cookiefile'] = COOKIE_FILE_PATH | |
| try: | |
| url_to_fetch = f"https://www.youtube.com/channel/{channel_id}/videos" | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url_to_fetch, download=False) | |
| if not info: | |
| logger.error(f"Backfill failed: Could not retrieve info for {channel_id}") | |
| await context.bot.send_message(chat_id=OWNER_ID, text=f"❌ Backfill for {channel_id} failed.") | |
| return | |
| video_entries = info.get('entries', []) | |
| logger.info(f"Found {len(video_entries)} videos for backfilling channel {channel_id}.") | |
| for entry in reversed(video_entries): | |
| video_id = entry.get('id') | |
| channel_data = get_channel_data(channel_id) | |
| if video_id and channel_data and not is_video_downloaded(video_id): | |
| logger.info(f"Backfilling video: {entry.get('title')}") | |
| video_info = {'id': video_id, 'title': entry.get('title'), 'webpage_url': f"https://www.youtube.com/watch?v={video_id}"} | |
| await download_and_send_video(context.bot, channel_data, video_info) | |
| except Exception as e: | |
| logger.error(f"Error during backfill for {channel_id}: {e}", exc_info=True) | |
| await context.bot.send_message(chat_id=OWNER_ID, text=f"❌ An error occurred during backfill for channel {channel_id}.\n`{e}`") | |
| await context.bot.send_message(chat_id=OWNER_ID, text=f"✅ Finished backfilling for channel: {channel_id}.") | |
| # --- Command Handlers --- | |
| async def start(update: Update, context: CallbackContext): | |
| start_message = ( | |
| "👋 *Welcome, Owner\\!*\n\n" | |
| "I'm your YouTube Archiver bot\\. Here are the commands:\n\n" | |
| "`/addchannel` \\- Add a new YouTube channel\\.\n" | |
| "`/removechannel <URL>` \\- Remove a channel\\.\n" | |
| "`/listchannels` \\- Show tracked channels\\.\n" | |
| "`/status` \\- See what I'm currently doing\\.\n" | |
| "`/stats` \\- View server resource usage\\.\n" | |
| "`/cancel` \\- Cancel the `/addchannel` process\\." | |
| ) | |
| await update.message.reply_text(start_message, parse_mode=ParseMode.MARKDOWN_V2) | |
| # --- NEW: /status and /stats commands --- | |
| async def status(update: Update, context: CallbackContext): | |
| if update.effective_user.id != OWNER_ID: return | |
| if not GLOBAL_STATUS["is_active"]: | |
| await update.message.reply_text("STATUS: I'm currently idle and waiting for new videos.") | |
| return | |
| elapsed_time = time.strftime('%H:%M:%S', time.gmtime(time.time() - GLOBAL_STATUS['start_time'])) | |
| status_msg = ( | |
| f"🏃 *Active Task*\n\n" | |
| f"Status: `{GLOBAL_STATUS['current_task']}`\n" | |
| f"Channel: `{escape_markdown_v2(GLOBAL_STATUS['channel_title'])}`\n" | |
| f"Video: `{escape_markdown_v2(GLOBAL_STATUS['video_title'])}`\n" | |
| f"Running for: `{elapsed_time}`" | |
| ) | |
| await update.message.reply_text(status_msg, parse_mode=ParseMode.MARKDOWN_V2) | |
| async def stats(update: Update, context: CallbackContext): | |
| if update.effective_user.id != OWNER_ID: return | |
| cpu_usage = psutil.cpu_percent() | |
| ram = psutil.virtual_memory() | |
| ram_usage = ram.percent | |
| disk = psutil.disk_usage('/') | |
| disk_usage = disk.percent | |
| def format_bytes(byte_count): | |
| if byte_count is None: return "N/A" | |
| power = 1024 | |
| n = 0 | |
| power_labels = {0: '', 1: 'K', 2: 'M', 3: 'G', 4: 'T'} | |
| while byte_count >= power and n < len(power_labels) -1 : | |
| byte_count /= power | |
| n += 1 | |
| return f"{byte_count:.2f} {power_labels[n]}B" | |
| stats_msg = ( | |
| f"💻 *System Stats*\n\n" | |
| f"CPU Usage: `{cpu_usage}%`\n" | |
| f"RAM Usage: `{ram_usage}%`\n" | |
| f"Disk Usage: `{disk_usage}%`\n" | |
| f" Total: `{format_bytes(disk.total)}`\n" | |
| f" Used: `{format_bytes(disk.used)}`" | |
| ) | |
| await update.message.reply_text(stats_msg, parse_mode=ParseMode.MARKDOWN_V2) | |
| # --- NEW: Conversation Handler with Buttons --- | |
| GET_URL, GET_QUALITY = range(2) | |
| async def add_channel_start(update: Update, context: CallbackContext): | |
| if update.effective_user.id != OWNER_ID: return ConversationHandler.END | |
| await update.message.reply_text("Please send the URL of the YouTube channel.") | |
| return GET_URL | |
| async def get_url(update: Update, context: CallbackContext): | |
| await update.message.reply_text("Resolving URL, please wait...") | |
| channel_info = get_channel_info(update.message.text) | |
| if not channel_info: | |
| await update.message.reply_text("❌ Could not resolve this URL. Please send a valid YouTube channel URL, handle, or ID.") | |
| return GET_URL | |
| context.user_data['channel_info'] = channel_info | |
| # NEW: Create Inline Keyboard | |
| keyboard = [ | |
| [InlineKeyboardButton("Best (up to 4K)", callback_data='bestvideo+bestaudio/best')], | |
| [InlineKeyboardButton("1080p", callback_data='bestvideo[height<=1080]+bestaudio/best[height<=1080]')], | |
| [InlineKeyboardButton("720p", callback_data='bestvideo[height<=720]+bestaudio/best[height<=720]')], | |
| [InlineKeyboardButton("480p", callback_data='bestvideo[height<=480]+bestaudio/best[height<=480]')], | |
| ] | |
| reply_markup = InlineKeyboardMarkup(keyboard) | |
| await update.message.reply_text( | |
| f"✅ Channel found: *{escape_markdown_v2(channel_info['title'])}*\n\nPlease select the desired video quality:", | |
| reply_markup=reply_markup, | |
| parse_mode=ParseMode.MARKDOWN_V2 | |
| ) | |
| return GET_QUALITY | |
| async def get_quality_from_button(update: Update, context: CallbackContext): | |
| query = update.callback_query | |
| await query.answer() # Acknowledge the button press | |
| quality_val = query.data | |
| channel_info = context.user_data['channel_info'] | |
| # NEW: Give user feedback on their choice | |
| quality_map = {'best': 'Best', '1080': '1080p', '720': '720p', '480': '480p'} | |
| quality_name = next((v for k, v in quality_map.items() if k in quality_val), 'Custom') | |
| await query.edit_message_text(text=f"✅ Quality set to *{quality_name}* for channel *{escape_markdown_v2(channel_info['title'])}*\\.\n\nCreating topic and starting backfill\\, please wait\\.", parse_mode=ParseMode.MARKDOWN_V2) | |
| try: | |
| topic = await context.bot.create_forum_topic(chat_id=SUPERGROUP_ID, name=channel_info['title']) | |
| add_channel_to_db(channel_info['id'], channel_info['title'], topic.message_thread_id, quality_val) | |
| await context.bot.send_message(chat_id=OWNER_ID, text=f"Starting backfill for {channel_info['title']}. I will notify you when it's done.") | |
| context.application.job_queue.run_once(backfill_channel_job, when=1, data={'channel_id': channel_info['id']}) | |
| except Exception as e: | |
| logger.error(f"Failed to create topic or add to DB: {e}") | |
| await query.edit_message_text(text=f"❌ *Error:*\n`{e}`\n\nPlease ensure the bot is an administrator in the supergroup and that 'Topics' are enabled.", parse_mode=ParseMode.MARKDOWN_V2) | |
| context.user_data.clear() | |
| return ConversationHandler.END | |
| async def cancel_conversation(update: Update, context: CallbackContext): | |
| await update.message.reply_text("Add channel process cancelled.") | |
| context.user_data.clear() | |
| return ConversationHandler.END | |
| async def remove_channel(update: Update, context: CallbackContext): | |
| if update.effective_user.id != OWNER_ID: return | |
| if not context.args: | |
| await update.message.reply_text("Usage: /removechannel <URL, Handle or ID>") | |
| return | |
| await update.message.reply_text("Resolving and removing...") | |
| channel_info = get_channel_info(' '.join(context.args)) | |
| if not channel_info: | |
| await update.message.reply_text("Could not resolve this channel.") | |
| return | |
| if remove_channel_from_db(channel_info['id']) > 0: | |
| await update.message.reply_text(f"✅ Successfully removed '{channel_info['title']}'.") | |
| else: | |
| await update.message.reply_text("That channel was not in the tracking list.") | |
| async def list_channels(update: Update, context: CallbackContext): | |
| if update.effective_user.id != OWNER_ID: return | |
| channels = get_all_channels_from_db() | |
| if not channels: | |
| await update.message.reply_text("The tracking list is currently empty.") | |
| return | |
| message = "📄 *Currently Tracked Channels:*\n\n" | |
| for channel in channels: | |
| title = escape_markdown_v2(channel.get('channel_title', 'N/A')) | |
| quality_val = channel.get('quality', 'best') | |
| quality_name = 'Best' | |
| if '1080' in quality_val: quality_name = '1080p' | |
| elif '720' in quality_val: quality_name = '720p' | |
| elif '480' in quality_val: quality_name = '480p' | |
| message += f"▪️ *{title}*\n `Quality: {quality_name}`\n" | |
| await update.message.reply_text(message, parse_mode=ParseMode.MARKDOWN_V2) | |
| # --- Main Application Runner --- | |
| async def main(): | |
| # ... (Pre-flight checks remain the same) ... | |
| application = Application.builder().token(BOT_TOKEN).build() | |
| # NEW: Updated ConversationHandler to use CallbackQueryHandler for buttons | |
| add_channel_conv = ConversationHandler( | |
| entry_points=[CommandHandler("addchannel", add_channel_start)], | |
| states={ | |
| GET_URL: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_url)], | |
| GET_QUALITY: [CallbackQueryHandler(get_quality_from_button)], # This is the change | |
| }, | |
| fallbacks=[CommandHandler("cancel", cancel_conversation)], | |
| ) | |
| # Add all handlers | |
| application.add_handler(CommandHandler("start", start)) | |
| application.add_handler(add_channel_conv) | |
| application.add_handler(CommandHandler("removechannel", remove_channel)) | |
| application.add_handler(CommandHandler("listchannels", list_channels)) | |
| # NEW: Add handlers for new commands | |
| application.add_handler(CommandHandler("status", status)) | |
| application.add_handler(CommandHandler("stats", stats)) | |
| # Schedule the recurring job | |
| application.job_queue.run_repeating(check_youtube_channels, interval=CHECK_INTERVAL_SECONDS, first=10) | |
| async with user_bot, application: | |
| logger.info("Starting all clients...") | |
| await application.initialize() | |
| await application.start() | |
| await application.updater.start_polling() | |
| logger.info("Bot is now fully operational with new features.") | |
| await asyncio.Event().wait() | |
| if __name__ == "__main__": | |
| try: | |
| asyncio.run(main()) | |
| except (KeyboardInterrupt, SystemExit): | |
| logger.info("Bot stopped manually.") |