from aiofiles import open as aiopen from contextlib import redirect_stdout, suppress from io import StringIO, BytesIO from os import path as ospath, getcwd, chdir from textwrap import indent from traceback import format_exc from re import match from .. import LOGGER from ..core.tg_client import TgClient from ..helper.ext_utils.bot_utils import sync_to_async, new_task from ..helper.telegram_helper.message_utils import send_file, send_message namespaces = {} def namespace_of(message): if message.chat.id not in namespaces: namespaces[message.chat.id] = { "__name__": "__main__", "__file__": "", "__builtins__": globals()["__builtins__"], "bot": TgClient.bot, "message": message, "user": message.from_user or message.sender_chat, "chat": message.chat, } return namespaces[message.chat.id] def log_input(message): LOGGER.info( f"IN: {message.text} (user={(message.from_user or message.sender_chat).id}, chat={message.chat.id})" ) async def send(msg, message): if len(str(msg)) > 2000: with BytesIO(str.encode(msg)) as out_file: out_file.name = "output.txt" await send_file(message, out_file) else: LOGGER.info(f"OUT: '{msg}'") if not msg or msg == "\n": msg = "MessageEmpty" elif not bool(match(r"<(spoiler|b|i|code|s|u|/a)>", msg)): msg = f"{msg}" await send_message(message, msg) @new_task async def aioexecute(_, message): await send(await do("aexec", message), message) @new_task async def execute(_, message): await send(await do("exec", message), message) def cleanup_code(code): if code.startswith("```") and code.endswith("```"): return "\n".join(code.split("\n")[1:-1]) return code.strip("` \n") async def do(func, message): log_input(message) content = message.text.split(maxsplit=1)[-1] body = cleanup_code(content) env = namespace_of(message) chdir(getcwd()) async with aiopen(ospath.join(getcwd(), "bot/modules/temp.txt"), "w") as temp: await temp.write(body) stdout = StringIO() try: if func == "exec": exec(f"def func():\n{indent(body, ' ')}", env) else: exec(f"async def func():\n{indent(body, ' ')}", env) except Exception as e: return f"{e.__class__.__name__}: {e}" rfunc = env["func"] try: with redirect_stdout(stdout): func_return = ( await sync_to_async(rfunc) if func == "exec" else await rfunc() ) except Exception: value = stdout.getvalue() return f"{value}{format_exc()}" else: value = stdout.getvalue() result = None if func_return is None: if value: result = f"{value}" else: with suppress(Exception): result = f"{repr(await sync_to_async(eval, body, env))}" else: result = f"{value}{func_return}" if result: return result @new_task async def clear(_, message): log_input(message) global namespaces if message.chat.id in namespaces: del namespaces[message.chat.id] await send("Locals Cleared.", message)