import subprocess import shlex import os import asyncio from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.staticfiles import StaticFiles from fastapi.responses import FileResponse, JSONResponse from pydantic import BaseModel app = FastAPI(title="PowerShell Terminal") # ─── Blocked dangerous commands ─────────────────────────────────────────────── BLOCKED = [ "rm -rf /", "rm -rf /*", ":(){ :|:& };:", "mkfs", "dd if=/dev/zero", "chmod -R 777 /", "> /dev/sda", "shutdown", "reboot", "halt", "passwd", "su ", "sudo su", "curl | bash", "wget | bash", ] def is_safe(cmd: str) -> bool: lower = cmd.strip().lower() for bad in BLOCKED: if bad in lower: return False return True class CommandRequest(BaseModel): command: str cwd: str = "/app" @app.post("/execute") async def execute(req: CommandRequest): cmd = req.command.strip() cwd = req.cwd if os.path.isdir(req.cwd) else "/app" if not cmd: return JSONResponse({"output": "", "cwd": cwd, "error": False}) # Handle built-in cd if cmd.startswith("cd ") or cmd == "cd": parts = cmd.split(None, 1) target = parts[1] if len(parts) > 1 else os.path.expanduser("~") target = os.path.join(cwd, target) if not os.path.isabs(target) else target target = os.path.normpath(target) if os.path.isdir(target): return JSONResponse({"output": "", "cwd": target, "error": False}) else: return JSONResponse({"output": f"cd: no such file or directory: {target}", "cwd": cwd, "error": True}) if not is_safe(cmd): return JSONResponse({ "output": "🚫 Command blocked for security reasons.", "cwd": cwd, "error": True }) try: proc = await asyncio.create_subprocess_shell( cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env={**os.environ, "TERM": "xterm-256color"}, ) try: stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=15) except asyncio.TimeoutError: proc.kill() return JSONResponse({"output": "⏱ Command timed out (15s limit).", "cwd": cwd, "error": True}) output = stdout.decode("utf-8", errors="replace") err = stderr.decode("utf-8", errors="replace") combined = (output + err).rstrip() return JSONResponse({ "output": combined, "cwd": cwd, "error": proc.returncode != 0 }) except Exception as e: return JSONResponse({"output": f"Error: {str(e)}", "cwd": cwd, "error": True}) # ─── Serve static frontend ───────────────────────────────────────────────────── app.mount("/static", StaticFiles(directory="static"), name="static") @app.get("/") async def root(): return FileResponse("static/index.html")