Spaces:
Sleeping
Sleeping
| """EPCC demo: classify the 15 synthetic excerpts and produce a risk register. | |
| This is the bridge from the CUAD-trained classifier to the product output the | |
| spec is asking for. The classifier predicts the CUAD-style clause label and | |
| its confidence; we map both the CUAD label and the EPCC risk area to a | |
| severity tier, compare against the playbook, and emit a structured register. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Iterable | |
| import joblib | |
| import numpy as np | |
| from src.config import ( | |
| DATA_DIR, | |
| DEFAULT_ABSTAIN_THRESHOLD, | |
| MODELS_DIR, | |
| OUTPUTS_DIR, | |
| SEVERITY_TIERS, | |
| label_to_severity, | |
| ) | |
| EPCC_DATA_PATH = DATA_DIR / "epcc_synthetic.json" | |
| class ClausePrediction: | |
| excerpt_id: str | |
| document: str | |
| text: str | |
| predicted_label: str | |
| confidence: float | |
| top3: list[tuple[str, float]] | |
| severity_from_label: str | |
| severity_from_risk_area: str | |
| epcc_risk_area: str | |
| playbook_note: str | |
| abstain: bool | |
| recommended_owner: str | |
| escalation: str | |
| # Risk-area -> recommended reviewer team. Pure routing logic; nothing learned. | |
| OWNER_BY_RISK_AREA = { | |
| "Liability cap": "Legal", | |
| "Liquidated damages": "Commercial + Legal", | |
| "Consequential damages": "Legal", | |
| "Force majeure": "Commercial", | |
| "Change in law": "Commercial + Tax", | |
| "Site conditions": "Commercial + Project controls", | |
| "Variation notice": "Contracts + Project controls", | |
| "Termination for convenience": "Legal + Commercial", | |
| "Commissioning / takeover": "Project execution + Commissioning", | |
| "Bonds and guarantees": "Treasury", | |
| "Insurance": "Insurance / Risk", | |
| "Flow-down / subcontracting": "Procurement + Legal", | |
| "Claims notice": "Contracts + Project controls", | |
| } | |
| def load_excerpts() -> tuple[list[dict], list[dict]]: | |
| payload = json.loads(EPCC_DATA_PATH.read_text()) | |
| return payload["excerpts"], payload["playbook"] | |
| def _escalation( | |
| confidence: float, | |
| severity_from_risk_area: str, | |
| abstain: bool, | |
| deviation: bool, | |
| ) -> str: | |
| if abstain: | |
| return "Route to human reviewer (low model confidence)." | |
| if severity_from_risk_area == "critical" and deviation: | |
| return "Escalate immediately: critical risk area with playbook deviation." | |
| if severity_from_risk_area == "critical": | |
| return "Mandatory human review (critical risk area)." | |
| if deviation: | |
| return "Flag for SME review (deviation from preferred position)." | |
| return "Standard review." | |
| def _playbook_deviation(excerpt: dict) -> bool: | |
| note = excerpt.get("playbook_note", "").lower() | |
| deviation_keywords = [ | |
| "deviation", | |
| "escalate", | |
| "high risk", | |
| "tight", | |
| "may unlock", | |
| "may still create", | |
| "narrow", | |
| "not standard", | |
| "transferred to contractor", | |
| "no margin", | |
| "no cost relief", | |
| ] | |
| return any(kw in note for kw in deviation_keywords) | |
| def predict_excerpts( | |
| excerpts: Iterable[dict], | |
| abstain_threshold: float = DEFAULT_ABSTAIN_THRESHOLD, | |
| ) -> list[ClausePrediction]: | |
| from src.features import EmbeddingEncoder | |
| bundle = joblib.load(MODELS_DIR / "embed_lr.joblib") | |
| clf = bundle["model"] | |
| classes = np.asarray(bundle["classes_"]) | |
| encoder = EmbeddingEncoder(model_name=bundle["encoder_name"]) | |
| texts = [e["text"] for e in excerpts] | |
| X = encoder.encode(texts, batch_size=16, show_progress=False) | |
| probs = clf.predict_proba(X) | |
| sev_map = label_to_severity() | |
| out: list[ClausePrediction] = [] | |
| for excerpt, prob in zip(excerpts, probs): | |
| order = np.argsort(prob)[::-1] | |
| top3 = [(classes[i], float(prob[i])) for i in order[:3]] | |
| pred_label = classes[order[0]] | |
| conf = float(prob[order[0]]) | |
| abstain = conf < abstain_threshold | |
| sev_from_label = sev_map.get(pred_label, "unmapped") | |
| sev_from_risk = excerpt.get("expected_severity", "moderate") | |
| deviation = _playbook_deviation(excerpt) | |
| out.append( | |
| ClausePrediction( | |
| excerpt_id=excerpt["id"], | |
| document=excerpt["document"], | |
| text=excerpt["text"], | |
| predicted_label=pred_label, | |
| confidence=round(conf, 4), | |
| top3=[(lab, round(p, 4)) for lab, p in top3], | |
| severity_from_label=sev_from_label, | |
| severity_from_risk_area=sev_from_risk, | |
| epcc_risk_area=excerpt["epcc_risk_area"], | |
| playbook_note=excerpt.get("playbook_note", ""), | |
| abstain=abstain, | |
| recommended_owner=OWNER_BY_RISK_AREA.get( | |
| excerpt["epcc_risk_area"], "Legal" | |
| ), | |
| escalation=_escalation(conf, sev_from_risk, abstain, deviation), | |
| ) | |
| ) | |
| return out | |
| def render_risk_register_md(preds: list[ClausePrediction]) -> str: | |
| lines: list[str] = [] | |
| lines.append("# EPCC Risk Register (synthetic demo packet)\n") | |
| lines.append( | |
| "Generated from the CUAD-trained clause classifier applied to the 15 " | |
| "synthetic excerpts in `data/epcc_synthetic.json`. The model predicts " | |
| "the CUAD clause type; the EPCC risk area, severity, and playbook note " | |
| "come from the curated mini-case packet. Escalation combines model " | |
| "confidence and risk-area severity.\n" | |
| ) | |
| header = ( | |
| "| ID | Doc | EPCC risk area | Predicted CUAD label | Conf | Sev (risk area) " | |
| "| Sev (predicted label) | Owner | Escalation |\n" | |
| "|---|---|---|---|---|---|---|---|---|\n" | |
| ) | |
| lines.append(header) | |
| for p in preds: | |
| lines.append( | |
| f"| {p.excerpt_id} | {p.document} | {p.epcc_risk_area} | " | |
| f"{p.predicted_label} | {p.confidence:.2f} | " | |
| f"{p.severity_from_risk_area} | {p.severity_from_label} | " | |
| f"{p.recommended_owner} | {p.escalation} |" | |
| ) | |
| lines.append("\n## Per-clause detail\n") | |
| for p in preds: | |
| lines.append(f"### {p.excerpt_id} — {p.document} \n") | |
| lines.append(f"**Clause:** {p.text}\n") | |
| lines.append( | |
| f"- Predicted CUAD label: **{p.predicted_label}** " | |
| f"(confidence {p.confidence:.2f}{', ABSTAIN' if p.abstain else ''})" | |
| ) | |
| top3_str = ", ".join(f"{l} ({c:.2f})" for l, c in p.top3) | |
| lines.append(f"- Top-3: {top3_str}") | |
| lines.append(f"- EPCC risk area: **{p.epcc_risk_area}** (severity: {p.severity_from_risk_area})") | |
| lines.append(f"- Playbook note: {p.playbook_note}") | |
| lines.append(f"- Recommended owner: {p.recommended_owner}") | |
| lines.append(f"- Escalation: {p.escalation}\n") | |
| return "\n".join(lines) | |
| def main(out_path: Path | None = None) -> Path: | |
| excerpts, _ = load_excerpts() | |
| preds = predict_excerpts(excerpts) | |
| md = render_risk_register_md(preds) | |
| if out_path is None: | |
| out_path = OUTPUTS_DIR / "epcc_risk_register.md" | |
| out_path.write_text(md) | |
| # also persist a JSON dump for the tests / Streamlit app | |
| json_path = OUTPUTS_DIR / "epcc_predictions.json" | |
| json_path.write_text( | |
| json.dumps( | |
| [ | |
| { | |
| "excerpt_id": p.excerpt_id, | |
| "document": p.document, | |
| "text": p.text, | |
| "predicted_label": p.predicted_label, | |
| "confidence": p.confidence, | |
| "top3": p.top3, | |
| "severity_from_label": p.severity_from_label, | |
| "severity_from_risk_area": p.severity_from_risk_area, | |
| "epcc_risk_area": p.epcc_risk_area, | |
| "playbook_note": p.playbook_note, | |
| "abstain": p.abstain, | |
| "recommended_owner": p.recommended_owner, | |
| "escalation": p.escalation, | |
| } | |
| for p in preds | |
| ], | |
| indent=2, | |
| ) | |
| ) | |
| print(f"Wrote {out_path}") | |
| print(f"Wrote {json_path}") | |
| return out_path | |
| if __name__ == "__main__": | |
| main() | |