Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import zipfile | |
| import base64 | |
| import uuid | |
| import tempfile | |
| import shutil | |
| from pathlib import Path | |
| from fastapi import FastAPI, UploadFile, File | |
| import aiohttp | |
| import uvicorn | |
| app = FastAPI() | |
| MAX_FILE_SIZE = 5*1024*1024 # 5 MB | |
| # ------------------------------- | |
| # ๐ Pure Python Encryptor | |
| # ------------------------------- | |
| def make_zip_bytes(filename: str, content: bytes) -> bytes: | |
| bio = io.BytesIO() | |
| with zipfile.ZipFile(bio, "w", compression=zipfile.ZIP_DEFLATED) as zf: | |
| zf.writestr(filename, content) | |
| return bio.getvalue() | |
| def rand_bytes(n: int) -> bytes: | |
| return os.urandom(n) | |
| def xor_bytes(data: bytes, key: bytes) -> bytes: | |
| return bytes([b ^ key[i % len(key)] for i, b in enumerate(data)]) | |
| def generate_junk_data(min_kb=50, max_kb=70): | |
| junk_size = int.from_bytes(rand_bytes(2), 'big') % ((max_kb-min_kb)*1024) + min_kb*1024 | |
| chars = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789_()-./,;:!?" | |
| return bytes([chars[b % len(chars)] for b in rand_bytes(junk_size)]) | |
| def build_wrapper(enc_b64: str, key_b64: str): | |
| junk = generate_junk_data() | |
| wrapper = f"""# Auto Encrypted by JerryCoder Python Encryptor | |
| import base64, zipfile, io, tempfile, os, runpy | |
| D = base64.b64decode('{enc_b64}') | |
| K = base64.b64decode('{key_b64}') | |
| O = bytes([b ^ K[i % len(K)] for i, b in enumerate(D)]) | |
| tmpdir = tempfile.mkdtemp() | |
| zipfile.ZipFile(io.BytesIO(O)).extractall(tmpdir) | |
| py_file = [f for f in os.listdir(tmpdir) if f.endswith('.py')][0] | |
| runpy.run_path(os.path.join(tmpdir, py_file)) | |
| """ | |
| return wrapper.encode() | |
| def encode_bytes(file_bytes: bytes, filename: str): | |
| zipped = make_zip_bytes(filename, file_bytes) | |
| key = rand_bytes(16) | |
| enc = xor_bytes(zipped, key) | |
| enc_b64 = base64.b64encode(enc).decode() | |
| key_b64 = base64.b64encode(key).decode() | |
| wrapper_bytes = build_wrapper(enc_b64, key_b64) | |
| wrapper_name = f"{os.path.splitext(filename)[0]}_{uuid.uuid4().hex}_enc.py" | |
| return wrapper_bytes, wrapper_name | |
| # ------------------------------- | |
| # POST /api/encode | |
| # ------------------------------- | |
| async def encode_post(file: UploadFile = File(...)): | |
| if not file.filename.endswith(".py"): | |
| return {"ok": False, "error": "Only Python (.py) files are allowed"} | |
| try: | |
| file_bytes = await file.read() | |
| if len(file_bytes) > MAX_FILE_SIZE: | |
| return {"ok": False, "error": "File too large (max 5MB)"} | |
| wrapper_bytes, wrapper_name = encode_bytes(file_bytes, file.filename) | |
| tmp_path = os.path.join(tempfile.gettempdir(), wrapper_name) | |
| with open(tmp_path, "wb") as f: | |
| f.write(wrapper_bytes) | |
| # Upload to AR Hosting | |
| async with aiohttp.ClientSession() as session: | |
| form = aiohttp.FormData() | |
| f = open(tmp_path,"rb") | |
| form.add_field("file", f, filename=wrapper_name, content_type="application/octet-stream") | |
| async with session.post("https://ar-hosting.pages.dev/upload", data=form) as resp: | |
| if resp.status != 200: | |
| f.close() | |
| return {"ok": False, "error": f"AR Hosting failed with status {resp.status}"} | |
| result = await resp.json() | |
| f.close() | |
| return {"ok": True, "result_url": result.get("url"), "filename": wrapper_name} | |
| except Exception as e: | |
| return {"ok": False, "error": f"Internal error: {str(e)}"} | |
| # ------------------------------- | |
| # GET /encode?url=...&filename=... | |
| # ------------------------------- | |
| async def encode_get(url: str, filename: str = "file.py"): | |
| try: | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(url) as resp: | |
| if resp.status != 200: | |
| return {"ok": False, "error": f"Failed to fetch file ({resp.status})"} | |
| file_bytes = await resp.read() | |
| wrapper_bytes, wrapper_name = encode_bytes(file_bytes, filename) | |
| tmp_path = os.path.join(tempfile.gettempdir(), wrapper_name) | |
| with open(tmp_path, "wb") as f: | |
| f.write(wrapper_bytes) | |
| # Upload to AR Hosting | |
| async with aiohttp.ClientSession() as session: | |
| form = aiohttp.FormData() | |
| f = open(tmp_path,"rb") | |
| form.add_field("file", f, filename=wrapper_name, content_type="application/octet-stream") | |
| async with session.post("https://ar-hosting.pages.dev/upload", data=form) as resp: | |
| if resp.status != 200: | |
| f.close() | |
| return {"ok": False, "error": f"AR Hosting failed with status {resp.status}"} | |
| result = await resp.json() | |
| f.close() | |
| return {"ok": True, "result_url": result.get("url"), "filename": wrapper_name} | |
| except Exception as e: | |
| return {"ok": False, "error": f"Internal error: {str(e)}"} | |
| # ------------------------------- | |
| # Run App | |
| # ------------------------------- | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 7860))) | |