wkos / bot.py
Mohammad Shahid
fixed a type
8a1c2b8
import asyncio
import logging
import os
import random
import time
# NEW: Import psutil for system statistics
import psutil
import feedparser
import yt_dlp
from pymongo import MongoClient
from pyrogram import Client as PyrogramClient
from pyrogram.errors import FloodWait
# NEW: Import Inline Keyboard classes
from telegram import Bot, Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.constants import ParseMode
from telegram.ext import (
Application, CallbackContext, CommandHandler, ConversationHandler, MessageHandler, filters,
# NEW: CallbackQueryHandler for button presses
CallbackQueryHandler
)
# --- Basic Setup ---
logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO)
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("pyrogram").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
# --- Configuration ---
BOT_TOKEN = os.environ.get("BOT_TOKEN")
API_ID = int(os.environ.get("API_ID"))
API_HASH = os.environ.get("API_HASH")
PYROGRAM_SESSION_STRING = os.environ.get("PYROGRAM_SESSION_STRING")
OWNER_ID = int(os.environ.get("OWNER_ID"))
SUPERGROUP_ID = int(os.environ.get("SUPERGROUP_ID"))
CHECK_INTERVAL_SECONDS = int(os.environ.get("CHECK_INTERVAL_SECONDS", 1800))
MONGO_URI = os.environ.get("MONGO_URI", "").strip()
DOWNLOAD_PATH = "/tmp/app/downloads" # Using /app which is the workdir in Docker
COOKIE_FILE_PATH = "/tmp/cookies.txt"
YOUTUBE_COOKIES = os.environ.get("YOUTUBE_COOKIES")
# --- NEW: Global Status Tracker ---
# This dictionary will hold the bot's current state for the /status command
GLOBAL_STATUS = {
"is_active": False,
"current_task": "Idle",
"video_title": "N/A",
"channel_title": "N/A",
"start_time": 0
}
# --- Initialize Clients & DB ---
os.makedirs(DOWNLOAD_PATH, exist_ok=True)
user_bot = PyrogramClient(name="user_session", api_id=API_ID, api_hash=API_HASH, session_string=PYROGRAM_SESSION_STRING, no_updates=True)
client = MongoClient(MONGO_URI)
db = client.youtube_bot
video_collection = db.downloaded_videos
channel_collection = db.tracked_channels
# --- Helper & DB Functions ---
def escape_markdown_v2(text: str) -> str:
reserved_chars = r'_*[]()~`>#+-=|{}.!'
text = str(text) # Ensure text is a string
text = text.replace('\\', '\\\\')
for char in reserved_chars:
text = text.replace(char, f'\\{char}')
return text
def get_channel_info(url_or_id: str) -> dict | None:
ydl_opts = {'quiet': True, 'extract_flat': True, 'cachedir': False, 'no_check_certificate': True, 'extractor_args': {'youtubetab': {'skip': ['authcheck']}}, 'playlist_items': '0'}
if YOUTUBE_COOKIES: ydl_opts['cookiefile'] = COOKIE_FILE_PATH
try:
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(f"ytsearch1:{url_or_id}", download=False)
if not info or not info.get('entries'): info = ydl.extract_info(url_or_id, download=False)
channel_id = info.get('uploader_id') or (info.get('entries') and info['entries'][0].get('channel_id'))
channel_title = info.get('uploader') or info.get('title') or (info.get('entries') and info['entries'][0].get('channel'))
if channel_id and channel_id.startswith('UC'):
logger.info(f"Resolved URL to Channel ID: {channel_id}, Title: {channel_title}")
return {'id': channel_id, 'title': channel_title}
logger.warning(f"Could not resolve '{url_or_id}' to a valid 'UC...' channel ID.")
return None
except Exception as e:
logger.error(f"yt-dlp failed to resolve {url_or_id}: {e}")
return None
def add_channel_to_db(channel_id: str, channel_title: str, topic_id: int, quality: str):
channel_collection.update_one({'channel_id': channel_id},{"$set": {'channel_title': channel_title, 'topic_id': topic_id, 'quality': quality}, "$setOnInsert": {'video_counter': 0}},upsert=True)
def get_channel_data(channel_id): return channel_collection.find_one({'channel_id': channel_id})
def get_all_channels_from_db(): return list(channel_collection.find())
def remove_channel_from_db(channel_id: str): return channel_collection.delete_one({'channel_id': channel_id}).deleted_count
def is_video_downloaded(video_id: str): return video_collection.find_one({"video_id": video_id}) is not None
def mark_video_as_downloaded(video_id: str, channel_id: str):
video_collection.insert_one({"video_id": video_id})
channel_collection.update_one({'channel_id': channel_id}, {'$inc': {'video_counter': 1}})
# --- Core Bot Logic ---
async def download_and_send_video(bot: Bot, channel_data: dict, video_info: dict):
global GLOBAL_STATUS
downloaded_file_path = None
try:
# --- NEW: Update Global Status ---
GLOBAL_STATUS.update({
"is_active": True, "current_task": "Downloading",
"video_title": video_info['title'], "channel_title": channel_data['channel_title'],
"start_time": time.time()
})
logger.info(f"Starting download for: {video_info['title']}")
ydl_opts = {'outtmpl': os.path.join(DOWNLOAD_PATH, '%(id)s.%(ext)s'),'noplaylist': True,'quiet': True,'format': channel_data.get('quality', 'bestvideo[height<=1080]+bestaudio/best[height<=1080]'),'cachedir': False,'no_check_certificate': True}
if YOUTUBE_COOKIES: ydl_opts['cookiefile'] = COOKIE_FILE_PATH
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
full_info = ydl.extract_info(video_info['webpage_url'], download=True)
downloaded_file_path = ydl.prepare_filename(full_info)
if not os.path.exists(downloaded_file_path): raise FileNotFoundError("Downloaded file not found.")
# --- NEW: Update Global Status ---
GLOBAL_STATUS["current_task"] = "Uploading"
video_number = channel_data.get('video_counter', 0) + 1
escaped_title = escape_markdown_v2(full_info.get('title', 'N/A'))
caption = f"*{video_number}\\.* {escaped_title}"
logger.info(f"Uploading '{full_info['title']}' via user account...")
await user_bot.send_video(chat_id=SUPERGROUP_ID,video=downloaded_file_path,reply_to_message_id=channel_data['topic_id'],caption=caption)
logger.info(f"Successfully uploaded {full_info['title']}.")
mark_video_as_downloaded(full_info['id'], channel_data['channel_id'])
except FloodWait as e:
logger.warning(f"Flood wait of {e.value} seconds. Sleeping...")
await asyncio.sleep(e.value + 5)
except Exception as e:
logger.error(f"Error processing {video_info['webpage_url']}: {e}", exc_info=True)
await bot.send_message(chat_id=OWNER_ID, text=f"Failed to process '{video_info['title']}'.\nReason: {str(e)[:1000]}")
finally:
# --- NEW: Critical - Reset status and clean up file ---
GLOBAL_STATUS.update({"is_active": False, "current_task": "Idle", "start_time": 0})
if downloaded_file_path and os.path.exists(downloaded_file_path):
os.remove(downloaded_file_path)
logger.info(f"Cleaned up file: {downloaded_file_path}")
delay = random.randint(10, 30)
logger.info(f"Waiting for {delay} seconds before next task...")
await asyncio.sleep(delay)
async def check_youtube_channels(context: CallbackContext):
logger.info("Running scheduled check for new videos...")
if GLOBAL_STATUS["is_active"]:
logger.warning("Skipping check, a task is already running.")
return
all_channels = get_all_channels_from_db()
for channel_data in all_channels:
# Check again in case a backfill was started
if GLOBAL_STATUS["is_active"]:
logger.info(f"Stopping check for {channel_data['channel_title']} because another task started.")
break
logger.info(f"Checking channel: {channel_data['channel_title']}")
rss_url = f"https://www.youtube.com/feeds/videos.xml?channel_id={channel_data['channel_id']}"
feed = feedparser.parse(rss_url)
if feed.bozo:
logger.warning(f"Could not parse RSS feed for {channel_data['channel_title']}.")
continue
for entry in reversed(feed.entries):
if not is_video_downloaded(entry.yt_videoid):
logger.info(f"New RSS video found: {entry.title}")
video_info = {'id': entry.yt_videoid, 'title': entry.title, 'webpage_url': entry.link}
await download_and_send_video(context.bot, channel_data, video_info)
logger.info("Scheduled check finished.")
async def backfill_channel_job(context: CallbackContext):
if GLOBAL_STATUS["is_active"]:
logger.warning("Cannot start backfill, a task is already running.")
await context.bot.send_message(chat_id=OWNER_ID, text="A task is already in progress. Please wait for it to finish before starting a backfill.")
return
channel_id = context.job.data['channel_id']
logger.info(f"Starting backfill job for channel: {channel_id}")
ydl_opts = {'quiet': True, 'extract_flat': 'in_playlist', 'cachedir': False,'no_check_certificate': True, 'extractor_args': {'youtubetab': {'skip': ['authcheck']}}}
if YOUTUBE_COOKIES: ydl_opts['cookiefile'] = COOKIE_FILE_PATH
try:
url_to_fetch = f"https://www.youtube.com/channel/{channel_id}/videos"
with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(url_to_fetch, download=False)
if not info:
logger.error(f"Backfill failed: Could not retrieve info for {channel_id}")
await context.bot.send_message(chat_id=OWNER_ID, text=f"❌ Backfill for {channel_id} failed.")
return
video_entries = info.get('entries', [])
logger.info(f"Found {len(video_entries)} videos for backfilling channel {channel_id}.")
for entry in reversed(video_entries):
video_id = entry.get('id')
channel_data = get_channel_data(channel_id)
if video_id and channel_data and not is_video_downloaded(video_id):
logger.info(f"Backfilling video: {entry.get('title')}")
video_info = {'id': video_id, 'title': entry.get('title'), 'webpage_url': f"https://www.youtube.com/watch?v={video_id}"}
await download_and_send_video(context.bot, channel_data, video_info)
except Exception as e:
logger.error(f"Error during backfill for {channel_id}: {e}", exc_info=True)
await context.bot.send_message(chat_id=OWNER_ID, text=f"❌ An error occurred during backfill for channel {channel_id}.\n`{e}`")
await context.bot.send_message(chat_id=OWNER_ID, text=f"✅ Finished backfilling for channel: {channel_id}.")
# --- Command Handlers ---
async def start(update: Update, context: CallbackContext):
start_message = (
"👋 *Welcome, Owner\\!*\n\n"
"I'm your YouTube Archiver bot\\. Here are the commands:\n\n"
"`/addchannel` \\- Add a new YouTube channel\\.\n"
"`/removechannel <URL>` \\- Remove a channel\\.\n"
"`/listchannels` \\- Show tracked channels\\.\n"
"`/status` \\- See what I'm currently doing\\.\n"
"`/stats` \\- View server resource usage\\.\n"
"`/cancel` \\- Cancel the `/addchannel` process\\."
)
await update.message.reply_text(start_message, parse_mode=ParseMode.MARKDOWN_V2)
# --- NEW: /status and /stats commands ---
async def status(update: Update, context: CallbackContext):
if update.effective_user.id != OWNER_ID: return
if not GLOBAL_STATUS["is_active"]:
await update.message.reply_text("STATUS: I'm currently idle and waiting for new videos.")
return
elapsed_time = time.strftime('%H:%M:%S', time.gmtime(time.time() - GLOBAL_STATUS['start_time']))
status_msg = (
f"🏃 *Active Task*\n\n"
f"Status: `{GLOBAL_STATUS['current_task']}`\n"
f"Channel: `{escape_markdown_v2(GLOBAL_STATUS['channel_title'])}`\n"
f"Video: `{escape_markdown_v2(GLOBAL_STATUS['video_title'])}`\n"
f"Running for: `{elapsed_time}`"
)
await update.message.reply_text(status_msg, parse_mode=ParseMode.MARKDOWN_V2)
async def stats(update: Update, context: CallbackContext):
if update.effective_user.id != OWNER_ID: return
cpu_usage = psutil.cpu_percent()
ram = psutil.virtual_memory()
ram_usage = ram.percent
disk = psutil.disk_usage('/')
disk_usage = disk.percent
def format_bytes(byte_count):
if byte_count is None: return "N/A"
power = 1024
n = 0
power_labels = {0: '', 1: 'K', 2: 'M', 3: 'G', 4: 'T'}
while byte_count >= power and n < len(power_labels) -1 :
byte_count /= power
n += 1
return f"{byte_count:.2f} {power_labels[n]}B"
stats_msg = (
f"💻 *System Stats*\n\n"
f"CPU Usage: `{cpu_usage}%`\n"
f"RAM Usage: `{ram_usage}%`\n"
f"Disk Usage: `{disk_usage}%`\n"
f" Total: `{format_bytes(disk.total)}`\n"
f" Used: `{format_bytes(disk.used)}`"
)
await update.message.reply_text(stats_msg, parse_mode=ParseMode.MARKDOWN_V2)
# --- NEW: Conversation Handler with Buttons ---
GET_URL, GET_QUALITY = range(2)
async def add_channel_start(update: Update, context: CallbackContext):
if update.effective_user.id != OWNER_ID: return ConversationHandler.END
await update.message.reply_text("Please send the URL of the YouTube channel.")
return GET_URL
async def get_url(update: Update, context: CallbackContext):
await update.message.reply_text("Resolving URL, please wait...")
channel_info = get_channel_info(update.message.text)
if not channel_info:
await update.message.reply_text("❌ Could not resolve this URL. Please send a valid YouTube channel URL, handle, or ID.")
return GET_URL
context.user_data['channel_info'] = channel_info
# NEW: Create Inline Keyboard
keyboard = [
[InlineKeyboardButton("Best (up to 4K)", callback_data='bestvideo+bestaudio/best')],
[InlineKeyboardButton("1080p", callback_data='bestvideo[height<=1080]+bestaudio/best[height<=1080]')],
[InlineKeyboardButton("720p", callback_data='bestvideo[height<=720]+bestaudio/best[height<=720]')],
[InlineKeyboardButton("480p", callback_data='bestvideo[height<=480]+bestaudio/best[height<=480]')],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ Channel found: *{escape_markdown_v2(channel_info['title'])}*\n\nPlease select the desired video quality:",
reply_markup=reply_markup,
parse_mode=ParseMode.MARKDOWN_V2
)
return GET_QUALITY
async def get_quality_from_button(update: Update, context: CallbackContext):
query = update.callback_query
await query.answer() # Acknowledge the button press
quality_val = query.data
channel_info = context.user_data['channel_info']
# NEW: Give user feedback on their choice
quality_map = {'best': 'Best', '1080': '1080p', '720': '720p', '480': '480p'}
quality_name = next((v for k, v in quality_map.items() if k in quality_val), 'Custom')
await query.edit_message_text(text=f"✅ Quality set to *{quality_name}* for channel *{escape_markdown_v2(channel_info['title'])}*\\.\n\nCreating topic and starting backfill\\, please wait\\.", parse_mode=ParseMode.MARKDOWN_V2)
try:
topic = await context.bot.create_forum_topic(chat_id=SUPERGROUP_ID, name=channel_info['title'])
add_channel_to_db(channel_info['id'], channel_info['title'], topic.message_thread_id, quality_val)
await context.bot.send_message(chat_id=OWNER_ID, text=f"Starting backfill for {channel_info['title']}. I will notify you when it's done.")
context.application.job_queue.run_once(backfill_channel_job, when=1, data={'channel_id': channel_info['id']})
except Exception as e:
logger.error(f"Failed to create topic or add to DB: {e}")
await query.edit_message_text(text=f"❌ *Error:*\n`{e}`\n\nPlease ensure the bot is an administrator in the supergroup and that 'Topics' are enabled.", parse_mode=ParseMode.MARKDOWN_V2)
context.user_data.clear()
return ConversationHandler.END
async def cancel_conversation(update: Update, context: CallbackContext):
await update.message.reply_text("Add channel process cancelled.")
context.user_data.clear()
return ConversationHandler.END
async def remove_channel(update: Update, context: CallbackContext):
if update.effective_user.id != OWNER_ID: return
if not context.args:
await update.message.reply_text("Usage: /removechannel <URL, Handle or ID>")
return
await update.message.reply_text("Resolving and removing...")
channel_info = get_channel_info(' '.join(context.args))
if not channel_info:
await update.message.reply_text("Could not resolve this channel.")
return
if remove_channel_from_db(channel_info['id']) > 0:
await update.message.reply_text(f"✅ Successfully removed '{channel_info['title']}'.")
else:
await update.message.reply_text("That channel was not in the tracking list.")
async def list_channels(update: Update, context: CallbackContext):
if update.effective_user.id != OWNER_ID: return
channels = get_all_channels_from_db()
if not channels:
await update.message.reply_text("The tracking list is currently empty.")
return
message = "📄 *Currently Tracked Channels:*\n\n"
for channel in channels:
title = escape_markdown_v2(channel.get('channel_title', 'N/A'))
quality_val = channel.get('quality', 'best')
quality_name = 'Best'
if '1080' in quality_val: quality_name = '1080p'
elif '720' in quality_val: quality_name = '720p'
elif '480' in quality_val: quality_name = '480p'
message += f"▪️ *{title}*\n `Quality: {quality_name}`\n"
await update.message.reply_text(message, parse_mode=ParseMode.MARKDOWN_V2)
# --- Main Application Runner ---
async def main():
# ... (Pre-flight checks remain the same) ...
application = Application.builder().token(BOT_TOKEN).build()
# NEW: Updated ConversationHandler to use CallbackQueryHandler for buttons
add_channel_conv = ConversationHandler(
entry_points=[CommandHandler("addchannel", add_channel_start)],
states={
GET_URL: [MessageHandler(filters.TEXT & ~filters.COMMAND, get_url)],
GET_QUALITY: [CallbackQueryHandler(get_quality_from_button)], # This is the change
},
fallbacks=[CommandHandler("cancel", cancel_conversation)],
)
# Add all handlers
application.add_handler(CommandHandler("start", start))
application.add_handler(add_channel_conv)
application.add_handler(CommandHandler("removechannel", remove_channel))
application.add_handler(CommandHandler("listchannels", list_channels))
# NEW: Add handlers for new commands
application.add_handler(CommandHandler("status", status))
application.add_handler(CommandHandler("stats", stats))
# Schedule the recurring job
application.job_queue.run_repeating(check_youtube_channels, interval=CHECK_INTERVAL_SECONDS, first=10)
async with user_bot, application:
logger.info("Starting all clients...")
await application.initialize()
await application.start()
await application.updater.start_polling()
logger.info("Bot is now fully operational with new features.")
await asyncio.Event().wait()
if __name__ == "__main__":
try:
asyncio.run(main())
except (KeyboardInterrupt, SystemExit):
logger.info("Bot stopped manually.")