Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import sys | |
| import zipfile | |
| import base64 | |
| import random | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| from fastapi import FastAPI, Request | |
| from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup | |
| from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| import uvicorn | |
| from config import BOT_TOKEN, ADMIN_ID, MONGO_URI, DB_NAME | |
| # --- MongoDB Setup --- | |
| mongo_client = AsyncIOMotorClient(MONGO_URI) | |
| db = mongo_client[DB_NAME] | |
| users_col = db["users"] | |
| bans_col = db["banned_users"] | |
| stats_col = db["stats"] | |
| async def init_stats(): | |
| if not await stats_col.find_one({"_id":"encryption_count"}): | |
| await stats_col.insert_one({"_id":"encryption_count","count":0}) | |
| # --- Helpers --- | |
| def make_zip_bytes(filename: str, content: bytes) -> bytes: | |
| bio = io.BytesIO() | |
| with zipfile.ZipFile(bio, "w", compression=zipfile.ZIP_DEFLATED) as zf: | |
| zf.writestr(filename, content) | |
| return bio.getvalue() | |
| def rand_bytes(n): return os.urandom(n) | |
| def bytes_to_c_array_literal(b: bytes) -> str: | |
| return ",".join(str(x) for x in b) | |
| def xor_string(s: str, key: int = None) -> str: | |
| if key is None: key = random.randint(1,255) | |
| arr = [ord(c)^key for c in s] | |
| return f"(lambda s:''.join(chr(B^{key}) for B in s))([{','.join(str(x) for x in arr)}])" | |
| def generate_junk_data(min_kb=50,max_kb=70): | |
| junk_size = random.randint(min_kb*1024,max_kb*1024) | |
| chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_()-./,;:!?" | |
| return ''.join(random.choice(chars) for _ in range(junk_size)) | |
| # --- C Template --- | |
| C_TEMPLATE = """/*KEY_BYTES*/""" | |
| SETUP_PY = """# Your setup.py content""" | |
| def compile_c_extension(build_dir: Path): | |
| (build_dir/"setup.py").write_text(SETUP_PY) | |
| proc = subprocess.run([sys.executable,"setup.py","build_ext","--inplace"], | |
| cwd=str(build_dir), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=30) | |
| candidates = list(build_dir.glob("aes_helper*.so")) + list(build_dir.glob("build/lib.*/*.so")) | |
| if not candidates: | |
| raise RuntimeError("Compile fail:\n"+proc.stdout.decode()) | |
| return candidates[0] | |
| def build_wrapper(j_b64, so_b64, so_filename): | |
| lines = [ | |
| "wulcan = '.WulcanPy'", | |
| "import os as O, sys as S, base64 as B, tempfile as T", | |
| f"WL='{j_b64}'", | |
| f"M={repr(so_b64)}", | |
| f"D=O.path.join(T.gettempdir(),{xor_string('ninja_tmp')});O.makedirs(D,exist_ok=True)", | |
| f"P=O.path.join(D,{xor_string(so_filename)})", | |
| "open(P,'wb').write(B.b64decode(M))", | |
| "import importlib.machinery as L, importlib.util as U", | |
| "loader=L.ExtensionFileLoader('aes_helper',P);spec=U.spec_from_loader(loader.name,loader);mod=U.module_from_spec(spec);loader.exec_module(mod)", | |
| "import sys, io, zipfile, runpy", | |
| "try:", | |
| "\tc=B.b64decode(WL);z=mod.decrypt(c)", | |
| "\tt=T.mkdtemp(prefix='nin_');zipfile.ZipFile(io.BytesIO(z)).extractall(t)", | |
| "\tS.path.insert(0,t)", | |
| "\te=[f for f in O.listdir(t) if f.endswith('.py')][0]", | |
| "\tep=O.path.join(t,e)", | |
| "\tS.argv=[ep]+S.argv[1:]", | |
| "\trunpy.run_path(ep,run_name='__main__')", | |
| "except Exception as E: print('ninja_error',E)", | |
| "finally: O.remove(P) if O.path.exists(P) else None", | |
| f"JUNK={repr(generate_junk_data())}" | |
| ] | |
| return "\n".join(lines) | |
| # --- Process File --- | |
| async def process_file(input_file: Path, chat_id, bot): | |
| tmpdir = Path(tempfile.mkdtemp()) | |
| tmp_file = tmpdir/input_file.name | |
| tmp_file.write_bytes(input_file.read_bytes()) | |
| py_bytes = tmp_file.read_bytes() | |
| zip_b = make_zip_bytes(input_file.name, py_bytes) | |
| key, iv = rand_bytes(32), rand_bytes(16) | |
| c_src = C_TEMPLATE.replace("/*KEY_BYTES*/", bytes_to_c_array_literal(key)).replace("/*IV_BYTES*/", bytes_to_c_array_literal(iv)) | |
| (tmpdir/"aes_helper.c").write_text(c_src) | |
| so_path = compile_c_extension(tmpdir) | |
| import importlib.util | |
| spec = importlib.util.spec_from_file_location("aes_helper",so_path) | |
| mod = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(mod) | |
| cipher = mod.encrypt(zip_b) | |
| cipher_b64 = base64.b64encode(cipher).decode() | |
| so_b64 = base64.b64encode(so_path.read_bytes()).decode() | |
| wrapper = build_wrapper(cipher_b64, so_b64, so_path.name) | |
| out_name = input_file.stem+"_enc.py" | |
| out_path = tmpdir/out_name | |
| out_path.write_text(wrapper) | |
| await bot.send_document(chat_id=chat_id, document=open(out_path,'rb'), filename=out_name) | |
| await stats_col.update_one({"_id":"encryption_count"},{"$inc":{"count":1}}) | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| # --- FastAPI App --- | |
| app = FastAPI() | |
| bot_app = Application.builder().token(BOT_TOKEN).build() | |
| # Register handlers (for PHP polling) | |
| bot_app.add_handler(CommandHandler("start", lambda u,c: u.message.reply_text("Bot started!"))) | |
| bot_app.add_handler(MessageHandler(filters.Document.ALL, lambda u,c: u.message.reply_text("File received"))) | |
| @app.post("/poll") | |
| async def poll_endpoint(request: Request): | |
| """ | |
| PHP calls this endpoint with JSON: {"update": telegram_update_json} | |
| """ | |
| data = await request.json() | |
| update = Update.de_json(data, bot_app.bot) | |
| await bot_app.update_queue.put(update) | |
| return {"ok": True} | |
| if __name__ == "__main__": | |
| import asyncio | |
| asyncio.run(init_stats()) | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) | |