Spaces:
Paused
Paused
| #!/usr/local/bin/python3 | |
| # coding: utf-8 | |
| # ytdlbot - new.py | |
| # 8/14/21 14:37 | |
| # | |
| __author__ = "Benny <benny.think@gmail.com>" | |
| import logging | |
| import os | |
| import re | |
| import threading | |
| import time | |
| import typing | |
| from io import BytesIO | |
| from typing import Any | |
| import psutil | |
| import pyrogram.errors | |
| import yt_dlp | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from pyrogram import Client, enums, filters, types | |
| from config import ( | |
| APP_HASH, | |
| APP_ID, | |
| AUTHORIZED_USER, | |
| BOT_TOKEN, | |
| ENABLE_ARIA2, | |
| ENABLE_FFMPEG, | |
| M3U8_SUPPORT, | |
| ENABLE_VIP, | |
| OWNER, | |
| PROVIDER_TOKEN, | |
| TOKEN_PRICE, | |
| BotText, | |
| ) | |
| from database.model import ( | |
| credit_account, | |
| get_format_settings, | |
| get_free_quota, | |
| get_paid_quota, | |
| get_quality_settings, | |
| init_user, | |
| reset_free, | |
| set_user_settings, | |
| ) | |
| from engine import direct_entrance, youtube_entrance, special_download_entrance | |
| from utils import extract_url_and_name, sizeof_fmt, timeof_fmt | |
| logging.info("Authorized users are %s", AUTHORIZED_USER) | |
| logging.getLogger("apscheduler.executors.default").propagate = False | |
| def create_app(name: str = "main", workers: int = 64) -> Client: | |
| """Create and configure Pyrogram client instance""" | |
| return Client( | |
| name, | |
| api_id=APP_ID, | |
| api_hash=APP_HASH, | |
| bot_token=BOT_TOKEN, | |
| workers=workers | |
| # plugins=dict(root="plugins") | |
| ) | |
| app = create_app("main") | |
| def private_use(func): | |
| def wrapper(client: Client, message: types.Message): | |
| chat_id = getattr(message.from_user, "id", None) | |
| # message type check | |
| if message.chat.type != enums.ChatType.PRIVATE and not getattr(message, "text", "").lower().startswith("/ytdl"): | |
| logging.debug("%s, it's annoying me...ποΈ ", message.text) | |
| return | |
| # authorized users check | |
| if AUTHORIZED_USER: | |
| users = [int(i) for i in AUTHORIZED_USER.split(",")] | |
| else: | |
| users = [] | |
| if users and chat_id and chat_id not in users: | |
| message.reply_text("BotText.private", quote=True) | |
| return | |
| return func(client, message) | |
| return wrapper | |
| def start_handler(client: Client, message: types.Message): | |
| from_id = message.chat.id | |
| init_user(from_id) | |
| logging.info("%s welcome to youtube-dl bot!", message.from_user.id) | |
| client.send_chat_action(from_id, enums.ChatAction.TYPING) | |
| free, paid = get_free_quota(from_id), get_paid_quota(from_id) | |
| client.send_message( | |
| from_id, | |
| BotText.start + f"You have {free} free and {paid} paid quota.", | |
| disable_web_page_preview=True, | |
| ) | |
| def help_handler(client: Client, message: types.Message): | |
| chat_id = message.chat.id | |
| init_user(chat_id) | |
| client.send_chat_action(chat_id, enums.ChatAction.TYPING) | |
| client.send_message(chat_id, BotText.help, disable_web_page_preview=True) | |
| def about_handler(client: Client, message: types.Message): | |
| chat_id = message.chat.id | |
| init_user(chat_id) | |
| client.send_chat_action(chat_id, enums.ChatAction.TYPING) | |
| client.send_message(chat_id, BotText.about) | |
| def ping_handler(client: Client, message: types.Message): | |
| chat_id = message.chat.id | |
| init_user(chat_id) | |
| client.send_chat_action(chat_id, enums.ChatAction.TYPING) | |
| def send_message_and_measure_ping(): | |
| start_time = int(round(time.time() * 1000)) | |
| reply: types.Message | typing.Any = client.send_message(chat_id, "Starting Ping...") | |
| end_time = int(round(time.time() * 1000)) | |
| ping_time = int(round(end_time - start_time)) | |
| message_sent = True | |
| if message_sent: | |
| message.reply_text(f"Ping: {ping_time:.2f} ms", quote=True) | |
| time.sleep(0.5) | |
| client.edit_message_text(chat_id=reply.chat.id, message_id=reply.id, text="Ping Calculation Complete.") | |
| time.sleep(1) | |
| client.delete_messages(chat_id=reply.chat.id, message_ids=reply.id) | |
| thread = threading.Thread(target=send_message_and_measure_ping) | |
| thread.start() | |
| def buy(client: Client, message: types.Message): | |
| markup = types.InlineKeyboardMarkup( | |
| [ | |
| [ # First row | |
| types.InlineKeyboardButton("10-$1", callback_data="buy-10-1"), | |
| types.InlineKeyboardButton("20-$2", callback_data="buy-20-2"), | |
| types.InlineKeyboardButton("40-$3.5", callback_data="buy-40-3.5"), | |
| ], | |
| [ # second row | |
| types.InlineKeyboardButton("50-$4", callback_data="buy-50-4"), | |
| types.InlineKeyboardButton("75-$6", callback_data="buy-75-6"), | |
| types.InlineKeyboardButton("100-$8", callback_data="buy-100-8"), | |
| ], | |
| ] | |
| ) | |
| message.reply_text("Please choose the amount you want to buy.", reply_markup=markup) | |
| def send_invoice(client: Client, callback_query: types.CallbackQuery): | |
| chat_id = callback_query.message.chat.id | |
| data = callback_query.data | |
| _, count, price = data.split("-") | |
| price = int(float(price) * 100) | |
| client.send_invoice( | |
| chat_id, | |
| f"{count} permanent download quota", | |
| "Please make a payment via Stripe", | |
| f"{count}", | |
| "USD", | |
| [types.LabeledPrice(label="VIP", amount=price)], | |
| provider_token=os.getenv("PROVIDER_TOKEN"), | |
| protect_content=True, | |
| start_parameter="no-forward-placeholder", | |
| ) | |
| def pre_checkout(client: Client, query: types.PreCheckoutQuery): | |
| client.answer_pre_checkout_query(query.id, ok=True) | |
| def successful_payment(client: Client, message: types.Message): | |
| who = message.chat.id | |
| amount = message.successful_payment.total_amount # in cents | |
| quota = int(message.successful_payment.invoice_payload) | |
| ch = message.successful_payment.provider_payment_charge_id | |
| free, paid = credit_account(who, amount, quota, ch) | |
| if paid > 0: | |
| message.reply_text(f"Payment successful! You now have {free} free and {paid} paid quota.") | |
| else: | |
| message.reply_text("Something went wrong. Please contact the admin.") | |
| message.delete() | |
| def stats_handler(client: Client, message: types.Message): | |
| chat_id = message.chat.id | |
| init_user(chat_id) | |
| client.send_chat_action(chat_id, enums.ChatAction.TYPING) | |
| cpu_usage = psutil.cpu_percent() | |
| total, used, free, disk = psutil.disk_usage("/") | |
| swap = psutil.swap_memory() | |
| memory = psutil.virtual_memory() | |
| boot_time = psutil.boot_time() | |
| owner_stats = ( | |
| "\n\nβ¬βββββγ Stats γββββββ¬\n\n" | |
| f"<b>βπ₯οΈ **CPU Usage Β»**</b> __{cpu_usage}%__\n" | |
| f"<b>βπΎ **RAM Usage Β»**</b> __{memory.percent}%__\n" | |
| f"<b>β°ποΈ **DISK Usage Β»**</b> __{disk}%__\n\n" | |
| f"<b>βπ€Upload:</b> {sizeof_fmt(psutil.net_io_counters().bytes_sent)}\n" | |
| f"<b>β°π₯Download:</b> {sizeof_fmt(psutil.net_io_counters().bytes_recv)}\n\n\n" | |
| f"<b>Memory Total:</b> {sizeof_fmt(memory.total)}\n" | |
| f"<b>Memory Free:</b> {sizeof_fmt(memory.available)}\n" | |
| f"<b>Memory Used:</b> {sizeof_fmt(memory.used)}\n" | |
| f"<b>SWAP Total:</b> {sizeof_fmt(swap.total)} | <b>SWAP Usage:</b> {swap.percent}%\n\n" | |
| f"<b>Total Disk Space:</b> {sizeof_fmt(total)}\n" | |
| f"<b>Used:</b> {sizeof_fmt(used)} | <b>Free:</b> {sizeof_fmt(free)}\n\n" | |
| f"<b>Physical Cores:</b> {psutil.cpu_count(logical=False)}\n" | |
| f"<b>Total Cores:</b> {psutil.cpu_count(logical=True)}\n\n" | |
| f"<b>π€Bot Uptime:</b> {timeof_fmt(time.time() - botStartTime)}\n" | |
| f"<b>β²οΈOS Uptime:</b> {timeof_fmt(time.time() - boot_time)}\n" | |
| ) | |
| user_stats = ( | |
| "\n\nβ¬βββββγ Stats γββββββ¬\n\n" | |
| f"<b>βπ₯οΈ **CPU Usage Β»**</b> __{cpu_usage}%__\n" | |
| f"<b>βπΎ **RAM Usage Β»**</b> __{memory.percent}%__\n" | |
| f"<b>β°ποΈ **DISK Usage Β»**</b> __{disk}%__\n\n" | |
| f"<b>βπ€Upload:</b> {sizeof_fmt(psutil.net_io_counters().bytes_sent)}\n" | |
| f"<b>β°π₯Download:</b> {sizeof_fmt(psutil.net_io_counters().bytes_recv)}\n\n\n" | |
| f"<b>Memory Total:</b> {sizeof_fmt(memory.total)}\n" | |
| f"<b>Memory Free:</b> {sizeof_fmt(memory.available)}\n" | |
| f"<b>Memory Used:</b> {sizeof_fmt(memory.used)}\n" | |
| f"<b>Total Disk Space:</b> {sizeof_fmt(total)}\n" | |
| f"<b>Used:</b> {sizeof_fmt(used)} | <b>Free:</b> {sizeof_fmt(free)}\n\n" | |
| f"<b>π€Bot Uptime:</b> {timeof_fmt(time.time() - botStartTime)}\n" | |
| ) | |
| if message.from_user.id in OWNER: | |
| message.reply_text(owner_stats, quote=True) | |
| else: | |
| message.reply_text(user_stats, quote=True) | |
| def settings_handler(client: Client, message: types.Message): | |
| chat_id = message.chat.id | |
| init_user(chat_id) | |
| client.send_chat_action(chat_id, enums.ChatAction.TYPING) | |
| markup = types.InlineKeyboardMarkup( | |
| [ | |
| [ # First row | |
| types.InlineKeyboardButton("send as document", callback_data="document"), | |
| types.InlineKeyboardButton("send as video", callback_data="video"), | |
| types.InlineKeyboardButton("send as audio", callback_data="audio"), | |
| ], | |
| [ # second row | |
| types.InlineKeyboardButton("High Quality", callback_data="high"), | |
| types.InlineKeyboardButton("Medium Quality", callback_data="medium"), | |
| types.InlineKeyboardButton("Low Quality", callback_data="low"), | |
| ], | |
| ] | |
| ) | |
| quality = get_quality_settings(chat_id) | |
| send_type = get_format_settings(chat_id) | |
| client.send_message(chat_id, BotText.settings.format(quality, send_type), reply_markup=markup) | |
| def direct_download(client: Client, message: types.Message): | |
| chat_id = message.chat.id | |
| init_user(chat_id) | |
| client.send_chat_action(chat_id, enums.ChatAction.TYPING) | |
| message_text = message.text | |
| url, new_name = extract_url_and_name(message_text) | |
| logging.info("Direct download using aria2/requests start %s", url) | |
| if url is None or not re.findall(r"^https?://", url.lower()): | |
| message.reply_text("Send me a correct LINK.", quote=True) | |
| return | |
| bot_msg = message.reply_text("Direct download request received.", quote=True) | |
| try: | |
| direct_entrance(client, bot_msg, url) | |
| except ValueError as e: | |
| message.reply_text(e.__str__(), quote=True) | |
| bot_msg.delete() | |
| return | |
| def spdl_handler(client: Client, message: types.Message): | |
| chat_id = message.chat.id | |
| init_user(chat_id) | |
| client.send_chat_action(chat_id, enums.ChatAction.TYPING) | |
| message_text = message.text | |
| url, new_name = extract_url_and_name(message_text) | |
| logging.info("spdl start %s", url) | |
| if url is None or not re.findall(r"^https?://", url.lower()): | |
| message.reply_text("Something wrong π€.\nCheck your URL and send me again.", quote=True) | |
| return | |
| bot_msg = message.reply_text("SPDL request received.", quote=True) | |
| try: | |
| special_download_entrance(client, bot_msg, url) | |
| except ValueError as e: | |
| message.reply_text(e.__str__(), quote=True) | |
| bot_msg.delete() | |
| return | |
| def ytdl_handler(client: Client, message: types.Message): | |
| # for group only | |
| init_user(message.from_user.id) | |
| client.send_chat_action(message.chat.id, enums.ChatAction.TYPING) | |
| message_text = message.text | |
| url, new_name = extract_url_and_name(message_text) | |
| logging.info("ytdl start %s", url) | |
| if url is None or not re.findall(r"^https?://", url.lower()): | |
| message.reply_text("Check your URL.", quote=True) | |
| return | |
| bot_msg = message.reply_text("Group download request received.", quote=True) | |
| try: | |
| youtube_entrance(client, bot_msg, url) | |
| except ValueError as e: | |
| message.reply_text(e.__str__(), quote=True) | |
| bot_msg.delete() | |
| return | |
| def check_link(url: str): | |
| ytdl = yt_dlp.YoutubeDL() | |
| if re.findall(r"^https://www\.youtube\.com/channel/", url) or "list" in url: | |
| # TODO maybe using ytdl.extract_info | |
| raise ValueError("Playlist or channel download are not supported at this moment.") | |
| if not M3U8_SUPPORT and (re.findall(r"m3u8|\.m3u8|\.m3u$", url.lower())): | |
| return "m3u8 links are disabled." | |
| def download_handler(client: Client, message: types.Message): | |
| chat_id = message.from_user.id | |
| init_user(chat_id) | |
| client.send_chat_action(chat_id, enums.ChatAction.TYPING) | |
| url = message.text | |
| logging.info("start %s", url) | |
| try: | |
| check_link(url) | |
| # raise pyrogram.errors.exceptions.FloodWait(10) | |
| bot_msg: types.Message | Any = message.reply_text("**__Task received.__**", quote=True) | |
| client.send_chat_action(chat_id, enums.ChatAction.UPLOAD_VIDEO) | |
| youtube_entrance(client, bot_msg, url) | |
| except pyrogram.errors.Flood as e: | |
| f = BytesIO() | |
| f.write(str(e).encode()) | |
| f.write(b"Your job will be done soon. Just wait!") | |
| f.name = "Please wait.txt" | |
| message.reply_document(f, caption=f"Flood wait! Please wait {e} seconds...", quote=True) | |
| f.close() | |
| client.send_message(OWNER, f"Flood wait! π {e} seconds....") | |
| time.sleep(e.value) | |
| except ValueError as e: | |
| message.reply_text(e.__str__(), quote=True) | |
| except Exception as e: | |
| logging.error("Download failed", exc_info=True) | |
| message.reply_text(f"β Download failed: {e}", quote=True) | |
| def format_callback(client: Client, callback_query: types.CallbackQuery): | |
| chat_id = callback_query.message.chat.id | |
| data = callback_query.data | |
| logging.info("Setting %s file type to %s", chat_id, data) | |
| callback_query.answer(f"Your send type was set to {callback_query.data}") | |
| set_user_settings(chat_id, "format", data) | |
| def quality_callback(client: Client, callback_query: types.CallbackQuery): | |
| chat_id = callback_query.message.chat.id | |
| data = callback_query.data | |
| logging.info("Setting %s download quality to %s", chat_id, data) | |
| callback_query.answer(f"Your default engine quality was set to {callback_query.data}") | |
| set_user_settings(chat_id, "quality", data) | |
| #=======EXEC============ | |
| import asyncio | |
| import logging | |
| from io import BytesIO | |
| MAX_MESSAGE_LENGTH = 4096 | |
| COMMAND_TIMEOUT = 60 # Timeout in seconds | |
| COMMAND_ALIASES = { | |
| "update": "git pull", | |
| "restart": "systemctl restart mybot.service", | |
| } | |
| command_history = [] | |
| logging.basicConfig(filename="bash_logs.txt", level=logging.INFO) | |
| async def execution(_, message): | |
| status_message = await message.reply_text("`Processing ...`") | |
| cmd = message.text.split(" ", maxsplit=1)[1] | |
| # Replace command with alias if it exists | |
| cmd = COMMAND_ALIASES.get(cmd, cmd) | |
| reply_to_ = message | |
| if message.reply_to_message: | |
| reply_to_ = message.reply_to_message | |
| try: | |
| # Log the command | |
| logging.info(f"Command executed by {message.from_user.id}: {cmd}") | |
| # Run the command with a timeout | |
| process = await asyncio.create_subprocess_shell( | |
| cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE | |
| ) | |
| try: | |
| stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=COMMAND_TIMEOUT) | |
| except asyncio.TimeoutError: | |
| await process.kill() # Kill the process if it times out | |
| await status_message.edit_text("β **Timeout**: The command took too long to execute.") | |
| return | |
| e = stderr.decode().strip() if stderr else "π" | |
| o = stdout.decode().strip() if stdout else "π" | |
| OUTPUT = "" | |
| OUTPUT += f"<b>QUERY:</b>\n<u>Command:</u>\n<code>{cmd}</code> \n" | |
| OUTPUT += f"<u>PID</u>: <code>{process.pid}</code>\n\n" | |
| OUTPUT += f"<b>stderr</b>: \n<code>{e}</code>\n\n" | |
| OUTPUT += f"<b>stdout</b>: \n<code>{o}</code>" | |
| if len(OUTPUT) > MAX_MESSAGE_LENGTH: | |
| with BytesIO(str.encode(OUTPUT)) as out_file: | |
| out_file.name = "exec.txt" | |
| await reply_to_.reply_document( | |
| document=out_file, | |
| caption=cmd[: MAX_MESSAGE_LENGTH // 4 - 1], | |
| disable_notification=True, | |
| quote=True, | |
| ) | |
| os.remove("exec.txt") | |
| else: | |
| await reply_to_.reply_text(OUTPUT, quote=True) | |
| # Add command to history | |
| command_history.append(cmd) | |
| if len(command_history) > 25: # Keep only the last 10 commands | |
| command_history.pop(0) | |
| except Exception as ex: | |
| await reply_to_.reply_text(f"β **Error**: {str(ex)}", quote=True) | |
| finally: | |
| await status_message.delete() | |
| async def show_history(_, message): | |
| # Add numbering to each command and wrap in <code> tags | |
| formatted_history = "\n".join(f"<b>{i + 1}.</b> <code>{cmd}</code>" for i, cmd in enumerate(reversed(command_history))) | |
| # Check if the message exceeds Telegram's character limit | |
| if len(formatted_history) > MAX_MESSAGE_LENGTH: | |
| # Send as a text file | |
| with BytesIO(str.encode(formatted_history)) as out_file: | |
| out_file.name = "command_history.txt" | |
| await message.reply_document( | |
| document=out_file, | |
| caption="__Limit exceeded, so sending as file__", | |
| quote=True, | |
| ) | |
| os.remove("command_history.txt") | |
| else: | |
| # Send as a regular message | |
| await message.reply_text(f"<b>Command History:</b>\n{formatted_history}", quote=True) | |
| if __name__ == "__main__": | |
| scheduler = BackgroundScheduler() | |
| scheduler.add_job(reset_free, "cron", hour=0, minute=0) | |
| scheduler.start() | |
| banner = """ | |
| β β βββ β βββ β β | |
| ββ βββ β β β β β βββ βββ β β βββ β β βββ β βββ βββ βββ | |
| β β β β β β β β β β ββ β β β β βββ β β β β β βββ β β | |
| β ββ βββ β βββ ββ βββ ββ ββ ββ β β β ββ βββ βββ | |
| By @BennyThink, VIP Mode: {vip_status}""".format(vip_status=ENABLE_VIP) | |
| print(banner) | |
| app.run() |