mcp-server / app.py
leinier310's picture
Create app.py
fa353c8 verified
raw
history blame
2.77 kB
import os
import threading
import time
import subprocess
from pathlib import Path
from typing import List
from fastapi import FastAPI, HTTPException, Query
from pydantic import BaseModel
BASE_DIR = Path("./public_files").resolve()
BASE_DIR.mkdir(parents=True, exist_ok=True)
app = FastAPI(title="Minimal MCP File Manager")
class ModifyRequest(BaseModel):
filename: str
mode: str # 'replace' | 'line_edit'
content: str
start_line: int = None
end_line: int = None
class ExecRequest(BaseModel):
command: List[str]
# ----------------- Endpoints -----------------
@app.post("/modify")
def modify_file(req: ModifyRequest):
path = BASE_DIR / req.filename
if not path.exists() or path.is_dir():
raise HTTPException(status_code=404, detail="Archivo no encontrado o es un directorio")
if req.mode == "replace":
path.write_text(req.content)
return {"status": "replaced", "path": str(path)}
elif req.mode == "line_edit":
if req.start_line is None or req.end_line is None:
raise HTTPException(status_code=400, detail="start_line y end_line requeridos para line_edit")
lines = path.read_text().splitlines()
s = max(1, req.start_line) - 1
e = max(0, req.end_line)
new_lines = lines[:s] + req.content.splitlines() + lines[e:]
path.write_text("\n".join(new_lines) + ("\n" if req.content.endswith("\n") else ""))
return {"status": "line_edited", "path": str(path)}
else:
raise HTTPException(status_code=400, detail="mode desconocido")
@app.post("/exec")
def exec_command(req: ExecRequest):
if not req.command:
raise HTTPException(status_code=400, detail="Comando vacío")
try:
res = subprocess.run(req.command, cwd=str(BASE_DIR), capture_output=True, timeout=15)
return {
"returncode": res.returncode,
"stdout": res.stdout.decode(errors="ignore"),
"stderr": res.stderr.decode(errors="ignore"),
}
except subprocess.TimeoutExpired:
raise HTTPException(status_code=500, detail="Execution timeout")
@app.get("/disk_usage")
def disk_usage():
import shutil
usage = shutil.disk_usage(str(BASE_DIR))
total_size = sum(f.stat().st_size for f in BASE_DIR.rglob('*') if f.is_file())
return {
"fs_total": usage.total,
"fs_used": usage.used,
"fs_free": usage.free,
"dir_size": total_size,
}
# ----------------- Keep-alive -----------------
def keep_alive_thread():
import requests
while True:
try:
requests.get("http://127.0.0.1:7860/disk_usage", timeout=2)
except:
pass
time.sleep(300)
threading.Thread(target=keep_alive_thread, daemon=True).start()