|
|
from asyncio import ( |
|
|
create_subprocess_exec, |
|
|
create_subprocess_shell, |
|
|
run_coroutine_threadsafe, |
|
|
sleep, |
|
|
) |
|
|
from asyncio.subprocess import PIPE |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
from functools import partial, wraps |
|
|
|
|
|
from httpx import AsyncClient |
|
|
|
|
|
from ... import bot_loop, user_data |
|
|
from ...core.config_manager import Config |
|
|
from ..telegram_helper.button_build import ButtonMaker |
|
|
from .help_messages import ( |
|
|
CLONE_HELP_DICT, |
|
|
MIRROR_HELP_DICT, |
|
|
YT_HELP_DICT, |
|
|
) |
|
|
from .telegraph_helper import telegraph |
|
|
|
|
|
COMMAND_USAGE = {} |
|
|
|
|
|
THREAD_POOL = ThreadPoolExecutor(max_workers=500) |
|
|
|
|
|
|
|
|
class SetInterval: |
|
|
def __init__(self, interval, action, *args, **kwargs): |
|
|
self.interval = interval |
|
|
self.action = action |
|
|
self.task = bot_loop.create_task(self._set_interval(*args, **kwargs)) |
|
|
|
|
|
async def _set_interval(self, *args, **kwargs): |
|
|
while True: |
|
|
await sleep(self.interval) |
|
|
await self.action(*args, **kwargs) |
|
|
|
|
|
def cancel(self): |
|
|
self.task.cancel() |
|
|
|
|
|
|
|
|
def _build_command_usage(help_dict, command_key): |
|
|
buttons = ButtonMaker() |
|
|
cmd_list = list(help_dict.keys())[1:] |
|
|
temp_store = [] |
|
|
cmd_pages = [cmd_list[i : i + 10] for i in range(0, len(cmd_list), 10)] |
|
|
for i in range(1, len(cmd_pages) + 1): |
|
|
for name in cmd_pages[i]: |
|
|
buttons.data_button(name, f"help {command_key} {name}") |
|
|
buttons.data_button("Prev", f"help pre {command_key} {i - 1}") |
|
|
buttons.data_button("Next", f"help nex {command_key} {i + 1}") |
|
|
buttons.data_button("Close", "help close", "footer") |
|
|
temp_store.append(buttons.build_menu(2)) |
|
|
COMMAND_USAGE[command_key] = [help_dict["main"], *temp_store] |
|
|
buttons.reset() |
|
|
|
|
|
|
|
|
def _build_command_usage(help_dict, command_key): |
|
|
buttons = ButtonMaker() |
|
|
cmd_list = list(help_dict.keys())[1:] |
|
|
cmd_pages = [cmd_list[i : i + 10] for i in range(0, len(cmd_list), 10)] |
|
|
temp_store = [] |
|
|
|
|
|
for i, page in enumerate(cmd_pages): |
|
|
for name in page: |
|
|
buttons.data_button(name, f"help {command_key} {name} {i}") |
|
|
if len(cmd_pages) > 1: |
|
|
if i > 0: |
|
|
buttons.data_button("⫷", f"help pre {command_key} {i - 1}") |
|
|
if i < len(cmd_pages) - 1: |
|
|
buttons.data_button("⫸", f"help nex {command_key} {i + 1}") |
|
|
buttons.data_button("Close", "help close", "footer") |
|
|
temp_store.append(buttons.build_menu(2)) |
|
|
buttons.reset() |
|
|
|
|
|
COMMAND_USAGE[command_key] = [help_dict["main"], *temp_store] |
|
|
|
|
|
|
|
|
def create_help_buttons(): |
|
|
_build_command_usage(MIRROR_HELP_DICT, "mirror") |
|
|
_build_command_usage(YT_HELP_DICT, "yt") |
|
|
_build_command_usage(CLONE_HELP_DICT, "clone") |
|
|
|
|
|
|
|
|
def compare_versions(v1, v2): |
|
|
v1, v2 = (list(map(int, v.split("-")[0][1:].split("."))) for v in (v1, v2)) |
|
|
return ( |
|
|
"New Version Update is Available! Check Now!" |
|
|
if v1 < v2 |
|
|
else ( |
|
|
"More Updated! Kindly Contribute in Official" |
|
|
if v1 > v2 |
|
|
else "Already up to date with latest version" |
|
|
) |
|
|
) |
|
|
|
|
|
|
|
|
def bt_selection_buttons(id_): |
|
|
gid = id_[:12] if len(id_) > 25 else id_ |
|
|
pin = "".join([n for n in id_ if n.isdigit()][:4]) |
|
|
buttons = ButtonMaker() |
|
|
if Config.WEB_PINCODE: |
|
|
buttons.url_button("Select Files", f"{Config.BASE_URL}/app/files?gid={id_}") |
|
|
buttons.data_button("Pincode", f"sel pin {gid} {pin}") |
|
|
else: |
|
|
buttons.url_button( |
|
|
"Select Files", f"{Config.BASE_URL}/app/files?gid={id_}&pin={pin}" |
|
|
) |
|
|
buttons.data_button("Done Selecting", f"sel done {gid} {id_}") |
|
|
buttons.data_button("Cancel", f"sel cancel {gid}") |
|
|
return buttons.build_menu(2) |
|
|
|
|
|
|
|
|
async def get_telegraph_list(telegraph_content): |
|
|
path = [ |
|
|
( |
|
|
await telegraph.create_page( |
|
|
title="Mirror-Leech-Bot Drive Search", content=content |
|
|
) |
|
|
)["path"] |
|
|
for content in telegraph_content |
|
|
] |
|
|
if len(path) > 1: |
|
|
await telegraph.edit_telegraph(path, telegraph_content) |
|
|
buttons = ButtonMaker() |
|
|
buttons.url_button("🔎 VIEW", f"https://telegra.ph/{path[0]}") |
|
|
return buttons.build_menu(1) |
|
|
|
|
|
|
|
|
def arg_parser(items, arg_base): |
|
|
if not items: |
|
|
return |
|
|
|
|
|
arg_start = -1 |
|
|
i = 0 |
|
|
total = len(items) |
|
|
|
|
|
bool_arg_set = { |
|
|
"-b", |
|
|
"-e", |
|
|
"-z", |
|
|
"-s", |
|
|
"-j", |
|
|
"-d", |
|
|
"-sv", |
|
|
"-ss", |
|
|
"-f", |
|
|
"-fd", |
|
|
"-fu", |
|
|
"-sync", |
|
|
"-hl", |
|
|
"-doc", |
|
|
"-med", |
|
|
"-ut", |
|
|
"-bt", |
|
|
"-yt", |
|
|
} |
|
|
if Config.DISABLE_BULK and "-b" in items: |
|
|
arg_base["-b"] = False |
|
|
|
|
|
if Config.DISABLE_MULTI and "-i" in items: |
|
|
arg_base["-i"] = 0 |
|
|
|
|
|
if Config.DISABLE_SEED and "-d" in items: |
|
|
arg_base["-d"] = False |
|
|
|
|
|
while i < total: |
|
|
part = items[i] |
|
|
|
|
|
if part in arg_base: |
|
|
if arg_start == -1: |
|
|
arg_start = i |
|
|
|
|
|
if ( |
|
|
i + 1 == total |
|
|
and part in bool_arg_set |
|
|
or part |
|
|
in [ |
|
|
"-s", |
|
|
"-j", |
|
|
"-f", |
|
|
"-fd", |
|
|
"-fu", |
|
|
"-sync", |
|
|
"-hl", |
|
|
"-doc", |
|
|
"-med", |
|
|
"-ut", |
|
|
"-bt", |
|
|
"-yt", |
|
|
] |
|
|
): |
|
|
arg_base[part] = True |
|
|
else: |
|
|
sub_list = [] |
|
|
for j in range(i + 1, total): |
|
|
if items[j] in arg_base: |
|
|
if part == "-c" and items[j] == "-c": |
|
|
sub_list.append(items[j]) |
|
|
continue |
|
|
if part in bool_arg_set and not sub_list: |
|
|
arg_base[part] = True |
|
|
break |
|
|
if not sub_list: |
|
|
break |
|
|
check = " ".join(sub_list).strip() |
|
|
if check.startswith("[") and check.endswith("]"): |
|
|
break |
|
|
elif not check.startswith("["): |
|
|
break |
|
|
sub_list.append(items[j]) |
|
|
if sub_list: |
|
|
value = " ".join(sub_list) |
|
|
if part == "-ff" and not value.strip().startswith("["): |
|
|
arg_base[part].add(value) |
|
|
else: |
|
|
arg_base[part] = value |
|
|
i += len(sub_list) |
|
|
|
|
|
i += 1 |
|
|
|
|
|
if "link" in arg_base: |
|
|
link_items = items[:arg_start] if arg_start != -1 else items |
|
|
if link_items: |
|
|
arg_base["link"] = " ".join(link_items) |
|
|
|
|
|
|
|
|
def get_size_bytes(size): |
|
|
size = size.lower() |
|
|
if "k" in size: |
|
|
size = int(float(size.split("k")[0]) * 1024) |
|
|
elif "m" in size: |
|
|
size = int(float(size.split("m")[0]) * 1048576) |
|
|
elif "g" in size: |
|
|
size = int(float(size.split("g")[0]) * 1073741824) |
|
|
elif "t" in size: |
|
|
size = int(float(size.split("t")[0]) * 1099511627776) |
|
|
else: |
|
|
size = 0 |
|
|
return size |
|
|
|
|
|
|
|
|
async def get_content_type(url): |
|
|
try: |
|
|
async with AsyncClient() as client: |
|
|
response = await client.get(url, allow_redirects=True, verify=False) |
|
|
return response.headers.get("Content-Type") |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
|
|
|
def update_user_ldata(id_, key, value): |
|
|
user_data.setdefault(id_, {}) |
|
|
user_data[id_][key] = value |
|
|
|
|
|
|
|
|
async def cmd_exec(cmd, shell=False): |
|
|
if shell: |
|
|
proc = await create_subprocess_shell(cmd, stdout=PIPE, stderr=PIPE) |
|
|
else: |
|
|
proc = await create_subprocess_exec(*cmd, stdout=PIPE, stderr=PIPE) |
|
|
stdout, stderr = await proc.communicate() |
|
|
try: |
|
|
stdout = stdout.decode().strip() |
|
|
except Exception: |
|
|
stdout = "Unable to decode the response!" |
|
|
try: |
|
|
stderr = stderr.decode().strip() |
|
|
except Exception: |
|
|
stderr = "Unable to decode the error!" |
|
|
return stdout, stderr, proc.returncode |
|
|
|
|
|
|
|
|
def new_task(func): |
|
|
@wraps(func) |
|
|
async def wrapper(*args, **kwargs): |
|
|
task = bot_loop.create_task(func(*args, **kwargs)) |
|
|
return task |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
async def sync_to_async(func, *args, wait=True, **kwargs): |
|
|
pfunc = partial(func, *args, **kwargs) |
|
|
future = bot_loop.run_in_executor(THREAD_POOL, pfunc) |
|
|
return await future if wait else future |
|
|
|
|
|
|
|
|
def async_to_sync(func, *args, wait=True, **kwargs): |
|
|
future = run_coroutine_threadsafe(func(*args, **kwargs), bot_loop) |
|
|
return future.result() if wait else future |
|
|
|
|
|
|
|
|
def loop_thread(func): |
|
|
@wraps(func) |
|
|
def wrapper(*args, wait=False, **kwargs): |
|
|
future = run_coroutine_threadsafe(func(*args, **kwargs), bot_loop) |
|
|
return future.result() if wait else future |
|
|
|
|
|
return wrapper |
|
|
|
|
|
|
|
|
def safe_int(value, default=0): |
|
|
try: |
|
|
return int(value) |
|
|
except (ValueError, TypeError): |
|
|
return default |
|
|
|