import os import io import sys import zipfile import base64 import random import shutil import subprocess import tempfile from pathlib import Path from fastapi import FastAPI, Request from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes from motor.motor_asyncio import AsyncIOMotorClient import uvicorn from config import BOT_TOKEN, ADMIN_ID, MONGO_URI, DB_NAME # --- MongoDB Setup --- mongo_client = AsyncIOMotorClient(MONGO_URI) db = mongo_client[DB_NAME] users_col = db["users"] bans_col = db["banned_users"] stats_col = db["stats"] async def init_stats(): if not await stats_col.find_one({"_id":"encryption_count"}): await stats_col.insert_one({"_id":"encryption_count","count":0}) # --- Helpers --- def make_zip_bytes(filename: str, content: bytes) -> bytes: bio = io.BytesIO() with zipfile.ZipFile(bio, "w", compression=zipfile.ZIP_DEFLATED) as zf: zf.writestr(filename, content) return bio.getvalue() def rand_bytes(n): return os.urandom(n) def bytes_to_c_array_literal(b: bytes) -> str: return ",".join(str(x) for x in b) def xor_string(s: str, key: int = None) -> str: if key is None: key = random.randint(1,255) arr = [ord(c)^key for c in s] return f"(lambda s:''.join(chr(B^{key}) for B in s))([{','.join(str(x) for x in arr)}])" def generate_junk_data(min_kb=50,max_kb=70): junk_size = random.randint(min_kb*1024,max_kb*1024) chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_()-./,;:!?" return ''.join(random.choice(chars) for _ in range(junk_size)) # --- C Template --- C_TEMPLATE = """/*KEY_BYTES*/""" SETUP_PY = """# Your setup.py content""" def compile_c_extension(build_dir: Path): (build_dir/"setup.py").write_text(SETUP_PY) proc = subprocess.run([sys.executable,"setup.py","build_ext","--inplace"], cwd=str(build_dir), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=30) candidates = list(build_dir.glob("aes_helper*.so")) + list(build_dir.glob("build/lib.*/*.so")) if not candidates: raise RuntimeError("Compile fail:\n"+proc.stdout.decode()) return candidates[0] def build_wrapper(j_b64, so_b64, so_filename): lines = [ "wulcan = '.WulcanPy'", "import os as O, sys as S, base64 as B, tempfile as T", f"WL='{j_b64}'", f"M={repr(so_b64)}", f"D=O.path.join(T.gettempdir(),{xor_string('ninja_tmp')});O.makedirs(D,exist_ok=True)", f"P=O.path.join(D,{xor_string(so_filename)})", "open(P,'wb').write(B.b64decode(M))", "import importlib.machinery as L, importlib.util as U", "loader=L.ExtensionFileLoader('aes_helper',P);spec=U.spec_from_loader(loader.name,loader);mod=U.module_from_spec(spec);loader.exec_module(mod)", "import sys, io, zipfile, runpy", "try:", "\tc=B.b64decode(WL);z=mod.decrypt(c)", "\tt=T.mkdtemp(prefix='nin_');zipfile.ZipFile(io.BytesIO(z)).extractall(t)", "\tS.path.insert(0,t)", "\te=[f for f in O.listdir(t) if f.endswith('.py')][0]", "\tep=O.path.join(t,e)", "\tS.argv=[ep]+S.argv[1:]", "\trunpy.run_path(ep,run_name='__main__')", "except Exception as E: print('ninja_error',E)", "finally: O.remove(P) if O.path.exists(P) else None", f"JUNK={repr(generate_junk_data())}" ] return "\n".join(lines) # --- Process File --- async def process_file(input_file: Path, chat_id, bot): tmpdir = Path(tempfile.mkdtemp()) tmp_file = tmpdir/input_file.name tmp_file.write_bytes(input_file.read_bytes()) py_bytes = tmp_file.read_bytes() zip_b = make_zip_bytes(input_file.name, py_bytes) key, iv = rand_bytes(32), rand_bytes(16) c_src = C_TEMPLATE.replace("/*KEY_BYTES*/", bytes_to_c_array_literal(key)).replace("/*IV_BYTES*/", bytes_to_c_array_literal(iv)) (tmpdir/"aes_helper.c").write_text(c_src) so_path = compile_c_extension(tmpdir) import importlib.util spec = importlib.util.spec_from_file_location("aes_helper",so_path) mod = importlib.util.module_from_spec(spec) spec.loader.exec_module(mod) cipher = mod.encrypt(zip_b) cipher_b64 = base64.b64encode(cipher).decode() so_b64 = base64.b64encode(so_path.read_bytes()).decode() wrapper = build_wrapper(cipher_b64, so_b64, so_path.name) out_name = input_file.stem+"_enc.py" out_path = tmpdir/out_name out_path.write_text(wrapper) await bot.send_document(chat_id=chat_id, document=open(out_path,'rb'), filename=out_name) await stats_col.update_one({"_id":"encryption_count"},{"$inc":{"count":1}}) shutil.rmtree(tmpdir, ignore_errors=True) # --- FastAPI App --- app = FastAPI() bot_app = Application.builder().token(BOT_TOKEN).build() # Register handlers (for PHP polling) bot_app.add_handler(CommandHandler("start", lambda u,c: u.message.reply_text("Bot started!"))) bot_app.add_handler(MessageHandler(filters.Document.ALL, lambda u,c: u.message.reply_text("File received"))) @app.post("/poll") async def poll_endpoint(request: Request): """ PHP calls this endpoint with JSON: {"update": telegram_update_json} """ data = await request.json() update = Update.de_json(data, bot_app.bot) await bot_app.update_queue.put(update) return {"ok": True} if __name__ == "__main__": import asyncio asyncio.run(init_stats()) uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))