Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| import traceback | |
| from inspect import getfullargspec | |
| from io import StringIO | |
| from time import time | |
| from pyrogram import filters | |
| from pyrogram.types import InlineKeyboardButton, InlineKeyboardMarkup, Message | |
| from Devine import app | |
| from config import OWNER_ID | |
| async def aexec(code, client, message): | |
| exec( | |
| "async def __aexec(client, message): " | |
| + "".join(f"\n {a}" for a in code.split("\n")) | |
| ) | |
| return await locals()["__aexec"](client, message) | |
| async def edit_or_reply(msg: Message, **kwargs): | |
| func = msg.edit_text if msg.from_user.is_self else msg.reply | |
| spec = getfullargspec(func.__wrapped__).args | |
| await func(**{k: v for k, v in kwargs.items() if k in spec}) | |
| async def executor(client: app, message: Message): | |
| if len(message.command) < 2: | |
| return await edit_or_reply(message, text="<b>ᴡʜᴀᴛ ʏᴏᴜ ᴡᴀɴɴᴀ ᴇxᴇᴄᴜᴛᴇ ᴍʏ ʟᴏʀᴅ ?</b>") | |
| try: | |
| cmd = message.text.split(" ", maxsplit=1)[1] | |
| except IndexError: | |
| return await message.delete() | |
| t1 = time() | |
| old_stderr = sys.stderr | |
| old_stdout = sys.stdout | |
| redirected_output = sys.stdout = StringIO() | |
| redirected_error = sys.stderr = StringIO() | |
| stdout, stderr, exc = None, None, None | |
| try: | |
| await aexec(cmd, client, message) | |
| except Exception: | |
| exc = traceback.format_exc() | |
| stdout = redirected_output.getvalue() | |
| stderr = redirected_error.getvalue() | |
| sys.stdout = old_stdout | |
| sys.stderr = old_stderr | |
| evaluation = "\n" | |
| if exc: | |
| evaluation += exc | |
| elif stderr: | |
| evaluation += stderr | |
| elif stdout: | |
| evaluation += stdout | |
| else: | |
| evaluation += "Success" | |
| final_output = f"<b>⥤ ʀᴇsᴜʟᴛ :</b>\n<pre language='python'>{evaluation}</pre>" | |
| if len(final_output) > 4096: | |
| filename = "output.txt" | |
| with open(filename, "w+", encoding="utf8") as out_file: | |
| out_file.write(str(evaluation)) | |
| t2 = time() | |
| keyboard = InlineKeyboardMarkup( | |
| [ | |
| [ | |
| InlineKeyboardButton( | |
| text="⏳", | |
| callback_data=f"runtime {t2-t1} Seconds", | |
| ) | |
| ] | |
| ] | |
| ) | |
| await message.reply_document( | |
| document=filename, | |
| caption=f"<b>⥤ ᴇᴠᴀʟ :</b>\n<code>{cmd[0:980]}</code>\n\n<b>⥤ ʀᴇsᴜʟᴛ :</b>\nAttached Document", | |
| quote=False, | |
| reply_markup=keyboard, | |
| ) | |
| await message.delete() | |
| os.remove(filename) | |
| else: | |
| t2 = time() | |
| keyboard = InlineKeyboardMarkup( | |
| [ | |
| [ | |
| InlineKeyboardButton( | |
| text="⏳", | |
| callback_data=f"runtime {round(t2-t1, 3)} Seconds", | |
| ), | |
| InlineKeyboardButton( | |
| text="🗑", | |
| callback_data=f"forceclose abc|{message.from_user.id}", | |
| ), | |
| ] | |
| ] | |
| ) | |
| await edit_or_reply(message, text=final_output, reply_markup=keyboard) | |
| async def runtime_func_cq(_, cq): | |
| runtime = cq.data.split(None, 1)[1] | |
| await cq.answer(runtime, show_alert=True) | |
| async def forceclose_command(_, CallbackQuery): | |
| callback_data = CallbackQuery.data.strip() | |
| callback_request = callback_data.split(None, 1)[1] | |
| query, user_id = callback_request.split("|") | |
| if CallbackQuery.from_user.id != int(user_id): | |
| try: | |
| return await CallbackQuery.answer( | |
| "<b>‣ ɪᴛ'ʟʟ ʙᴇ ʙᴇᴛᴛᴇʀ ɪғ ʏᴏᴜ sᴛᴀʏ ɪɴ ʏᴏᴜʀ ʟɪᴍɪᴛs ʙᴀʙʏe.</b>", show_alert=True | |
| ) | |
| except: | |
| return | |
| await CallbackQuery.message.delete() | |
| try: | |
| await CallbackQuery.answer() | |
| except: | |
| return | |
| async def shellrunner(_, message: Message): | |
| if len(message.command) < 2: | |
| return await edit_or_reply(message, text="<b>ᴇxᴀᴍᴩʟᴇ :</b>\n/sh git pull") | |
| text = message.text.split(None, 1)[1] | |
| if "\n" in text: | |
| code = text.split("\n") | |
| output = "" | |
| for x in code: | |
| shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", x) | |
| try: | |
| process = subprocess.Popen( | |
| shell, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| except Exception as err: | |
| await edit_or_reply(message, text=f"<b>ERROR :</b>\n<pre>{err}</pre>") | |
| output += f"<b>{code}</b>\n" | |
| output += process.stdout.read()[:-1].decode("utf-8") | |
| output += "\n" | |
| else: | |
| shell = re.split(""" (?=(?:[^'"]|'[^']*'|"[^"]*")*$)""", text) | |
| for a in range(len(shell)): | |
| shell[a] = shell[a].replace('"', "") | |
| try: | |
| process = subprocess.Popen( | |
| shell, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| ) | |
| except Exception as err: | |
| print(err) | |
| exc_type, exc_obj, exc_tb = sys.exc_info() | |
| errors = traceback.format_exception( | |
| etype=exc_type, | |
| value=exc_obj, | |
| tb=exc_tb, | |
| ) | |
| return await edit_or_reply( | |
| message, text=f"<b>ERROR :</b>\n<pre>{''.join(errors)}</pre>" | |
| ) | |
| output = process.stdout.read()[:-1].decode("utf-8") | |
| if str(output) == "\n": | |
| output = None | |
| if output: | |
| if len(output) > 4096: | |
| with open("output.txt", "w+") as file: | |
| file.write(output) | |
| await app.send_document( | |
| message.chat.id, | |
| "output.txt", | |
| reply_to_message_id=message.id, | |
| caption="<code>Output</code>", | |
| ) | |
| return os.remove("output.txt") | |
| await edit_or_reply(message, text=f"<b>OUTPUT :</b>\n<pre>{output}</pre>") | |
| else: | |
| await edit_or_reply(message, text="<b>OUTPUT :</b>\n<code>None</code>") | |
| await message.stop_propagation() | |