import json import subprocess import sys from typing import Any, Dict, List, Optional class MCPClientError(RuntimeError): pass class MCPCalculationClient: def __init__( self, run_dir: str, python_executable: Optional[str] = None, server_script: Optional[str] = None, ): self.run_dir = str(run_dir) self.python_executable = python_executable or sys.executable if server_script: self.server_script = str(server_script) else: self.server_script = None self._proc: Optional[subprocess.Popen] = None self._id = 0 def start(self) -> None: if self._proc is not None: return if self.server_script: cmd = [ self.python_executable, self.server_script, "--stdio", "--run-dir", self.run_dir, ] else: cmd = [ self.python_executable, "-m", "mcp_tox_calc.server", "--stdio", "--run-dir", self.run_dir, ] self._proc = subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) self._request("initialize", {"protocolVersion": "2024-11-05", "clientInfo": {"name": "toxra-app", "version": "0.1.0"}}) def stop(self) -> None: if self._proc is None: return try: if self._proc.stdin: self._proc.stdin.close() self._proc.terminate() self._proc.wait(timeout=3) except Exception: try: self._proc.kill() except Exception: pass finally: self._proc = None def __enter__(self): self.start() return self def __exit__(self, exc_type, exc, tb): self.stop() def _request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: if self._proc is None: raise MCPClientError("MCP server not started.") if self._proc.stdin is None or self._proc.stdout is None: raise MCPClientError("MCP server pipes unavailable.") self._id += 1 req_id = self._id request = { "jsonrpc": "2.0", "id": req_id, "method": method, "params": params, } self._proc.stdin.write(json.dumps(request) + "\n") self._proc.stdin.flush() while True: line = self._proc.stdout.readline() if line == "": err = "" if self._proc.stderr is not None: try: err = self._proc.stderr.read()[-1500:] except Exception: err = "" raise MCPClientError(f"No response from MCP server. stderr={err}") line = line.strip() if not line: continue try: resp = json.loads(line) except Exception: continue if resp.get("id") != req_id: continue if "error" in resp: raise MCPClientError(str(resp["error"])) result = resp.get("result", {}) if not isinstance(result, dict): return {"result": result} return result def list_tools(self) -> List[Dict[str, Any]]: result = self._request("tools/list", {}) tools = result.get("tools", []) return tools if isinstance(tools, list) else [] def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]: result = self._request("tools/call", {"name": name, "arguments": arguments}) content = result.get("content", []) if isinstance(result, dict) else [] if isinstance(content, list) and content: first = content[0] if isinstance(first, dict) and first.get("type") == "json": data = first.get("json", {}) return data if isinstance(data, dict) else {"value": data} return result if isinstance(result, dict) else {"value": result} def run_batch_cancer_risk(rows: List[Dict[str, Any]], run_dir: str) -> Dict[str, Any]: with MCPCalculationClient(run_dir=run_dir) as client: return client.call_tool("run_batch_cancer_risk", {"rows": rows})