File size: 6,740 Bytes
184b62a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
import subprocess
import sys
import time
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Optional

import requests


BASE_DIR = Path(__file__).resolve().parents[1]
LOGS_DIR = BASE_DIR / "logs"
LOGS_DIR.mkdir(parents=True, exist_ok=True)


@dataclass
class MCPConfig:
    name: str
    command: List[str]
    url: str
    health_path: str = "/health"
    start_timeout: float = 20.0  # seconds


class MCPManager:
    """
    Manages lifecycle of local MCP servers for MVP Agent.

    Responsibilities:
    - Start required MCP servers as subprocesses.
    - Wait for them to become healthy via HTTP checks.
    - Provide structured errors on failure.
    - Terminate all subprocesses on shutdown.

    Designed for:
    - Local dev: single `python app.py` starts everything.
    - Hugging Face Spaces: single-process entrypoint spawning child MCP servers.
    """

    def __init__(self) -> None:
        python_exe = sys.executable or "python"

        self.configs: List[MCPConfig] = [
            MCPConfig(
                name="file-manager-mcp",
                command=[python_exe, "-u", "tools/file_manager_mcp/run.py"],
                url="http://127.0.0.1:8081",
            ),
            MCPConfig(
                name="google-search-mcp",
                command=[python_exe, "-u", "tools/google_search_mcp/run.py"],
                url="http://127.0.0.1:8082",
            ),
            MCPConfig(
                name="markdownify-mcp",
                command=[python_exe, "-u", "tools/markdownify_mcp/run.py"],
                url="http://127.0.0.1:8083",
            ),
        ]

        # name -> subprocess.Popen
        self.procs: Dict[str, subprocess.Popen] = {}
        self.started: bool = False

    def start_all(self) -> None:
        """
        Start all MCP servers and wait for them to become healthy.

        Raises:
            RuntimeError if any server fails to start or become healthy.
        """
        if self.started:
            return

        errors: List[str] = []

        for cfg in self.configs:
            try:
                self._start_one(cfg)
            except Exception as e:
                errors.append(f"{cfg.name} failed to start: {e}")

        # If any failed at spawn time, bail out immediately
        if errors:
            self.stop_all()
            raise RuntimeError("; ".join(errors))

        # Wait for health for each
        for cfg in self.configs:
            ok, msg = self._wait_healthy(cfg)
            if not ok:
                errors.append(f"{cfg.name} unhealthy: {msg}")

        if errors:
            self.stop_all()
            raise RuntimeError("; ".join(errors))

        self.started = True

    def _start_one(self, cfg: MCPConfig) -> None:
        if cfg.name in self.procs and self.procs[cfg.name].poll() is None:
            # Already running
            return

        log_file = (LOGS_DIR / f"{cfg.name}.log").open("ab", buffering=0)
        # Environment: inherit, but ensure we are in project root
        env = os.environ.copy()
        # Start subprocess in BASE_DIR so relative paths in run.py work
        proc = subprocess.Popen(
            cfg.command,
            cwd=str(BASE_DIR),
            stdout=log_file,
            stderr=subprocess.STDOUT,
        )
        self.procs[cfg.name] = proc

    def _wait_healthy(self, cfg: MCPConfig) -> (bool, str):
        """
        Poll server until healthy or timeout.
        """
        deadline = time.time() + cfg.start_timeout
        health_url = cfg.url.rstrip("/") + cfg.health_path

        # If /health 404s, we fallback to root just to confirm it's listening.
        tried_root = False

        while time.time() < deadline:
            proc = self.procs.get(cfg.name)
            if proc is None or proc.poll() is not None:
                # Process exited - read last lines of log for debugging
                log_path = LOGS_DIR / f"{cfg.name}.log"
                error_details = self._read_log_tail(log_path, lines=10)
                return False, f"process exited during startup. Check {log_path}\nLast log lines:\n{error_details}"

            try:
                resp = requests.get(health_url, timeout=1)
                if resp.status_code == 200:
                    return True, "ok"
                # If /health not found, fall back once to /
                if resp.status_code == 404 and not tried_root:
                    tried_root = True
                    root_url = cfg.url
                    try:
                        root_resp = requests.get(root_url, timeout=1)
                        if root_resp.status_code in (200, 404):
                            # Listening; consider healthy for our purposes
                            return True, "ok (no /health, but port open)"
                    except Exception:
                        pass
            except Exception:
                # Not ready yet
                time.sleep(0.5)
                continue

        # Timeout - include log tail for debugging
        log_path = LOGS_DIR / f"{cfg.name}.log"
        error_details = self._read_log_tail(log_path, lines=10)
        return False, f"timeout after {cfg.start_timeout}s waiting for {health_url}. Check {log_path}\nLast log lines:\n{error_details}"
    
    def _read_log_tail(self, log_path: Path, lines: int = 10) -> str:
        """
        Read last N lines of log file for error reporting.
        """
        try:
            if not log_path.exists():
                return "(log file not found)"
            
            with open(log_path, 'r', encoding='utf-8', errors='ignore') as f:
                all_lines = f.readlines()
                tail_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
                return ''.join(tail_lines).strip() or "(empty log)"
        except Exception as e:
            return f"(could not read log: {e})"

    def stop_all(self) -> None:
        """
        Terminate all MCP subprocesses gracefully.
        """
        for name, proc in list(self.procs.items()):
            try:
                if proc.poll() is None:
                    proc.terminate()
            except Exception:
                pass

        # Give them a moment to exit, then kill if needed
        deadline = time.time() + 5
        for name, proc in list(self.procs.items()):
            if proc.poll() is None and time.time() < deadline:
                time.sleep(0.2)

        for name, proc in list(self.procs.items()):
            try:
                if proc.poll() is None:
                    proc.kill()
            except Exception:
                pass

        self.procs.clear()
        self.started = False