Spaces:
Sleeping
Sleeping
File size: 4,724 Bytes
86c7dfb ac2e4c6 8d2b6d7 4fd9703 5aa9d54 12577a4 b1d8e61 86c7dfb 12577a4 86c7dfb 12577a4 6fb5b74 b1d8e61 ac2e4c6 8d2b6d7 12577a4 8d2b6d7 12577a4 8d2b6d7 12577a4 8d2b6d7 12577a4 5fc6cc4 8d2b6d7 b1d8e61 5aa9d54 12577a4 5aa9d54 12577a4 5aa9d54 86c7dfb 12577a4 86c7dfb 4fd9703 12577a4 4fd9703 12577a4 4fd9703 12577a4 4fd9703 12577a4 4fd9703 5aa9d54 4fd9703 12577a4 d158719 ac2e4c6 86c7dfb d158719 8d2b6d7 5fc6cc4 4fd9703 8d2b6d7 4fd9703 86c7dfb 8d2b6d7 86c7dfb 12577a4 86c7dfb 12577a4 86c7dfb 12577a4 86c7dfb 8d2b6d7 86c7dfb 8d2b6d7 12577a4 ac2e4c6 86c7dfb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 | import os
import subprocess
import asyncio
import psutil
import httpx
import shutil
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, UploadFile, File, Body
from fastapi.responses import HTMLResponse, JSONResponse, FileResponse
app = FastAPI()
# Environment variable থেকে পাসওয়ার্ড নেওয়া
ACCESS_PASSWORD = os.getenv("ACCESS_PASSWORD", "admin")
# সিস্টেম রুট থেকে শুরু করার অনুমতি দেওয়া হয়েছে
current_working_dir = "/"
@app.get("/")
async def get():
with open("index.html", "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
# --- ফাইল ম্যানেজার API (Root Access Enabled) ---
@app.get("/api/files")
async def list_files(path: str = "/"):
try:
# পাথ যদি খালি থাকে তবে রুটে নিয়ে যাবে
target_path = path if path.startswith("/") else "/"
files = []
for entry in os.scandir(target_path):
try:
files.append({
"name": entry.name,
"is_dir": entry.is_dir(),
"path": os.path.abspath(entry.path),
"size": f"{entry.stat().st_size / 1024:.1f} KB" if entry.is_file() else "-"
})
except PermissionError:
continue # পারমিশন নেই এমন ফাইল স্কিপ করবে
return sorted(files, key=lambda x: not x['is_dir'])
except Exception as e:
return JSONResponse(status_code=500, content={"error": str(e)})
@app.get("/api/read")
async def read_file(path: str):
try:
with open(path, "r", encoding="utf-8") as f:
return {"content": f.read()}
except Exception as e:
return {"content": f"Error: {str(e)}"}
@app.post("/api/save")
async def save_file(path: str = Body(...), content: str = Body(...)):
with open(path, "w", encoding="utf-8") as f:
f.write(content)
return {"status": "saved"}
@app.post("/api/create")
async def create_item(name: str = Body(...), is_dir: bool = Body(...), path: str = Body(...)):
target = os.path.join(path, name)
if is_dir:
os.makedirs(target, exist_ok=True)
else:
with open(target, "w") as f: f.write("")
return {"status": "created"}
@app.post("/api/upload")
async def upload_file(file: UploadFile = File(...), path: str = "/"):
target_path = os.path.join(path, file.filename)
with open(target_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {"message": "Uploaded"}
@app.get("/api/download")
async def download_file(path: str):
return FileResponse(path)
@app.delete("/api/delete")
async def delete_file(path: str):
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
return {"message": "Deleted"}
# --- প্রক্সি ---
@app.get("/proxy")
async def proxy(url: str = "http://localhost:8080"):
async with httpx.AsyncClient() as client:
try:
response = await client.get(url, timeout=5.0)
return HTMLResponse(content=response.text)
except Exception as e:
return HTMLResponse(content=f"<div style='color:red;padding:20px;'>Error: {str(e)}</div>")
# --- টার্মিনাল সেশন ---
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
global current_working_dir
await websocket.accept()
try:
auth = await websocket.receive_text()
if auth != ACCESS_PASSWORD:
await websocket.send_text("AUTH_FAILED")
await websocket.close()
return
await websocket.send_text("AUTH_SUCCESS")
while True:
command = await websocket.receive_text()
if command.startswith("cd "):
new_p = command[3:].strip()
try:
os.chdir(os.path.expanduser(new_p))
current_working_dir = os.getcwd()
await websocket.send_text(f"CWD: {current_working_dir}")
except Exception as e:
await websocket.send_text(str(e))
continue
process = await asyncio.create_subprocess_shell(
command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=current_working_dir
)
stdout, stderr = await process.communicate()
output = stdout.decode().strip() or stderr.decode().strip() or "Done."
await websocket.send_text(output)
except WebSocketDisconnect:
pass
|