File size: 3,419 Bytes
fa353c8
d28fe3e
fa353c8
 
d28fe3e
 
81dba3c
fa353c8
d28fe3e
fa353c8
 
d28fe3e
fa353c8
d28fe3e
 
 
 
 
 
fa353c8
d28fe3e
 
 
 
 
 
fa353c8
d28fe3e
 
 
fa353c8
d28fe3e
 
 
 
 
 
 
 
 
fa353c8
d28fe3e
fa353c8
d28fe3e
 
 
 
 
 
 
fa353c8
d28fe3e
 
fa353c8
d28fe3e
 
 
fa353c8
d28fe3e
 
fa353c8
 
d28fe3e
fa353c8
d28fe3e
 
 
 
fa353c8
 
 
 
 
 
 
d28fe3e
 
 
 
 
 
3c4cb20
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import os
import shutil
import subprocess
from pathlib import Path
from typing import List, Optional, Dict, Any
from mcp.server.fastmcp import FastMCP
import uvicorn

BASE_DIR = Path(os.getenv("BASE_DIR", "./")).resolve()
BASE_DIR.mkdir(parents=True, exist_ok=True)

mcp = FastMCP(name="HF-MCP-Server", stateless_http=True, json_response=True)

# Helper
def _safe_path(rel_path: str) -> Path:
    candidate = (BASE_DIR / rel_path).resolve()
    if not str(candidate).startswith(str(BASE_DIR)):
        raise ValueError("Path escapes BASE_DIR")
    return candidate

# Tools
@mcp.tool(name="modify_file", description="Modify or edit files")
def modify_file(filename: str, mode: str = "replace", content: str = "", start_line: Optional[int] = None, end_line: Optional[int] = None) -> Dict[str, Any]:
    p = _safe_path(filename)
    if not p.exists() or p.is_dir():
        return {"status": "error", "error": "file not found or is a directory"}

    if mode == "replace":
        p.write_text(content)
        return {"status": "ok", "action": "replaced", "path": str(p.relative_to(BASE_DIR))}

    if mode == "line_edit":
        if start_line is None or end_line is None:
            return {"status": "error", "error": "start_line and end_line required"}
        lines = p.read_text().splitlines()
        s = max(1, start_line) - 1
        e = max(0, end_line)
        new_lines = lines[:s] + content.splitlines() + lines[e:]
        p.write_text("\n".join(new_lines) + ("\n" if content.endswith("\n") else ""))
        return {"status": "ok", "action": "line_edit", "path": str(p.relative_to(BASE_DIR))}

    return {"status": "error", "error": "unknown mode"}

@mcp.tool(name="exec_command", description="Execute shell commands in BASE_DIR")
def exec_command(command: List[str]) -> Dict[str, Any]:
    allow = os.getenv("ALLOW_UNRESTRICTED_EXEC", "0") == "1"
    if not allow:
        return {"status": "forbidden", "error": "exec disabled. Set ALLOW_UNRESTRICTED_EXEC=1 to enable"}
    if not command:
        return {"status": "error", "error": "empty command"}
    try:
        proc = subprocess.run(command, cwd=str(BASE_DIR), capture_output=True, text=True, timeout=int(os.getenv("EXEC_TIMEOUT", "30")))
        return {"status": "ok", "returncode": proc.returncode, "stdout": proc.stdout, "stderr": proc.stderr}
    except subprocess.TimeoutExpired:
        return {"status": "error", "error": "timeout"}
    except Exception as e:
        return {"status": "error", "error": str(e)}

@mcp.tool(name="disk_usage", description="Report disk usage for BASE_DIR")
def disk_usage() -> Dict[str, int]:
    usage = shutil.disk_usage(str(BASE_DIR))
    total_size = sum(f.stat().st_size for f in BASE_DIR.rglob('*') if f.is_file())
    return {"fs_total": usage.total, "fs_used": usage.used, "fs_free": usage.free, "dir_size": total_size}

# Keep-alive
import threading, requests, time

def keep_alive():
    while True:
        try:
            requests.get("http://127.0.0.1:7860/disk_usage", timeout=2)
        except:
            pass
        time.sleep(300)

threading.Thread(target=keep_alive, daemon=True).start()

# Run server
if __name__ == "__main__":
    port = int(os.getenv("PORT", "7860"))
    print(f"Starting MCP server on 0.0.0.0:{port} -- BASE_DIR={BASE_DIR}")
    #mcp.run(transport="streamable-http")
    app = mcp.streamable_http_app()  # ASGI app
    uvicorn.run(app, host="0.0.0.0", port=7860)