NLP_Project / toxra_core /calculation_client.py
hchevva's picture
Upload 43 files
630d650 verified
import json
import subprocess
import sys
from typing import Any, Dict, List, Optional
class MCPClientError(RuntimeError):
pass
class MCPCalculationClient:
def __init__(
self,
run_dir: str,
python_executable: Optional[str] = None,
server_script: Optional[str] = None,
):
self.run_dir = str(run_dir)
self.python_executable = python_executable or sys.executable
if server_script:
self.server_script = str(server_script)
else:
self.server_script = None
self._proc: Optional[subprocess.Popen] = None
self._id = 0
def start(self) -> None:
if self._proc is not None:
return
if self.server_script:
cmd = [
self.python_executable,
self.server_script,
"--stdio",
"--run-dir",
self.run_dir,
]
else:
cmd = [
self.python_executable,
"-m",
"mcp_tox_calc.server",
"--stdio",
"--run-dir",
self.run_dir,
]
self._proc = subprocess.Popen(
cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
)
self._request("initialize", {"protocolVersion": "2024-11-05", "clientInfo": {"name": "toxra-app", "version": "0.1.0"}})
def stop(self) -> None:
if self._proc is None:
return
try:
if self._proc.stdin:
self._proc.stdin.close()
self._proc.terminate()
self._proc.wait(timeout=3)
except Exception:
try:
self._proc.kill()
except Exception:
pass
finally:
self._proc = None
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc, tb):
self.stop()
def _request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
if self._proc is None:
raise MCPClientError("MCP server not started.")
if self._proc.stdin is None or self._proc.stdout is None:
raise MCPClientError("MCP server pipes unavailable.")
self._id += 1
req_id = self._id
request = {
"jsonrpc": "2.0",
"id": req_id,
"method": method,
"params": params,
}
self._proc.stdin.write(json.dumps(request) + "\n")
self._proc.stdin.flush()
while True:
line = self._proc.stdout.readline()
if line == "":
err = ""
if self._proc.stderr is not None:
try:
err = self._proc.stderr.read()[-1500:]
except Exception:
err = ""
raise MCPClientError(f"No response from MCP server. stderr={err}")
line = line.strip()
if not line:
continue
try:
resp = json.loads(line)
except Exception:
continue
if resp.get("id") != req_id:
continue
if "error" in resp:
raise MCPClientError(str(resp["error"]))
result = resp.get("result", {})
if not isinstance(result, dict):
return {"result": result}
return result
def list_tools(self) -> List[Dict[str, Any]]:
result = self._request("tools/list", {})
tools = result.get("tools", [])
return tools if isinstance(tools, list) else []
def call_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
result = self._request("tools/call", {"name": name, "arguments": arguments})
content = result.get("content", []) if isinstance(result, dict) else []
if isinstance(content, list) and content:
first = content[0]
if isinstance(first, dict) and first.get("type") == "json":
data = first.get("json", {})
return data if isinstance(data, dict) else {"value": data}
return result if isinstance(result, dict) else {"value": result}
def run_batch_cancer_risk(rows: List[Dict[str, Any]], run_dir: str) -> Dict[str, Any]:
with MCPCalculationClient(run_dir=run_dir) as client:
return client.call_tool("run_batch_cancer_risk", {"rows": rows})