File size: 9,993 Bytes
db78256 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 |
from html import escape
from time import monotonic, time
from uuid import uuid4
from re import match
from aiofiles import open as aiopen
from cloudscraper import create_scraper
from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup
from .. import LOGGER, user_data
from ..core.config_manager import Config
from ..core.tg_client import TgClient
from ..helper.ext_utils.bot_utils import new_task, update_user_ldata
from ..helper.ext_utils.links_utils import decode_slink
from ..helper.ext_utils.status_utils import get_readable_time
from ..helper.ext_utils.db_handler import database
from ..helper.languages import Language
from ..helper.telegram_helper.bot_commands import BotCommands
from ..helper.telegram_helper.button_build import ButtonMaker
from ..helper.telegram_helper.filters import CustomFilters
from ..helper.telegram_helper.message_utils import (
delete_message,
edit_message,
edit_reply_markup,
send_file,
send_message,
)
@new_task
async def start(_, message):
userid = message.from_user.id
lang = Language()
buttons = ButtonMaker()
buttons.url_button(
lang.START_BUTTON1, "https://www.github.com/SilentDemonSD/WZML-X"
)
buttons.url_button(lang.START_BUTTON2, "https://t.me/WZML_X")
reply_markup = buttons.build_menu(2)
if len(message.command) > 1 and message.command[1] == "wzmlx":
await delete_message(message)
elif len(message.command) > 1 and message.command[1] != "start":
decrypted_url = decode_slink(message.command[1])
if Config.MEDIA_STORE and decrypted_url.startswith("file"):
decrypted_url = decrypted_url.replace("file", "")
chat_id, msg_id = decrypted_url.split("&&")
LOGGER.info(f"Copying message from {chat_id} & {msg_id} to {userid}")
return await TgClient.bot.copy_message( # TODO: make it function
chat_id=userid,
from_chat_id=int(chat_id) if match(r"\d+", chat_id) else chat_id,
message_id=int(msg_id),
disable_notification=True,
)
elif Config.VERIFY_TIMEOUT:
input_token, pre_uid = decrypted_url.split("&&")
if int(pre_uid) != userid:
return await send_message(
message,
"<b>Access Token is not yours!</b>\n\n<i>Kindly generate your own to use.</i>",
)
data = user_data.get(userid, {})
if "VERIFY_TOKEN" not in data or data["VERIFY_TOKEN"] != input_token:
return await send_message(
message,
"<b>Access Token already used!</b>\n\n<i>Kindly generate a new one.</i>",
)
elif (
Config.LOGIN_PASS
and data["VERIFY_TOKEN"].casefold() == Config.LOGIN_PASS.casefold()
):
return await send_message(
message,
"<b>Bot Already Logged In via Password</b>\n\n<i>No Need to Accept Temp Tokens.</i>",
)
buttons.data_button(
"Activate Access Token", f"start pass {input_token}", "header"
)
reply_markup = buttons.build_menu(2)
msg = f"""β¬ Access Login Token :
β
β <b>Status</b> β <code>Generated Successfully</code>
β <b>Access Token</b> β <code>{input_token}</code>
β
β <b>Validity:</b> {get_readable_time(int(Config.VERIFY_TIMEOUT))}"""
return await send_message(message, msg, reply_markup)
if await CustomFilters.authorized(_, message):
start_string = lang.START_MSG.format(
cmd=BotCommands.HelpCommand[0],
)
await send_message(message, start_string, reply_markup)
elif Config.BOT_PM:
await send_message(
message,
"<i>Now, Bot will send you all your files and links here. Start Using Now...</i>",
reply_markup,
)
else:
await send_message(
message,
"<i>Bot can mirror/leech from links|tgfiles|torrents|nzb|rclone-cloud to any rclone cloud, Google Drive or to telegram.\n\nβ οΈ You Are not authorized user! Deploy your own WZML-X bot</i>",
reply_markup,
)
await database.set_pm_users(userid)
@new_task
async def start_cb(_, query):
user_id = query.from_user.id
input_token = query.data.split()[2]
data = user_data.get(user_id, {})
if input_token == "activated":
return await query.answer("Already Activated!", show_alert=True)
elif "VERIFY_TOKEN" not in data or data["VERIFY_TOKEN"] != input_token:
return await query.answer("Already Used, Generate New One", show_alert=True)
update_user_ldata(user_id, "VERIFY_TOKEN", str(uuid4()))
update_user_ldata(user_id, "VERIFY_TIME", time())
if Config.DATABASE_URL:
await database.update_user_data(user_id)
await query.answer("Activated Access Login Token!", show_alert=True)
kb = query.message.reply_markup.inline_keyboard[1:]
kb.insert(
0,
[InlineKeyboardButton("β
οΈ Activated β
", callback_data="start pass activated")],
)
await edit_reply_markup(query.message, InlineKeyboardMarkup(kb))
@new_task
async def login(_, message):
if Config.LOGIN_PASS is None:
return await send_message(message, "<i>Login is not enabled !</i>")
elif len(message.command) > 1:
user_id = message.from_user.id
input_pass = message.command[1]
if user_data.get(user_id, {}).get("VERIFY_TOKEN", "") == Config.LOGIN_PASS:
return await send_message(
message, "<b>Already Bot Login In!</b>\n\n<i>No Need to Login Again</i>"
)
if input_pass.casefold() != Config.LOGIN_PASS.casefold():
return await send_message(
message, "<b>Wrong Password!</b>\n\n<i>Kindly check and try again</i>"
)
update_user_ldata(user_id, "VERIFY_TOKEN", Config.LOGIN_PASS)
if Config.DATABASE_URL:
await database.update_user_data(user_id)
return await send_message(
message, "<b>Bot Permanent Logged In!</b>\n\n<i>Now you can use the bot</i>"
)
else:
await send_message(
message, "<b>Bot Login Usage :</b>\n\n<code>/login [password]</code>"
)
@new_task
async def ping(_, message):
start_time = monotonic()
reply = await send_message(message, "<i>Starting Ping..</i>")
end_time = monotonic()
await edit_message(
reply, f"<i>Pong!</i>\n <code>{int((end_time - start_time) * 1000)} ms</code>"
)
@new_task
async def log(_, message):
uid = message.from_user.id
buttons = ButtonMaker()
buttons.data_button("Log Disp", f"log {uid} disp")
buttons.data_button("Web Log", f"log {uid} web")
buttons.data_button("Close", f"log {uid} close")
await send_file(message, "log.txt", buttons=buttons.build_menu(2))
@new_task
async def log_cb(_, query):
data = query.data.split()
message = query.message
user_id = query.from_user.id
if user_id != int(data[1]):
await query.answer("Not Yours!", show_alert=True)
elif data[2] == "close":
await query.answer()
await delete_message(message, message.reply_to_message)
elif data[2] == "disp":
await query.answer("Fetching Log..")
async with aiopen("log.txt", "r") as f:
content = await f.read()
def parse(line):
parts = line.split("] [", 1)
return f"[{parts[1]}" if len(parts) > 1 else line
try:
res, total = [], 0
for line in reversed(content.splitlines()):
line = parse(line)
res.append(line)
total += len(line) + 1
if total > 3500:
break
text = f"<b>Showing Last {len(res)} Lines from log.txt:</b> \n\n----------<b>START LOG</b>----------\n\n<blockquote expandable>{escape('\n'.join(reversed(res)))}</blockquote>\n----------<b>END LOG</b>----------"
btn = ButtonMaker()
btn.data_button("Close", f"log {user_id} close")
await send_message(message, text, btn.build_menu(1))
await edit_reply_markup(message, None)
except Exception as err:
LOGGER.error(f"TG Log Display : {str(err)}")
elif data[2] == "web":
boundary = "R1eFDeaC554BUkLF"
headers = {
"Content-Type": f"multipart/form-data; boundary=----WebKitFormBoundary{boundary}",
"Origin": "https://spaceb.in",
"Referer": "https://spaceb.in/",
"sec-ch-ua": '"Not-A.Brand";v="99", "Chromium";v="124"',
"sec-ch-ua-mobile": "?1",
"sec-ch-ua-platform": '"Android"',
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-User": "?1",
"Upgrade-Insecure-Requests": "1",
"User-Agent": "Mozilla/5.0 (Linux; Android 10; K) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Mobile Safari/537.36",
}
async with aiopen("log.txt", "r") as f:
content = await f.read()
data = (
f"------WebKitFormBoundary{boundary}\r\n"
f'Content-Disposition: form-data; name="content"\r\n\r\n'
f"{content}\r\n"
f"------WebKitFormBoundary{boundary}--\r\n"
)
cget = create_scraper().request
resp = cget("POST", "https://spaceb.in/", headers=headers, data=data)
if resp.status_code == 200:
await query.answer("Generating..")
btn = ButtonMaker()
btn.url_button("π¨ Web Paste (SB)", resp.url)
await edit_reply_markup(message, btn.build_menu(1))
else:
await query.answer("Web Paste Failed ! Check Logs", show_alert=True)
|