from asyncio import gather, iscoroutinefunction
from html import escape
from re import findall
from time import time
from psutil import cpu_percent, disk_usage, virtual_memory
from ... import (
DOWNLOAD_DIR,
bot_cache,
bot_start_time,
status_dict,
task_dict,
task_dict_lock,
)
from ...core.config_manager import Config
from ..telegram_helper.button_build import ButtonMaker
SIZE_UNITS = ["B", "KB", "MB", "GB", "TB", "PB"]
class MirrorStatus:
STATUS_UPLOAD = "Upload"
STATUS_DOWNLOAD = "Download"
STATUS_CLONE = "Clone"
STATUS_QUEUEDL = "QueueDl"
STATUS_QUEUEUP = "QueueUp"
STATUS_PAUSED = "Pause"
STATUS_ARCHIVE = "Archive"
STATUS_EXTRACT = "Extract"
STATUS_SPLIT = "Split"
STATUS_CHECK = "CheckUp"
STATUS_SEED = "Seed"
STATUS_SAMVID = "SamVid"
STATUS_CONVERT = "Convert"
STATUS_FFMPEG = "FFmpeg"
STATUS_YT = "YouTube"
STATUS_METADATA = "Metadata"
class EngineStatus:
def __init__(self):
ver = bot_cache.get("eng_versions", {})
self.STATUS_ARIA2 = f"Aria2 v{ver.get('aria2', 'N/A')}"
self.STATUS_AIOHTTP = f"AioHttp v{ver.get('aiohttp', 'N/A')}"
self.STATUS_GDAPI = f"Google-API v{ver.get('gapi', 'N/A')}"
self.STATUS_QBIT = f"qBit v{ver.get('qBittorrent', 'N/A')}"
self.STATUS_TGRAM = f"Pyro v{ver.get('pyrotgfork', 'N/A')}"
self.STATUS_MEGA = f"MegaCMD v{ver.get('mega', 'N/A')}"
self.STATUS_YTDLP = f"yt-dlp v{ver.get('yt-dlp', 'N/A')}"
self.STATUS_FFMPEG = f"ffmpeg v{ver.get('ffmpeg', 'N/A')}"
self.STATUS_7Z = f"7z v{ver.get('7z', 'N/A')}"
self.STATUS_RCLONE = f"RClone v{ver.get('rclone', 'N/A')}"
self.STATUS_SABNZBD = f"SABnzbd+ v{ver.get('SABnzbd+', 'N/A')}"
self.STATUS_QUEUE = "QSystem v2"
self.STATUS_JD = "JDownloader v2"
self.STATUS_YT = "Youtube-Api"
self.STATUS_METADATA = "Metadata"
self.STATUS_UPHOSTER = "Uphoster"
STATUSES = {
"ALL": "All",
"DL": MirrorStatus.STATUS_DOWNLOAD,
"UP": MirrorStatus.STATUS_UPLOAD,
"QD": MirrorStatus.STATUS_QUEUEDL,
"QU": MirrorStatus.STATUS_QUEUEUP,
"AR": MirrorStatus.STATUS_ARCHIVE,
"EX": MirrorStatus.STATUS_EXTRACT,
"SD": MirrorStatus.STATUS_SEED,
"CL": MirrorStatus.STATUS_CLONE,
"CM": MirrorStatus.STATUS_CONVERT,
"SP": MirrorStatus.STATUS_SPLIT,
"SV": MirrorStatus.STATUS_SAMVID,
"FF": MirrorStatus.STATUS_FFMPEG,
"PA": MirrorStatus.STATUS_PAUSED,
"CK": MirrorStatus.STATUS_CHECK,
}
async def get_task_by_gid(gid: str):
async with task_dict_lock:
for tk in task_dict.values():
if hasattr(tk, "seeding"):
await tk.update()
if tk.gid() == gid:
return tk
return None
async def get_specific_tasks(status, user_id):
if status == "All":
if user_id:
return [tk for tk in task_dict.values() if tk.listener.user_id == user_id]
else:
return list(task_dict.values())
tasks_to_check = (
[tk for tk in task_dict.values() if tk.listener.user_id == user_id]
if user_id
else list(task_dict.values())
)
coro_tasks = []
coro_tasks.extend(tk for tk in tasks_to_check if iscoroutinefunction(tk.status))
coro_statuses = await gather(*[tk.status() for tk in coro_tasks])
result = []
coro_index = 0
for tk in tasks_to_check:
if tk in coro_tasks:
st = coro_statuses[coro_index]
coro_index += 1
else:
st = tk.status()
if (st == status) or (
status == MirrorStatus.STATUS_DOWNLOAD and st not in STATUSES.values()
):
result.append(tk)
return result
async def get_all_tasks(req_status: str, user_id):
async with task_dict_lock:
return await get_specific_tasks(req_status, user_id)
def get_raw_file_size(size):
num, unit = size.split()
return int(float(num) * (1024 ** SIZE_UNITS.index(unit)))
def get_readable_file_size(size_in_bytes):
if not size_in_bytes:
return "0B"
index = 0
while size_in_bytes >= 1024 and index < len(SIZE_UNITS) - 1:
size_in_bytes /= 1024
index += 1
return f"{size_in_bytes:.2f}{SIZE_UNITS[index]}"
def get_readable_time(seconds: int):
periods = [("d", 86400), ("h", 3600), ("m", 60), ("s", 1)]
result = ""
for period_name, period_seconds in periods:
if seconds >= period_seconds:
period_value, seconds = divmod(seconds, period_seconds)
result += f"{int(period_value)}{period_name}"
return result
def get_raw_time(time_str: str) -> int:
time_units = {"d": 86400, "h": 3600, "m": 60, "s": 1}
return sum(
int(value) * time_units[unit]
for value, unit in findall(r"(\d+)([dhms])", time_str)
)
def time_to_seconds(time_duration):
try:
parts = time_duration.split(":")
if len(parts) == 3:
hours, minutes, seconds = map(float, parts)
elif len(parts) == 2:
hours = 0
minutes, seconds = map(float, parts)
elif len(parts) == 1:
hours = 0
minutes = 0
seconds = float(parts[0])
else:
return 0
return hours * 3600 + minutes * 60 + seconds
except Exception:
return 0
def speed_string_to_bytes(size_text: str):
size = 0
size_text = size_text.lower()
if "k" in size_text:
size += float(size_text.split("k")[0]) * 1024
elif "m" in size_text:
size += float(size_text.split("m")[0]) * 1048576
elif "g" in size_text:
size += float(size_text.split("g")[0]) * 1073741824
elif "t" in size_text:
size += float(size_text.split("t")[0]) * 1099511627776
elif "b" in size_text:
size += float(size_text.split("b")[0])
return size
def get_progress_bar_string(pct):
pct = float(str(pct).strip("%"))
p = min(max(pct, 0), 100)
cFull = int(p // 8)
p_str = "⬢" * cFull
p_str += "⬡" * (12 - cFull)
return f"[{p_str}]"
async def get_readable_message(sid, is_user, page_no=1, status="All", page_step=1):
msg = ""
button = None
tasks = await get_specific_tasks(status, sid if is_user else None)
STATUS_LIMIT = Config.STATUS_LIMIT
tasks_no = len(tasks)
pages = (max(tasks_no, 1) + STATUS_LIMIT - 1) // STATUS_LIMIT
if page_no > pages:
page_no = (page_no - 1) % pages + 1
status_dict[sid]["page_no"] = page_no
elif page_no < 1:
page_no = pages - (abs(page_no) % pages)
status_dict[sid]["page_no"] = page_no
start_position = (page_no - 1) * STATUS_LIMIT
for index, task in enumerate(
tasks[start_position : STATUS_LIMIT + start_position], start=1
):
if status != "All":
tstatus = status
elif iscoroutinefunction(task.status):
tstatus = await task.status()
else:
tstatus = task.status()
msg += f"{index + start_position}. "
msg += f"{escape(f'{task.name()}')}"
if task.listener.subname:
msg += f"\n┖ Sub Name → {task.listener.subname}"
elapsed = time() - task.listener.message.date.timestamp()
msg += f"\n\nTask By {task.listener.message.from_user.mention(style='html')} ( #ID{task.listener.message.from_user.id} )"
if task.listener.is_super_chat:
msg += f" [Link]"
if (
tstatus not in [MirrorStatus.STATUS_SEED, MirrorStatus.STATUS_QUEUEUP]
and task.listener.progress
):
progress = task.progress()
msg += f"\n┟ {get_progress_bar_string(progress)} {progress}"
if task.listener.subname:
subsize = f" / {get_readable_file_size(task.listener.subsize)}"
ac = len(task.listener.files_to_proceed)
count = f"( {task.listener.proceed_count} / {ac or '?'} )"
else:
subsize = ""
count = ""
msg += f"\n┠ Processed → {task.processed_bytes()}{subsize} of {task.size()}"
if count:
msg += f"\n┠ Count: → {count}"
msg += f"\n┠ Status → {tstatus}"
msg += f"\n┠ Speed → {task.speed()}"
msg += f"\n┠ Time → {task.eta()} of {get_readable_time(elapsed + get_raw_time(task.eta()))} ( {get_readable_time(elapsed)} )"
if tstatus == MirrorStatus.STATUS_DOWNLOAD and (
task.listener.is_torrent or task.listener.is_qbit
):
try:
msg += f"\n┠ Seeders → {task.seeders_num()} | Leechers → {task.leechers_num()}"
except Exception:
pass
# TODO: Add Connected Peers
elif tstatus == MirrorStatus.STATUS_SEED:
msg += f"\n┠ Size → {task.size()} | Uploaded → {task.uploaded_bytes()}"
msg += f"\n┠ Status → {tstatus}"
msg += f"\n┠ Speed → {task.seed_speed()}"
msg += f"\n┠ Ratio → {task.ratio()}"
msg += f"\n┠ Time → {task.seeding_time()} | Elapsed → {get_readable_time(elapsed)}"
else:
msg += f"\n┠ Size → {task.size()}"
msg += f"\n┠ Engine → {task.engine}"
msg += f"\n┠ In Mode → {task.listener.mode[0]}"
msg += f"\n┠ Out Mode → {task.listener.mode[1]}"
# TODO: Add Bt Sel
from ..telegram_helper.bot_commands import BotCommands
msg += f"\n┖ Stop → /{BotCommands.CancelTaskCommand[1]}_{task.gid()}\n\n"
if len(msg) == 0:
if status == "All":
return None, None
else:
msg = f"No Active {status} Tasks!\n\n"
msg += "⌬ Bot Stats"
buttons = ButtonMaker()
if not is_user:
buttons.data_button("📜 TStats", f"status {sid} ov", position="header")
if len(tasks) > STATUS_LIMIT:
msg += f"Page: {page_no}/{pages} | Tasks: {tasks_no} | Step: {page_step}\n"
buttons.data_button("<<", f"status {sid} pre", position="header")
buttons.data_button(">>", f"status {sid} nex", position="header")
if tasks_no > 30:
for i in [1, 2, 4, 6, 8, 10, 15]:
buttons.data_button(i, f"status {sid} ps {i}", position="footer")
if status != "All" or tasks_no > 20:
for label, status_value in list(STATUSES.items()):
if status_value != status:
buttons.data_button(label, f"status {sid} st {status_value}")
buttons.data_button("♻️ Refresh", f"status {sid} ref", position="header")
button = buttons.build_menu(8)
msg += f"\n┟ CPU → {cpu_percent()}% | F → {get_readable_file_size(disk_usage(DOWNLOAD_DIR).free)} [{round(100 - disk_usage(DOWNLOAD_DIR).percent, 1)}%]"
msg += f"\n┖ RAM → {virtual_memory().percent}% | UP → {get_readable_time(time() - bot_start_time)}"
return msg, button