from html import escape
from time import monotonic, time
from uuid import uuid4
from re import match
from aiofiles import open as aiopen
from cloudscraper import create_scraper
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from .. import LOGGER, user_data
from ..core.config_manager import Config
from ..core.tg_client import TgClient
from ..helper.ext_utils.bot_utils import new_task, update_user_ldata
from ..helper.ext_utils.links_utils import decode_slink
from ..helper.ext_utils.status_utils import get_readable_time
from ..helper.ext_utils.db_handler import database
from ..helper.languages import Language
from ..helper.telegram_helper.bot_commands import BotCommands
from ..helper.telegram_helper.button_build import ButtonMaker
from ..helper.telegram_helper.filters import CustomFilters
from ..helper.telegram_helper.message_utils import (
delete_message,
edit_message,
edit_reply_markup,
send_file,
send_message,
)
@new_task
async def start(_, message):
userid = message.from_user.id
lang = Language()
buttons = ButtonMaker()
buttons.url_button(
lang.START_BUTTON1, "https://www.github.com/SilentDemonSD/WZML-X"
)
buttons.url_button(lang.START_BUTTON2, "https://t.me/WZML_X")
reply_markup = buttons.build_menu(2)
if len(message.command) > 1 and message.command[1] == "wzmlx":
await delete_message(message)
elif len(message.command) > 1 and message.command[1] != "start":
decrypted_url = decode_slink(message.command[1])
if Config.MEDIA_STORE and decrypted_url.startswith("file"):
decrypted_url = decrypted_url.replace("file", "")
chat_id, msg_id = decrypted_url.split("&&")
LOGGER.info(f"Copying message from {chat_id} & {msg_id} to {userid}")
return await TgClient.bot.copy_message( # TODO: make it function
chat_id=userid,
from_chat_id=int(chat_id) if match(r"\d+", chat_id) else chat_id,
message_id=int(msg_id),
disable_notification=True,
)
elif Config.VERIFY_TIMEOUT:
input_token, pre_uid = decrypted_url.split("&&")
if int(pre_uid) != userid:
return await send_message(
message,
"Access Token is not yours!\n\nKindly generate your own to use.",
)
data = user_data.get(userid, {})
if "VERIFY_TOKEN" not in data or data["VERIFY_TOKEN"] != input_token:
return await send_message(
message,
"Access Token already used!\n\nKindly generate a new one.",
)
elif (
Config.LOGIN_PASS
and data["VERIFY_TOKEN"].casefold() == Config.LOGIN_PASS.casefold()
):
return await send_message(
message,
"Bot Already Logged In via Password\n\nNo Need to Accept Temp Tokens.",
)
buttons.data_button(
"Activate Access Token", f"start pass {input_token}", "header"
)
reply_markup = buttons.build_menu(2)
msg = f"""⌬ Access Login Token :
│
┟ Status → Generated Successfully
┟ Access Token → {input_token}
┃
┖ Validity: {get_readable_time(int(Config.VERIFY_TIMEOUT))}"""
return await send_message(message, msg, reply_markup)
if await CustomFilters.authorized(_, message):
start_string = lang.START_MSG.format(
cmd=BotCommands.HelpCommand[0],
)
await send_message(message, start_string, reply_markup)
elif Config.BOT_PM:
await send_message(
message,
"Now, Bot will send you all your files and links here. Start Using Now...",
reply_markup,
)
else:
await send_message(
message,
"Bot can mirror/leech from links|tgfiles|torrents|nzb|rclone-cloud to any rclone cloud, Google Drive or to telegram.\n\n⚠️ You Are not authorized user! Deploy your own WZML-X bot",
reply_markup,
)
await database.set_pm_users(userid)
@new_task
async def start_cb(_, query):
user_id = query.from_user.id
input_token = query.data.split()[2]
data = user_data.get(user_id, {})
if input_token == "activated":
return await query.answer("Already Activated!", show_alert=True)
elif "VERIFY_TOKEN" not in data or data["VERIFY_TOKEN"] != input_token:
return await query.answer("Already Used, Generate New One", show_alert=True)
update_user_ldata(user_id, "VERIFY_TOKEN", str(uuid4()))
update_user_ldata(user_id, "VERIFY_TIME", time())
if Config.DATABASE_URL:
await database.update_user_data(user_id)
await query.answer("Activated Access Login Token!", show_alert=True)
kb = query.message.reply_markup.inline_keyboard[1:]
kb.insert(
0,
[InlineKeyboardButton("✅️ Activated ✅", callback_data="start pass activated")],
)
await edit_reply_markup(query.message, InlineKeyboardMarkup(kb))
@new_task
async def login(_, message):
if Config.LOGIN_PASS is None:
return await send_message(message, "Login is not enabled !")
elif len(message.command) > 1:
user_id = message.from_user.id
input_pass = message.command[1]
if user_data.get(user_id, {}).get("VERIFY_TOKEN", "") == Config.LOGIN_PASS:
return await send_message(
message, "Already Bot Login In!\n\nNo Need to Login Again"
)
if input_pass.casefold() != Config.LOGIN_PASS.casefold():
return await send_message(
message, "Wrong Password!\n\nKindly check and try again"
)
update_user_ldata(user_id, "VERIFY_TOKEN", Config.LOGIN_PASS)
if Config.DATABASE_URL:
await database.update_user_data(user_id)
return await send_message(
message, "Bot Permanent Logged In!\n\nNow you can use the bot"
)
else:
await send_message(
message, "Bot Login Usage :\n\n/login [password]"
)
@new_task
async def ping(_, message):
start_time = monotonic()
reply = await send_message(message, "Starting Ping..")
end_time = monotonic()
await edit_message(
reply, f"Pong!\n {int((end_time - start_time) * 1000)} ms"
)
@new_task
async def log(_, message):
uid = message.from_user.id
buttons = ButtonMaker()
buttons.data_button("Log Disp", f"log {uid} disp")
buttons.data_button("Web Log", f"log {uid} web")
buttons.data_button("Close", f"log {uid} close")
await send_file(message, "log.txt", buttons=buttons.build_menu(2))
@new_task
async def log_cb(_, query):
data = query.data.split()
message = query.message
user_id = query.from_user.id
if user_id != int(data[1]):
await query.answer("Not Yours!", show_alert=True)
elif data[2] == "close":
await query.answer()
await delete_message(message, message.reply_to_message)
elif data[2] == "disp":
await query.answer("Fetching Log..")
async with aiopen("log.txt", "r") as f:
content = await f.read()
def parse(line):
parts = line.split("] [", 1)
return f"[{parts[1]}" if len(parts) > 1 else line
try:
res, total = [], 0
for line in reversed(content.splitlines()):
line = parse(line)
res.append(line)
total += len(line) + 1
if total > 3500:
break
text = f"Showing Last {len(res)} Lines from log.txt: \n\n----------START LOG----------\n\n
{escape('\n'.join(reversed(res)))}\n----------END LOG----------" btn = ButtonMaker() btn.data_button("Close", f"log {user_id} close") await send_message(message, text, btn.build_menu(1)) await edit_reply_markup(message, None) except Exception as err: LOGGER.error(f"TG Log Display : {str(err)}") elif data[2] == "web": boundary = "R1eFDeaC554BUkLF" headers = { "Content-Type": f"multipart/form-data; boundary=----WebKitFormBoundary{boundary}", "Origin": "https://spaceb.in", "Referer": "https://spaceb.in/", "sec-ch-ua": '"Not-A.Brand";v="99", "Chromium";v="124"', "sec-ch-ua-mobile": "?1", "sec-ch-ua-platform": '"Android"', "Sec-Fetch-Dest": "document", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-User": "?1", "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36", } async with aiopen("log.txt", "r") as f: content = await f.read() data = ( f"------WebKitFormBoundary{boundary}\r\n" f'Content-Disposition: form-data; name="content"\r\n\r\n' f"{content}\r\n" f"------WebKitFormBoundary{boundary}--\r\n" ) cget = create_scraper().request resp = cget("POST", "https://spaceb.in/", headers=headers, data=data) if resp.status_code == 200: await query.answer("Generating..") btn = ButtonMaker() btn.url_button("📨 Web Paste (SB)", resp.url) await edit_reply_markup(message, btn.build_menu(1)) else: await query.answer("Web Paste Failed ! Check Logs", show_alert=True)