MVP-Agent / src /mcp_process_manager.py
Furqan Ahmad Rao
Add input validation and file management tools for MVP Agent
184b62a
import subprocess
import sys
import time
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional
import requests
BASE_DIR = Path(__file__).resolve().parents[1]
LOGS_DIR = BASE_DIR / "logs"
LOGS_DIR.mkdir(parents=True, exist_ok=True)
@dataclass
class MCPConfig:
name: str
command: List[str]
url: str
health_path: str = "/health"
start_timeout: float = 20.0 # seconds
class MCPManager:
"""
Manages lifecycle of local MCP servers for MVP Agent.
Responsibilities:
- Start required MCP servers as subprocesses.
- Wait for them to become healthy via HTTP checks.
- Provide structured errors on failure.
- Terminate all subprocesses on shutdown.
Designed for:
- Local dev: single `python app.py` starts everything.
- Hugging Face Spaces: single-process entrypoint spawning child MCP servers.
"""
def __init__(self) -> None:
python_exe = sys.executable or "python"
self.configs: List[MCPConfig] = [
MCPConfig(
name="file-manager-mcp",
command=[python_exe, "-u", "tools/file_manager_mcp/run.py"],
url="http://127.0.0.1:8081",
),
MCPConfig(
name="google-search-mcp",
command=[python_exe, "-u", "tools/google_search_mcp/run.py"],
url="http://127.0.0.1:8082",
),
MCPConfig(
name="markdownify-mcp",
command=[python_exe, "-u", "tools/markdownify_mcp/run.py"],
url="http://127.0.0.1:8083",
),
]
# name -> subprocess.Popen
self.procs: Dict[str, subprocess.Popen] = {}
self.started: bool = False
def start_all(self) -> None:
"""
Start all MCP servers and wait for them to become healthy.
Raises:
RuntimeError if any server fails to start or become healthy.
"""
if self.started:
return
errors: List[str] = []
for cfg in self.configs:
try:
self._start_one(cfg)
except Exception as e:
errors.append(f"{cfg.name} failed to start: {e}")
# If any failed at spawn time, bail out immediately
if errors:
self.stop_all()
raise RuntimeError("; ".join(errors))
# Wait for health for each
for cfg in self.configs:
ok, msg = self._wait_healthy(cfg)
if not ok:
errors.append(f"{cfg.name} unhealthy: {msg}")
if errors:
self.stop_all()
raise RuntimeError("; ".join(errors))
self.started = True
def _start_one(self, cfg: MCPConfig) -> None:
if cfg.name in self.procs and self.procs[cfg.name].poll() is None:
# Already running
return
log_file = (LOGS_DIR / f"{cfg.name}.log").open("ab", buffering=0)
# Environment: inherit, but ensure we are in project root
env = os.environ.copy()
# Start subprocess in BASE_DIR so relative paths in run.py work
proc = subprocess.Popen(
cfg.command,
cwd=str(BASE_DIR),
stdout=log_file,
stderr=subprocess.STDOUT,
)
self.procs[cfg.name] = proc
def _wait_healthy(self, cfg: MCPConfig) -> (bool, str):
"""
Poll server until healthy or timeout.
"""
deadline = time.time() + cfg.start_timeout
health_url = cfg.url.rstrip("/") + cfg.health_path
# If /health 404s, we fallback to root just to confirm it's listening.
tried_root = False
while time.time() < deadline:
proc = self.procs.get(cfg.name)
if proc is None or proc.poll() is not None:
# Process exited - read last lines of log for debugging
log_path = LOGS_DIR / f"{cfg.name}.log"
error_details = self._read_log_tail(log_path, lines=10)
return False, f"process exited during startup. Check {log_path}\nLast log lines:\n{error_details}"
try:
resp = requests.get(health_url, timeout=1)
if resp.status_code == 200:
return True, "ok"
# If /health not found, fall back once to /
if resp.status_code == 404 and not tried_root:
tried_root = True
root_url = cfg.url
try:
root_resp = requests.get(root_url, timeout=1)
if root_resp.status_code in (200, 404):
# Listening; consider healthy for our purposes
return True, "ok (no /health, but port open)"
except Exception:
pass
except Exception:
# Not ready yet
time.sleep(0.5)
continue
# Timeout - include log tail for debugging
log_path = LOGS_DIR / f"{cfg.name}.log"
error_details = self._read_log_tail(log_path, lines=10)
return False, f"timeout after {cfg.start_timeout}s waiting for {health_url}. Check {log_path}\nLast log lines:\n{error_details}"
def _read_log_tail(self, log_path: Path, lines: int = 10) -> str:
"""
Read last N lines of log file for error reporting.
"""
try:
if not log_path.exists():
return "(log file not found)"
with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
all_lines = f.readlines()
tail_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
return ''.join(tail_lines).strip() or "(empty log)"
except Exception as e:
return f"(could not read log: {e})"
def stop_all(self) -> None:
"""
Terminate all MCP subprocesses gracefully.
"""
for name, proc in list(self.procs.items()):
try:
if proc.poll() is None:
proc.terminate()
except Exception:
pass
# Give them a moment to exit, then kill if needed
deadline = time.time() + 5
for name, proc in list(self.procs.items()):
if proc.poll() is None and time.time() < deadline:
time.sleep(0.2)
for name, proc in list(self.procs.items()):
try:
if proc.poll() is None:
proc.kill()
except Exception:
pass
self.procs.clear()
self.started = False