| import os, pty, select, socket, asyncio, shutil, zipfile, bcrypt, aiosqlite, telebot, psutil, sys, threading, base64 |
| from fastapi import FastAPI, UploadFile, File, Request, Response, Form |
| from fastapi.responses import HTMLResponse, JSONResponse |
| import socketio |
| import database, ui_templates |
|
|
| K_I = base64.b64decode("R1VTSU9OTFlfUFJPXzIwMjY=").decode() |
| C_V = os.getenv("AQSO_CORE_KEY", "").strip() |
| B_T = os.getenv("BOT_TOKEN") |
| S_K = os.getenv("PANEL_SECRET") |
| A_T = os.getenv("ADMIN_TELE_ID") |
|
|
| if C_V != K_I: sys.exit(1) |
| if not all([B_T, S_K, A_T]): sys.exit(1) |
|
|
| app = FastAPI() |
| sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins="*") |
| bot = telebot.TeleBot(B_T) |
| W_R = os.path.abspath(os.path.join(os.getcwd(), "data")) |
| A_I = int(A_T) |
|
|
| |
| @bot.message_handler(commands=['start']) |
| def _s(m): |
| l = "⚡ **AQSO CORE v18** ⚡\n\n├ `/daftar [user] [pass]`\n├ `/hapus [user]`\n└ `/list`" |
| bot.reply_to(m, l, parse_mode="Markdown") |
|
|
| @bot.message_handler(commands=['daftar']) |
| def _r(m): |
| async def _do(): |
| try: |
| a = m.text.split() |
| if len(a) < 3: return |
| if await database.add_user(a[1], a[2]): bot.reply_to(m, f"✅ {a[1].upper()} ADDED.") |
| except: pass |
| asyncio.run(_do()) |
|
|
| @bot.message_handler(commands=['hapus']) |
| def _d(m): |
| if m.from_user.id != A_I: return |
| async def _do(): |
| try: |
| u = m.text.split()[1] |
| await database.del_user(u) |
| |
| p = os.path.join(W_R, u.upper()) |
| if os.path.exists(p): shutil.rmtree(p) |
| bot.reply_to(m, f"🗑️ {u.upper()} PURGED.") |
| except: pass |
| asyncio.run(_do()) |
|
|
| @bot.message_handler(commands=['list']) |
| def _l(m): |
| if m.from_user.id != A_I: return |
| async def _do(): |
| u = await database.list_users() |
| bot.reply_to(m, "👥 **USERS:**\n" + "\n".join(u) if u else "EMPTY") |
| asyncio.run(_do()) |
|
|
| def _bt(): |
| try: |
| bot.remove_webhook() |
| bot.infinity_polling(timeout=20) |
| except: pass |
|
|
| @app.on_event("startup") |
| async def _os(): |
| if not os.path.exists(W_R): os.makedirs(W_R, exist_ok=True) |
| await database.init_db() |
| threading.Thread(target=_bt, daemon=True).start() |
|
|
| def _gw(u, sub=""): |
| r = os.path.abspath(os.path.join(W_R, u.upper() if u else "GUEST")) |
| p = os.path.abspath(os.path.join(r, sub.lstrip("/"))) |
| if not p.startswith(r): p = r |
| if not os.path.exists(p): os.makedirs(p, exist_ok=True) |
| return p |
|
|
| |
| async def _au(r): |
| u = r.headers.get("X-User-ID") |
| t = r.headers.get("Authorization") |
| if t == S_K and u: |
| if await database.check_user(u): return u |
| return None |
|
|
| @app.get("/") |
| def _id(): return HTMLResponse(ui_templates.PANEL_UI) |
|
|
| @app.post("/login_api") |
| async def _lg(request: Request): |
| f = await request.form() |
| u = f.get("username", f.get("user", f.get("u", ""))).upper().strip() |
| p = f.get("password", f.get("pass", f.get("p", ""))).strip() |
| async with aiosqlite.connect(database.D_P) as db: |
| async with db.execute("SELECT p FROM u_tab WHERE u = ?", (u,)) as cur: |
| row = await cur.fetchone() |
| if row and bcrypt.checkpw(p.encode(), row[0].encode()): |
| return JSONResponse({"status": "success", "token": S_K, "user": u}) |
| return JSONResponse({"status": "failed"}, status_code=401) |
|
|
| @app.get("/api/files") |
| async def _fl(r: Request, p: str = ""): |
| u = await _au(r) |
| if not u: return JSONResponse([], 401) |
| w = _gw(u, p) |
| return [{"n": f, "d": os.path.isdir(os.path.join(w, f))} for f in sorted(os.listdir(w))] |
|
|
| @app.get("/api/get") |
| async def _gt(r: Request, n: str): |
| u = await _au(r) |
| if not u: return JSONResponse({"c": ""}, 401) |
| p = _gw(u, n) |
| if os.path.isdir(p) or not os.path.exists(p): return {"c": ""} |
| with open(p, "r", encoding="utf-8", errors="ignore") as f: return {"c": f.read()} |
|
|
| @app.post("/api/save") |
| async def _sv(r: Request, d: dict): |
| u = await _au(r) |
| if not u: return JSONResponse({}, 401) |
| p = _gw(u, d['n']) |
| with open(p, "w", encoding="utf-8") as f: f.write(d['c']) |
|
|
| @app.post("/api/upload") |
| async def _ul(r: Request, f: UploadFile = File(...), p: str = Form("")): |
| u = await _au(r) |
| if not u: return JSONResponse({"s": "fail"}, 401) |
| w = _gw(u, p) |
| d = os.path.join(w, os.path.basename(f.filename)) |
| with open(d, "wb") as o: shutil.copyfileobj(f.file, o) |
| return JSONResponse({"s": "ok"}) |
|
|
| @app.post("/api/unzip") |
| async def _uz(r: Request, d: dict): |
| u = await _au(r) |
| if not u: return JSONResponse({}, 401) |
| w = _gw(u, d.get('p', '')) |
| f = os.path.join(w, d['n']) |
| if os.path.exists(f) and f.endswith(".zip"): |
| try: |
| with zipfile.ZipFile(f, 'r') as z: z.extractall(w) |
| return JSONResponse({"s": "ok"}) |
| except: return JSONResponse({"s": "fail"}, 500) |
|
|
| @app.post("/api/mkdir") |
| async def _md(r: Request, d: dict): |
| u = await _au(r) |
| if not u: return JSONResponse({}, 401) |
| _gw(u, d['p']) |
|
|
| @app.post("/api/mkfile") |
| async def _mf(r: Request, d: dict): |
| u = await _au(r) |
| if not u: return JSONResponse({}, 401) |
| p = _gw(u, d['p']) |
| if not os.path.exists(p): open(p, 'w').close() |
|
|
| @app.post("/api/move") |
| async def _mv(r: Request, d: dict): |
| u = await _au(r) |
| if not u: return JSONResponse({}, 401) |
| w = _gw(u) |
| s, t = os.path.join(w, d['src']), os.path.join(w, d['dst']) |
| if os.path.exists(s): shutil.move(s, t) |
|
|
| @app.delete("/api/delete") |
| async def _dl(r: Request, n: str, p: str = ""): |
| u = await _au(r) |
| if not u: return JSONResponse({}, 401) |
| t = _gw(u, os.path.join(p, n)) |
| if os.path.isdir(t): shutil.rmtree(t) |
| else: os.remove(t) |
|
|
| _fm = {} |
|
|
| @sio.on('connect') |
| async def _cn(sid, env): |
| q = env.get('QUERY_STRING', '') |
| t, u = "", "" |
| for v in q.split('&'): |
| if 'token=' in v: t = v.split('=')[1] |
| if 'user=' in v: u = v.split('=')[1] |
| |
| if t != S_K or not u or not await database.check_user(u): return False |
| (pid, fd) = pty.fork() |
| if pid == 0: |
| w = _gw(u) |
| os.chdir(w) |
| os.execvpe("bash", ["bash"], {**os.environ, "HOME": w, "TERM": "xterm-256color", "USER": u}) |
| _fm[sid] = fd |
| async def _st(): |
| while sid in _fm: |
| await asyncio.sleep(3) |
| await sio.emit('stats', {'cpu': psutil.cpu_percent(), 'ram': psutil.virtual_memory().percent}, room=sid) |
| async def _rd(): |
| while sid in _fm: |
| await asyncio.sleep(0.01) |
| if _fm[sid] and select.select([_fm[sid]], [], [], 0)[0]: |
| await sio.emit('o', os.read(_fm[sid], 2048).decode(errors='ignore'), room=sid) |
| sio.start_background_task(_st) |
| sio.start_background_task(_rd) |
|
|
| @sio.on('i') |
| async def _in(sid, d): |
| if any(c in d.lower() for c in ["node", "npm", "npx", "yarn"]): return |
| if sid in _fm: os.write(_fm[sid], d.encode()) |
|
|
| @sio.on('run') |
| async def _rn(sid, f): |
| if sid in _fm: os.write(_fm[sid], f"python {f}\n".encode()) |
|
|
| @sio.on('stop') |
| async def _sp(sid): |
| if sid in _fm: os.write(_fm[sid], b'\x03') |
|
|
| @sio.on('disconnect') |
| async def _dc(sid): |
| if sid in _fm: |
| try: os.close(_fm[sid]) |
| except: pass |
| del _fm[sid] |
|
|
| _sa = socketio.ASGIApp(sio, app) |
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(_sa, host="0.0.0.0", port=7860, loop="asyncio") |
| |