import os import shutil import subprocess from pathlib import Path from typing import List, Optional, Dict, Any from mcp.server.fastmcp import FastMCP import uvicorn BASE_DIR = Path(os.getenv("BASE_DIR", "./")).resolve() BASE_DIR.mkdir(parents=True, exist_ok=True) mcp = FastMCP(name="HF-MCP-Server", stateless_http=True, json_response=True) # Helper def _safe_path(rel_path: str) -> Path: candidate = (BASE_DIR / rel_path).resolve() if not str(candidate).startswith(str(BASE_DIR)): raise ValueError("Path escapes BASE_DIR") return candidate # Tools @mcp.tool(name="modify_file", description="Modify or edit files") def modify_file(filename: str, mode: str = "replace", content: str = "", start_line: Optional[int] = None, end_line: Optional[int] = None) -> Dict[str, Any]: p = _safe_path(filename) if not p.exists() or p.is_dir(): return {"status": "error", "error": "file not found or is a directory"} if mode == "replace": p.write_text(content) return {"status": "ok", "action": "replaced", "path": str(p.relative_to(BASE_DIR))} if mode == "line_edit": if start_line is None or end_line is None: return {"status": "error", "error": "start_line and end_line required"} lines = p.read_text().splitlines() s = max(1, start_line) - 1 e = max(0, end_line) new_lines = lines[:s] + content.splitlines() + lines[e:] p.write_text("\n".join(new_lines) + ("\n" if content.endswith("\n") else "")) return {"status": "ok", "action": "line_edit", "path": str(p.relative_to(BASE_DIR))} return {"status": "error", "error": "unknown mode"} @mcp.tool(name="exec_command", description="Execute shell commands in BASE_DIR") def exec_command(command: List[str]) -> Dict[str, Any]: allow = os.getenv("ALLOW_UNRESTRICTED_EXEC", "0") == "1" if not allow: return {"status": "forbidden", "error": "exec disabled. Set ALLOW_UNRESTRICTED_EXEC=1 to enable"} if not command: return {"status": "error", "error": "empty command"} try: proc = subprocess.run(command, cwd=str(BASE_DIR), capture_output=True, text=True, timeout=int(os.getenv("EXEC_TIMEOUT", "30"))) return {"status": "ok", "returncode": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr} except subprocess.TimeoutExpired: return {"status": "error", "error": "timeout"} except Exception as e: return {"status": "error", "error": str(e)} @mcp.tool(name="disk_usage", description="Report disk usage for BASE_DIR") def disk_usage() -> Dict[str, int]: usage = shutil.disk_usage(str(BASE_DIR)) total_size = sum(f.stat().st_size for f in BASE_DIR.rglob('*') if f.is_file()) return {"fs_total": usage.total, "fs_used": usage.used, "fs_free": usage.free, "dir_size": total_size} # Keep-alive import threading, requests, time def keep_alive(): while True: try: requests.get("http://127.0.0.1:7860/disk_usage", timeout=2) except: pass time.sleep(300) threading.Thread(target=keep_alive, daemon=True).start() # Run server if __name__ == "__main__": port = int(os.getenv("PORT", "7860")) print(f"Starting MCP server on 0.0.0.0:{port} -- BASE_DIR={BASE_DIR}") #mcp.run(transport="streamable-http") app = mcp.streamable_http_app() # ASGI app uvicorn.run(app, host="0.0.0.0", port=7860)