import os, asyncio, collections, shutil, urllib.request, json, time, re, threading
from pathlib import Path
from fastapi import FastAPI, WebSocket, Form, UploadFile, File, HTTPException, WebSocketDisconnect
from fastapi.responses import HTMLResponse, Response, JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
# ─── CONFIG ──────────────────────────────────────────────────────────────────
app = FastAPI()
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"])
BASE_DIR = os.environ.get("SERVER_DIR", os.path.abspath("/app"))
PLUGINS_DIR = os.path.join(BASE_DIR, "plugins")
PANEL_CFG = os.path.join(BASE_DIR, ".panel_config.json")
mc_process = None
output_history = collections.deque(maxlen=500)
connected_clients: set = set()
server_start_time: float | None = None
# ─── HTML FRONTEND ────────────────────────────────────────────────────────────
HTML_CONTENT = r"""
Orbit Panel
"""
# ─── PATH HELPER ─────────────────────────────────────────────────────────────
def safe_path(p: str) -> str:
clean = (p or "").strip("/").replace("..", "")
full = os.path.abspath(os.path.join(BASE_DIR, clean))
if not full.startswith(os.path.abspath(BASE_DIR)):
raise HTTPException(403, "Access denied")
return full
# ─── MC PROCESS MANAGEMENT ───────────────────────────────────────────────────
async def stream_output(pipe):
while True:
line = await pipe.readline()
if not line:
break
txt = line.decode("utf-8", errors="replace").rstrip()
output_history.append(txt)
dead = set()
for c in connected_clients:
try:
await c.send_text(txt)
except:
dead.add(c)
connected_clients.difference_update(dead)
async def boot_mc():
global mc_process, server_start_time
# Prefer purpur.jar → paper.jar → server.jar
jar = None
for candidate in ("purpur.jar", "paper.jar", "server.jar"):
p = os.path.join(BASE_DIR, candidate)
if os.path.exists(p):
jar = p
break
if not jar:
output_history.append("\x1b[33m[System] No server jar found in /app. Upload one via Files or install via Software tab.\x1b[0m")
return
server_start_time = time.time()
mc_process = await asyncio.create_subprocess_exec(
"java", "-Xmx4G", "-Xms1G", "-Dfile.encoding=UTF-8",
"-XX:+UseG1GC", "-XX:+ParallelRefProcEnabled",
"-jar", jar, "--nogui",
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.STDOUT,
cwd=BASE_DIR
)
asyncio.create_task(stream_output(mc_process.stdout))
await mc_process.wait()
server_start_time = None
output_history.append("[System] Server process exited.")
@app.on_event("startup")
async def on_start():
os.makedirs(PLUGINS_DIR, exist_ok=True)
asyncio.create_task(boot_mc())
# ─── ROUTES ───────────────────────────────────────────────────────────────────
@app.get("/")
def index():
return HTMLResponse(HTML_CONTENT)
@app.websocket("/ws")
async def ws_endpoint(ws: WebSocket):
await ws.accept()
connected_clients.add(ws)
# Replay history
for line in output_history:
try:
await ws.send_text(line)
except:
break
try:
while True:
cmd = await ws.receive_text()
if mc_process and mc_process.stdin and not mc_process.stdin.is_closing():
mc_process.stdin.write((cmd + "\n").encode())
await mc_process.stdin.drain()
except (WebSocketDisconnect, Exception):
connected_clients.discard(ws)
@app.get("/api/console/history")
def console_history():
return list(output_history)
# ─── SERVER STATUS ────────────────────────────────────────────────────────────
@app.get("/api/server/status")
def server_status():
running = mc_process is not None and mc_process.returncode is None
# Uptime
uptime_str = "—"
if server_start_time and running:
secs = int(time.time() - server_start_time)
h, r = divmod(secs, 3600)
m, s = divmod(r, 60)
uptime_str = f"{h}h {m}m {s}s" if h else f"{m}m {s}s"
# Memory (from /proc/meminfo if available)
ram_total, ram_used, ram_pct = 0, 0, 0
try:
with open("/proc/meminfo") as f:
mem = {}
for line in f:
k, v = line.split(":", 1)
mem[k.strip()] = int(v.strip().split()[0])
ram_total = mem.get("MemTotal", 0)
ram_free = mem.get("MemAvailable", mem.get("MemFree", 0))
ram_used = ram_total - ram_free
ram_pct = round(ram_used / ram_total * 100) if ram_total else 0
except:
pass
# CPU (simple /proc/stat delta approximation)
cpu_pct = 0
try:
with open("/proc/stat") as f:
vals = list(map(int, f.readline().split()[1:]))
idle, total = vals[3], sum(vals)
cpu_pct = max(0, round(100 - idle / total * 100)) if total else 0
except:
pass
# Disk
disk_total, disk_used, disk_pct = 0, 0, 0
try:
st = os.statvfs(BASE_DIR)
disk_total = st.f_blocks * st.f_frsize
disk_free = st.f_bfree * st.f_frsize
disk_used = disk_total - disk_free
disk_pct = round(disk_used / disk_total * 100) if disk_total else 0
except:
pass
def mb(b): return f"{b//1048576} MB" if b else "—"
def gb(b): return f"{b/1073741824:.1f} GB" if b else "—"
# Active jar
active_jar = None
for c in ("purpur.jar", "paper.jar", "server.jar"):
if os.path.exists(os.path.join(BASE_DIR, c)):
active_jar = c; break
return {
"running": running,
"uptime": uptime_str,
"cpu_pct": cpu_pct,
"cpu_sub": f"{cpu_pct}% utilization",
"ram_pct": ram_pct,
"ram_sub": f"{mb(ram_used)} / {mb(ram_total)}",
"disk_pct": disk_pct,
"disk_sub": f"{gb(disk_used)} / {gb(disk_total)}",
"tps": "—",
"players": "—",
"mc_version": "1.20.4",
"active_jar": active_jar,
"address": ""
}
@app.post("/api/server/{action}")
async def server_control(action: str):
global mc_process
if action == "stop":
if mc_process and mc_process.returncode is None:
try:
mc_process.stdin.write(b"stop\n")
await mc_process.stdin.drain()
except:
mc_process.terminate()
elif action == "start":
if mc_process is None or mc_process.returncode is not None:
asyncio.create_task(boot_mc())
elif action == "restart":
if mc_process and mc_process.returncode is None:
try:
mc_process.stdin.write(b"stop\n")
await mc_process.stdin.drain()
await asyncio.sleep(3)
except:
pass
asyncio.create_task(boot_mc())
return {"ok": True}
# ─── FILE SYSTEM API ──────────────────────────────────────────────────────────
@app.get("/api/fs/list")
def fs_list(path: str = ""):
target = safe_path(path)
if not os.path.isdir(target):
raise HTTPException(404, "Not a directory")
items = []
for name in os.listdir(target):
fp = os.path.join(target, name)
st = os.stat(fp)
items.append({
"name": name,
"is_dir": os.path.isdir(fp),
"size": st.st_size if not os.path.isdir(fp) else -1,
"mtime": int(st.st_mtime)
})
return sorted(items, key=lambda x: (not x["is_dir"], x["name"].lower()))
@app.get("/api/fs/read")
def fs_read(path: str):
target = safe_path(path)
if not os.path.isfile(target):
raise HTTPException(404, "File not found")
try:
with open(target, "r", encoding="utf-8", errors="replace") as f:
content = f.read()
if path.endswith(".json"):
try:
return json.loads(content)
except:
pass
return Response(content, media_type="text/plain; charset=utf-8")
except:
raise HTTPException(500, "Cannot read file")
@app.get("/api/fs/download")
def fs_download(path: str):
target = safe_path(path)
if not os.path.isfile(target):
raise HTTPException(404, "File not found")
from fastapi.responses import FileResponse
return FileResponse(target, filename=os.path.basename(target))
@app.post("/api/fs/write")
async def fs_write(path: str = Form(...), content: str = Form(...)):
target = safe_path(path)
os.makedirs(os.path.dirname(target), exist_ok=True)
with open(target, "w", encoding="utf-8") as f:
f.write(content)
return {"ok": True}
@app.post("/api/fs/upload")
async def fs_upload(path: str = Form(""), file: UploadFile = File(...)):
target_dir = safe_path(path)
os.makedirs(target_dir, exist_ok=True)
dest = os.path.join(target_dir, file.filename)
with open(dest, "wb") as f:
shutil.copyfileobj(file.file, f)
return {"ok": True}
@app.post("/api/fs/delete")
def fs_delete(path: str = Form(...)):
target = safe_path(path)
if not os.path.exists(target):
raise HTTPException(404, "Not found")
if os.path.isdir(target):
shutil.rmtree(target)
else:
os.remove(target)
return {"ok": True}
@app.post("/api/fs/rename")
def fs_rename(old_path: str = Form(...), new_path: str = Form(...)):
src = safe_path(old_path)
dst = safe_path(new_path)
if not os.path.exists(src):
raise HTTPException(404, "Source not found")
os.makedirs(os.path.dirname(dst), exist_ok=True)
shutil.move(src, dst)
return {"ok": True}
@app.post("/api/fs/create")
def fs_create(path: str = Form(...), is_dir: str = Form("0")):
target = safe_path(path)
if is_dir == "1":
os.makedirs(target, exist_ok=True)
else:
os.makedirs(os.path.dirname(target), exist_ok=True)
if not os.path.exists(target):
open(target, "w").close()
return {"ok": True}
# ─── PLUGIN INSTALLER ─────────────────────────────────────────────────────────
@app.post("/api/plugins/install")
def plugins_install(
url: str = Form(...),
filename: str = Form(...),
project_id: str = Form(...),
version_id: str = Form(...),
name: str = Form(...)
):
dest = os.path.join(PLUGINS_DIR, filename)
try:
req = urllib.request.Request(url, headers={"User-Agent": "OrbitPanel/2.0"})
with urllib.request.urlopen(req, timeout=120) as resp, open(dest, "wb") as f:
shutil.copyfileobj(resp, f)
except Exception as e:
raise HTTPException(500, f"Download failed: {e}")
record_path = os.path.join(PLUGINS_DIR, "plugins.json")
data = {}
if os.path.exists(record_path):
try:
with open(record_path) as f:
data = json.load(f)
except:
pass
data[project_id] = {
"name": name,
"filename": filename,
"version_id": version_id,
"installed_at": time.time()
}
with open(record_path, "w") as f:
json.dump(data, f, indent=2)
return {"ok": True}
# ─── SOFTWARE INSTALLER ───────────────────────────────────────────────────────
@app.post("/api/software/install")
async def software_install(type: str = Form(...), version: str = Form(...)):
"""Download and install a server jar from official sources."""
dest = os.path.join(BASE_DIR, "server.jar")
# Rename existing jar as backup
for candidate in ("purpur.jar", "paper.jar", "server.jar"):
p = os.path.join(BASE_DIR, candidate)
if os.path.exists(p):
shutil.copy2(p, p + ".bak")
try:
dl_url = None
if type == "paper":
# Get latest build for version
builds_url = f"https://api.papermc.io/v2/projects/paper/versions/{version}/builds"
with urllib.request.urlopen(builds_url, timeout=15) as r:
builds_data = json.loads(r.read())
latest_build = builds_data["builds"][-1]["build"]
jar_name = f"paper-{version}-{latest_build}.jar"
dl_url = f"https://api.papermc.io/v2/projects/paper/versions/{version}/builds/{latest_build}/downloads/{jar_name}"
elif type == "purpur":
dl_url = f"https://api.purpurmc.org/v2/purpur/{version}/latest/download"
elif type == "vanilla":
with urllib.request.urlopen("https://launchermeta.mojang.com/mc/game/version_manifest.json", timeout=15) as r:
manifest = json.loads(r.read())
ver_info = next((v for v in manifest["versions"] if v["id"] == version), None)
if not ver_info:
raise HTTPException(404, f"Version {version} not found in manifest")
with urllib.request.urlopen(ver_info["url"], timeout=15) as r:
ver_data = json.loads(r.read())
dl_url = ver_data["downloads"]["server"]["url"]
elif type == "fabric":
# Get latest loader + installer
with urllib.request.urlopen("https://meta.fabricmc.net/v2/versions/loader", timeout=10) as r:
loaders = json.loads(r.read())
with urllib.request.urlopen("https://meta.fabricmc.net/v2/versions/installer", timeout=10) as r:
installers = json.loads(r.read())
loader_ver = loaders[0]["version"]
installer_ver = installers[0]["version"]
dl_url = f"https://meta.fabricmc.net/v2/versions/loader/{version}/{loader_ver}/{installer_ver}/server/jar"
else:
raise HTTPException(400, f"Unsupported type: {type}")
def do_download():
req = urllib.request.Request(dl_url, headers={"User-Agent": "OrbitPanel/2.0"})
with urllib.request.urlopen(req, timeout=300) as resp, open(dest, "wb") as f:
shutil.copyfileobj(resp, f)
# Run blocking download in thread
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, do_download)
output_history.append(f"[System] Installed {type} {version} → server.jar")
return {"ok": True}
except HTTPException:
raise
except Exception as e:
raise HTTPException(500, str(e))
# ─── SETTINGS API ─────────────────────────────────────────────────────────────
def _parse_properties(path: str) -> dict:
props = {}
if not os.path.isfile(path):
return props
with open(path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if line.startswith("#") or "=" not in line:
continue
k, _, v = line.partition("=")
props[k.strip()] = v.strip()
return props
def _write_properties(path: str, props: dict):
lines = [f"# Managed by Orbit Panel\n"]
for k, v in sorted(props.items()):
lines.append(f"{k}={v}\n")
with open(path, "w", encoding="utf-8") as f:
f.writelines(lines)
@app.get("/api/settings/properties")
def get_properties():
path = os.path.join(BASE_DIR, "server.properties")
return _parse_properties(path)
@app.post("/api/settings/properties")
async def save_properties(data: str = Form(...)):
path = os.path.join(BASE_DIR, "server.properties")
try:
props = json.loads(data)
_write_properties(path, props)
return {"ok": True}
except Exception as e:
raise HTTPException(500, str(e))
@app.get("/api/settings/panel")
def get_panel_config():
if os.path.isfile(PANEL_CFG):
try:
with open(PANEL_CFG) as f:
return json.load(f)
except:
pass
return {"accentColor": "#00ff88", "bgColor": "#0a0a0a", "textSize": "14", "serverAddress": ""}
@app.post("/api/settings/panel")
async def save_panel_config(data: str = Form(...)):
try:
cfg = json.loads(data)
with open(PANEL_CFG, "w") as f:
json.dump(cfg, f, indent=2)
return {"ok": True}
except Exception as e:
raise HTTPException(500, str(e))
# ─── ENTRY POINT ─────────────────────────────────────────────────────────────
if __name__ == "__main__":
uvicorn.run(
app,
host="0.0.0.0",
port=int(os.environ.get("PORT", 7860)),
log_level="warning"
)