"""EPCC demo: classify the 15 synthetic excerpts and produce a risk register. This is the bridge from the CUAD-trained classifier to the product output the spec is asking for. The classifier predicts the CUAD-style clause label and its confidence; we map both the CUAD label and the EPCC risk area to a severity tier, compare against the playbook, and emit a structured register. """ from __future__ import annotations import json from dataclasses import dataclass from pathlib import Path from typing import Iterable import joblib import numpy as np from src.config import ( DATA_DIR, DEFAULT_ABSTAIN_THRESHOLD, MODELS_DIR, OUTPUTS_DIR, SEVERITY_TIERS, label_to_severity, ) EPCC_DATA_PATH = DATA_DIR / "epcc_synthetic.json" @dataclass class ClausePrediction: excerpt_id: str document: str text: str predicted_label: str confidence: float top3: list[tuple[str, float]] severity_from_label: str severity_from_risk_area: str epcc_risk_area: str playbook_note: str abstain: bool recommended_owner: str escalation: str # Risk-area -> recommended reviewer team. Pure routing logic; nothing learned. OWNER_BY_RISK_AREA = { "Liability cap": "Legal", "Liquidated damages": "Commercial + Legal", "Consequential damages": "Legal", "Force majeure": "Commercial", "Change in law": "Commercial + Tax", "Site conditions": "Commercial + Project controls", "Variation notice": "Contracts + Project controls", "Termination for convenience": "Legal + Commercial", "Commissioning / takeover": "Project execution + Commissioning", "Bonds and guarantees": "Treasury", "Insurance": "Insurance / Risk", "Flow-down / subcontracting": "Procurement + Legal", "Claims notice": "Contracts + Project controls", } def load_excerpts() -> tuple[list[dict], list[dict]]: payload = json.loads(EPCC_DATA_PATH.read_text()) return payload["excerpts"], payload["playbook"] def _escalation( confidence: float, severity_from_risk_area: str, abstain: bool, deviation: bool, ) -> str: if abstain: return "Route to human reviewer (low model confidence)." if severity_from_risk_area == "critical" and deviation: return "Escalate immediately: critical risk area with playbook deviation." if severity_from_risk_area == "critical": return "Mandatory human review (critical risk area)." if deviation: return "Flag for SME review (deviation from preferred position)." return "Standard review." def _playbook_deviation(excerpt: dict) -> bool: note = excerpt.get("playbook_note", "").lower() deviation_keywords = [ "deviation", "escalate", "high risk", "tight", "may unlock", "may still create", "narrow", "not standard", "transferred to contractor", "no margin", "no cost relief", ] return any(kw in note for kw in deviation_keywords) def predict_excerpts( excerpts: Iterable[dict], abstain_threshold: float = DEFAULT_ABSTAIN_THRESHOLD, ) -> list[ClausePrediction]: from src.features import EmbeddingEncoder bundle = joblib.load(MODELS_DIR / "embed_lr.joblib") clf = bundle["model"] classes = np.asarray(bundle["classes_"]) encoder = EmbeddingEncoder(model_name=bundle["encoder_name"]) texts = [e["text"] for e in excerpts] X = encoder.encode(texts, batch_size=16, show_progress=False) probs = clf.predict_proba(X) sev_map = label_to_severity() out: list[ClausePrediction] = [] for excerpt, prob in zip(excerpts, probs): order = np.argsort(prob)[::-1] top3 = [(classes[i], float(prob[i])) for i in order[:3]] pred_label = classes[order[0]] conf = float(prob[order[0]]) abstain = conf < abstain_threshold sev_from_label = sev_map.get(pred_label, "unmapped") sev_from_risk = excerpt.get("expected_severity", "moderate") deviation = _playbook_deviation(excerpt) out.append( ClausePrediction( excerpt_id=excerpt["id"], document=excerpt["document"], text=excerpt["text"], predicted_label=pred_label, confidence=round(conf, 4), top3=[(lab, round(p, 4)) for lab, p in top3], severity_from_label=sev_from_label, severity_from_risk_area=sev_from_risk, epcc_risk_area=excerpt["epcc_risk_area"], playbook_note=excerpt.get("playbook_note", ""), abstain=abstain, recommended_owner=OWNER_BY_RISK_AREA.get( excerpt["epcc_risk_area"], "Legal" ), escalation=_escalation(conf, sev_from_risk, abstain, deviation), ) ) return out def render_risk_register_md(preds: list[ClausePrediction]) -> str: lines: list[str] = [] lines.append("# EPCC Risk Register (synthetic demo packet)\n") lines.append( "Generated from the CUAD-trained clause classifier applied to the 15 " "synthetic excerpts in `data/epcc_synthetic.json`. The model predicts " "the CUAD clause type; the EPCC risk area, severity, and playbook note " "come from the curated mini-case packet. Escalation combines model " "confidence and risk-area severity.\n" ) header = ( "| ID | Doc | EPCC risk area | Predicted CUAD label | Conf | Sev (risk area) " "| Sev (predicted label) | Owner | Escalation |\n" "|---|---|---|---|---|---|---|---|---|\n" ) lines.append(header) for p in preds: lines.append( f"| {p.excerpt_id} | {p.document} | {p.epcc_risk_area} | " f"{p.predicted_label} | {p.confidence:.2f} | " f"{p.severity_from_risk_area} | {p.severity_from_label} | " f"{p.recommended_owner} | {p.escalation} |" ) lines.append("\n## Per-clause detail\n") for p in preds: lines.append(f"### {p.excerpt_id} — {p.document} \n") lines.append(f"**Clause:** {p.text}\n") lines.append( f"- Predicted CUAD label: **{p.predicted_label}** " f"(confidence {p.confidence:.2f}{', ABSTAIN' if p.abstain else ''})" ) top3_str = ", ".join(f"{l} ({c:.2f})" for l, c in p.top3) lines.append(f"- Top-3: {top3_str}") lines.append(f"- EPCC risk area: **{p.epcc_risk_area}** (severity: {p.severity_from_risk_area})") lines.append(f"- Playbook note: {p.playbook_note}") lines.append(f"- Recommended owner: {p.recommended_owner}") lines.append(f"- Escalation: {p.escalation}\n") return "\n".join(lines) def main(out_path: Path | None = None) -> Path: excerpts, _ = load_excerpts() preds = predict_excerpts(excerpts) md = render_risk_register_md(preds) if out_path is None: out_path = OUTPUTS_DIR / "epcc_risk_register.md" out_path.write_text(md) # also persist a JSON dump for the tests / Streamlit app json_path = OUTPUTS_DIR / "epcc_predictions.json" json_path.write_text( json.dumps( [ { "excerpt_id": p.excerpt_id, "document": p.document, "text": p.text, "predicted_label": p.predicted_label, "confidence": p.confidence, "top3": p.top3, "severity_from_label": p.severity_from_label, "severity_from_risk_area": p.severity_from_risk_area, "epcc_risk_area": p.epcc_risk_area, "playbook_note": p.playbook_note, "abstain": p.abstain, "recommended_owner": p.recommended_owner, "escalation": p.escalation, } for p in preds ], indent=2, ) ) print(f"Wrote {out_path}") print(f"Wrote {json_path}") return out_path if __name__ == "__main__": main()