Spaces:
Sleeping
Sleeping
| import os | |
| import threading | |
| import time | |
| import subprocess | |
| from pathlib import Path | |
| from typing import List | |
| from fastapi import FastAPI, HTTPException, Query | |
| from pydantic import BaseModel | |
| BASE_DIR = Path("./public_files").resolve() | |
| BASE_DIR.mkdir(parents=True, exist_ok=True) | |
| app = FastAPI(title="Minimal MCP File Manager") | |
| class ModifyRequest(BaseModel): | |
| filename: str | |
| mode: str # 'replace' | 'line_edit' | |
| content: str | |
| start_line: int = None | |
| end_line: int = None | |
| class ExecRequest(BaseModel): | |
| command: List[str] | |
| # ----------------- Endpoints ----------------- | |
| def modify_file(req: ModifyRequest): | |
| path = BASE_DIR / req.filename | |
| if not path.exists() or path.is_dir(): | |
| raise HTTPException(status_code=404, detail="Archivo no encontrado o es un directorio") | |
| if req.mode == "replace": | |
| path.write_text(req.content) | |
| return {"status": "replaced", "path": str(path)} | |
| elif req.mode == "line_edit": | |
| if req.start_line is None or req.end_line is None: | |
| raise HTTPException(status_code=400, detail="start_line y end_line requeridos para line_edit") | |
| lines = path.read_text().splitlines() | |
| s = max(1, req.start_line) - 1 | |
| e = max(0, req.end_line) | |
| new_lines = lines[:s] + req.content.splitlines() + lines[e:] | |
| path.write_text("\n".join(new_lines) + ("\n" if req.content.endswith("\n") else "")) | |
| return {"status": "line_edited", "path": str(path)} | |
| else: | |
| raise HTTPException(status_code=400, detail="mode desconocido") | |
| def exec_command(req: ExecRequest): | |
| if not req.command: | |
| raise HTTPException(status_code=400, detail="Comando vacío") | |
| try: | |
| res = subprocess.run(req.command, cwd=str(BASE_DIR), capture_output=True, timeout=15) | |
| return { | |
| "returncode": res.returncode, | |
| "stdout": res.stdout.decode(errors="ignore"), | |
| "stderr": res.stderr.decode(errors="ignore"), | |
| } | |
| except subprocess.TimeoutExpired: | |
| raise HTTPException(status_code=500, detail="Execution timeout") | |
| def disk_usage(): | |
| import shutil | |
| usage = shutil.disk_usage(str(BASE_DIR)) | |
| total_size = sum(f.stat().st_size for f in BASE_DIR.rglob('*') if f.is_file()) | |
| return { | |
| "fs_total": usage.total, | |
| "fs_used": usage.used, | |
| "fs_free": usage.free, | |
| "dir_size": total_size, | |
| } | |
| # ----------------- Keep-alive ----------------- | |
| def keep_alive_thread(): | |
| import requests | |
| while True: | |
| try: | |
| requests.get("http://127.0.0.1:7860/disk_usage", timeout=2) | |
| except: | |
| pass | |
| time.sleep(300) | |
| threading.Thread(target=keep_alive_thread, daemon=True).start() |