Spaces:
Paused
Paused
| # | |
| # Copyright (C) 2021-2022 by TeamYukki@Github, < https://github.com/TeamYukki >. | |
| # | |
| # This file is part of < https://github.com/TeamYukki/YukkiMusicBot > project, | |
| # and is released under the "GNU v3.0 License Agreement". | |
| # Please see < https://github.com/TeamYukki/YukkiMusicBot/blob/master/LICENSE > | |
| # | |
| # All rights reserved. | |
| import asyncio | |
| import platform | |
| from sys import version as pyver | |
| import psutil | |
| from pyrogram import __version__ as pyrover | |
| from pyrogram import filters | |
| from pyrogram.errors import MessageIdInvalid | |
| from pyrogram.types import CallbackQuery, InputMediaPhoto, Message | |
| from pytgcalls.__version__ import __version__ as pytgver | |
| import config | |
| from config import BANNED_USERS, MUSIC_BOT_NAME | |
| from strings import get_command | |
| from YukkiMusic import YouTube, app | |
| from YukkiMusic.core.userbot import assistants | |
| from YukkiMusic.misc import SUDOERS, pymongodb | |
| from YukkiMusic.plugins import ALL_MODULES | |
| from YukkiMusic.utils.database import (get_global_tops, | |
| get_particulars, get_queries, | |
| get_served_chats, | |
| get_served_users, get_sudoers, | |
| get_top_chats, get_topp_users) | |
| from YukkiMusic.utils.decorators.language import language, languageCB | |
| from YukkiMusic.utils.inline.stats import (back_stats_buttons, | |
| back_stats_markup, | |
| get_stats_markup, | |
| overallback_stats_markup, | |
| stats_buttons, | |
| top_ten_stats_markup) | |
| loop = asyncio.get_running_loop() | |
| # Commands | |
| GSTATS_COMMAND = get_command("GSTATS_COMMAND") | |
| STATS_COMMAND = get_command("STATS_COMMAND") | |
| async def stats_global(client, message: Message, _): | |
| upl = stats_buttons( | |
| _, True if message.from_user.id in SUDOERS else False | |
| ) | |
| await message.reply_photo( | |
| photo=config.STATS_IMG_URL, | |
| caption=_["gstats_11"].format(config.MUSIC_BOT_NAME), | |
| reply_markup=upl, | |
| ) | |
| async def gstats_global(client, message: Message, _): | |
| mystic = await message.reply_text(_["gstats_1"]) | |
| stats = await get_global_tops() | |
| if not stats: | |
| await asyncio.sleep(1) | |
| return await mystic.edit(_["gstats_2"]) | |
| def get_stats(): | |
| results = {} | |
| for i in stats: | |
| top_list = stats[i]["spot"] | |
| results[str(i)] = top_list | |
| list_arranged = dict( | |
| sorted( | |
| results.items(), | |
| key=lambda item: item[1], | |
| reverse=True, | |
| ) | |
| ) | |
| if not results: | |
| return mystic.edit(_["gstats_2"]) | |
| videoid = None | |
| co = None | |
| for vidid, count in list_arranged.items(): | |
| if vidid == "telegram": | |
| continue | |
| else: | |
| videoid = vidid | |
| co = count | |
| break | |
| return videoid, co | |
| try: | |
| videoid, co = await loop.run_in_executor(None, get_stats) | |
| except Exception as e: | |
| print(e) | |
| return | |
| ( | |
| title, | |
| duration_min, | |
| duration_sec, | |
| thumbnail, | |
| vidid, | |
| ) = await YouTube.details(videoid, True) | |
| title = title.title() | |
| final = f"Top Most Played Track on {MUSIC_BOT_NAME}\n\n**Title:** {title}\n\nPlayed** {co} **times" | |
| upl = get_stats_markup( | |
| _, True if message.from_user.id in SUDOERS else False | |
| ) | |
| await app.send_photo( | |
| message.chat.id, | |
| photo=thumbnail, | |
| caption=final, | |
| reply_markup=upl, | |
| ) | |
| await mystic.delete() | |
| async def top_users_ten(client, CallbackQuery: CallbackQuery, _): | |
| chat_id = CallbackQuery.message.chat.id | |
| callback_data = CallbackQuery.data.strip() | |
| what = callback_data.split(None, 1)[1] | |
| upl = back_stats_markup(_) | |
| try: | |
| await CallbackQuery.answer() | |
| except: | |
| pass | |
| mystic = await CallbackQuery.edit_message_text( | |
| _["gstats_3"].format( | |
| f"of {CallbackQuery.message.chat.title}" | |
| if what == "Here" | |
| else what | |
| ) | |
| ) | |
| if what == "Tracks": | |
| stats = await get_global_tops() | |
| elif what == "Chats": | |
| stats = await get_top_chats() | |
| elif what == "Users": | |
| stats = await get_topp_users() | |
| elif what == "Here": | |
| stats = await get_particulars(chat_id) | |
| if not stats: | |
| await asyncio.sleep(1) | |
| return await mystic.edit(_["gstats_2"], reply_markup=upl) | |
| queries = await get_queries() | |
| def get_stats(): | |
| results = {} | |
| for i in stats: | |
| top_list = ( | |
| stats[i] | |
| if what in ["Chats", "Users"] | |
| else stats[i]["spot"] | |
| ) | |
| results[str(i)] = top_list | |
| list_arranged = dict( | |
| sorted( | |
| results.items(), | |
| key=lambda item: item[1], | |
| reverse=True, | |
| ) | |
| ) | |
| if not results: | |
| return mystic.edit(_["gstats_2"], reply_markup=upl) | |
| msg = "" | |
| limit = 0 | |
| total_count = 0 | |
| if what in ["Tracks", "Here"]: | |
| for items, count in list_arranged.items(): | |
| total_count += count | |
| if limit == 10: | |
| continue | |
| limit += 1 | |
| details = stats.get(items) | |
| title = (details["title"][:35]).title() | |
| if items == "telegram": | |
| msg += f"🔗[Telegram Files and Audios](https://t.me/telegram) ** played {count} times**\n\n" | |
| else: | |
| msg += f"🔗 [{title}](https://www.youtube.com/watch?v={items}) ** played {count} times**\n\n" | |
| temp = ( | |
| _["gstats_4"].format( | |
| queries, | |
| config.MUSIC_BOT_NAME, | |
| len(stats), | |
| total_count, | |
| limit, | |
| ) | |
| if what == "Tracks" | |
| else _["gstats_7"].format( | |
| len(stats), total_count, limit | |
| ) | |
| ) | |
| msg = temp + msg | |
| return msg, list_arranged | |
| try: | |
| msg, list_arranged = await loop.run_in_executor( | |
| None, get_stats | |
| ) | |
| except Exception as e: | |
| print(e) | |
| return | |
| limit = 0 | |
| if what in ["Users", "Chats"]: | |
| for items, count in list_arranged.items(): | |
| if limit == 10: | |
| break | |
| try: | |
| extract = ( | |
| (await app.get_users(items)).first_name | |
| if what == "Users" | |
| else (await app.get_chat(items)).title | |
| ) | |
| if extract is None: | |
| continue | |
| await asyncio.sleep(0.5) | |
| except: | |
| continue | |
| limit += 1 | |
| msg += f"🔗`{extract}` played {count} times on bot.\n\n" | |
| temp = ( | |
| _["gstats_5"].format(limit, MUSIC_BOT_NAME) | |
| if what == "Chats" | |
| else _["gstats_6"].format(limit, MUSIC_BOT_NAME) | |
| ) | |
| msg = temp + msg | |
| med = InputMediaPhoto(media=config.GLOBAL_IMG_URL, caption=msg) | |
| try: | |
| await CallbackQuery.edit_message_media( | |
| media=med, reply_markup=upl | |
| ) | |
| except MessageIdInvalid: | |
| await CallbackQuery.message.reply_photo( | |
| photo=config.GLOBAL_IMG_URL, caption=msg, reply_markup=upl | |
| ) | |
| async def overall_stats(client, CallbackQuery, _): | |
| callback_data = CallbackQuery.data.strip() | |
| what = callback_data.split(None, 1)[1] | |
| if what != "s": | |
| upl = overallback_stats_markup(_) | |
| else: | |
| upl = back_stats_buttons(_) | |
| try: | |
| await CallbackQuery.answer() | |
| except: | |
| pass | |
| await CallbackQuery.edit_message_text(_["gstats_8"]) | |
| served_chats = len(await get_served_chats()) | |
| served_users = len(await get_served_users()) | |
| total_queries = await get_queries() | |
| blocked = len(BANNED_USERS) | |
| sudoers = len(SUDOERS) | |
| mod = len(ALL_MODULES) | |
| assistant = len(assistants) | |
| playlist_limit = config.SERVER_PLAYLIST_LIMIT | |
| fetch_playlist = config.PLAYLIST_FETCH_LIMIT | |
| song = config.SONG_DOWNLOAD_DURATION | |
| play_duration = config.DURATION_LIMIT_MIN | |
| if config.AUTO_LEAVING_ASSISTANT == str(True): | |
| ass = "Yes" | |
| else: | |
| ass = "No" | |
| cm = config.CLEANMODE_DELETE_MINS | |
| text = f"""**Bot's Stats and Information:** | |
| **Imported Modules:** {mod} | |
| **Served Chats:** {served_chats} | |
| **Served Users:** {served_users} | |
| **Blocked Users:** {blocked} | |
| **Sudo Users:** {sudoers} | |
| **Total Queries:** {total_queries} | |
| **Total Assistants:** {assistant} | |
| **Auto Leaving Assistant:** {ass} | |
| **Cleanmode duration:** {cm} Mins | |
| **Play Duration Limit:** {play_duration} Mins | |
| **Song Download Limit:** {song} Mins | |
| **Bot's Server Playlist Limit:** {playlist_limit} | |
| **Playlist Play Limit:** {fetch_playlist}""" | |
| med = InputMediaPhoto(media=config.STATS_IMG_URL, caption=text) | |
| try: | |
| await CallbackQuery.edit_message_media( | |
| media=med, reply_markup=upl | |
| ) | |
| except MessageIdInvalid: | |
| await CallbackQuery.message.reply_photo( | |
| photo=config.STATS_IMG_URL, caption=text, reply_markup=upl | |
| ) | |
| async def overall_stats(client, CallbackQuery, _): | |
| if CallbackQuery.from_user.id not in SUDOERS: | |
| return await CallbackQuery.answer( | |
| "Only for Sudo Users", show_alert=True | |
| ) | |
| callback_data = CallbackQuery.data.strip() | |
| what = callback_data.split(None, 1)[1] | |
| if what != "s": | |
| upl = overallback_stats_markup(_) | |
| else: | |
| upl = back_stats_buttons(_) | |
| try: | |
| await CallbackQuery.answer() | |
| except: | |
| pass | |
| await CallbackQuery.edit_message_text(_["gstats_8"]) | |
| sc = platform.system() | |
| p_core = psutil.cpu_count(logical=False) | |
| t_core = psutil.cpu_count(logical=True) | |
| ram = ( | |
| str(round(psutil.virtual_memory().total / (1024.0**3))) | |
| + " GB" | |
| ) | |
| try: | |
| cpu_freq = psutil.cpu_freq().current | |
| if cpu_freq >= 1000: | |
| cpu_freq = f"{round(cpu_freq / 1000, 2)}GHz" | |
| else: | |
| cpu_freq = f"{round(cpu_freq, 2)}MHz" | |
| except: | |
| cpu_freq = "Unable to Fetch" | |
| hdd = psutil.disk_usage("/") | |
| total = hdd.total / (1024.0**3) | |
| total = str(total) | |
| used = hdd.used / (1024.0**3) | |
| used = str(used) | |
| free = hdd.free / (1024.0**3) | |
| free = str(free) | |
| mod = len(ALL_MODULES) | |
| db = pymongodb | |
| call = db.command("dbstats") | |
| datasize = call["dataSize"] / 1024 | |
| datasize = str(datasize) | |
| storage = call["storageSize"] / 1024 | |
| objects = call["objects"] | |
| collections = call["collections"] | |
| status = db.command("serverStatus") | |
| query = status["opcounters"]["query"] | |
| mongouptime = status["uptime"] / 86400 | |
| mongouptime = str(mongouptime) | |
| served_chats = len(await get_served_chats()) | |
| served_users = len(await get_served_users()) | |
| total_queries = await get_queries() | |
| blocked = len(BANNED_USERS) | |
| sudoers = len(await get_sudoers()) | |
| text = f""" **Bot's Stats and Information:** | |
| **Imported Modules:** {mod} | |
| **Platform:** {sc} | |
| **Ram:** {ram} | |
| **Physical Cores:** {p_core} | |
| **Total Cores:** {t_core} | |
| **Cpu Frequency:** {cpu_freq} | |
| **Python Version :** {pyver.split()[0]} | |
| **Pyrogram Version :** {pyrover} | |
| **Py-TgCalls Version :** {pytgver} | |
| **Storage Avail:** {total[:4]} GiB | |
| **Storage Used:** {used[:4]} GiB | |
| **Storage Left:** {free[:4]} GiB | |
| **Served Chats:** {served_chats} | |
| **Served Users:** {served_users} | |
| **Blocked Users:** {blocked} | |
| **Sudo Users:** {sudoers} | |
| **Mongo Uptime:** {mongouptime[:4]} Days | |
| **Total DB Size:** {datasize[:6]} Mb | |
| **Total DB Storage:** {storage} Mb | |
| **Total DB Collections:** {collections} | |
| **Total DB Keys:** {objects} | |
| **Total DB Queries:** `{query}` | |
| **Total Bot Queries:** `{total_queries} ` | |
| """ | |
| med = InputMediaPhoto(media=config.STATS_IMG_URL, caption=text) | |
| try: | |
| await CallbackQuery.edit_message_media( | |
| media=med, reply_markup=upl | |
| ) | |
| except MessageIdInvalid: | |
| await CallbackQuery.message.reply_photo( | |
| photo=config.STATS_IMG_URL, caption=text, reply_markup=upl | |
| ) | |
| async def back_buttons(client, CallbackQuery, _): | |
| try: | |
| await CallbackQuery.answer() | |
| except: | |
| pass | |
| command = CallbackQuery.matches[0].group(1) | |
| if command == "TOPMARKUPGET": | |
| upl = top_ten_stats_markup(_) | |
| med = InputMediaPhoto( | |
| media=config.GLOBAL_IMG_URL, | |
| caption=_["gstats_9"], | |
| ) | |
| try: | |
| await CallbackQuery.edit_message_media( | |
| media=med, reply_markup=upl | |
| ) | |
| except MessageIdInvalid: | |
| await CallbackQuery.message.reply_photo( | |
| photo=config.GLOBAL_IMG_URL, | |
| caption=_["gstats_9"], | |
| reply_markup=upl, | |
| ) | |
| if command == "GlobalStats": | |
| upl = get_stats_markup( | |
| _, | |
| True if CallbackQuery.from_user.id in SUDOERS else False, | |
| ) | |
| med = InputMediaPhoto( | |
| media=config.GLOBAL_IMG_URL, | |
| caption=_["gstats_10"].format(config.MUSIC_BOT_NAME), | |
| ) | |
| try: | |
| await CallbackQuery.edit_message_media( | |
| media=med, reply_markup=upl | |
| ) | |
| except MessageIdInvalid: | |
| await CallbackQuery.message.reply_photo( | |
| photo=config.GLOBAL_IMG_URL, | |
| caption=_["gstats_10"].format(config.MUSIC_BOT_NAME), | |
| reply_markup=upl, | |
| ) | |
| if command == "GETSTATS": | |
| upl = stats_buttons( | |
| _, | |
| True if CallbackQuery.from_user.id in SUDOERS else False, | |
| ) | |
| med = InputMediaPhoto( | |
| media=config.STATS_IMG_URL, | |
| caption=_["gstats_11"].format(config.MUSIC_BOT_NAME), | |
| ) | |
| try: | |
| await CallbackQuery.edit_message_media( | |
| media=med, reply_markup=upl | |
| ) | |
| except MessageIdInvalid: | |
| await CallbackQuery.message.reply_photo( | |
| photo=config.STATS_IMG_URL, | |
| caption=_["gstats_11"].format(config.MUSIC_BOT_NAME), | |
| reply_markup=upl, | |
| ) | |