kuttuxd / bot-not.txt
JerryCoder's picture
Rename bot.py to bot-not.txt
fc69f00 verified
import os
import io
import sys
import zipfile
import base64
import random
import shutil
import subprocess
import tempfile
from pathlib import Path
from fastapi import FastAPI, Request
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, MessageHandler, filters, ContextTypes
from motor.motor_asyncio import AsyncIOMotorClient
import uvicorn
from config import BOT_TOKEN, ADMIN_ID, MONGO_URI, DB_NAME
# --- MongoDB Setup ---
mongo_client = AsyncIOMotorClient(MONGO_URI)
db = mongo_client[DB_NAME]
users_col = db["users"]
bans_col = db["banned_users"]
stats_col = db["stats"]
async def init_stats():
if not await stats_col.find_one({"_id":"encryption_count"}):
await stats_col.insert_one({"_id":"encryption_count","count":0})
# --- Helpers ---
def make_zip_bytes(filename: str, content: bytes) -> bytes:
bio = io.BytesIO()
with zipfile.ZipFile(bio, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(filename, content)
return bio.getvalue()
def rand_bytes(n): return os.urandom(n)
def bytes_to_c_array_literal(b: bytes) -> str:
return ",".join(str(x) for x in b)
def xor_string(s: str, key: int = None) -> str:
if key is None: key = random.randint(1,255)
arr = [ord(c)^key for c in s]
return f"(lambda s:''.join(chr(B^{key}) for B in s))([{','.join(str(x) for x in arr)}])"
def generate_junk_data(min_kb=50,max_kb=70):
junk_size = random.randint(min_kb*1024,max_kb*1024)
chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_()-./,;:!?"
return ''.join(random.choice(chars) for _ in range(junk_size))
# --- C Template ---
C_TEMPLATE = """/*KEY_BYTES*/"""
SETUP_PY = """# Your setup.py content"""
def compile_c_extension(build_dir: Path):
(build_dir/"setup.py").write_text(SETUP_PY)
proc = subprocess.run([sys.executable,"setup.py","build_ext","--inplace"],
cwd=str(build_dir), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, timeout=30)
candidates = list(build_dir.glob("aes_helper*.so")) + list(build_dir.glob("build/lib.*/*.so"))
if not candidates:
raise RuntimeError("Compile fail:\n"+proc.stdout.decode())
return candidates[0]
def build_wrapper(j_b64, so_b64, so_filename):
lines = [
"wulcan = '.WulcanPy'",
"import os as O, sys as S, base64 as B, tempfile as T",
f"WL='{j_b64}'",
f"M={repr(so_b64)}",
f"D=O.path.join(T.gettempdir(),{xor_string('ninja_tmp')});O.makedirs(D,exist_ok=True)",
f"P=O.path.join(D,{xor_string(so_filename)})",
"open(P,'wb').write(B.b64decode(M))",
"import importlib.machinery as L, importlib.util as U",
"loader=L.ExtensionFileLoader('aes_helper',P);spec=U.spec_from_loader(loader.name,loader);mod=U.module_from_spec(spec);loader.exec_module(mod)",
"import sys, io, zipfile, runpy",
"try:",
"\tc=B.b64decode(WL);z=mod.decrypt(c)",
"\tt=T.mkdtemp(prefix='nin_');zipfile.ZipFile(io.BytesIO(z)).extractall(t)",
"\tS.path.insert(0,t)",
"\te=[f for f in O.listdir(t) if f.endswith('.py')][0]",
"\tep=O.path.join(t,e)",
"\tS.argv=[ep]+S.argv[1:]",
"\trunpy.run_path(ep,run_name='__main__')",
"except Exception as E: print('ninja_error',E)",
"finally: O.remove(P) if O.path.exists(P) else None",
f"JUNK={repr(generate_junk_data())}"
]
return "\n".join(lines)
# --- Process File ---
async def process_file(input_file: Path, chat_id, bot):
tmpdir = Path(tempfile.mkdtemp())
tmp_file = tmpdir/input_file.name
tmp_file.write_bytes(input_file.read_bytes())
py_bytes = tmp_file.read_bytes()
zip_b = make_zip_bytes(input_file.name, py_bytes)
key, iv = rand_bytes(32), rand_bytes(16)
c_src = C_TEMPLATE.replace("/*KEY_BYTES*/", bytes_to_c_array_literal(key)).replace("/*IV_BYTES*/", bytes_to_c_array_literal(iv))
(tmpdir/"aes_helper.c").write_text(c_src)
so_path = compile_c_extension(tmpdir)
import importlib.util
spec = importlib.util.spec_from_file_location("aes_helper",so_path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
cipher = mod.encrypt(zip_b)
cipher_b64 = base64.b64encode(cipher).decode()
so_b64 = base64.b64encode(so_path.read_bytes()).decode()
wrapper = build_wrapper(cipher_b64, so_b64, so_path.name)
out_name = input_file.stem+"_enc.py"
out_path = tmpdir/out_name
out_path.write_text(wrapper)
await bot.send_document(chat_id=chat_id, document=open(out_path,'rb'), filename=out_name)
await stats_col.update_one({"_id":"encryption_count"},{"$inc":{"count":1}})
shutil.rmtree(tmpdir, ignore_errors=True)
# --- FastAPI App ---
app = FastAPI()
bot_app = Application.builder().token(BOT_TOKEN).build()
# Register handlers (for PHP polling)
bot_app.add_handler(CommandHandler("start", lambda u,c: u.message.reply_text("Bot started!")))
bot_app.add_handler(MessageHandler(filters.Document.ALL, lambda u,c: u.message.reply_text("File received")))
@app.post("/poll")
async def poll_endpoint(request: Request):
"""
PHP calls this endpoint with JSON: {"update": telegram_update_json}
"""
data = await request.json()
update = Update.de_json(data, bot_app.bot)
await bot_app.update_queue.put(update)
return {"ok": True}
if __name__ == "__main__":
import asyncio
asyncio.run(init_stats())
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))