Spaces:
Running
Running
File size: 6,740 Bytes
184b62a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
import subprocess
import sys
import time
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional
import requests
BASE_DIR = Path(__file__).resolve().parents[1]
LOGS_DIR = BASE_DIR / "logs"
LOGS_DIR.mkdir(parents=True, exist_ok=True)
@dataclass
class MCPConfig:
name: str
command: List[str]
url: str
health_path: str = "/health"
start_timeout: float = 20.0 # seconds
class MCPManager:
"""
Manages lifecycle of local MCP servers for MVP Agent.
Responsibilities:
- Start required MCP servers as subprocesses.
- Wait for them to become healthy via HTTP checks.
- Provide structured errors on failure.
- Terminate all subprocesses on shutdown.
Designed for:
- Local dev: single `python app.py` starts everything.
- Hugging Face Spaces: single-process entrypoint spawning child MCP servers.
"""
def __init__(self) -> None:
python_exe = sys.executable or "python"
self.configs: List[MCPConfig] = [
MCPConfig(
name="file-manager-mcp",
command=[python_exe, "-u", "tools/file_manager_mcp/run.py"],
url="http://127.0.0.1:8081",
),
MCPConfig(
name="google-search-mcp",
command=[python_exe, "-u", "tools/google_search_mcp/run.py"],
url="http://127.0.0.1:8082",
),
MCPConfig(
name="markdownify-mcp",
command=[python_exe, "-u", "tools/markdownify_mcp/run.py"],
url="http://127.0.0.1:8083",
),
]
# name -> subprocess.Popen
self.procs: Dict[str, subprocess.Popen] = {}
self.started: bool = False
def start_all(self) -> None:
"""
Start all MCP servers and wait for them to become healthy.
Raises:
RuntimeError if any server fails to start or become healthy.
"""
if self.started:
return
errors: List[str] = []
for cfg in self.configs:
try:
self._start_one(cfg)
except Exception as e:
errors.append(f"{cfg.name} failed to start: {e}")
# If any failed at spawn time, bail out immediately
if errors:
self.stop_all()
raise RuntimeError("; ".join(errors))
# Wait for health for each
for cfg in self.configs:
ok, msg = self._wait_healthy(cfg)
if not ok:
errors.append(f"{cfg.name} unhealthy: {msg}")
if errors:
self.stop_all()
raise RuntimeError("; ".join(errors))
self.started = True
def _start_one(self, cfg: MCPConfig) -> None:
if cfg.name in self.procs and self.procs[cfg.name].poll() is None:
# Already running
return
log_file = (LOGS_DIR / f"{cfg.name}.log").open("ab", buffering=0)
# Environment: inherit, but ensure we are in project root
env = os.environ.copy()
# Start subprocess in BASE_DIR so relative paths in run.py work
proc = subprocess.Popen(
cfg.command,
cwd=str(BASE_DIR),
stdout=log_file,
stderr=subprocess.STDOUT,
)
self.procs[cfg.name] = proc
def _wait_healthy(self, cfg: MCPConfig) -> (bool, str):
"""
Poll server until healthy or timeout.
"""
deadline = time.time() + cfg.start_timeout
health_url = cfg.url.rstrip("/") + cfg.health_path
# If /health 404s, we fallback to root just to confirm it's listening.
tried_root = False
while time.time() < deadline:
proc = self.procs.get(cfg.name)
if proc is None or proc.poll() is not None:
# Process exited - read last lines of log for debugging
log_path = LOGS_DIR / f"{cfg.name}.log"
error_details = self._read_log_tail(log_path, lines=10)
return False, f"process exited during startup. Check {log_path}\nLast log lines:\n{error_details}"
try:
resp = requests.get(health_url, timeout=1)
if resp.status_code == 200:
return True, "ok"
# If /health not found, fall back once to /
if resp.status_code == 404 and not tried_root:
tried_root = True
root_url = cfg.url
try:
root_resp = requests.get(root_url, timeout=1)
if root_resp.status_code in (200, 404):
# Listening; consider healthy for our purposes
return True, "ok (no /health, but port open)"
except Exception:
pass
except Exception:
# Not ready yet
time.sleep(0.5)
continue
# Timeout - include log tail for debugging
log_path = LOGS_DIR / f"{cfg.name}.log"
error_details = self._read_log_tail(log_path, lines=10)
return False, f"timeout after {cfg.start_timeout}s waiting for {health_url}. Check {log_path}\nLast log lines:\n{error_details}"
def _read_log_tail(self, log_path: Path, lines: int = 10) -> str:
"""
Read last N lines of log file for error reporting.
"""
try:
if not log_path.exists():
return "(log file not found)"
with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
all_lines = f.readlines()
tail_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
return ''.join(tail_lines).strip() or "(empty log)"
except Exception as e:
return f"(could not read log: {e})"
def stop_all(self) -> None:
"""
Terminate all MCP subprocesses gracefully.
"""
for name, proc in list(self.procs.items()):
try:
if proc.poll() is None:
proc.terminate()
except Exception:
pass
# Give them a moment to exit, then kill if needed
deadline = time.time() + 5
for name, proc in list(self.procs.items()):
if proc.poll() is None and time.time() < deadline:
time.sleep(0.2)
for name, proc in list(self.procs.items()):
try:
if proc.poll() is None:
proc.kill()
except Exception:
pass
self.procs.clear()
self.started = False
|