Spaces:
Sleeping
Sleeping
| import os, io, sys, zipfile, base64, random, shutil, subprocess, tempfile, asyncio | |
| from pathlib import Path | |
| from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup | |
| from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| from config import BOT_TOKEN, ADMIN_ID, MONGO_URI, DB_NAME | |
| # --- MongoDB Setup --- | |
| mongo_client = AsyncIOMotorClient(MONGO_URI) | |
| db = mongo_client[DB_NAME] | |
| users_col = db["users"] | |
| bans_col = db["banned_users"] | |
| stats_col = db["stats"] | |
| async def init_stats(): | |
| if not await stats_col.find_one({"_id":"encryption_count"}): | |
| await stats_col.insert_one({"_id":"encryption_count","count":0}) | |
| # --- Helpers --- | |
| def make_zip_bytes(filename: str, content: bytes) -> bytes: | |
| bio = io.BytesIO() | |
| with zipfile.ZipFile(bio, "w", compression=zipfile.ZIP_DEFLATED) as zf: | |
| zf.writestr(filename, content) | |
| return bio.getvalue() | |
| def rand_bytes(n): return os.urandom(n) | |
| def bytes_to_c_array_literal(b: bytes) -> str: | |
| return ",".join(str(x) for x in b) | |
| def xor_string(s: str, key: int = None) -> str: | |
| if key is None: key = random.randint(1,255) | |
| arr = [ord(c)^key for c in s] | |
| return f"(lambda s:''.join(chr(B^{key}) for B in s))([{','.join(str(x) for x in arr)}])" | |
| def generate_junk_data(min_kb=50,max_kb=70): | |
| junk_size = random.randint(min_kb*1024,max_kb*1024) | |
| chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_()-./,;:!?" | |
| return ''.join(random.choice(chars) for _ in range(junk_size)) | |
| # --- C Template --- | |
| C_TEMPLATE = """...""" # Keep original C_TEMPLATE | |
| SETUP_PY = """...""" # Keep original SETUP_PY | |
| def compile_c_extension(build_dir: Path): | |
| (build_dir/"setup.py").write_text(SETUP_PY) | |
| proc = subprocess.run([sys.executable,"setup.py","build_ext","--inplace"], | |
| cwd=str(build_dir), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=30) | |
| candidates = list(build_dir.glob("aes_helper*.so")) + list(build_dir.glob("build/lib.*/*.so")) | |
| if not candidates: | |
| raise RuntimeError("Compile fail:\n"+proc.stdout.decode()) | |
| return candidates[0] | |
| def build_wrapper(j_b64, so_b64, so_filename): | |
| lines = [ | |
| "wulcan = '.WulcanPy'", | |
| "import os as O, sys as S, base64 as B, tempfile as T", | |
| f"WL='{j_b64}'", | |
| f"M={repr(so_b64)}", | |
| f"D=O.path.join(T.gettempdir(),{xor_string('ninja_tmp')});O.makedirs(D,exist_ok=True)", | |
| f"P=O.path.join(D,{xor_string(so_filename)})", | |
| "open(P,'wb').write(B.b64decode(M))", | |
| "import importlib.machinery as L, importlib.util as U", | |
| "loader=L.ExtensionFileLoader('aes_helper',P);spec=U.spec_from_loader(loader.name,loader);mod=U.module_from_spec(spec);loader.exec_module(mod)", | |
| "import sys, io, zipfile, runpy", | |
| "try:", | |
| "\tc=B.b64decode(WL);z=mod.decrypt(c)", | |
| "\tt=T.mkdtemp(prefix='nin_');zipfile.ZipFile(io.BytesIO(z)).extractall(t)", | |
| "\tS.path.insert(0,t)", | |
| "\te=[f for f in O.listdir(t) if f.endswith('.py')][0]", | |
| "\tep=O.path.join(t,e)", | |
| "\tS.argv=[ep]+S.argv[1:]", | |
| "\trunpy.run_path(ep,run_name='__main__')", | |
| "except Exception as E: print('ninja_error',E)", | |
| "finally: O.remove(P) if O.path.exists(P) else None", | |
| f"JUNK={repr(generate_junk_data())}" | |
| ] | |
| return "\n".join(lines) | |
| # --- Process File --- | |
| async def process_file(input_file: Path, update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| user = update.effective_user | |
| tmpdir = Path(tempfile.mkdtemp()) | |
| tmp_file = tmpdir/input_file.name | |
| tmp_file.write_bytes(input_file.read_bytes()) | |
| py_bytes = tmp_file.read_bytes() | |
| zip_b = make_zip_bytes(input_file.name, py_bytes) | |
| key, iv = rand_bytes(32), rand_bytes(16) | |
| c_src = C_TEMPLATE.replace("/*KEY_BYTES*/", bytes_to_c_array_literal(key)).replace("/*IV_BYTES*/", bytes_to_c_array_literal(iv)) | |
| (tmpdir/"aes_helper.c").write_text(c_src) | |
| so_path = compile_c_extension(tmpdir) | |
| import importlib.util | |
| spec = importlib.util.spec_from_file_location("aes_helper",so_path) | |
| mod = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(mod) | |
| cipher = mod.encrypt(zip_b) | |
| cipher_b64 = base64.b64encode(cipher).decode() | |
| so_b64 = base64.b64encode(so_path.read_bytes()).decode() | |
| wrapper = build_wrapper(cipher_b64, so_b64, so_path.name) | |
| out_name = input_file.stem+"_enc.py" | |
| out_path = tmpdir/out_name | |
| out_path.write_text(wrapper) | |
| with open(out_path,'rb') as f: | |
| await update.message.reply_document(document=f,filename=out_name,caption=f"✅ File encoded: {out_name}") | |
| await stats_col.update_one({"_id":"encryption_count"},{"$inc":{"count":1}}) | |
| await context.bot.send_message(chat_id=ADMIN_ID, text=f"New encryption by {user.full_name} ({user.id}) file: {input_file.name}") | |
| shutil.rmtree(tmpdir, ignore_errors=True) | |
| # --- Handlers --- | |
| async def start(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| user = update.effective_user | |
| if not await users_col.find_one({"user_id":user.id}): | |
| await users_col.insert_one({"user_id":user.id,"name":user.full_name,"username":user.username or "None","language":user.language_code or "Unknown"}) | |
| await context.bot.send_message(chat_id=ADMIN_ID, text=f"New user: {user.full_name} ({user.id})") | |
| keyboard=[[InlineKeyboardButton("▶ Join Channel",url="https://t.me/kuttuxd17")]] | |
| await update.message.reply_text("Welcome to Secure File Encryptor Bot!",reply_markup=InlineKeyboardMarkup(keyboard)) | |
| async def encode(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| user = update.effective_user | |
| if await bans_col.find_one({"user_id":user.id}): | |
| await update.message.reply_text("You are banned.") | |
| return | |
| await update.message.reply_text("Please upload a .py file.") | |
| async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| user = update.effective_user | |
| if await bans_col.find_one({"user_id":user.id}): | |
| await update.message.reply_text("You are banned.") | |
| return | |
| document = update.message.document | |
| if not document.file_name.endswith(".py"): | |
| await update.message.reply_text("Upload .py file only.") | |
| return | |
| file = await document.get_file() | |
| tmpdir = Path(tempfile.mkdtemp()) | |
| input_path = tmpdir/document.file_name | |
| await file.download_to_drive(str(input_path)) | |
| await process_file(input_path, update, context) | |
| # --- Admin Commands --- | |
| async def total_users(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| if update.effective_user.id != ADMIN_ID: | |
| await update.message.reply_text("Unauthorized") | |
| return | |
| count = await users_col.count_documents({}) | |
| await update.message.reply_text(f"Total users: {count}") | |
| async def total_enc(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| if update.effective_user.id != ADMIN_ID: | |
| await update.message.reply_text("Unauthorized") | |
| return | |
| stats = await stats_col.find_one({"_id":"encryption_count"}) | |
| await update.message.reply_text(f"Total encryptions: {stats['count'] if stats else 0}") | |
| async def broadcast(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| if update.effective_user.id != ADMIN_ID: | |
| await update.message.reply_text("Unauthorized") | |
| return | |
| msg=" ".join(context.args) | |
| users=users_col.find({}) | |
| count=0 | |
| async for u in users: | |
| try: await context.bot.send_message(chat_id=u['user_id'], text=msg); count+=1 | |
| except: pass | |
| await update.message.reply_text(f"Broadcast sent to {count} users") | |
| async def ban(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| if update.effective_user.id != ADMIN_ID: return | |
| try: user_id=int(context.args[0]); await bans_col.update_one({"user_id":user_id},{"$set":{"user_id":user_id}},upsert=True); await update.message.reply_text(f"Banned {user_id}") | |
| except: await update.message.reply_text("Provide user ID") | |
| async def unban(update: Update, context: ContextTypes.DEFAULT_TYPE): | |
| if update.effective_user.id != ADMIN_ID: return | |
| try: user_id=int(context.args[0]); await bans_col.delete_one({"user_id":user_id}); await update.message.reply_text(f"Unbanned {user_id}") | |
| except: await update.message.reply_text("Provide user ID") | |
| # --- Main --- | |
| async def main(): | |
| await init_stats() | |
| app = Application.builder().token(BOT_TOKEN).build() | |
| app.add_handler(CommandHandler("start", start)) | |
| app.add_handler(CommandHandler("encode", encode)) | |
| app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) | |
| app.add_handler(CommandHandler("totalusers", total_users)) | |
| app.add_handler(CommandHandler("totalenc", total_enc)) | |
| app.add_handler(CommandHandler("broadcast", broadcast)) | |
| app.add_handler(CommandHandler("ban", ban)) | |
| app.add_handler(CommandHandler("unban", unban)) | |
| await app.run_polling() | |
| # New | |
| if __name__ == "__main__": | |
| import asyncio | |
| import nest_asyncio | |
| nest_asyncio.apply() # Allow nested event loops | |
| asyncio.get_event_loop().run_until_complete(main()) | |