from asyncio import gather, sleep, wait_for, TimeoutError from platform import platform, version from re import search as research from time import time from aiofiles.os import path as aiopath from psutil import ( Process, boot_time, cpu_count, cpu_freq, cpu_percent, disk_io_counters, disk_usage, getloadavg, net_io_counters, swap_memory, virtual_memory, process_iter, NoSuchProcess, AccessDenied, ) from .. import LOGGER, bot_cache, bot_start_time, bot_loop from ..core.config_manager import Config, BinConfig from ..helper.ext_utils.bot_utils import cmd_exec, compare_versions, new_task from ..helper.ext_utils.status_utils import ( get_progress_bar_string, get_readable_file_size, get_readable_time, ) from ..helper.telegram_helper.filters import CustomFilters from ..helper.telegram_helper.button_build import ButtonMaker from ..helper.telegram_helper.message_utils import ( delete_message, edit_message, send_message, ) from ..version import get_version commands = { "aria2": ([BinConfig.ARIA2_NAME, "--version"], r"aria2 version ([\d.]+)"), "qBittorrent": ([BinConfig.QBIT_NAME, "--version"], r"qBittorrent v([\d.]+)"), "SABnzbd+": ( [BinConfig.SABNZBD_NAME, "--version"], rf"{BinConfig.SABNZBD_NAME}-([\d.]+)", ), "python": (["python3", "--version"], r"Python ([\d.]+)"), "rclone": ([BinConfig.RCLONE_NAME, "--version"], r"rclone v([\d.]+)"), "yt-dlp": (["yt-dlp", "--version"], r"([\d.]+)"), "ffmpeg": ( [BinConfig.FFMPEG_NAME, "-version"], r"ffmpeg version ([\d.]+(-\w+)?).*", ), "7z": (["7z", "i"], r"7-Zip ([\d.]+)"), "aiohttp": (["uv", "pip", "show", "aiohttp"], r"Version: ([\d.]+)"), "pyrotgfork": (["uv", "pip", "show", "pyrotgfork"], r"Version: ([\d.]+)"), "gapi": (["uv", "pip", "show", "google-api-python-client"], r"Version: ([\d.]+)"), "mega": (["mega-version"], r"version: ([\d.]+)"), } async def get_stats(event, key="home"): user_id = event.from_user.id btns = ButtonMaker() if key == "home": btns = ButtonMaker() btns.data_button("Bot Stats", f"stats {user_id} stbot") btns.data_button("OS Stats", f"stats {user_id} stsys") btns.data_button("Repo Stats", f"stats {user_id} strepo") btns.data_button("Pkgs Stats", f"stats {user_id} stpkgs") btns.data_button("Task Limits", f"stats {user_id} tlimits") btns.data_button("Sys Tasks", f"stats {user_id} systasks") msg = "⌬ Bot & OS Statistics!" elif key == "stbot": total, used, free, disk = disk_usage("/") swap = swap_memory() memory = virtual_memory() disk_io = disk_io_counters() msg = f"""⌬ BOT STATISTICS :Bot Uptime : {get_readable_time(time() - bot_start_time)} ┎ RAM ( MEMORY ) : ┃ {get_progress_bar_string(memory.percent)} {memory.percent}% ┖ U : {get_readable_file_size(memory.used)} | F : {get_readable_file_size(memory.available)} | T : {get_readable_file_size(memory.total)} ┎ SWAP MEMORY : ┃ {get_progress_bar_string(swap.percent)} {swap.percent}% ┖ U : {get_readable_file_size(swap.used)} | F : {get_readable_file_size(swap.free)} | T : {get_readable_file_size(swap.total)} ┎ DISK : ┃ {get_progress_bar_string(disk)} {disk}% ┃ Total Disk Read : {f"{get_readable_file_size(disk_io.read_bytes)} ({get_readable_time(disk_io.read_time / 1000)})" if disk_io else "Access Denied"} ┃ Total Disk Write : {f"{get_readable_file_size(disk_io.write_bytes)} ({get_readable_time(disk_io.write_time / 1000)})" if disk_io else "Access Denied"} ┖ U : {get_readable_file_size(used)} | F : {get_readable_file_size(free)} | T : {get_readable_file_size(total)} """ elif key == "stsys": cpu_usage = cpu_percent(interval=0.5) msg = f"""⌬ OS SYSTEM :OS Uptime : {get_readable_time(time() - boot_time())} ┠ OS Version : {version()} ┖ OS Arch : {platform()} ⌬ NETWORK STATS :Upload Data: {get_readable_file_size(net_io_counters().bytes_sent)} ┠ Download Data: {get_readable_file_size(net_io_counters().bytes_recv)} ┠ Pkts Sent: {str(net_io_counters().packets_sent)[:-3]}k ┠ Pkts Received: {str(net_io_counters().packets_recv)[:-3]}k ┖ Total I/O Data: {get_readable_file_size(net_io_counters().bytes_recv + net_io_counters().bytes_sent)} ┎ CPU : ┃ {get_progress_bar_string(cpu_usage)} {cpu_usage}% ┠ CPU Frequency : {f"{cpu_freq().current / 1000:.2f} GHz" if cpu_freq() else "Access Denied"} ┠ System Avg Load : {"%, ".join(str(round((x / cpu_count() * 100), 2)) for x in getloadavg())}%, (1m, 5m, 15m) ┠ P-Core(s) : {cpu_count(logical=False)} | V-Core(s) : {cpu_count(logical=True) - cpu_count(logical=False)} ┠ Total Core(s) : {cpu_count(logical=True)} ┖ Usable CPU(s) : {len(Process().cpu_affinity())} """ elif key == "strepo": last_commit, changelog = "No Data", "N/A" if await aiopath.exists(".git"): last_commit = ( await cmd_exec( "git log -1 --pretty='%cd ( %cr )' --date=format-local:'%d/%m/%Y'", True, ) )[0] changelog = ( await cmd_exec( "git log -1 --pretty=format:'%s By %an'", True ) )[0] official_v = ( await cmd_exec( f"curl -o latestversion.py https://raw.githubusercontent.com/SilentDemonSD/WZML-X/{Config.UPSTREAM_BRANCH}/bot/version.py -s && python3 latestversion.py && rm latestversion.py", True, ) )[0] msg = f"""⌬ Repo Statistics : │ ┟ Bot Updated : {last_commit} ┠ Current Version : {get_version()} ┠ Latest Version : {official_v} ┖ Last ChangeLog : {changelog} ⌬ REMARKS : {compare_versions(get_version(), official_v)} """ elif key == "stpkgs": ver = bot_cache.get("eng_versions", {}) msg = f"""⌬ Packages Statistics : │ ┟ python: {ver.get("python", "N/A")} ┠ aria2: {ver.get("aria2", "N/A")} ┠ qBittorrent: {ver.get("qBittorrent", "N/A")} ┠ SABnzbd+: {ver.get("SABnzbd+", "N/A")} ┠ rclone: {ver.get("rclone", "N/A")} ┠ yt-dlp: {ver.get("yt-dlp", "N/A")} ┠ ffmpeg: {ver.get("ffmpeg", "N/A")} ┠ 7z: {ver.get("7z", "N/A")} ┠ Aiohttp: {ver.get("aiohttp", "N/A")} ┠ PyroTgFork: {ver.get("pyrotgfork", "N/A")} ┠ Google API: {ver.get("gapi", "N/A")} ┖ Mega CMD: {ver.get("mega", "N/A")} """ elif key == "tlimits": msg = f"""⌬ Bot Task Limits : │ ┟ Direct Limit : {Config.DIRECT_LIMIT or "∞"} GB ┠ Torrent Limit : {Config.TORRENT_LIMIT or "∞"} GB ┠ GDriveDL Limit : {Config.GD_DL_LIMIT or "∞"} GB ┠ RCloneDL Limit : {Config.RC_DL_LIMIT or "∞"} GB ┠ Clone Limit : {Config.CLONE_LIMIT or "∞"} GB ┠ JDown Limit : {Config.JD_LIMIT or "∞"} GB ┠ NZB Limit : {Config.NZB_LIMIT or "∞"} GB ┠ YT-DLP Limit : {Config.YTDLP_LIMIT or "∞"} GB ┠ Playlist Limit : {Config.PLAYLIST_LIMIT or "∞"} ┠ Mega Limit : {Config.MEGA_LIMIT or "∞"} GB ┠ Leech Limit : {Config.LEECH_LIMIT or "∞"} GB ┠ Archive Limit : {Config.ARCHIVE_LIMIT or "∞"} GB ┠ Extract Limit : {Config.EXTRACT_LIMIT or "∞"} GB ┞ Threshold Storage : {Config.STORAGE_LIMIT or "∞"} GB │ ┟ Token Validity : {get_readable_time(Config.VERIFY_TIMEOUT) if Config.VERIFY_TIMEOUT else "Disabled"} ┠ User Time Limit : {Config.USER_TIME_INTERVAL or "0"}s / task ┠ User Max Tasks : {Config.USER_MAX_TASKS or "∞"} ┖ Bot Max Tasks : {Config.BOT_MAX_TASKS or "∞"} """ elif key == "systasks": try: processes = [] for proc in process_iter( ["pid", "name", "cpu_percent", "memory_percent", "username"] ): try: info = proc.info if ( info.get("cpu_percent", 0) > 1.0 or info.get("memory_percent", 0) > 1.0 ): processes.append(info) except (NoSuchProcess, AccessDenied): continue processes.sort( key=lambda x: x.get("cpu_percent", 0) + x.get("memory_percent", 0), reverse=True, ) processes = processes[:15] except Exception: processes = [] msg = "⌬ System Tasks (High Usage)\n│\n" if processes: for i, proc in enumerate(processes, 1): name = proc.get("name", "Unknown")[:20] cpu = proc.get("cpu_percent", 0) mem = proc.get("memory_percent", 0) user = proc.get("username", "Unknown")[:10] msg += f"┠ {i:2d}. {name}\n┃ 🔹 CPU: {cpu:.1f}% | MEM: {mem:.1f}%\n┃ 👤 User: {user} | PID: {proc['pid']}\n" btns.data_button(f"{i}", f"stats {user_id} killproc {proc['pid']}") msg += "┃\n┖ Click serial number to terminate process" else: msg += "┃\n┖ No high usage processes found" btns.data_button("🔄 Refresh", f"stats {user_id} systasks", "header") btns.data_button("Back", f"stats {user_id} home", "footer") btns.data_button("Close", f"stats {user_id} close", "footer") return msg, btns.build_menu(8 if key == "systasks" else 2) @new_task async def bot_stats(_, message): msg, btns = await get_stats(message) await send_message(message, msg, btns) @new_task async def stats_pages(_, query): data = query.data.split() message = query.message user_id = query.from_user.id if user_id != int(data[1]): await query.answer("Not Yours!", show_alert=True) elif data[2] == "close": await query.answer() await delete_message(message, message.reply_to_message) elif data[2] == "killproc": if data[2] == "systasks" and not await CustomFilters.owner(_, query): await query.answer("Sorry! You cannot Kill System Tasks!", show_alert=True) return pid = int(data[3]) try: process = Process(pid) proc_name = process.name() process.terminate() await sleep(2) if process.is_running(): process.kill() status = "🔥 Force killed" else: status = "✅ Terminated" await query.answer(f"{status}: {proc_name} (PID: {pid})", show_alert=True) except NoSuchProcess: await query.answer( "❌ Process not found or already terminated!", show_alert=True ) except AccessDenied: await query.answer( "❌ Access denied! Cannot kill this process.", show_alert=True ) except Exception as e: await query.answer(f"❌ Error: {str(e)}", show_alert=True) msg, btns = await get_stats(query, "systasks") await edit_message(message, msg, btns) else: if data[2] == "systasks" and not await CustomFilters.sudo(_, query): await query.answer("Sorry! You cannot open System Tasks!", show_alert=True) return await query.answer() msg, btns = await get_stats(query, data[2]) await edit_message(message, msg, btns) async def get_version_async(command, regex, timeout=5): try: out, err, code = await wait_for(cmd_exec(command), timeout=timeout) if code != 0: return f"Error: {err}" match = research(regex, out) return match.group(1) if match else "-" except TimeoutError: return "Timeout" except Exception as e: return f"Exception: {str(e)}" async def retry_mega_version(): await sleep(60) command, regex = commands["mega"] version = await get_version_async(command, regex, timeout=10) if version != "Timeout" and not version.startswith("Exception"): bot_cache["eng_versions"]["mega"] = version LOGGER.info(f"MegaCMD Version Fetched: {version}") else: LOGGER.warning(f"Failed to fetch MegaCMD Version: {version}") @new_task async def get_packages_version(): tasks = [get_version_async(command, regex) for command, regex in commands.values()] versions = await gather(*tasks) bot_cache["eng_versions"] = {} for tool, ver in zip(commands.keys(), versions): bot_cache["eng_versions"][tool] = ver if await aiopath.exists(".git"): last_commit = await cmd_exec( "git log -1 --date=short --pretty=format:'%cd From %cr'", True ) last_commit = last_commit[0] else: last_commit = "No UPSTREAM_REPO" bot_cache["commit"] = last_commit if bot_cache["eng_versions"]["mega"] in ["Timeout", "N/A"] or bot_cache[ "eng_versions" ]["mega"].startswith("Exception"): bot_loop.create_task(retry_mega_version()) LOGGER.info("Fetched Package Versions!")