"""MCP Server Manager — install, uninstall, and lifecycle management. Manages optional MCP servers as child processes. Core servers are always started by agentic-start.sh; optional servers can be installed/uninstalled via the API. Install: 1. Start uvicorn subprocess on the designated port 2. Wait for /health 3. Discover tools via JSON-RPC tools/list 4. Register tools in Context Forge 5. Trigger a full sync to update virtual server tool associations 6. Persist to installed.json Uninstall: 1. Deactivate tools in Forge (disable, don't delete) 2. Trigger sync to update virtual server associations 3. Stop the subprocess (SIGTERM) 4. Remove from installed.json """ from __future__ import annotations import asyncio import json import logging import os import signal import subprocess import sys import time from pathlib import Path from typing import Any, Dict, List, Optional import httpx import yaml logger = logging.getLogger("homepilot.agentic.server_manager") MCP_TOOL_HOST = os.getenv("MCP_TOOL_HOST", "127.0.0.1") # ── Server Catalog ──────────────────────────────────────────────────────── def _catalog_path() -> Path: candidates = [ Path(__file__).resolve().parents[3] / "agentic" / "forge" / "templates" / "server_catalog.yaml", Path("agentic/forge/templates/server_catalog.yaml"), Path(os.environ.get("HOMEPILOT_ROOT", ".")) / "agentic" / "forge" / "templates" / "server_catalog.yaml", ] for p in candidates: if p.exists(): return p return candidates[0] def _load_catalog() -> Dict[str, Any]: path = _catalog_path() if not path.exists(): logger.warning("server_catalog.yaml not found at %s", path) return {"core": [], "optional": []} with path.open("r", encoding="utf-8") as f: return yaml.safe_load(f) or {"core": [], "optional": []} class ServerDef: """A server definition from the catalog.""" def __init__(self, data: Dict[str, Any], is_core: bool = False): self.id: str = data["id"] self.port: int = data["port"] self.module: str = data.get("module", "") self.label: str = data.get("label", self.id) self.description: str = data.get("description", "") self.category: str = data.get("category", "other") self.icon: str = data.get("icon", "server") self.requires_config: Optional[str] = data.get("requires_config") self.source: Optional[Dict[str, Any]] = data.get("source") self.is_core: bool = is_core def to_dict(self) -> Dict[str, Any]: d: Dict[str, Any] = { "id": self.id, "port": self.port, "module": self.module, "label": self.label, "description": self.description, "category": self.category, "icon": self.icon, "requires_config": self.requires_config, "is_core": self.is_core, } if self.source: d["source"] = self.source return d # ── Installed State Persistence ────────────────────────────────────────── def _state_path() -> Path: candidates = [ Path(__file__).resolve().parents[3] / "agentic" / "forge" / "installed.json", Path("agentic/forge/installed.json"), Path(os.environ.get("HOMEPILOT_ROOT", ".")) / "agentic" / "forge" / "installed.json", ] for p in candidates: if p.parent.is_dir(): return p return candidates[0] def _external_label(name: str) -> str: """Convert external server name like 'mcp-teams' to a display label.""" return name.replace("mcp-", "").replace("hp-", "").replace("-", " ").replace("_", " ").title() def _read_installed() -> Dict[str, Any]: path = _state_path() if not path.exists(): return {"installed": []} try: with path.open("r", encoding="utf-8") as f: return json.load(f) except Exception: return {"installed": []} def _write_installed(state: Dict[str, Any]) -> None: path = _state_path() path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: json.dump(state, f, indent=2) # ── Server Manager ─────────────────────────────────────────────────────── class ServerManager: """Manages MCP server processes and Forge registration.""" def __init__(self) -> None: catalog = _load_catalog() self._core = [ServerDef(d, is_core=True) for d in catalog.get("core", [])] self._optional = [ServerDef(d, is_core=False) for d in catalog.get("optional", [])] self._all: Dict[str, ServerDef] = {} for s in self._core + self._optional: self._all[s.id] = s # PID tracking for managed (optional) processes self._processes: Dict[str, subprocess.Popen] = {} @property def core_servers(self) -> List[ServerDef]: return list(self._core) @property def optional_servers(self) -> List[ServerDef]: return list(self._optional) def get_server(self, server_id: str) -> Optional[ServerDef]: return self._all.get(server_id) def installed_ids(self) -> List[str]: state = _read_installed() return [e["id"] for e in state.get("installed", []) if e.get("id")] def is_installed(self, server_id: str) -> bool: return server_id in self.installed_ids() def _python_path(self) -> str: """Resolve the Python interpreter to use for subprocesses.""" root = Path(__file__).resolve().parents[3] venv_python = root / "backend" / ".venv" / "bin" / "python" if venv_python.is_file(): return str(venv_python) return sys.executable def _project_root(self) -> str: return str(Path(__file__).resolve().parents[3]) async def _check_health(self, port: int, timeout: int = 10) -> bool: for _ in range(timeout): try: async with httpx.AsyncClient(timeout=2.0) as c: r = await c.get(f"http://{MCP_TOOL_HOST}:{port}/health") if r.status_code == 200: return True except Exception: pass await asyncio.sleep(1) return False async def _discover_tools(self, port: int) -> List[Dict[str, Any]]: url = f"http://{MCP_TOOL_HOST}:{port}/rpc" body = {"jsonrpc": "2.0", "id": "mgr-discover", "method": "tools/list"} try: async with httpx.AsyncClient(timeout=10.0) as c: r = await c.post(url, json=body) if r.status_code == 200: return r.json().get("result", {}).get("tools", []) except Exception as exc: logger.warning("Tool discovery on port %d failed: %s", port, exc) return [] async def _register_tools_in_forge( self, tools: List[Dict[str, Any]], port: int, forge_url: str, auth_user: str = "admin", auth_pass: str = "changeme", bearer_token: Optional[str] = None, ) -> List[str]: """Register discovered tools in Forge. Returns list of registered tool IDs.""" from .sync_service import _acquire_jwt, _post, _safe_list, _get registered_ids: List[str] = [] async with httpx.AsyncClient(headers={"Content-Type": "application/json"}, timeout=30.0) as client: if bearer_token: client.headers["Authorization"] = f"Bearer {bearer_token}" else: token = await _acquire_jwt(client, forge_url, auth_user, auth_pass) if token: client.headers["Authorization"] = f"Bearer {token}" else: client.auth = httpx.BasicAuth(auth_user, auth_pass) # Get existing tools to avoid duplicates (limit=0 bypasses pagination) try: existing_list = _safe_list(await _get(client, forge_url, "/tools", limit=0)) except Exception: existing_list = [] existing = {t["name"]: t["id"] for t in existing_list if t.get("name") and t.get("id")} for tool_def in tools: tname = tool_def.get("name", "") if tname in existing: registered_ids.append(existing[tname]) continue payload = { "tool": { "name": tname, "description": tool_def.get("description", ""), "inputSchema": tool_def.get("inputSchema", {"type": "object", "properties": {}}), "integration_type": "REST", "request_type": "POST", "url": f"http://{MCP_TOOL_HOST}:{port}/rpc", "tags": ["homepilot"], }, "team_id": None, } try: result = await _post(client, forge_url, "/tools", json=payload) tid = result.get("id") or result.get("tool_id") if isinstance(result, dict) else None if tid: registered_ids.append(tid) except Exception as exc: logger.warning("Failed to register tool '%s': %s", tname, exc) return registered_ids async def _deactivate_tools_in_forge( self, port: int, forge_url: str, auth_user: str = "admin", auth_pass: str = "changeme", bearer_token: Optional[str] = None, ) -> int: """Deactivate tools from this server in Forge. Returns count deactivated.""" from .sync_service import _acquire_jwt, _safe_list, _get deactivated = 0 expected_url = f"http://{MCP_TOOL_HOST}:{port}/rpc" async with httpx.AsyncClient(headers={"Content-Type": "application/json"}, timeout=30.0) as client: if bearer_token: client.headers["Authorization"] = f"Bearer {bearer_token}" else: token = await _acquire_jwt(client, forge_url, auth_user, auth_pass) if token: client.headers["Authorization"] = f"Bearer {token}" else: client.auth = httpx.BasicAuth(auth_user, auth_pass) try: tools_list = _safe_list(await _get(client, forge_url, "/tools", limit=0)) except Exception: tools_list = [] for t in tools_list: tool_url = str(t.get("url") or "") if tool_url == expected_url and t.get("enabled") is not False: tid = t.get("id") if not tid: continue # Deactivate via POST /tools/{id}/state?activate=false try: r = await client.post( f"{forge_url}/tools/{tid}/state?activate=false", ) if r.status_code < 400: deactivated += 1 except Exception as exc: logger.debug("Failed to deactivate tool %s: %s", tid, exc) return deactivated def _start_process(self, server: ServerDef) -> Optional[subprocess.Popen]: """Start a uvicorn subprocess for the server.""" if not server.module: logger.info("Skipping process start for %s (external server)", server.id) return None python = self._python_path() root = self._project_root() env = {**os.environ, "PYTHONPATH": root} try: proc = subprocess.Popen( [python, "-m", "uvicorn", server.module, "--host", "127.0.0.1", "--port", str(server.port), "--log-level", "warning"], env=env, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) self._processes[server.id] = proc logger.info("Started %s (pid=%d, port=%d)", server.id, proc.pid, server.port) return proc except Exception as exc: logger.error("Failed to start %s: %s", server.id, exc) return None def _stop_process(self, server_id: str) -> bool: """Stop a managed subprocess.""" proc = self._processes.pop(server_id, None) if proc is None: return False try: proc.send_signal(signal.SIGTERM) proc.wait(timeout=5) except subprocess.TimeoutExpired: proc.kill() proc.wait(timeout=2) except Exception as exc: logger.warning("Error stopping %s: %s", server_id, exc) logger.info("Stopped %s", server_id) return True async def server_healthy(self, server_id: str) -> bool: """Check if a server is responding on its port.""" server = self.get_server(server_id) if not server: return False try: async with httpx.AsyncClient(timeout=2.0) as c: r = await c.get(f"http://{MCP_TOOL_HOST}:{server.port}/health") return r.status_code == 200 except Exception: return False async def install( self, server_id: str, forge_url: str, auth_user: str = "admin", auth_pass: str = "changeme", bearer_token: Optional[str] = None, ) -> Dict[str, Any]: """Install an optional MCP server: start process, register tools, update state.""" server = self.get_server(server_id) if not server: return {"ok": False, "error": f"Unknown server: {server_id}"} if server.is_core: return {"ok": False, "error": f"'{server_id}' is a core server (always running)"} if self.is_installed(server_id): # Already installed — check if healthy healthy = await self.server_healthy(server_id) if healthy: return {"ok": True, "status": "already_installed", "healthy": True} # Process died — restart it self._stop_process(server_id) # 1. Start the process proc = self._start_process(server) if not proc: return {"ok": False, "error": f"Failed to start process for {server_id}"} # 2. Wait for health healthy = await self._check_health(server.port, timeout=12) if not healthy: self._stop_process(server_id) return {"ok": False, "error": f"Server {server_id} did not become healthy within 12s"} # 3. Discover tools tools = await self._discover_tools(server.port) # 4. Register tools in Forge tool_ids = await self._register_tools_in_forge( tools, server.port, forge_url, auth_user, auth_pass, bearer_token, ) # 5. Persist installed state state = _read_installed() ids = {e["id"] for e in state.get("installed", [])} if server_id not in ids: state.setdefault("installed", []).append({ "id": server_id, "installed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), }) _write_installed(state) return { "ok": True, "status": "installed", "server_id": server_id, "port": server.port, "tools_discovered": len(tools), "tools_registered": len(tool_ids), "healthy": True, } async def uninstall( self, server_id: str, forge_url: str, auth_user: str = "admin", auth_pass: str = "changeme", bearer_token: Optional[str] = None, ) -> Dict[str, Any]: """Uninstall an optional MCP server: deactivate tools, stop process, update state.""" server = self.get_server(server_id) if not server: return {"ok": False, "error": f"Unknown server: {server_id}"} if server.is_core: return {"ok": False, "error": f"'{server_id}' is a core server and cannot be uninstalled"} # 1. Deactivate tools in Forge deactivated = await self._deactivate_tools_in_forge( server.port, forge_url, auth_user, auth_pass, bearer_token, ) # 2. Stop the process self._stop_process(server_id) # 3. Remove from installed state state = _read_installed() state["installed"] = [e for e in state.get("installed", []) if e.get("id") != server_id] _write_installed(state) return { "ok": True, "status": "uninstalled", "server_id": server_id, "tools_deactivated": deactivated, } async def get_available(self) -> List[Dict[str, Any]]: """Return all servers (core + optional + external) with their current status.""" installed_set = set(self.installed_ids()) result: List[Dict[str, Any]] = [] for server in self._core + self._optional: healthy = await self.server_healthy(server.id) entry = server.to_dict() entry["installed"] = server.is_core or server.id in installed_set entry["healthy"] = healthy entry["status"] = ( "running" if healthy else "installed" if entry["installed"] else "available" ) result.append(entry) # Include external servers from community/external/registry.json result.extend(await self._get_external_entries()) return result async def _get_external_entries(self) -> List[Dict[str, Any]]: """Build entries for external servers from the external registry.""" from .mcp_installer import _read_external_registry reg = _read_external_registry() entries: List[Dict[str, Any]] = [] seen_names = set() for srv in reg.get("servers", []): name = srv.get("name", "") if not name or name in seen_names: continue seen_names.add(name) port = srv.get("port", 0) is_installed = srv.get("status") == "installed" # Check health if installed healthy = False if is_installed and port: healthy = await self._check_health(port, timeout=3) entries.append({ "id": name, "port": port, "module": "", "label": _external_label(name), "description": f"External MCP server ({srv.get('git_url', 'community')})", "category": "external", "icon": "package", "requires_config": None, "is_core": False, "source_type": "external", "git_url": srv.get("git_url", ""), "install_path": srv.get("install_path", ""), "tools_discovered": srv.get("tools_discovered", 0), "installed_at": srv.get("installed_at", ""), "installed": is_installed, "healthy": healthy, "status": ( "running" if healthy else "installed" if is_installed else "available" ), }) return entries async def auto_start_core(self) -> List[str]: """Start core MCP servers if they aren't already running. Core servers are normally started by agentic-start.sh, but if the backend is launched independently (e.g. ``uvicorn app.main:app``) or if the shell script failed, this ensures they come up. """ started: List[str] = [] for server in self._core: healthy = await self.server_healthy(server.id) if healthy: continue proc = self._start_process(server) if proc: started.append(server.id) logger.info("Auto-started core server %s on port %d", server.id, server.port) if started: # Poll health for each started server (up to 10s) # instead of a single 3s sleep which is too short for # servers that take longer to initialize. for sid in list(started): healthy = await self._check_health( self._all[sid].port, timeout=10, ) if not healthy: logger.warning("Core server %s did not become healthy after auto-start", sid) return started async def auto_start_installed(self) -> List[str]: """Start all previously installed optional servers. Called on backend startup.""" started: List[str] = [] for server_id in self.installed_ids(): server = self.get_server(server_id) if not server or server.is_core: continue # Check if already running (started by agentic-start.sh or another process) healthy = await self.server_healthy(server_id) if healthy: continue proc = self._start_process(server) if proc: started.append(server_id) # Brief wait for processes to boot if started: await asyncio.sleep(2) return started async def auto_start_external(self) -> List[str]: """Start all previously installed external/community MCP servers. Reads community/external/registry.json and restarts any server with status="installed". These are servers installed via persona import (external git repos or community bundles) that are not in the server_catalog.yaml but still need to run for their personas to work. Uses the same robust startup logic as the installer (_start_external_server) including: - server-local .venv detection - pyproject.toml entry-point detection - environment variable injection (.env files, TEAMS_MCP_* vars) - log-file capture for debugging """ started: List[str] = [] root = Path(self._project_root()) # Read external registry ext_path = root / "community" / "external" / "registry.json" if not ext_path.exists(): return started try: reg = json.loads(ext_path.read_text()) except Exception: return started for entry in reg.get("servers", []): name = entry.get("name", "") port = entry.get("port") install_path = entry.get("install_path", "") status = entry.get("status", "") if not name or not port or status != "installed": continue # Skip if already healthy try: async with httpx.AsyncClient(timeout=2.0) as c: r = await c.get(f"http://{MCP_TOOL_HOST}:{port}/health") if r.status_code == 200: continue except Exception: pass # Determine how to start the server — full detection matching # the installer (_start_external_server in mcp_installer.py) server_dir = Path(install_path) if install_path else None if not server_dir or not server_dir.is_dir(): logger.warning("External server %s install_path not found: %s", name, install_path) continue # Python interpreter: prefer server .venv, then backend .venv, then sys server_python = server_dir / ".venv" / "bin" / "python" backend_python = root / "backend" / ".venv" / "bin" / "python" if server_python.is_file(): python = str(server_python) elif backend_python.is_file(): python = str(backend_python) else: python = sys.executable # PYTHONPATH: include server root + src/ for src-layout packages pp_parts = [str(server_dir)] src_dir = server_dir / "src" if src_dir.is_dir(): pp_parts.append(str(src_dir)) existing_pp = os.environ.get("PYTHONPATH", "") if existing_pp: pp_parts.append(existing_pp) env = {**os.environ, "PYTHONPATH": os.pathsep.join(pp_parts)} # Entry-point detection (same priority as installer): # 1. app/main.py (HomePilot module style) # 2. pyproject.toml [project.scripts] (pip-installed CLI) # 3. app.py (simple uvicorn) # 4. bundle_manifest.json module app_main = server_dir / "app" / "main.py" app_py = server_dir / "app.py" pyproject = server_dir / "pyproject.toml" cmd: list[str] = [] if app_main.exists(): cmd = [python, "-m", "app.main", "--http", "--host", "127.0.0.1", "--port", str(port)] elif pyproject.exists(): try: from .mcp_installer import _detect_pyproject_app module_app = _detect_pyproject_app(server_dir, name) except ImportError: module_app = None if module_app: cmd = [python, "-m", "uvicorn", module_app, "--host", "127.0.0.1", "--port", str(port), "--log-level", "info"] # Env vars for pydantic-settings-based servers env[f"{name.upper().replace('-', '_')}_PORT"] = str(port) env["TEAMS_MCP_PORT"] = str(port) env["TEAMS_MCP_HOST"] = "127.0.0.1" if "TEAMS_MCP_TOKEN_KEY" not in env: import base64 env["TEAMS_MCP_TOKEN_KEY"] = base64.urlsafe_b64encode( os.urandom(32) ).decode() elif app_py.exists(): cmd = [python, "-m", "uvicorn", "app:app", "--host", "127.0.0.1", "--port", str(port), "--log-level", "warning"] else: # Community bundle — try bundle_manifest.json manifest_path = server_dir / "bundle_manifest.json" module = "" if manifest_path.exists(): try: bm = json.loads(manifest_path.read_text()) module = bm.get("mcp_server", {}).get("module", "") except Exception: pass if module: cmd = [python, "-m", "uvicorn", module, "--host", "127.0.0.1", "--port", str(port), "--log-level", "warning"] else: logger.warning("Cannot detect entry point for external server %s at %s", name, server_dir) continue # Inject .env file values into env dict try: from .mcp_installer import _auto_populate_env _auto_populate_env(server_dir, name, env) except ImportError: pass if not cmd: continue # Log to files instead of DEVNULL for debugging log_dir = root / "community" / "external" / "install_logs" log_dir.mkdir(parents=True, exist_ok=True) stdout_log = log_dir / f"{name}.stdout.log" stderr_log = log_dir / f"{name}.stderr.log" try: stdout_f = open(stdout_log, "a", encoding="utf-8") stderr_f = open(stderr_log, "a", encoding="utf-8") proc = subprocess.Popen( cmd, env=env, cwd=str(server_dir), stdout=stdout_f, stderr=stderr_f, ) self._processes[name] = proc started.append(name) logger.info("Auto-started external server %s (pid=%d, port=%d)", name, proc.pid, port) except Exception as exc: logger.warning("Failed to auto-start external server %s: %s", name, exc) if started: await asyncio.sleep(3) # Verify health for name in list(started): entry = next((e for e in reg.get("servers", []) if e.get("name") == name), None) if entry: try: async with httpx.AsyncClient(timeout=2.0) as c: r = await c.get(f"http://{MCP_TOOL_HOST}:{entry['port']}/health") if r.status_code != 200: logger.warning("External server %s did not become healthy after auto-start", name) except Exception: logger.warning("External server %s did not become healthy after auto-start", name) return started async def ensure_all_running(self) -> Dict[str, List[str]]: """Start all servers that should be running (core + installed optional + external). Called during backend startup to ensure the full agentic stack is up, regardless of whether agentic-start.sh ran. """ core_started = await self.auto_start_core() optional_started = await self.auto_start_installed() external_started = await self.auto_start_external() if core_started: logger.info("Auto-started %d core servers: %s", len(core_started), core_started) if optional_started: logger.info("Auto-started %d optional servers: %s", len(optional_started), optional_started) if external_started: logger.info("Auto-started %d external servers: %s", len(external_started), external_started) return {"core": core_started, "optional": optional_started, "external": external_started} async def uninstall_external( self, server_name: str, forge_url: str, auth_user: str = "admin", auth_pass: str = "changeme", bearer_token: Optional[str] = None, ) -> Dict[str, Any]: """Uninstall an external/community MCP server: stop process, deactivate tools, update registry.""" root = Path(self._project_root()) ext_path = root / "community" / "external" / "registry.json" if not ext_path.exists(): return {"ok": False, "error": f"External server '{server_name}' not found"} try: reg = json.loads(ext_path.read_text()) except Exception: return {"ok": False, "error": "Failed to read external registry"} entry = next((s for s in reg.get("servers", []) if s.get("name") == server_name), None) if not entry: return {"ok": False, "error": f"External server '{server_name}' not in registry"} port = entry.get("port") # 1. Deactivate tools in Forge deactivated = 0 if port: deactivated = await self._deactivate_tools_in_forge( port, forge_url, auth_user, auth_pass, bearer_token, ) # 2. Stop the process self._stop_process(server_name) # 3. Mark as uninstalled in registry (don't delete — preserve history) for s in reg.get("servers", []): if s.get("name") == server_name: s["status"] = "uninstalled" break ext_path.write_text(json.dumps(reg, indent=2)) logger.info("Uninstalled external server %s (port %s, %d tools deactivated)", server_name, port, deactivated) return { "ok": True, "status": "uninstalled", "server_name": server_name, "tools_deactivated": deactivated, } def shutdown_all(self) -> None: """Stop all managed subprocesses (called on backend shutdown). Sends SIGTERM to every child process, waits up to 5 s for each, then SIGKILL any survivors. Logs a summary of what was stopped. """ if not self._processes: return names = list(self._processes.keys()) logger.info("Stopping %d managed server(s): %s", len(names), names) # Phase 1: send SIGTERM to all at once (parallel) for server_id, proc in list(self._processes.items()): try: proc.send_signal(signal.SIGTERM) except Exception: pass # Phase 2: wait for graceful exit (up to 5s total) import time deadline = time.monotonic() + 5 for server_id in list(names): proc = self._processes.get(server_id) if proc is None: continue remaining = max(0.1, deadline - time.monotonic()) try: proc.wait(timeout=remaining) except subprocess.TimeoutExpired: pass # Phase 3: force-kill any still alive for server_id in list(names): proc = self._processes.pop(server_id, None) if proc is None: continue if proc.poll() is None: try: proc.kill() proc.wait(timeout=2) logger.info("Force-killed %s (pid=%d)", server_id, proc.pid) except Exception: pass else: logger.info("Stopped %s (pid=%d)", server_id, proc.pid) logger.info("All managed servers stopped") # Module-level singleton _manager: Optional[ServerManager] = None def get_server_manager() -> ServerManager: global _manager if _manager is None: _manager = ServerManager() return _manager