kuttuxd / bot.py
JerryCoder's picture
Update bot.py
fe1d6aa verified
import os
import io
import zipfile
import base64
import uuid
import tempfile
import shutil
from pathlib import Path
from fastapi import FastAPI, UploadFile, File
import aiohttp
import uvicorn
app = FastAPI()
MAX_FILE_SIZE = 5*1024*1024 # 5 MB
# -------------------------------
# ๐Ÿ” Pure Python Encryptor
# -------------------------------
def make_zip_bytes(filename: str, content: bytes) -> bytes:
bio = io.BytesIO()
with zipfile.ZipFile(bio, "w", compression=zipfile.ZIP_DEFLATED) as zf:
zf.writestr(filename, content)
return bio.getvalue()
def rand_bytes(n: int) -> bytes:
return os.urandom(n)
def xor_bytes(data: bytes, key: bytes) -> bytes:
return bytes([b ^ key[i % len(key)] for i, b in enumerate(data)])
def generate_junk_data(min_kb=50, max_kb=70):
junk_size = int.from_bytes(rand_bytes(2), 'big') % ((max_kb-min_kb)*1024) + min_kb*1024
chars = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_()-./,;:!?"
return bytes([chars[b % len(chars)] for b in rand_bytes(junk_size)])
def build_wrapper(enc_b64: str, key_b64: str):
junk = generate_junk_data()
wrapper = f"""# Auto Encrypted by JerryCoder Python Encryptor
import base64, zipfile, io, tempfile, os, runpy
D = base64.b64decode('{enc_b64}')
K = base64.b64decode('{key_b64}')
O = bytes([b ^ K[i % len(K)] for i, b in enumerate(D)])
tmpdir = tempfile.mkdtemp()
zipfile.ZipFile(io.BytesIO(O)).extractall(tmpdir)
py_file = [f for f in os.listdir(tmpdir) if f.endswith('.py')][0]
runpy.run_path(os.path.join(tmpdir, py_file))
"""
return wrapper.encode()
def encode_bytes(file_bytes: bytes, filename: str):
zipped = make_zip_bytes(filename, file_bytes)
key = rand_bytes(16)
enc = xor_bytes(zipped, key)
enc_b64 = base64.b64encode(enc).decode()
key_b64 = base64.b64encode(key).decode()
wrapper_bytes = build_wrapper(enc_b64, key_b64)
wrapper_name = f"{os.path.splitext(filename)[0]}_{uuid.uuid4().hex}_enc.py"
return wrapper_bytes, wrapper_name
# -------------------------------
# POST /api/encode
# -------------------------------
@app.post("/api/encode")
async def encode_post(file: UploadFile = File(...)):
if not file.filename.endswith(".py"):
return {"ok": False, "error": "Only Python (.py) files are allowed"}
try:
file_bytes = await file.read()
if len(file_bytes) > MAX_FILE_SIZE:
return {"ok": False, "error": "File too large (max 5MB)"}
wrapper_bytes, wrapper_name = encode_bytes(file_bytes, file.filename)
tmp_path = os.path.join(tempfile.gettempdir(), wrapper_name)
with open(tmp_path, "wb") as f:
f.write(wrapper_bytes)
# Upload to AR Hosting
async with aiohttp.ClientSession() as session:
form = aiohttp.FormData()
f = open(tmp_path,"rb")
form.add_field("file", f, filename=wrapper_name, content_type="application/octet-stream")
async with session.post("https://ar-hosting.pages.dev/upload", data=form) as resp:
if resp.status != 200:
f.close()
return {"ok": False, "error": f"AR Hosting failed with status {resp.status}"}
result = await resp.json()
f.close()
return {"ok": True, "result_url": result.get("url"), "filename": wrapper_name}
except Exception as e:
return {"ok": False, "error": f"Internal error: {str(e)}"}
# -------------------------------
# GET /encode?url=...&filename=...
# -------------------------------
@app.get("/encode")
async def encode_get(url: str, filename: str = "file.py"):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
if resp.status != 200:
return {"ok": False, "error": f"Failed to fetch file ({resp.status})"}
file_bytes = await resp.read()
wrapper_bytes, wrapper_name = encode_bytes(file_bytes, filename)
tmp_path = os.path.join(tempfile.gettempdir(), wrapper_name)
with open(tmp_path, "wb") as f:
f.write(wrapper_bytes)
# Upload to AR Hosting
async with aiohttp.ClientSession() as session:
form = aiohttp.FormData()
f = open(tmp_path,"rb")
form.add_field("file", f, filename=wrapper_name, content_type="application/octet-stream")
async with session.post("https://ar-hosting.pages.dev/upload", data=form) as resp:
if resp.status != 200:
f.close()
return {"ok": False, "error": f"AR Hosting failed with status {resp.status}"}
result = await resp.json()
f.close()
return {"ok": True, "result_url": result.get("url"), "filename": wrapper_name}
except Exception as e:
return {"ok": False, "error": f"Internal error: {str(e)}"}
# -------------------------------
# Run App
# -------------------------------
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860)))