| """MCP Server Manager β install, uninstall, and lifecycle management. |
| |
| Manages optional MCP servers as child processes. Core servers are always |
| started by agentic-start.sh; optional servers can be installed/uninstalled |
| via the API. |
| |
| Install: |
| 1. Start uvicorn subprocess on the designated port |
| 2. Wait for /health |
| 3. Discover tools via JSON-RPC tools/list |
| 4. Register tools in Context Forge |
| 5. Trigger a full sync to update virtual server tool associations |
| 6. Persist to installed.json |
| |
| Uninstall: |
| 1. Deactivate tools in Forge (disable, don't delete) |
| 2. Trigger sync to update virtual server associations |
| 3. Stop the subprocess (SIGTERM) |
| 4. Remove from installed.json |
| """ |
|
|
| from __future__ import annotations |
|
|
| import asyncio |
| import json |
| import logging |
| import os |
| import signal |
| import subprocess |
| import sys |
| import time |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| import httpx |
| import yaml |
|
|
| logger = logging.getLogger("homepilot.agentic.server_manager") |
|
|
| MCP_TOOL_HOST = os.getenv("MCP_TOOL_HOST", "127.0.0.1") |
|
|
|
|
| |
|
|
|
|
| def _catalog_path() -> Path: |
| candidates = [ |
| Path(__file__).resolve().parents[3] / "agentic" / "forge" / "templates" / "server_catalog.yaml", |
| Path("agentic/forge/templates/server_catalog.yaml"), |
| Path(os.environ.get("HOMEPILOT_ROOT", ".")) / "agentic" / "forge" / "templates" / "server_catalog.yaml", |
| ] |
| for p in candidates: |
| if p.exists(): |
| return p |
| return candidates[0] |
|
|
|
|
| def _load_catalog() -> Dict[str, Any]: |
| path = _catalog_path() |
| if not path.exists(): |
| logger.warning("server_catalog.yaml not found at %s", path) |
| return {"core": [], "optional": []} |
| with path.open("r", encoding="utf-8") as f: |
| return yaml.safe_load(f) or {"core": [], "optional": []} |
|
|
|
|
| class ServerDef: |
| """A server definition from the catalog.""" |
|
|
| def __init__(self, data: Dict[str, Any], is_core: bool = False): |
| self.id: str = data["id"] |
| self.port: int = data["port"] |
| self.module: str = data.get("module", "") |
| self.label: str = data.get("label", self.id) |
| self.description: str = data.get("description", "") |
| self.category: str = data.get("category", "other") |
| self.icon: str = data.get("icon", "server") |
| self.requires_config: Optional[str] = data.get("requires_config") |
| self.source: Optional[Dict[str, Any]] = data.get("source") |
| self.is_core: bool = is_core |
|
|
| def to_dict(self) -> Dict[str, Any]: |
| d: Dict[str, Any] = { |
| "id": self.id, |
| "port": self.port, |
| "module": self.module, |
| "label": self.label, |
| "description": self.description, |
| "category": self.category, |
| "icon": self.icon, |
| "requires_config": self.requires_config, |
| "is_core": self.is_core, |
| } |
| if self.source: |
| d["source"] = self.source |
| return d |
|
|
|
|
| |
|
|
|
|
| def _state_path() -> Path: |
| candidates = [ |
| Path(__file__).resolve().parents[3] / "agentic" / "forge" / "installed.json", |
| Path("agentic/forge/installed.json"), |
| Path(os.environ.get("HOMEPILOT_ROOT", ".")) / "agentic" / "forge" / "installed.json", |
| ] |
| for p in candidates: |
| if p.parent.is_dir(): |
| return p |
| return candidates[0] |
|
|
|
|
| def _external_label(name: str) -> str: |
| """Convert external server name like 'mcp-teams' to a display label.""" |
| return name.replace("mcp-", "").replace("hp-", "").replace("-", " ").replace("_", " ").title() |
|
|
|
|
| def _read_installed() -> Dict[str, Any]: |
| path = _state_path() |
| if not path.exists(): |
| return {"installed": []} |
| try: |
| with path.open("r", encoding="utf-8") as f: |
| return json.load(f) |
| except Exception: |
| return {"installed": []} |
|
|
|
|
| def _write_installed(state: Dict[str, Any]) -> None: |
| path = _state_path() |
| path.parent.mkdir(parents=True, exist_ok=True) |
| with path.open("w", encoding="utf-8") as f: |
| json.dump(state, f, indent=2) |
|
|
|
|
| |
|
|
|
|
| class ServerManager: |
| """Manages MCP server processes and Forge registration.""" |
|
|
| def __init__(self) -> None: |
| catalog = _load_catalog() |
| self._core = [ServerDef(d, is_core=True) for d in catalog.get("core", [])] |
| self._optional = [ServerDef(d, is_core=False) for d in catalog.get("optional", [])] |
| self._all: Dict[str, ServerDef] = {} |
| for s in self._core + self._optional: |
| self._all[s.id] = s |
| |
| self._processes: Dict[str, subprocess.Popen] = {} |
|
|
| @property |
| def core_servers(self) -> List[ServerDef]: |
| return list(self._core) |
|
|
| @property |
| def optional_servers(self) -> List[ServerDef]: |
| return list(self._optional) |
|
|
| def get_server(self, server_id: str) -> Optional[ServerDef]: |
| return self._all.get(server_id) |
|
|
| def installed_ids(self) -> List[str]: |
| state = _read_installed() |
| return [e["id"] for e in state.get("installed", []) if e.get("id")] |
|
|
| def is_installed(self, server_id: str) -> bool: |
| return server_id in self.installed_ids() |
|
|
| def _python_path(self) -> str: |
| """Resolve the Python interpreter to use for subprocesses.""" |
| root = Path(__file__).resolve().parents[3] |
| venv_python = root / "backend" / ".venv" / "bin" / "python" |
| if venv_python.is_file(): |
| return str(venv_python) |
| return sys.executable |
|
|
| def _project_root(self) -> str: |
| return str(Path(__file__).resolve().parents[3]) |
|
|
| async def _check_health(self, port: int, timeout: int = 10) -> bool: |
| for _ in range(timeout): |
| try: |
| async with httpx.AsyncClient(timeout=2.0) as c: |
| r = await c.get(f"http://{MCP_TOOL_HOST}:{port}/health") |
| if r.status_code == 200: |
| return True |
| except Exception: |
| pass |
| await asyncio.sleep(1) |
| return False |
|
|
| async def _discover_tools(self, port: int) -> List[Dict[str, Any]]: |
| url = f"http://{MCP_TOOL_HOST}:{port}/rpc" |
| body = {"jsonrpc": "2.0", "id": "mgr-discover", "method": "tools/list"} |
| try: |
| async with httpx.AsyncClient(timeout=10.0) as c: |
| r = await c.post(url, json=body) |
| if r.status_code == 200: |
| return r.json().get("result", {}).get("tools", []) |
| except Exception as exc: |
| logger.warning("Tool discovery on port %d failed: %s", port, exc) |
| return [] |
|
|
| async def _register_tools_in_forge( |
| self, |
| tools: List[Dict[str, Any]], |
| port: int, |
| forge_url: str, |
| auth_user: str = "admin", |
| auth_pass: str = "changeme", |
| bearer_token: Optional[str] = None, |
| ) -> List[str]: |
| """Register discovered tools in Forge. Returns list of registered tool IDs.""" |
| from .sync_service import _acquire_jwt, _post, _safe_list, _get |
|
|
| registered_ids: List[str] = [] |
| async with httpx.AsyncClient(headers={"Content-Type": "application/json"}, timeout=30.0) as client: |
| if bearer_token: |
| client.headers["Authorization"] = f"Bearer {bearer_token}" |
| else: |
| token = await _acquire_jwt(client, forge_url, auth_user, auth_pass) |
| if token: |
| client.headers["Authorization"] = f"Bearer {token}" |
| else: |
| client.auth = httpx.BasicAuth(auth_user, auth_pass) |
|
|
| |
| try: |
| existing_list = _safe_list(await _get(client, forge_url, "/tools", limit=0)) |
| except Exception: |
| existing_list = [] |
| existing = {t["name"]: t["id"] for t in existing_list if t.get("name") and t.get("id")} |
|
|
| for tool_def in tools: |
| tname = tool_def.get("name", "") |
| if tname in existing: |
| registered_ids.append(existing[tname]) |
| continue |
|
|
| payload = { |
| "tool": { |
| "name": tname, |
| "description": tool_def.get("description", ""), |
| "inputSchema": tool_def.get("inputSchema", {"type": "object", "properties": {}}), |
| "integration_type": "REST", |
| "request_type": "POST", |
| "url": f"http://{MCP_TOOL_HOST}:{port}/rpc", |
| "tags": ["homepilot"], |
| }, |
| "team_id": None, |
| } |
| try: |
| result = await _post(client, forge_url, "/tools", json=payload) |
| tid = result.get("id") or result.get("tool_id") if isinstance(result, dict) else None |
| if tid: |
| registered_ids.append(tid) |
| except Exception as exc: |
| logger.warning("Failed to register tool '%s': %s", tname, exc) |
|
|
| return registered_ids |
|
|
| async def _deactivate_tools_in_forge( |
| self, |
| port: int, |
| forge_url: str, |
| auth_user: str = "admin", |
| auth_pass: str = "changeme", |
| bearer_token: Optional[str] = None, |
| ) -> int: |
| """Deactivate tools from this server in Forge. Returns count deactivated.""" |
| from .sync_service import _acquire_jwt, _safe_list, _get |
|
|
| deactivated = 0 |
| expected_url = f"http://{MCP_TOOL_HOST}:{port}/rpc" |
|
|
| async with httpx.AsyncClient(headers={"Content-Type": "application/json"}, timeout=30.0) as client: |
| if bearer_token: |
| client.headers["Authorization"] = f"Bearer {bearer_token}" |
| else: |
| token = await _acquire_jwt(client, forge_url, auth_user, auth_pass) |
| if token: |
| client.headers["Authorization"] = f"Bearer {token}" |
| else: |
| client.auth = httpx.BasicAuth(auth_user, auth_pass) |
|
|
| try: |
| tools_list = _safe_list(await _get(client, forge_url, "/tools", limit=0)) |
| except Exception: |
| tools_list = [] |
|
|
| for t in tools_list: |
| tool_url = str(t.get("url") or "") |
| if tool_url == expected_url and t.get("enabled") is not False: |
| tid = t.get("id") |
| if not tid: |
| continue |
| |
| try: |
| r = await client.post( |
| f"{forge_url}/tools/{tid}/state?activate=false", |
| ) |
| if r.status_code < 400: |
| deactivated += 1 |
| except Exception as exc: |
| logger.debug("Failed to deactivate tool %s: %s", tid, exc) |
|
|
| return deactivated |
|
|
| def _start_process(self, server: ServerDef) -> Optional[subprocess.Popen]: |
| """Start a uvicorn subprocess for the server.""" |
| if not server.module: |
| logger.info("Skipping process start for %s (external server)", server.id) |
| return None |
| python = self._python_path() |
| root = self._project_root() |
| env = {**os.environ, "PYTHONPATH": root} |
|
|
| try: |
| proc = subprocess.Popen( |
| [python, "-m", "uvicorn", server.module, |
| "--host", "127.0.0.1", "--port", str(server.port), |
| "--log-level", "warning"], |
| env=env, |
| stdout=subprocess.DEVNULL, |
| stderr=subprocess.DEVNULL, |
| ) |
| self._processes[server.id] = proc |
| logger.info("Started %s (pid=%d, port=%d)", server.id, proc.pid, server.port) |
| return proc |
| except Exception as exc: |
| logger.error("Failed to start %s: %s", server.id, exc) |
| return None |
|
|
| def _stop_process(self, server_id: str) -> bool: |
| """Stop a managed subprocess.""" |
| proc = self._processes.pop(server_id, None) |
| if proc is None: |
| return False |
| try: |
| proc.send_signal(signal.SIGTERM) |
| proc.wait(timeout=5) |
| except subprocess.TimeoutExpired: |
| proc.kill() |
| proc.wait(timeout=2) |
| except Exception as exc: |
| logger.warning("Error stopping %s: %s", server_id, exc) |
| logger.info("Stopped %s", server_id) |
| return True |
|
|
| async def server_healthy(self, server_id: str) -> bool: |
| """Check if a server is responding on its port.""" |
| server = self.get_server(server_id) |
| if not server: |
| return False |
| try: |
| async with httpx.AsyncClient(timeout=2.0) as c: |
| r = await c.get(f"http://{MCP_TOOL_HOST}:{server.port}/health") |
| return r.status_code == 200 |
| except Exception: |
| return False |
|
|
| async def install( |
| self, |
| server_id: str, |
| forge_url: str, |
| auth_user: str = "admin", |
| auth_pass: str = "changeme", |
| bearer_token: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """Install an optional MCP server: start process, register tools, update state.""" |
| server = self.get_server(server_id) |
| if not server: |
| return {"ok": False, "error": f"Unknown server: {server_id}"} |
| if server.is_core: |
| return {"ok": False, "error": f"'{server_id}' is a core server (always running)"} |
| if self.is_installed(server_id): |
| |
| healthy = await self.server_healthy(server_id) |
| if healthy: |
| return {"ok": True, "status": "already_installed", "healthy": True} |
| |
| self._stop_process(server_id) |
|
|
| |
| proc = self._start_process(server) |
| if not proc: |
| return {"ok": False, "error": f"Failed to start process for {server_id}"} |
|
|
| |
| healthy = await self._check_health(server.port, timeout=12) |
| if not healthy: |
| self._stop_process(server_id) |
| return {"ok": False, "error": f"Server {server_id} did not become healthy within 12s"} |
|
|
| |
| tools = await self._discover_tools(server.port) |
|
|
| |
| tool_ids = await self._register_tools_in_forge( |
| tools, server.port, forge_url, auth_user, auth_pass, bearer_token, |
| ) |
|
|
| |
| state = _read_installed() |
| ids = {e["id"] for e in state.get("installed", [])} |
| if server_id not in ids: |
| state.setdefault("installed", []).append({ |
| "id": server_id, |
| "installed_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), |
| }) |
| _write_installed(state) |
|
|
| return { |
| "ok": True, |
| "status": "installed", |
| "server_id": server_id, |
| "port": server.port, |
| "tools_discovered": len(tools), |
| "tools_registered": len(tool_ids), |
| "healthy": True, |
| } |
|
|
| async def uninstall( |
| self, |
| server_id: str, |
| forge_url: str, |
| auth_user: str = "admin", |
| auth_pass: str = "changeme", |
| bearer_token: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """Uninstall an optional MCP server: deactivate tools, stop process, update state.""" |
| server = self.get_server(server_id) |
| if not server: |
| return {"ok": False, "error": f"Unknown server: {server_id}"} |
| if server.is_core: |
| return {"ok": False, "error": f"'{server_id}' is a core server and cannot be uninstalled"} |
|
|
| |
| deactivated = await self._deactivate_tools_in_forge( |
| server.port, forge_url, auth_user, auth_pass, bearer_token, |
| ) |
|
|
| |
| self._stop_process(server_id) |
|
|
| |
| state = _read_installed() |
| state["installed"] = [e for e in state.get("installed", []) if e.get("id") != server_id] |
| _write_installed(state) |
|
|
| return { |
| "ok": True, |
| "status": "uninstalled", |
| "server_id": server_id, |
| "tools_deactivated": deactivated, |
| } |
|
|
| async def get_available(self) -> List[Dict[str, Any]]: |
| """Return all servers (core + optional + external) with their current status.""" |
| installed_set = set(self.installed_ids()) |
| result: List[Dict[str, Any]] = [] |
|
|
| for server in self._core + self._optional: |
| healthy = await self.server_healthy(server.id) |
| entry = server.to_dict() |
| entry["installed"] = server.is_core or server.id in installed_set |
| entry["healthy"] = healthy |
| entry["status"] = ( |
| "running" if healthy |
| else "installed" if entry["installed"] |
| else "available" |
| ) |
| result.append(entry) |
|
|
| |
| result.extend(await self._get_external_entries()) |
|
|
| return result |
|
|
| async def _get_external_entries(self) -> List[Dict[str, Any]]: |
| """Build entries for external servers from the external registry.""" |
| from .mcp_installer import _read_external_registry |
|
|
| reg = _read_external_registry() |
| entries: List[Dict[str, Any]] = [] |
| seen_names = set() |
|
|
| for srv in reg.get("servers", []): |
| name = srv.get("name", "") |
| if not name or name in seen_names: |
| continue |
| seen_names.add(name) |
|
|
| port = srv.get("port", 0) |
| is_installed = srv.get("status") == "installed" |
|
|
| |
| healthy = False |
| if is_installed and port: |
| healthy = await self._check_health(port, timeout=3) |
|
|
| entries.append({ |
| "id": name, |
| "port": port, |
| "module": "", |
| "label": _external_label(name), |
| "description": f"External MCP server ({srv.get('git_url', 'community')})", |
| "category": "external", |
| "icon": "package", |
| "requires_config": None, |
| "is_core": False, |
| "source_type": "external", |
| "git_url": srv.get("git_url", ""), |
| "install_path": srv.get("install_path", ""), |
| "tools_discovered": srv.get("tools_discovered", 0), |
| "installed_at": srv.get("installed_at", ""), |
| "installed": is_installed, |
| "healthy": healthy, |
| "status": ( |
| "running" if healthy |
| else "installed" if is_installed |
| else "available" |
| ), |
| }) |
|
|
| return entries |
|
|
| async def auto_start_core(self) -> List[str]: |
| """Start core MCP servers if they aren't already running. |
| |
| Core servers are normally started by agentic-start.sh, but if the |
| backend is launched independently (e.g. ``uvicorn app.main:app``) |
| or if the shell script failed, this ensures they come up. |
| """ |
| started: List[str] = [] |
| for server in self._core: |
| healthy = await self.server_healthy(server.id) |
| if healthy: |
| continue |
| proc = self._start_process(server) |
| if proc: |
| started.append(server.id) |
| logger.info("Auto-started core server %s on port %d", server.id, server.port) |
| if started: |
| |
| |
| |
| for sid in list(started): |
| healthy = await self._check_health( |
| self._all[sid].port, timeout=10, |
| ) |
| if not healthy: |
| logger.warning("Core server %s did not become healthy after auto-start", sid) |
| return started |
|
|
| async def auto_start_installed(self) -> List[str]: |
| """Start all previously installed optional servers. Called on backend startup.""" |
| started: List[str] = [] |
| for server_id in self.installed_ids(): |
| server = self.get_server(server_id) |
| if not server or server.is_core: |
| continue |
| |
| healthy = await self.server_healthy(server_id) |
| if healthy: |
| continue |
| proc = self._start_process(server) |
| if proc: |
| started.append(server_id) |
| |
| if started: |
| await asyncio.sleep(2) |
| return started |
|
|
| async def auto_start_external(self) -> List[str]: |
| """Start all previously installed external/community MCP servers. |
| |
| Reads community/external/registry.json and restarts any server |
| with status="installed". These are servers installed via persona |
| import (external git repos or community bundles) that are not in |
| the server_catalog.yaml but still need to run for their personas |
| to work. |
| |
| Uses the same robust startup logic as the installer |
| (_start_external_server) including: |
| - server-local .venv detection |
| - pyproject.toml entry-point detection |
| - environment variable injection (.env files, TEAMS_MCP_* vars) |
| - log-file capture for debugging |
| """ |
| started: List[str] = [] |
| root = Path(self._project_root()) |
|
|
| |
| ext_path = root / "community" / "external" / "registry.json" |
| if not ext_path.exists(): |
| return started |
|
|
| try: |
| reg = json.loads(ext_path.read_text()) |
| except Exception: |
| return started |
|
|
| for entry in reg.get("servers", []): |
| name = entry.get("name", "") |
| port = entry.get("port") |
| install_path = entry.get("install_path", "") |
| status = entry.get("status", "") |
|
|
| if not name or not port or status != "installed": |
| continue |
|
|
| |
| try: |
| async with httpx.AsyncClient(timeout=2.0) as c: |
| r = await c.get(f"http://{MCP_TOOL_HOST}:{port}/health") |
| if r.status_code == 200: |
| continue |
| except Exception: |
| pass |
|
|
| |
| |
| server_dir = Path(install_path) if install_path else None |
| if not server_dir or not server_dir.is_dir(): |
| logger.warning("External server %s install_path not found: %s", name, install_path) |
| continue |
|
|
| |
| server_python = server_dir / ".venv" / "bin" / "python" |
| backend_python = root / "backend" / ".venv" / "bin" / "python" |
| if server_python.is_file(): |
| python = str(server_python) |
| elif backend_python.is_file(): |
| python = str(backend_python) |
| else: |
| python = sys.executable |
|
|
| |
| pp_parts = [str(server_dir)] |
| src_dir = server_dir / "src" |
| if src_dir.is_dir(): |
| pp_parts.append(str(src_dir)) |
| existing_pp = os.environ.get("PYTHONPATH", "") |
| if existing_pp: |
| pp_parts.append(existing_pp) |
| env = {**os.environ, "PYTHONPATH": os.pathsep.join(pp_parts)} |
|
|
| |
| |
| |
| |
| |
| app_main = server_dir / "app" / "main.py" |
| app_py = server_dir / "app.py" |
| pyproject = server_dir / "pyproject.toml" |
| cmd: list[str] = [] |
|
|
| if app_main.exists(): |
| cmd = [python, "-m", "app.main", "--http", |
| "--host", "127.0.0.1", "--port", str(port)] |
| elif pyproject.exists(): |
| try: |
| from .mcp_installer import _detect_pyproject_app |
| module_app = _detect_pyproject_app(server_dir, name) |
| except ImportError: |
| module_app = None |
| if module_app: |
| cmd = [python, "-m", "uvicorn", module_app, |
| "--host", "127.0.0.1", "--port", str(port), |
| "--log-level", "info"] |
| |
| env[f"{name.upper().replace('-', '_')}_PORT"] = str(port) |
| env["TEAMS_MCP_PORT"] = str(port) |
| env["TEAMS_MCP_HOST"] = "127.0.0.1" |
| if "TEAMS_MCP_TOKEN_KEY" not in env: |
| import base64 |
| env["TEAMS_MCP_TOKEN_KEY"] = base64.urlsafe_b64encode( |
| os.urandom(32) |
| ).decode() |
| elif app_py.exists(): |
| cmd = [python, "-m", "uvicorn", "app:app", |
| "--host", "127.0.0.1", "--port", str(port), |
| "--log-level", "warning"] |
| else: |
| |
| manifest_path = server_dir / "bundle_manifest.json" |
| module = "" |
| if manifest_path.exists(): |
| try: |
| bm = json.loads(manifest_path.read_text()) |
| module = bm.get("mcp_server", {}).get("module", "") |
| except Exception: |
| pass |
| if module: |
| cmd = [python, "-m", "uvicorn", module, |
| "--host", "127.0.0.1", "--port", str(port), |
| "--log-level", "warning"] |
| else: |
| logger.warning("Cannot detect entry point for external server %s at %s", name, server_dir) |
| continue |
|
|
| |
| try: |
| from .mcp_installer import _auto_populate_env |
| _auto_populate_env(server_dir, name, env) |
| except ImportError: |
| pass |
|
|
| if not cmd: |
| continue |
|
|
| |
| log_dir = root / "community" / "external" / "install_logs" |
| log_dir.mkdir(parents=True, exist_ok=True) |
| stdout_log = log_dir / f"{name}.stdout.log" |
| stderr_log = log_dir / f"{name}.stderr.log" |
|
|
| try: |
| stdout_f = open(stdout_log, "a", encoding="utf-8") |
| stderr_f = open(stderr_log, "a", encoding="utf-8") |
| proc = subprocess.Popen( |
| cmd, env=env, cwd=str(server_dir), |
| stdout=stdout_f, stderr=stderr_f, |
| ) |
| self._processes[name] = proc |
| started.append(name) |
| logger.info("Auto-started external server %s (pid=%d, port=%d)", name, proc.pid, port) |
| except Exception as exc: |
| logger.warning("Failed to auto-start external server %s: %s", name, exc) |
|
|
| if started: |
| await asyncio.sleep(3) |
| |
| for name in list(started): |
| entry = next((e for e in reg.get("servers", []) if e.get("name") == name), None) |
| if entry: |
| try: |
| async with httpx.AsyncClient(timeout=2.0) as c: |
| r = await c.get(f"http://{MCP_TOOL_HOST}:{entry['port']}/health") |
| if r.status_code != 200: |
| logger.warning("External server %s did not become healthy after auto-start", name) |
| except Exception: |
| logger.warning("External server %s did not become healthy after auto-start", name) |
|
|
| return started |
|
|
| async def ensure_all_running(self) -> Dict[str, List[str]]: |
| """Start all servers that should be running (core + installed optional + external). |
| |
| Called during backend startup to ensure the full agentic stack is up, |
| regardless of whether agentic-start.sh ran. |
| """ |
| core_started = await self.auto_start_core() |
| optional_started = await self.auto_start_installed() |
| external_started = await self.auto_start_external() |
| if core_started: |
| logger.info("Auto-started %d core servers: %s", len(core_started), core_started) |
| if optional_started: |
| logger.info("Auto-started %d optional servers: %s", len(optional_started), optional_started) |
| if external_started: |
| logger.info("Auto-started %d external servers: %s", len(external_started), external_started) |
| return {"core": core_started, "optional": optional_started, "external": external_started} |
|
|
| async def uninstall_external( |
| self, |
| server_name: str, |
| forge_url: str, |
| auth_user: str = "admin", |
| auth_pass: str = "changeme", |
| bearer_token: Optional[str] = None, |
| ) -> Dict[str, Any]: |
| """Uninstall an external/community MCP server: stop process, deactivate tools, update registry.""" |
| root = Path(self._project_root()) |
| ext_path = root / "community" / "external" / "registry.json" |
|
|
| if not ext_path.exists(): |
| return {"ok": False, "error": f"External server '{server_name}' not found"} |
|
|
| try: |
| reg = json.loads(ext_path.read_text()) |
| except Exception: |
| return {"ok": False, "error": "Failed to read external registry"} |
|
|
| entry = next((s for s in reg.get("servers", []) if s.get("name") == server_name), None) |
| if not entry: |
| return {"ok": False, "error": f"External server '{server_name}' not in registry"} |
|
|
| port = entry.get("port") |
|
|
| |
| deactivated = 0 |
| if port: |
| deactivated = await self._deactivate_tools_in_forge( |
| port, forge_url, auth_user, auth_pass, bearer_token, |
| ) |
|
|
| |
| self._stop_process(server_name) |
|
|
| |
| for s in reg.get("servers", []): |
| if s.get("name") == server_name: |
| s["status"] = "uninstalled" |
| break |
| ext_path.write_text(json.dumps(reg, indent=2)) |
|
|
| logger.info("Uninstalled external server %s (port %s, %d tools deactivated)", server_name, port, deactivated) |
|
|
| return { |
| "ok": True, |
| "status": "uninstalled", |
| "server_name": server_name, |
| "tools_deactivated": deactivated, |
| } |
|
|
| def shutdown_all(self) -> None: |
| """Stop all managed subprocesses (called on backend shutdown). |
| |
| Sends SIGTERM to every child process, waits up to 5 s for each, |
| then SIGKILL any survivors. Logs a summary of what was stopped. |
| """ |
| if not self._processes: |
| return |
|
|
| names = list(self._processes.keys()) |
| logger.info("Stopping %d managed server(s): %s", len(names), names) |
|
|
| |
| for server_id, proc in list(self._processes.items()): |
| try: |
| proc.send_signal(signal.SIGTERM) |
| except Exception: |
| pass |
|
|
| |
| import time |
| deadline = time.monotonic() + 5 |
| for server_id in list(names): |
| proc = self._processes.get(server_id) |
| if proc is None: |
| continue |
| remaining = max(0.1, deadline - time.monotonic()) |
| try: |
| proc.wait(timeout=remaining) |
| except subprocess.TimeoutExpired: |
| pass |
|
|
| |
| for server_id in list(names): |
| proc = self._processes.pop(server_id, None) |
| if proc is None: |
| continue |
| if proc.poll() is None: |
| try: |
| proc.kill() |
| proc.wait(timeout=2) |
| logger.info("Force-killed %s (pid=%d)", server_id, proc.pid) |
| except Exception: |
| pass |
| else: |
| logger.info("Stopped %s (pid=%d)", server_id, proc.pid) |
|
|
| logger.info("All managed servers stopped") |
|
|
|
|
| |
| _manager: Optional[ServerManager] = None |
|
|
|
|
| def get_server_manager() -> ServerManager: |
| global _manager |
| if _manager is None: |
| _manager = ServerManager() |
| return _manager |
|
|