hchevva's picture
Upload 43 files
630d650 verified
import argparse
import json
import sys
from pathlib import Path
from typing import Any, Callable, Dict
from mcp_tox_calc.equations import (
CalculationError,
calculate_epa_elcr_csf,
calculate_epa_elcr_iur,
calculate_fda_ctp_elcr,
get_formula_catalog,
run_batch_cancer_risk,
validate_risk_input,
)
from mcp_tox_calc.logging import RunLogger
ToolFn = Callable[[Dict[str, Any]], Dict[str, Any]]
class ToxCalcMCPServer:
def __init__(self, run_dir: str):
self.run_dir = str(Path(run_dir))
self.logger = RunLogger(self.run_dir)
self.tools: Dict[str, Dict[str, Any]] = {
"validate_risk_input": {
"description": "Validate a row payload for deterministic cancer risk calculations.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": validate_risk_input,
},
"calculate_epa_elcr_csf": {
"description": "Compute ELCR using EPA CSF pathway.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": calculate_epa_elcr_csf,
},
"calculate_epa_elcr_iur": {
"description": "Compute ELCR using EPA IUR pathway.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": calculate_epa_elcr_iur,
},
"calculate_fda_ctp_elcr": {
"description": "Compute ELCR profile using FDA CTP-style constituent aggregation.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": calculate_fda_ctp_elcr,
},
"run_batch_cancer_risk": {
"description": "Run deterministic cancer risk calculations across a batch of rows.",
"inputSchema": {
"type": "object",
"properties": {"rows": {"type": "array", "items": {"type": "object"}}},
"required": ["rows"],
"additionalProperties": True,
},
"fn": run_batch_cancer_risk,
},
"get_formula_catalog": {
"description": "Return available formula catalog and version.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": lambda _args: get_formula_catalog(),
},
}
def handle_request(self, req: Dict[str, Any]) -> Dict[str, Any]:
method = req.get("method")
req_id = req.get("id")
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2024-11-05",
"serverInfo": {"name": "toxra-calc-mcp", "version": "0.1.0"},
"capabilities": {"tools": {}},
},
}
if method == "tools/list":
tools = []
for name, meta in self.tools.items():
tools.append(
{
"name": name,
"description": meta["description"],
"inputSchema": meta["inputSchema"],
}
)
return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": tools}}
if method == "tools/call":
params = req.get("params", {}) or {}
name = params.get("name")
args = params.get("arguments", {}) or {}
if name not in self.tools:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32602, "message": f"Unknown tool: {name}"},
}
fn: ToolFn = self.tools[name]["fn"]
try:
result = fn(args)
if not isinstance(result, dict):
result = {"value": result}
# Attach structured log reference per tool call.
log_ref = self.logger.log_event(name, args, result)
result.setdefault("log_ref", log_ref)
if name == "run_batch_cancer_risk":
rows = result.get("rows", []) if isinstance(result.get("rows", []), list) else []
for row in rows:
if isinstance(row, dict):
row.setdefault("formula_id", "calculate_fda_ctp_elcr")
row.setdefault("formula_version", result.get("formula_version", "1.0.0"))
row.setdefault("inputs_normalized", {})
row.setdefault("unit_conversions", [])
row.setdefault("result_value", row.get("fda_ctp_elcr", ""))
row.setdefault("risk_tier", row.get("risk_tier", "unknown"))
row.setdefault("warnings", row.get("warnings", []))
row.setdefault("log_ref", log_ref)
report_path = self.logger.write_report(result.get("summary", {}), rows)
result["artifacts"] = {
"run_dir": self.run_dir,
"log_jsonl": str(self.logger.log_path),
"report_md": str(report_path),
}
content = [{"type": "json", "json": result}]
return {"jsonrpc": "2.0", "id": req_id, "result": {"content": content}}
except CalculationError as exc:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32001, "message": str(exc)},
}
except Exception as exc:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32099, "message": f"Unexpected tool error: {exc}"},
}
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
def _serve_stdio(server: ToxCalcMCPServer) -> None:
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
req = json.loads(line)
resp = server.handle_request(req)
except Exception as exc:
resp = {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": f"Parse/dispatch error: {exc}"},
}
sys.stdout.write(json.dumps(resp) + "\n")
sys.stdout.flush()
def main() -> None:
parser = argparse.ArgumentParser(description="Local MCP server for deterministic toxicology calculations")
parser.add_argument("--stdio", action="store_true", default=False, help="Run stdio JSON-RPC loop")
parser.add_argument("--run-dir", default="runs/mcp_server", help="Run artifact directory")
args = parser.parse_args()
server = ToxCalcMCPServer(run_dir=args.run_dir)
_serve_stdio(server)
if __name__ == "__main__":
main()