# # Copyright (C) 2021-2022 by TeamYukki@Github, < https://github.com/TeamYukki >. # # This file is part of < https://github.com/TeamYukki/YukkiMusicBot > project, # and is released under the "GNU v3.0 License Agreement". # Please see < https://github.com/TeamYukki/YukkiMusicBot/blob/master/LICENSE > # # All rights reserved. # # This aeval and sh module is taken from < https://github.com/TheHamkerCat/WilliamButcherBot > # Credit goes to TheHamkerCat. # import os import re import subprocess import sys import traceback from inspect import getfullargspec from io import StringIO from time import time from pyrogram import filters from pyrogram.types import (InlineKeyboardButton, InlineKeyboardMarkup, Message) from YukkiMusic import app from YukkiMusic.misc import SUDOERS async def aexec(code, client, message): exec( "async def __aexec(client, message): " + "".join(f"\n {a}" for a in code.split("\n")) ) return await locals()["__aexec"](client, message) async def edit_or_reply(msg: Message, **kwargs): func = msg.edit_text if msg.from_user.is_self else msg.reply spec = getfullargspec(func.__wrapped__).args await func(**{k: v for k, v in kwargs.items() if k in spec}) @app.on_message( filters.command("eval") & SUDOERS & ~filters.forwarded & ~filters.via_bot ) async def executor(client, message): if len(message.command) < 2: return await edit_or_reply( message, text="__Nigga Give me some command to execute.__" ) try: cmd = message.text.split(" ", maxsplit=1)[1] except IndexError: return await message.delete() t1 = time() old_stderr = sys.stderr old_stdout = sys.stdout redirected_output = sys.stdout = StringIO() redirected_error = sys.stderr = StringIO() stdout, stderr, exc = None, None, None try: await aexec(cmd, client, message) except Exception: exc = traceback.format_exc() stdout = redirected_output.getvalue() stderr = redirected_error.getvalue() sys.stdout = old_stdout sys.stderr = old_stderr evaluation = "" if exc: evaluation = exc elif stderr: evaluation = stderr elif stdout: evaluation = stdout else: evaluation = "Success" final_output = f"**OUTPUT**:\n```{evaluation.strip()}```" if len(final_output) > 4096: filename = "output.txt" with open(filename, "w+", encoding="utf8") as out_file: out_file.write(str(evaluation.strip())) t2 = time() keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text="⏳", callback_data=f"runtime {t2-t1} Seconds", ) ] ] ) await message.reply_document( document=filename, caption=f"**INPUT:**\n`{cmd[0:980]}`\n\n**OUTPUT:**\n`Attached Document`", quote=False, reply_markup=keyboard, ) await message.delete() os.remove(filename) else: t2 = time() keyboard = InlineKeyboardMarkup( [ [ InlineKeyboardButton( text="⏳", callback_data=f"runtime {round(t2-t1, 3)} Seconds", ), InlineKeyboardButton( text="🗑", callback_data=f"forceclose abc|{message.from_user.id}", ), ] ] ) await edit_or_reply( message, text=final_output, reply_markup=keyboard ) @app.on_callback_query(filters.regex(r"runtime")) async def runtime_func_cq(_, cq): runtime = cq.data.split(None, 1)[1] await cq.answer(runtime, show_alert=True) @app.on_callback_query(filters.regex("forceclose")) async def forceclose_command(_, CallbackQuery): callback_data = CallbackQuery.data.strip() callback_request = callback_data.split(None, 1)[1] query, user_id = callback_request.split("|") if CallbackQuery.from_user.id != int(user_id): try: return await CallbackQuery.answer( "You're not allowed to close this.", show_alert=True ) except: return await CallbackQuery.message.delete() try: await CallbackQuery.answer() except: return @app.on_message( filters.command("sh") & SUDOERS & ~filters.forwarded & ~filters.via_bot ) async def shellrunner(client, message): if len(message.command) < 2: return await edit_or_reply( message, text="**Usage:**\n/sh git pull" ) text = message.text.split(None, 1)[1] if "\n" in text: code = text.split("\n") output = "" for x in code: shell = re.split( """ (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", x ) try: process = subprocess.Popen( shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except Exception as err: print(err) await edit_or_reply( message, text=f"**ERROR:**\n```{err}```" ) output += f"**{code}**\n" output += process.stdout.read()[:-1].decode("utf-8") output += "\n" else: shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text) for a in range(len(shell)): shell[a] = shell[a].replace('"', "") try: process = subprocess.Popen( shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) except Exception as err: print(err) exc_type, exc_obj, exc_tb = sys.exc_info() errors = traceback.format_exception( etype=exc_type, value=exc_obj, tb=exc_tb, ) return await edit_or_reply( message, text=f"**ERROR:**\n```{''.join(errors)}```" ) output = process.stdout.read()[:-1].decode("utf-8") if str(output) == "\n": output = None if output: if len(output) > 4096: with open("output.txt", "w+") as file: file.write(output) await client.send_document( message.chat.id, "output.txt", reply_to_message_id=message.message_id, caption="`Output`", ) return os.remove("output.txt") await edit_or_reply( message, text=f"**OUTPUT:**\n```{output}```" ) else: await edit_or_reply(message, text="**OUTPUT: **\n`No output`")