NLP_Project / mcp_tox_calc /equations.py
hchevva's picture
Upload 43 files
630d650 verified
from typing import Any, Dict, List, Optional
from toxra_core.contracts import classify_risk_tier
from .units import (
UnitError,
normalize_air_concentration,
normalize_csf,
normalize_iur,
normalize_oral_exposure,
normalize_route,
)
FORMULA_VERSION = "1.0.0"
class CalculationError(ValueError):
pass
def _base_result(formula_id: str) -> Dict[str, Any]:
return {
"formula_id": formula_id,
"formula_version": FORMULA_VERSION,
"inputs_normalized": {},
"unit_conversions": [],
"result_value": None,
"risk_tier": "unknown",
"warnings": [],
"log_ref": "",
}
def validate_risk_input(payload: Dict[str, Any]) -> Dict[str, Any]:
result = _base_result("validate_risk_input")
errors: List[str] = []
warnings: List[str] = []
try:
route = normalize_route(payload.get("route"))
except Exception as exc:
route = ""
errors.append(str(exc))
has_csf = payload.get("csf_value") not in (None, "") and payload.get("exposure_value") not in (None, "")
has_iur = payload.get("iur_value") not in (None, "") and payload.get("air_conc_value") not in (None, "")
if route == "oral" and not has_csf:
errors.append("Oral route requires csf_value and exposure_value for CSF pathway.")
if route == "inhalation" and not (has_iur or has_csf):
errors.append("Inhalation route requires iur_value+air_conc_value or csf_value+exposure_value.")
if has_csf and payload.get("csf_unit") in (None, ""):
warnings.append("csf_unit missing; assuming standard per (mg/kg-day).")
if has_iur and payload.get("air_conc_unit") in (None, ""):
warnings.append("air_conc_unit missing; assuming ug/m3.")
result["warnings"] = warnings
result["valid"] = len(errors) == 0
result["errors"] = errors
result["result_value"] = 1.0 if result["valid"] else 0.0
result["risk_tier"] = "unknown"
return result
def calculate_epa_elcr_csf(payload: Dict[str, Any]) -> Dict[str, Any]:
result = _base_result("calculate_epa_elcr_csf")
try:
normalize_route(payload.get("route"))
exp_norm = normalize_oral_exposure(
payload.get("exposure_value"),
payload.get("exposure_unit"),
payload.get("body_weight_kg"),
)
csf_norm = normalize_csf(payload.get("csf_value"), payload.get("csf_unit"))
elcr = exp_norm["value_mg_per_kg_day"] * csf_norm["value_per_mg_per_kg_day"]
result["inputs_normalized"] = {
"cdi_mg_per_kg_day": exp_norm["value_mg_per_kg_day"],
"csf_per_mg_per_kg_day": csf_norm["value_per_mg_per_kg_day"],
}
result["unit_conversions"] = exp_norm["conversions"] + csf_norm["conversions"]
result["result_value"] = float(elcr)
result["risk_tier"] = classify_risk_tier(result["result_value"])
return result
except (UnitError, ValueError) as exc:
raise CalculationError(str(exc)) from exc
def calculate_epa_elcr_iur(payload: Dict[str, Any]) -> Dict[str, Any]:
result = _base_result("calculate_epa_elcr_iur")
try:
route = normalize_route(payload.get("route"))
if route != "inhalation":
result["warnings"].append("IUR calculation is generally applicable to inhalation route.")
conc_norm = normalize_air_concentration(payload.get("air_conc_value"), payload.get("air_conc_unit"))
iur_norm = normalize_iur(payload.get("iur_value"), payload.get("iur_unit"))
elcr = conc_norm["value_ug_per_m3"] * iur_norm["value_per_ug_per_m3"]
result["inputs_normalized"] = {
"air_conc_ug_per_m3": conc_norm["value_ug_per_m3"],
"iur_per_ug_per_m3": iur_norm["value_per_ug_per_m3"],
}
result["unit_conversions"] = conc_norm["conversions"] + iur_norm["conversions"]
result["result_value"] = float(elcr)
result["risk_tier"] = classify_risk_tier(result["result_value"])
return result
except (UnitError, ValueError) as exc:
raise CalculationError(str(exc)) from exc
def calculate_fda_ctp_elcr(payload: Dict[str, Any]) -> Dict[str, Any]:
result = _base_result("calculate_fda_ctp_elcr")
# Supports either a single row payload or a multi-constituent list under "constituents".
components: List[Dict[str, Any]] = []
total = 0.0
rows: List[Dict[str, Any]]
if isinstance(payload.get("constituents"), list) and payload.get("constituents"):
rows = [x for x in payload.get("constituents", []) if isinstance(x, dict)]
else:
rows = [payload]
for row in rows:
comp: Dict[str, Any] = {
"chemical_name": row.get("chemical_name", ""),
"route": row.get("route", ""),
"csf_result": None,
"iur_result": None,
"component_total": 0.0,
}
if row.get("csf_value") not in (None, "") and row.get("exposure_value") not in (None, ""):
csf_res = calculate_epa_elcr_csf(row)
comp["csf_result"] = csf_res
comp["component_total"] += float(csf_res["result_value"] or 0.0)
if row.get("iur_value") not in (None, "") and row.get("air_conc_value") not in (None, ""):
iur_res = calculate_epa_elcr_iur(row)
comp["iur_result"] = iur_res
comp["component_total"] += float(iur_res["result_value"] or 0.0)
total += comp["component_total"]
components.append(comp)
result["inputs_normalized"] = {"component_count": len(components)}
result["unit_conversions"] = []
result["result_value"] = float(total)
result["risk_tier"] = classify_risk_tier(result["result_value"])
result["component_results"] = components
return result
def get_formula_catalog() -> Dict[str, Any]:
return {
"formula_id": "get_formula_catalog",
"formula_version": FORMULA_VERSION,
"inputs_normalized": {},
"unit_conversions": [],
"result_value": None,
"risk_tier": "unknown",
"warnings": [],
"log_ref": "",
"formulas": [
{
"id": "calculate_epa_elcr_csf",
"equation": "ELCR = CDI (mg/kg-day) * CSF ((mg/kg-day)^-1)",
"notes": "Oral pathway using cancer slope factor.",
},
{
"id": "calculate_epa_elcr_iur",
"equation": "ELCR = Air Concentration (ug/m3) * IUR ((ug/m3)^-1)",
"notes": "Inhalation pathway using inhalation unit risk.",
},
{
"id": "calculate_fda_ctp_elcr",
"equation": "ELCR_total = sum(component ELCR)",
"notes": "Constituent-level aggregation wrapper for CTP-style profile assessment.",
},
],
}
def run_batch_cancer_risk(payload: Dict[str, Any]) -> Dict[str, Any]:
out = _base_result("run_batch_cancer_risk")
rows = payload.get("rows", []) if isinstance(payload, dict) else []
if not isinstance(rows, list):
raise CalculationError("rows must be a list of objects")
row_results: List[Dict[str, Any]] = []
n_ok = 0
n_err = 0
for i, row in enumerate(rows):
if not isinstance(row, dict):
row_results.append(
{
"row_index": i,
"status": "error",
"error": "row must be an object",
}
)
n_err += 1
continue
v = validate_risk_input(row)
if not v.get("valid", False):
row_results.append(
{
"row_index": i,
"record_id": row.get("record_id", ""),
"chemical_name": row.get("chemical_name", ""),
"status": "error",
"errors": v.get("errors", []),
"warnings": v.get("warnings", []),
}
)
n_err += 1
continue
try:
csf_res: Optional[Dict[str, Any]] = None
iur_res: Optional[Dict[str, Any]] = None
if row.get("csf_value") not in (None, "") and row.get("exposure_value") not in (None, ""):
csf_res = calculate_epa_elcr_csf(row)
if row.get("iur_value") not in (None, "") and row.get("air_conc_value") not in (None, ""):
iur_res = calculate_epa_elcr_iur(row)
fda_res = calculate_fda_ctp_elcr(row)
row_out = {
"row_index": i,
"record_id": row.get("record_id", ""),
"chemical_name": row.get("chemical_name", ""),
"casrn": row.get("casrn", ""),
"route": row.get("route", ""),
"status": "ok",
"epa_elcr_csf": (csf_res or {}).get("result_value", ""),
"epa_elcr_iur": (iur_res or {}).get("result_value", ""),
"fda_ctp_elcr": fda_res.get("result_value", ""),
"risk_tier": fda_res.get("risk_tier", "unknown"),
"formula_id": fda_res.get("formula_id", "calculate_fda_ctp_elcr"),
"formula_version": fda_res.get("formula_version", FORMULA_VERSION),
"inputs_normalized": fda_res.get("inputs_normalized", {}),
"unit_conversions": fda_res.get("unit_conversions", []),
"warnings": (v.get("warnings", []) + fda_res.get("warnings", [])),
"log_ref": "",
}
row_results.append(row_out)
n_ok += 1
except Exception as exc:
row_results.append(
{
"row_index": i,
"record_id": row.get("record_id", ""),
"chemical_name": row.get("chemical_name", ""),
"status": "error",
"errors": [str(exc)],
}
)
n_err += 1
out["rows"] = row_results
out["summary"] = {
"total_rows": len(rows),
"ok_rows": n_ok,
"error_rows": n_err,
}
out["result_value"] = float(n_ok)
out["risk_tier"] = "unknown"
return out