Spaces:
Runtime error
Runtime error
File size: 7,286 Bytes
630d650 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | import argparse
import json
import sys
from pathlib import Path
from typing import Any, Callable, Dict
from mcp_tox_calc.equations import (
CalculationError,
calculate_epa_elcr_csf,
calculate_epa_elcr_iur,
calculate_fda_ctp_elcr,
get_formula_catalog,
run_batch_cancer_risk,
validate_risk_input,
)
from mcp_tox_calc.logging import RunLogger
ToolFn = Callable[[Dict[str, Any]], Dict[str, Any]]
class ToxCalcMCPServer:
def __init__(self, run_dir: str):
self.run_dir = str(Path(run_dir))
self.logger = RunLogger(self.run_dir)
self.tools: Dict[str, Dict[str, Any]] = {
"validate_risk_input": {
"description": "Validate a row payload for deterministic cancer risk calculations.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": validate_risk_input,
},
"calculate_epa_elcr_csf": {
"description": "Compute ELCR using EPA CSF pathway.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": calculate_epa_elcr_csf,
},
"calculate_epa_elcr_iur": {
"description": "Compute ELCR using EPA IUR pathway.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": calculate_epa_elcr_iur,
},
"calculate_fda_ctp_elcr": {
"description": "Compute ELCR profile using FDA CTP-style constituent aggregation.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": calculate_fda_ctp_elcr,
},
"run_batch_cancer_risk": {
"description": "Run deterministic cancer risk calculations across a batch of rows.",
"inputSchema": {
"type": "object",
"properties": {"rows": {"type": "array", "items": {"type": "object"}}},
"required": ["rows"],
"additionalProperties": True,
},
"fn": run_batch_cancer_risk,
},
"get_formula_catalog": {
"description": "Return available formula catalog and version.",
"inputSchema": {"type": "object", "properties": {}, "additionalProperties": True},
"fn": lambda _args: get_formula_catalog(),
},
}
def handle_request(self, req: Dict[str, Any]) -> Dict[str, Any]:
method = req.get("method")
req_id = req.get("id")
if method == "initialize":
return {
"jsonrpc": "2.0",
"id": req_id,
"result": {
"protocolVersion": "2024-11-05",
"serverInfo": {"name": "toxra-calc-mcp", "version": "0.1.0"},
"capabilities": {"tools": {}},
},
}
if method == "tools/list":
tools = []
for name, meta in self.tools.items():
tools.append(
{
"name": name,
"description": meta["description"],
"inputSchema": meta["inputSchema"],
}
)
return {"jsonrpc": "2.0", "id": req_id, "result": {"tools": tools}}
if method == "tools/call":
params = req.get("params", {}) or {}
name = params.get("name")
args = params.get("arguments", {}) or {}
if name not in self.tools:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32602, "message": f"Unknown tool: {name}"},
}
fn: ToolFn = self.tools[name]["fn"]
try:
result = fn(args)
if not isinstance(result, dict):
result = {"value": result}
# Attach structured log reference per tool call.
log_ref = self.logger.log_event(name, args, result)
result.setdefault("log_ref", log_ref)
if name == "run_batch_cancer_risk":
rows = result.get("rows", []) if isinstance(result.get("rows", []), list) else []
for row in rows:
if isinstance(row, dict):
row.setdefault("formula_id", "calculate_fda_ctp_elcr")
row.setdefault("formula_version", result.get("formula_version", "1.0.0"))
row.setdefault("inputs_normalized", {})
row.setdefault("unit_conversions", [])
row.setdefault("result_value", row.get("fda_ctp_elcr", ""))
row.setdefault("risk_tier", row.get("risk_tier", "unknown"))
row.setdefault("warnings", row.get("warnings", []))
row.setdefault("log_ref", log_ref)
report_path = self.logger.write_report(result.get("summary", {}), rows)
result["artifacts"] = {
"run_dir": self.run_dir,
"log_jsonl": str(self.logger.log_path),
"report_md": str(report_path),
}
content = [{"type": "json", "json": result}]
return {"jsonrpc": "2.0", "id": req_id, "result": {"content": content}}
except CalculationError as exc:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32001, "message": str(exc)},
}
except Exception as exc:
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32099, "message": f"Unexpected tool error: {exc}"},
}
return {
"jsonrpc": "2.0",
"id": req_id,
"error": {"code": -32601, "message": f"Method not found: {method}"},
}
def _serve_stdio(server: ToxCalcMCPServer) -> None:
for line in sys.stdin:
line = line.strip()
if not line:
continue
try:
req = json.loads(line)
resp = server.handle_request(req)
except Exception as exc:
resp = {
"jsonrpc": "2.0",
"id": None,
"error": {"code": -32700, "message": f"Parse/dispatch error: {exc}"},
}
sys.stdout.write(json.dumps(resp) + "\n")
sys.stdout.flush()
def main() -> None:
parser = argparse.ArgumentParser(description="Local MCP server for deterministic toxicology calculations")
parser.add_argument("--stdio", action="store_true", default=False, help="Run stdio JSON-RPC loop")
parser.add_argument("--run-dir", default="runs/mcp_server", help="Run artifact directory")
args = parser.parse_args()
server = ToxCalcMCPServer(run_dir=args.run_dir)
_serve_stdio(server)
if __name__ == "__main__":
main()
|