panelx / main.py
Aqso's picture
Update main.py
f1cb440 verified
import os, pty, select, socket, asyncio, shutil, zipfile, bcrypt, aiosqlite, telebot, psutil, sys, threading, base64
from fastapi import FastAPI, UploadFile, File, Request, Response, Form
from fastapi.responses import HTMLResponse, JSONResponse
import socketio
import database, ui_templates
K_I = base64.b64decode("R1VTSU9OTFlfUFJPXzIwMjY=").decode()
C_V = os.getenv("AQSO_CORE_KEY", "").strip()
B_T = os.getenv("BOT_TOKEN")
S_K = os.getenv("PANEL_SECRET")
A_T = os.getenv("ADMIN_TELE_ID")
if C_V != K_I: sys.exit(1)
if not all([B_T, S_K, A_T]): sys.exit(1)
app = FastAPI()
sio = socketio.AsyncServer(async_mode='asgi', cors_allowed_origins="*")
bot = telebot.TeleBot(B_T)
W_R = os.path.abspath(os.path.join(os.getcwd(), "data"))
A_I = int(A_T)
# --- BOT HANDLERS ---
@bot.message_handler(commands=['start'])
def _s(m):
l = "⚡ **AQSO CORE v18** ⚡\n\n├ `/daftar [user] [pass]`\n├ `/hapus [user]`\n└ `/list`"
bot.reply_to(m, l, parse_mode="Markdown")
@bot.message_handler(commands=['daftar'])
def _r(m):
async def _do():
try:
a = m.text.split()
if len(a) < 3: return
if await database.add_user(a[1], a[2]): bot.reply_to(m, f"✅ {a[1].upper()} ADDED.")
except: pass
asyncio.run(_do())
@bot.message_handler(commands=['hapus'])
def _d(m):
if m.from_user.id != A_I: return
async def _do():
try:
u = m.text.split()[1]
await database.del_user(u)
# Hapus juga foldernya biar bersih
p = os.path.join(W_R, u.upper())
if os.path.exists(p): shutil.rmtree(p)
bot.reply_to(m, f"🗑️ {u.upper()} PURGED.")
except: pass
asyncio.run(_do())
@bot.message_handler(commands=['list'])
def _l(m):
if m.from_user.id != A_I: return
async def _do():
u = await database.list_users()
bot.reply_to(m, "👥 **USERS:**\n" + "\n".join(u) if u else "EMPTY")
asyncio.run(_do())
def _bt():
try:
bot.remove_webhook()
bot.infinity_polling(timeout=20)
except: pass
@app.on_event("startup")
async def _os():
if not os.path.exists(W_R): os.makedirs(W_R, exist_ok=True)
await database.init_db()
threading.Thread(target=_bt, daemon=True).start()
def _gw(u, sub=""):
r = os.path.abspath(os.path.join(W_R, u.upper() if u else "GUEST"))
p = os.path.abspath(os.path.join(r, sub.lstrip("/")))
if not p.startswith(r): p = r
if not os.path.exists(p): os.makedirs(p, exist_ok=True)
return p
# VERIFIKASI USER TIAP DETIK (HARD-AUTH)
async def _au(r):
u = r.headers.get("X-User-ID")
t = r.headers.get("Authorization")
if t == S_K and u:
if await database.check_user(u): return u
return None
@app.get("/")
def _id(): return HTMLResponse(ui_templates.PANEL_UI)
@app.post("/login_api")
async def _lg(request: Request):
f = await request.form()
u = f.get("username", f.get("user", f.get("u", ""))).upper().strip()
p = f.get("password", f.get("pass", f.get("p", ""))).strip()
async with aiosqlite.connect(database.D_P) as db:
async with db.execute("SELECT p FROM u_tab WHERE u = ?", (u,)) as cur:
row = await cur.fetchone()
if row and bcrypt.checkpw(p.encode(), row[0].encode()):
return JSONResponse({"status": "success", "token": S_K, "user": u})
return JSONResponse({"status": "failed"}, status_code=401)
@app.get("/api/files")
async def _fl(r: Request, p: str = ""):
u = await _au(r)
if not u: return JSONResponse([], 401)
w = _gw(u, p)
return [{"n": f, "d": os.path.isdir(os.path.join(w, f))} for f in sorted(os.listdir(w))]
@app.get("/api/get")
async def _gt(r: Request, n: str):
u = await _au(r)
if not u: return JSONResponse({"c": ""}, 401)
p = _gw(u, n)
if os.path.isdir(p) or not os.path.exists(p): return {"c": ""}
with open(p, "r", encoding="utf-8", errors="ignore") as f: return {"c": f.read()}
@app.post("/api/save")
async def _sv(r: Request, d: dict):
u = await _au(r)
if not u: return JSONResponse({}, 401)
p = _gw(u, d['n'])
with open(p, "w", encoding="utf-8") as f: f.write(d['c'])
@app.post("/api/upload")
async def _ul(r: Request, f: UploadFile = File(...), p: str = Form("")):
u = await _au(r)
if not u: return JSONResponse({"s": "fail"}, 401)
w = _gw(u, p)
d = os.path.join(w, os.path.basename(f.filename))
with open(d, "wb") as o: shutil.copyfileobj(f.file, o)
return JSONResponse({"s": "ok"})
@app.post("/api/unzip")
async def _uz(r: Request, d: dict):
u = await _au(r)
if not u: return JSONResponse({}, 401)
w = _gw(u, d.get('p', ''))
f = os.path.join(w, d['n'])
if os.path.exists(f) and f.endswith(".zip"):
try:
with zipfile.ZipFile(f, 'r') as z: z.extractall(w)
return JSONResponse({"s": "ok"})
except: return JSONResponse({"s": "fail"}, 500)
@app.post("/api/mkdir")
async def _md(r: Request, d: dict):
u = await _au(r)
if not u: return JSONResponse({}, 401)
_gw(u, d['p'])
@app.post("/api/mkfile")
async def _mf(r: Request, d: dict):
u = await _au(r)
if not u: return JSONResponse({}, 401)
p = _gw(u, d['p'])
if not os.path.exists(p): open(p, 'w').close()
@app.post("/api/move")
async def _mv(r: Request, d: dict):
u = await _au(r)
if not u: return JSONResponse({}, 401)
w = _gw(u)
s, t = os.path.join(w, d['src']), os.path.join(w, d['dst'])
if os.path.exists(s): shutil.move(s, t)
@app.delete("/api/delete")
async def _dl(r: Request, n: str, p: str = ""):
u = await _au(r)
if not u: return JSONResponse({}, 401)
t = _gw(u, os.path.join(p, n))
if os.path.isdir(t): shutil.rmtree(t)
else: os.remove(t)
_fm = {}
@sio.on('connect')
async def _cn(sid, env):
q = env.get('QUERY_STRING', '')
t, u = "", ""
for v in q.split('&'):
if 'token=' in v: t = v.split('=')[1]
if 'user=' in v: u = v.split('=')[1]
# Check DB juga di Socket
if t != S_K or not u or not await database.check_user(u): return False
(pid, fd) = pty.fork()
if pid == 0:
w = _gw(u)
os.chdir(w)
os.execvpe("bash", ["bash"], {**os.environ, "HOME": w, "TERM": "xterm-256color", "USER": u})
_fm[sid] = fd
async def _st():
while sid in _fm:
await asyncio.sleep(3)
await sio.emit('stats', {'cpu': psutil.cpu_percent(), 'ram': psutil.virtual_memory().percent}, room=sid)
async def _rd():
while sid in _fm:
await asyncio.sleep(0.01)
if _fm[sid] and select.select([_fm[sid]], [], [], 0)[0]:
await sio.emit('o', os.read(_fm[sid], 2048).decode(errors='ignore'), room=sid)
sio.start_background_task(_st)
sio.start_background_task(_rd)
@sio.on('i')
async def _in(sid, d):
if any(c in d.lower() for c in ["node", "npm", "npx", "yarn"]): return
if sid in _fm: os.write(_fm[sid], d.encode())
@sio.on('run')
async def _rn(sid, f):
if sid in _fm: os.write(_fm[sid], f"python {f}\n".encode())
@sio.on('stop')
async def _sp(sid):
if sid in _fm: os.write(_fm[sid], b'\x03')
@sio.on('disconnect')
async def _dc(sid):
if sid in _fm:
try: os.close(_fm[sid])
except: pass
del _fm[sid]
_sa = socketio.ASGIApp(sio, app)
if __name__ == "__main__":
import uvicorn
uvicorn.run(_sa, host="0.0.0.0", port=7860, loop="asyncio")