Spaces:
Running
Running
| import subprocess | |
| import sys | |
| import time | |
| import os | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| import requests | |
| BASE_DIR = Path(__file__).resolve().parents[1] | |
| LOGS_DIR = BASE_DIR / "logs" | |
| LOGS_DIR.mkdir(parents=True, exist_ok=True) | |
| class MCPConfig: | |
| name: str | |
| command: List[str] | |
| url: str | |
| health_path: str = "/health" | |
| start_timeout: float = 20.0 # seconds | |
| class MCPManager: | |
| """ | |
| Manages lifecycle of local MCP servers for MVP Agent. | |
| Responsibilities: | |
| - Start required MCP servers as subprocesses. | |
| - Wait for them to become healthy via HTTP checks. | |
| - Provide structured errors on failure. | |
| - Terminate all subprocesses on shutdown. | |
| Designed for: | |
| - Local dev: single `python app.py` starts everything. | |
| - Hugging Face Spaces: single-process entrypoint spawning child MCP servers. | |
| """ | |
| def __init__(self) -> None: | |
| python_exe = sys.executable or "python" | |
| self.configs: List[MCPConfig] = [ | |
| MCPConfig( | |
| name="file-manager-mcp", | |
| command=[python_exe, "-u", "tools/file_manager_mcp/run.py"], | |
| url="http://127.0.0.1:8081", | |
| ), | |
| MCPConfig( | |
| name="google-search-mcp", | |
| command=[python_exe, "-u", "tools/google_search_mcp/run.py"], | |
| url="http://127.0.0.1:8082", | |
| ), | |
| MCPConfig( | |
| name="markdownify-mcp", | |
| command=[python_exe, "-u", "tools/markdownify_mcp/run.py"], | |
| url="http://127.0.0.1:8083", | |
| ), | |
| ] | |
| # name -> subprocess.Popen | |
| self.procs: Dict[str, subprocess.Popen] = {} | |
| self.started: bool = False | |
| def start_all(self) -> None: | |
| """ | |
| Start all MCP servers and wait for them to become healthy. | |
| Raises: | |
| RuntimeError if any server fails to start or become healthy. | |
| """ | |
| if self.started: | |
| return | |
| errors: List[str] = [] | |
| for cfg in self.configs: | |
| try: | |
| self._start_one(cfg) | |
| except Exception as e: | |
| errors.append(f"{cfg.name} failed to start: {e}") | |
| # If any failed at spawn time, bail out immediately | |
| if errors: | |
| self.stop_all() | |
| raise RuntimeError("; ".join(errors)) | |
| # Wait for health for each | |
| for cfg in self.configs: | |
| ok, msg = self._wait_healthy(cfg) | |
| if not ok: | |
| errors.append(f"{cfg.name} unhealthy: {msg}") | |
| if errors: | |
| self.stop_all() | |
| raise RuntimeError("; ".join(errors)) | |
| self.started = True | |
| def _start_one(self, cfg: MCPConfig) -> None: | |
| if cfg.name in self.procs and self.procs[cfg.name].poll() is None: | |
| # Already running | |
| return | |
| log_file = (LOGS_DIR / f"{cfg.name}.log").open("ab", buffering=0) | |
| # Environment: inherit, but ensure we are in project root | |
| env = os.environ.copy() | |
| # Start subprocess in BASE_DIR so relative paths in run.py work | |
| proc = subprocess.Popen( | |
| cfg.command, | |
| cwd=str(BASE_DIR), | |
| stdout=log_file, | |
| stderr=subprocess.STDOUT, | |
| ) | |
| self.procs[cfg.name] = proc | |
| def _wait_healthy(self, cfg: MCPConfig) -> (bool, str): | |
| """ | |
| Poll server until healthy or timeout. | |
| """ | |
| deadline = time.time() + cfg.start_timeout | |
| health_url = cfg.url.rstrip("/") + cfg.health_path | |
| # If /health 404s, we fallback to root just to confirm it's listening. | |
| tried_root = False | |
| while time.time() < deadline: | |
| proc = self.procs.get(cfg.name) | |
| if proc is None or proc.poll() is not None: | |
| # Process exited - read last lines of log for debugging | |
| log_path = LOGS_DIR / f"{cfg.name}.log" | |
| error_details = self._read_log_tail(log_path, lines=10) | |
| return False, f"process exited during startup. Check {log_path}\nLast log lines:\n{error_details}" | |
| try: | |
| resp = requests.get(health_url, timeout=1) | |
| if resp.status_code == 200: | |
| return True, "ok" | |
| # If /health not found, fall back once to / | |
| if resp.status_code == 404 and not tried_root: | |
| tried_root = True | |
| root_url = cfg.url | |
| try: | |
| root_resp = requests.get(root_url, timeout=1) | |
| if root_resp.status_code in (200, 404): | |
| # Listening; consider healthy for our purposes | |
| return True, "ok (no /health, but port open)" | |
| except Exception: | |
| pass | |
| except Exception: | |
| # Not ready yet | |
| time.sleep(0.5) | |
| continue | |
| # Timeout - include log tail for debugging | |
| log_path = LOGS_DIR / f"{cfg.name}.log" | |
| error_details = self._read_log_tail(log_path, lines=10) | |
| return False, f"timeout after {cfg.start_timeout}s waiting for {health_url}. Check {log_path}\nLast log lines:\n{error_details}" | |
| def _read_log_tail(self, log_path: Path, lines: int = 10) -> str: | |
| """ | |
| Read last N lines of log file for error reporting. | |
| """ | |
| try: | |
| if not log_path.exists(): | |
| return "(log file not found)" | |
| with open(log_path, 'r', encoding='utf-8', errors='ignore') as f: | |
| all_lines = f.readlines() | |
| tail_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines | |
| return ''.join(tail_lines).strip() or "(empty log)" | |
| except Exception as e: | |
| return f"(could not read log: {e})" | |
| def stop_all(self) -> None: | |
| """ | |
| Terminate all MCP subprocesses gracefully. | |
| """ | |
| for name, proc in list(self.procs.items()): | |
| try: | |
| if proc.poll() is None: | |
| proc.terminate() | |
| except Exception: | |
| pass | |
| # Give them a moment to exit, then kill if needed | |
| deadline = time.time() + 5 | |
| for name, proc in list(self.procs.items()): | |
| if proc.poll() is None and time.time() < deadline: | |
| time.sleep(0.2) | |
| for name, proc in list(self.procs.items()): | |
| try: | |
| if proc.poll() is None: | |
| proc.kill() | |
| except Exception: | |
| pass | |
| self.procs.clear() | |
| self.started = False | |