#!/usr/local/bin/python3 # coding: utf-8 # ytdlbot - new.py # 8/14/21 14:37 # __author__ = "Benny " import logging import os import re import threading import time import typing from io import BytesIO from typing import Any import psutil import pyrogram.errors import yt_dlp from apscheduler.schedulers.background import BackgroundScheduler from pyrogram import Client, enums, filters, types from config import ( APP_HASH, APP_ID, AUTHORIZED_USER, BOT_TOKEN, ENABLE_ARIA2, ENABLE_FFMPEG, M3U8_SUPPORT, ENABLE_VIP, OWNER, PROVIDER_TOKEN, TOKEN_PRICE, BotText, ) from database.model import ( credit_account, get_format_settings, get_free_quota, get_paid_quota, get_quality_settings, init_user, reset_free, set_user_settings, ) from engine import direct_entrance, youtube_entrance, special_download_entrance from utils import extract_url_and_name, sizeof_fmt, timeof_fmt logging.info("Authorized users are %s", AUTHORIZED_USER) logging.getLogger("apscheduler.executors.default").propagate = False def create_app(name: str = "main", workers: int = 64) -> Client: """Create and configure Pyrogram client instance""" return Client( name, api_id=APP_ID, api_hash=APP_HASH, bot_token=BOT_TOKEN, workers=workers # plugins=dict(root="plugins") ) app = create_app("main") def private_use(func): def wrapper(client: Client, message: types.Message): chat_id = getattr(message.from_user, "id", None) # message type check if message.chat.type != enums.ChatType.PRIVATE and not getattr(message, "text", "").lower().startswith("/ytdl"): logging.debug("%s, it's annoying me...๐Ÿ™„๏ธ ", message.text) return # authorized users check if AUTHORIZED_USER: users = [int(i) for i in AUTHORIZED_USER.split(",")] else: users = [] if users and chat_id and chat_id not in users: message.reply_text("BotText.private", quote=True) return return func(client, message) return wrapper @app.on_message(filters.command(["start"])) def start_handler(client: Client, message: types.Message): from_id = message.chat.id init_user(from_id) logging.info("%s welcome to youtube-dl bot!", message.from_user.id) client.send_chat_action(from_id, enums.ChatAction.TYPING) free, paid = get_free_quota(from_id), get_paid_quota(from_id) client.send_message( from_id, BotText.start + f"You have {free} free and {paid} paid quota.", disable_web_page_preview=True, ) @app.on_message(filters.command(["help"])) def help_handler(client: Client, message: types.Message): chat_id = message.chat.id init_user(chat_id) client.send_chat_action(chat_id, enums.ChatAction.TYPING) client.send_message(chat_id, BotText.help, disable_web_page_preview=True) @app.on_message(filters.command(["about"])) def about_handler(client: Client, message: types.Message): chat_id = message.chat.id init_user(chat_id) client.send_chat_action(chat_id, enums.ChatAction.TYPING) client.send_message(chat_id, BotText.about) @app.on_message(filters.command(["ping"])) def ping_handler(client: Client, message: types.Message): chat_id = message.chat.id init_user(chat_id) client.send_chat_action(chat_id, enums.ChatAction.TYPING) def send_message_and_measure_ping(): start_time = int(round(time.time() * 1000)) reply: types.Message | typing.Any = client.send_message(chat_id, "Starting Ping...") end_time = int(round(time.time() * 1000)) ping_time = int(round(end_time - start_time)) message_sent = True if message_sent: message.reply_text(f"Ping: {ping_time:.2f} ms", quote=True) time.sleep(0.5) client.edit_message_text(chat_id=reply.chat.id, message_id=reply.id, text="Ping Calculation Complete.") time.sleep(1) client.delete_messages(chat_id=reply.chat.id, message_ids=reply.id) thread = threading.Thread(target=send_message_and_measure_ping) thread.start() @app.on_message(filters.command(["buy"])) def buy(client: Client, message: types.Message): markup = types.InlineKeyboardMarkup( [ [ # First row types.InlineKeyboardButton("10-$1", callback_data="buy-10-1"), types.InlineKeyboardButton("20-$2", callback_data="buy-20-2"), types.InlineKeyboardButton("40-$3.5", callback_data="buy-40-3.5"), ], [ # second row types.InlineKeyboardButton("50-$4", callback_data="buy-50-4"), types.InlineKeyboardButton("75-$6", callback_data="buy-75-6"), types.InlineKeyboardButton("100-$8", callback_data="buy-100-8"), ], ] ) message.reply_text("Please choose the amount you want to buy.", reply_markup=markup) @app.on_callback_query(filters.regex(r"buy.*")) def send_invoice(client: Client, callback_query: types.CallbackQuery): chat_id = callback_query.message.chat.id data = callback_query.data _, count, price = data.split("-") price = int(float(price) * 100) client.send_invoice( chat_id, f"{count} permanent download quota", "Please make a payment via Stripe", f"{count}", "USD", [types.LabeledPrice(label="VIP", amount=price)], provider_token=os.getenv("PROVIDER_TOKEN"), protect_content=True, start_parameter="no-forward-placeholder", ) @app.on_pre_checkout_query() def pre_checkout(client: Client, query: types.PreCheckoutQuery): client.answer_pre_checkout_query(query.id, ok=True) @app.on_message(filters.successful_payment) def successful_payment(client: Client, message: types.Message): who = message.chat.id amount = message.successful_payment.total_amount # in cents quota = int(message.successful_payment.invoice_payload) ch = message.successful_payment.provider_payment_charge_id free, paid = credit_account(who, amount, quota, ch) if paid > 0: message.reply_text(f"Payment successful! You now have {free} free and {paid} paid quota.") else: message.reply_text("Something went wrong. Please contact the admin.") message.delete() @app.on_message(filters.command(["stats"])) def stats_handler(client: Client, message: types.Message): chat_id = message.chat.id init_user(chat_id) client.send_chat_action(chat_id, enums.ChatAction.TYPING) cpu_usage = psutil.cpu_percent() total, used, free, disk = psutil.disk_usage("/") swap = psutil.swap_memory() memory = psutil.virtual_memory() boot_time = psutil.boot_time() owner_stats = ( "\n\nโŒฌโ”€โ”€โ”€โ”€โ”€ใ€Œ Stats ใ€โ”€โ”€โ”€โ”€โ”€โŒฌ\n\n" f"โ•ญ๐Ÿ–ฅ๏ธ **CPU Usage ยป** __{cpu_usage}%__\n" f"โ”œ๐Ÿ’พ **RAM Usage ยป** __{memory.percent}%__\n" f"โ•ฐ๐Ÿ—ƒ๏ธ **DISK Usage ยป** __{disk}%__\n\n" f"โ•ญ๐Ÿ“คUpload: {sizeof_fmt(psutil.net_io_counters().bytes_sent)}\n" f"โ•ฐ๐Ÿ“ฅDownload: {sizeof_fmt(psutil.net_io_counters().bytes_recv)}\n\n\n" f"Memory Total: {sizeof_fmt(memory.total)}\n" f"Memory Free: {sizeof_fmt(memory.available)}\n" f"Memory Used: {sizeof_fmt(memory.used)}\n" f"SWAP Total: {sizeof_fmt(swap.total)} | SWAP Usage: {swap.percent}%\n\n" f"Total Disk Space: {sizeof_fmt(total)}\n" f"Used: {sizeof_fmt(used)} | Free: {sizeof_fmt(free)}\n\n" f"Physical Cores: {psutil.cpu_count(logical=False)}\n" f"Total Cores: {psutil.cpu_count(logical=True)}\n\n" f"๐Ÿค–Bot Uptime: {timeof_fmt(time.time() - botStartTime)}\n" f"โฒ๏ธOS Uptime: {timeof_fmt(time.time() - boot_time)}\n" ) user_stats = ( "\n\nโŒฌโ”€โ”€โ”€โ”€โ”€ใ€Œ Stats ใ€โ”€โ”€โ”€โ”€โ”€โŒฌ\n\n" f"โ•ญ๐Ÿ–ฅ๏ธ **CPU Usage ยป** __{cpu_usage}%__\n" f"โ”œ๐Ÿ’พ **RAM Usage ยป** __{memory.percent}%__\n" f"โ•ฐ๐Ÿ—ƒ๏ธ **DISK Usage ยป** __{disk}%__\n\n" f"โ•ญ๐Ÿ“คUpload: {sizeof_fmt(psutil.net_io_counters().bytes_sent)}\n" f"โ•ฐ๐Ÿ“ฅDownload: {sizeof_fmt(psutil.net_io_counters().bytes_recv)}\n\n\n" f"Memory Total: {sizeof_fmt(memory.total)}\n" f"Memory Free: {sizeof_fmt(memory.available)}\n" f"Memory Used: {sizeof_fmt(memory.used)}\n" f"Total Disk Space: {sizeof_fmt(total)}\n" f"Used: {sizeof_fmt(used)} | Free: {sizeof_fmt(free)}\n\n" f"๐Ÿค–Bot Uptime: {timeof_fmt(time.time() - botStartTime)}\n" ) if message.from_user.id in OWNER: message.reply_text(owner_stats, quote=True) else: message.reply_text(user_stats, quote=True) @app.on_message(filters.command(["settings"])) def settings_handler(client: Client, message: types.Message): chat_id = message.chat.id init_user(chat_id) client.send_chat_action(chat_id, enums.ChatAction.TYPING) markup = types.InlineKeyboardMarkup( [ [ # First row types.InlineKeyboardButton("send as document", callback_data="document"), types.InlineKeyboardButton("send as video", callback_data="video"), types.InlineKeyboardButton("send as audio", callback_data="audio"), ], [ # second row types.InlineKeyboardButton("High Quality", callback_data="high"), types.InlineKeyboardButton("Medium Quality", callback_data="medium"), types.InlineKeyboardButton("Low Quality", callback_data="low"), ], ] ) quality = get_quality_settings(chat_id) send_type = get_format_settings(chat_id) client.send_message(chat_id, BotText.settings.format(quality, send_type), reply_markup=markup) @app.on_message(filters.command(["direct"])) def direct_download(client: Client, message: types.Message): chat_id = message.chat.id init_user(chat_id) client.send_chat_action(chat_id, enums.ChatAction.TYPING) message_text = message.text url, new_name = extract_url_and_name(message_text) logging.info("Direct download using aria2/requests start %s", url) if url is None or not re.findall(r"^https?://", url.lower()): message.reply_text("Send me a correct LINK.", quote=True) return bot_msg = message.reply_text("Direct download request received.", quote=True) try: direct_entrance(client, bot_msg, url) except ValueError as e: message.reply_text(e.__str__(), quote=True) bot_msg.delete() return @app.on_message(filters.command(["spdl"])) def spdl_handler(client: Client, message: types.Message): chat_id = message.chat.id init_user(chat_id) client.send_chat_action(chat_id, enums.ChatAction.TYPING) message_text = message.text url, new_name = extract_url_and_name(message_text) logging.info("spdl start %s", url) if url is None or not re.findall(r"^https?://", url.lower()): message.reply_text("Something wrong ๐Ÿค”.\nCheck your URL and send me again.", quote=True) return bot_msg = message.reply_text("SPDL request received.", quote=True) try: special_download_entrance(client, bot_msg, url) except ValueError as e: message.reply_text(e.__str__(), quote=True) bot_msg.delete() return @app.on_message(filters.command(["ytdl"]) & filters.group) def ytdl_handler(client: Client, message: types.Message): # for group only init_user(message.from_user.id) client.send_chat_action(message.chat.id, enums.ChatAction.TYPING) message_text = message.text url, new_name = extract_url_and_name(message_text) logging.info("ytdl start %s", url) if url is None or not re.findall(r"^https?://", url.lower()): message.reply_text("Check your URL.", quote=True) return bot_msg = message.reply_text("Group download request received.", quote=True) try: youtube_entrance(client, bot_msg, url) except ValueError as e: message.reply_text(e.__str__(), quote=True) bot_msg.delete() return def check_link(url: str): ytdl = yt_dlp.YoutubeDL() if re.findall(r"^https://www\.youtube\.com/channel/", url) or "list" in url: # TODO maybe using ytdl.extract_info raise ValueError("Playlist or channel download are not supported at this moment.") if not M3U8_SUPPORT and (re.findall(r"m3u8|\.m3u8|\.m3u$", url.lower())): return "m3u8 links are disabled." @app.on_message(filters.incoming & filters.text) @private_use def download_handler(client: Client, message: types.Message): chat_id = message.from_user.id init_user(chat_id) client.send_chat_action(chat_id, enums.ChatAction.TYPING) url = message.text logging.info("start %s", url) try: check_link(url) # raise pyrogram.errors.exceptions.FloodWait(10) bot_msg: types.Message | Any = message.reply_text("**__Task received.__**", quote=True) client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_VIDEO) youtube_entrance(client, bot_msg, url) except pyrogram.errors.Flood as e: f = BytesIO() f.write(str(e).encode()) f.write(b"Your job will be done soon. Just wait!") f.name = "Please wait.txt" message.reply_document(f, caption=f"Flood wait! Please wait {e} seconds...", quote=True) f.close() client.send_message(OWNER, f"Flood wait! ๐Ÿ™ {e} seconds....") time.sleep(e.value) except ValueError as e: message.reply_text(e.__str__(), quote=True) except Exception as e: logging.error("Download failed", exc_info=True) message.reply_text(f"โŒ Download failed: {e}", quote=True) @app.on_callback_query(filters.regex(r"document|video|audio")) def format_callback(client: Client, callback_query: types.CallbackQuery): chat_id = callback_query.message.chat.id data = callback_query.data logging.info("Setting %s file type to %s", chat_id, data) callback_query.answer(f"Your send type was set to {callback_query.data}") set_user_settings(chat_id, "format", data) @app.on_callback_query(filters.regex(r"high|medium|low")) def quality_callback(client: Client, callback_query: types.CallbackQuery): chat_id = callback_query.message.chat.id data = callback_query.data logging.info("Setting %s download quality to %s", chat_id, data) callback_query.answer(f"Your default engine quality was set to {callback_query.data}") set_user_settings(chat_id, "quality", data) #=======EXEC============ import asyncio import logging from io import BytesIO MAX_MESSAGE_LENGTH = 4096 COMMAND_TIMEOUT = 60 # Timeout in seconds COMMAND_ALIASES = { "update": "git pull", "restart": "systemctl restart mybot.service", } command_history = [] logging.basicConfig(filename="bash_logs.txt", level=logging.INFO) @app.on_message(filters.command("bash") & filters.user(OWNER)) async def execution(_, message): status_message = await message.reply_text("`Processing ...`") cmd = message.text.split(" ", maxsplit=1)[1] # Replace command with alias if it exists cmd = COMMAND_ALIASES.get(cmd, cmd) reply_to_ = message if message.reply_to_message: reply_to_ = message.reply_to_message try: # Log the command logging.info(f"Command executed by {message.from_user.id}: {cmd}") # Run the command with a timeout process = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) try: stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=COMMAND_TIMEOUT) except asyncio.TimeoutError: await process.kill() # Kill the process if it times out await status_message.edit_text("โŒ **Timeout**: The command took too long to execute.") return e = stderr.decode().strip() if stderr else "๐Ÿ˜‚" o = stdout.decode().strip() if stdout else "๐Ÿ˜" OUTPUT = "" OUTPUT += f"QUERY:\nCommand:\n{cmd} \n" OUTPUT += f"PID: {process.pid}\n\n" OUTPUT += f"stderr: \n{e}\n\n" OUTPUT += f"stdout: \n{o}" if len(OUTPUT) > MAX_MESSAGE_LENGTH: with BytesIO(str.encode(OUTPUT)) as out_file: out_file.name = "exec.txt" await reply_to_.reply_document( document=out_file, caption=cmd[: MAX_MESSAGE_LENGTH // 4 - 1], disable_notification=True, quote=True, ) os.remove("exec.txt") else: await reply_to_.reply_text(OUTPUT, quote=True) # Add command to history command_history.append(cmd) if len(command_history) > 25: # Keep only the last 10 commands command_history.pop(0) except Exception as ex: await reply_to_.reply_text(f"โŒ **Error**: {str(ex)}", quote=True) finally: await status_message.delete() @app.on_message(filters.command("bhis") & filters.user(OWNER)) async def show_history(_, message): # Add numbering to each command and wrap in tags formatted_history = "\n".join(f"{i + 1}. {cmd}" for i, cmd in enumerate(reversed(command_history))) # Check if the message exceeds Telegram's character limit if len(formatted_history) > MAX_MESSAGE_LENGTH: # Send as a text file with BytesIO(str.encode(formatted_history)) as out_file: out_file.name = "command_history.txt" await message.reply_document( document=out_file, caption="__Limit exceeded, so sending as file__", quote=True, ) os.remove("command_history.txt") else: # Send as a regular message await message.reply_text(f"Command History:\n{formatted_history}", quote=True) if __name__ == "__main__": scheduler = BackgroundScheduler() scheduler.add_job(reset_free, "cron", hour=0, minute=0) scheduler.start() banner = """ โ–Œ โ–Œ โ–€โ–›โ–˜ โ–Œ โ–›โ–€โ–– โ–œ โ–Œ โ–โ–ž โ–žโ–€โ–– โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–›โ–€โ–– โ–žโ–€โ–– โ–Œ โ–Œ โ–žโ–€โ–– โ–Œ โ–Œ โ–›โ–€โ–– โ– โ–žโ–€โ–– โ–โ–€โ–– โ–žโ–€โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–Œ โ–›โ–€ โ–Œ โ–Œ โ–Œ โ–Œ โ–โ–โ– โ–Œ โ–Œ โ– โ–Œ โ–Œ โ–žโ–€โ–Œ โ–Œ โ–Œ โ–˜ โ–โ–€ โ–โ–€โ–˜ โ–˜ โ–โ–€โ–˜ โ–€โ–€ โ–โ–€โ–˜ โ–€โ–€ โ–โ–€ โ–˜โ–˜ โ–˜ โ–˜ โ–˜ โ–โ–€ โ–โ–€โ–˜ โ–โ–€โ–˜ By @BennyThink, VIP Mode: {vip_status}""".format(vip_status=ENABLE_VIP) print(banner) app.run()