| | import os |
| | import re |
| | import subprocess |
| | import sys |
| | import traceback |
| | from inspect import getfullargspec |
| | from io import StringIO |
| | from time import time |
| |
|
| | from pyrogram import filters |
| | from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message |
| |
|
| | from Devine import app |
| | from config import OWNER_ID |
| |
|
| |
|
| | async def aexec(code, client, message): |
| | exec( |
| | "async def __aexec(client, message): " |
| | + "".join(f"\n {a}" for a in code.split("\n")) |
| | ) |
| | return await locals()["__aexec"](client, message) |
| |
|
| |
|
| | async def edit_or_reply(msg: Message, **kwargs): |
| | func = msg.edit_text if msg.from_user.is_self else msg.reply |
| | spec = getfullargspec(func.__wrapped__).args |
| | await func(**{k: v for k, v in kwargs.items() if k in spec}) |
| |
|
| |
|
| | @app.on_edited_message( |
| | filters.command("eval") |
| | & filters.user(OWNER_ID) |
| | & ~filters.forwarded |
| | & ~filters.via_bot |
| | ) |
| | @app.on_message( |
| | filters.command("eval") |
| | & filters.user(OWNER_ID) |
| | & ~filters.forwarded |
| | & ~filters.via_bot |
| | ) |
| | async def executor(client: app, message: Message): |
| | if len(message.command) < 2: |
| | return await edit_or_reply(message, text="<b>ᴡʜᴀᴛ ʏᴏᴜ ᴡᴀɴɴᴀ ᴇxᴇᴄᴜᴛᴇ ᴍʏ ʟᴏʀᴅ ?</b>") |
| | try: |
| | cmd = message.text.split(" ", maxsplit=1)[1] |
| | except IndexError: |
| | return await message.delete() |
| | t1 = time() |
| | old_stderr = sys.stderr |
| | old_stdout = sys.stdout |
| | redirected_output = sys.stdout = StringIO() |
| | redirected_error = sys.stderr = StringIO() |
| | stdout, stderr, exc = None, None, None |
| | try: |
| | await aexec(cmd, client, message) |
| | except Exception: |
| | exc = traceback.format_exc() |
| | stdout = redirected_output.getvalue() |
| | stderr = redirected_error.getvalue() |
| | sys.stdout = old_stdout |
| | sys.stderr = old_stderr |
| | evaluation = "\n" |
| | if exc: |
| | evaluation += exc |
| | elif stderr: |
| | evaluation += stderr |
| | elif stdout: |
| | evaluation += stdout |
| | else: |
| | evaluation += "Success" |
| | final_output = f"<b>⥤ ʀᴇsᴜʟᴛ :</b>\n<pre language='python'>{evaluation}</pre>" |
| | if len(final_output) > 4096: |
| | filename = "output.txt" |
| | with open(filename, "w+", encoding="utf8") as out_file: |
| | out_file.write(str(evaluation)) |
| | t2 = time() |
| | keyboard = InlineKeyboardMarkup( |
| | [ |
| | [ |
| | InlineKeyboardButton( |
| | text="⏳", |
| | callback_data=f"runtime {t2-t1} Seconds", |
| | ) |
| | ] |
| | ] |
| | ) |
| | await message.reply_document( |
| | document=filename, |
| | caption=f"<b>⥤ ᴇᴠᴀʟ :</b>\n<code>{cmd[0:980]}</code>\n\n<b>⥤ ʀᴇsᴜʟᴛ :</b>\nAttached Document", |
| | quote=False, |
| | reply_markup=keyboard, |
| | ) |
| | await message.delete() |
| | os.remove(filename) |
| | else: |
| | t2 = time() |
| | keyboard = InlineKeyboardMarkup( |
| | [ |
| | [ |
| | InlineKeyboardButton( |
| | text="⏳", |
| | callback_data=f"runtime {round(t2-t1, 3)} Seconds", |
| | ), |
| | InlineKeyboardButton( |
| | text="🗑", |
| | callback_data=f"forceclose abc|{message.from_user.id}", |
| | ), |
| | ] |
| | ] |
| | ) |
| | await edit_or_reply(message, text=final_output, reply_markup=keyboard) |
| |
|
| |
|
| | @app.on_callback_query(filters.regex(r"runtime")) |
| | async def runtime_func_cq(_, cq): |
| | runtime = cq.data.split(None, 1)[1] |
| | await cq.answer(runtime, show_alert=True) |
| |
|
| |
|
| | @app.on_callback_query(filters.regex("forceclose")) |
| | async def forceclose_command(_, CallbackQuery): |
| | callback_data = CallbackQuery.data.strip() |
| | callback_request = callback_data.split(None, 1)[1] |
| | query, user_id = callback_request.split("|") |
| | if CallbackQuery.from_user.id != int(user_id): |
| | try: |
| | return await CallbackQuery.answer( |
| | "<b>‣ ɪᴛ'ʟʟ ʙᴇ ʙᴇᴛᴛᴇʀ ɪғ ʏᴏᴜ sᴛᴀʏ ɪɴ ʏᴏᴜʀ ʟɪᴍɪᴛs ʙᴀʙʏe.</b>", show_alert=True |
| | ) |
| | except: |
| | return |
| | await CallbackQuery.message.delete() |
| | try: |
| | await CallbackQuery.answer() |
| | except: |
| | return |
| |
|
| |
|
| | @app.on_edited_message( |
| | filters.command("sh") |
| | & filters.user(OWNER_ID) |
| | & ~filters.forwarded |
| | & ~filters.via_bot |
| | ) |
| | @app.on_message( |
| | filters.command("sh") |
| | & filters.user(OWNER_ID) |
| | & ~filters.forwarded |
| | & ~filters.via_bot |
| | ) |
| | async def shellrunner(_, message: Message): |
| | if len(message.command) < 2: |
| | return await edit_or_reply(message, text="<b>ᴇxᴀᴍᴩʟᴇ :</b>\n/sh git pull") |
| | text = message.text.split(None, 1)[1] |
| | if "\n" in text: |
| | code = text.split("\n") |
| | output = "" |
| | for x in code: |
| | shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", x) |
| | try: |
| | process = subprocess.Popen( |
| | shell, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | ) |
| | except Exception as err: |
| | await edit_or_reply(message, text=f"<b>ERROR :</b>\n<pre>{err}</pre>") |
| | output += f"<b>{code}</b>\n" |
| | output += process.stdout.read()[:-1].decode("utf-8") |
| | output += "\n" |
| | else: |
| | shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text) |
| | for a in range(len(shell)): |
| | shell[a] = shell[a].replace('"', "") |
| | try: |
| | process = subprocess.Popen( |
| | shell, |
| | stdout=subprocess.PIPE, |
| | stderr=subprocess.PIPE, |
| | ) |
| | except Exception as err: |
| | print(err) |
| | exc_type, exc_obj, exc_tb = sys.exc_info() |
| | errors = traceback.format_exception( |
| | etype=exc_type, |
| | value=exc_obj, |
| | tb=exc_tb, |
| | ) |
| | return await edit_or_reply( |
| | message, text=f"<b>ERROR :</b>\n<pre>{''.join(errors)}</pre>" |
| | ) |
| | output = process.stdout.read()[:-1].decode("utf-8") |
| | if str(output) == "\n": |
| | output = None |
| | if output: |
| | if len(output) > 4096: |
| | with open("output.txt", "w+") as file: |
| | file.write(output) |
| | await app.send_document( |
| | message.chat.id, |
| | "output.txt", |
| | reply_to_message_id=message.id, |
| | caption="<code>Output</code>", |
| | ) |
| | return os.remove("output.txt") |
| | await edit_or_reply(message, text=f"<b>OUTPUT :</b>\n<pre>{output}</pre>") |
| | else: |
| | await edit_or_reply(message, text="<b>OUTPUT :</b>\n<code>None</code>") |
| | await message.stop_propagation() |
| |
|