maralzar
Initial commit: EPCC clause classifier Streamlit demo for HF Spaces
5212b8e
Raw
History Blame Contribute Delete
8.14 kB
"""EPCC demo: classify the 15 synthetic excerpts and produce a risk register.
This is the bridge from the CUAD-trained classifier to the product output the
spec is asking for. The classifier predicts the CUAD-style clause label and
its confidence; we map both the CUAD label and the EPCC risk area to a
severity tier, compare against the playbook, and emit a structured register.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
import joblib
import numpy as np
from src.config import (
DATA_DIR,
DEFAULT_ABSTAIN_THRESHOLD,
MODELS_DIR,
OUTPUTS_DIR,
SEVERITY_TIERS,
label_to_severity,
)
EPCC_DATA_PATH = DATA_DIR / "epcc_synthetic.json"
@dataclass
class ClausePrediction:
excerpt_id: str
document: str
text: str
predicted_label: str
confidence: float
top3: list[tuple[str, float]]
severity_from_label: str
severity_from_risk_area: str
epcc_risk_area: str
playbook_note: str
abstain: bool
recommended_owner: str
escalation: str
# Risk-area -> recommended reviewer team. Pure routing logic; nothing learned.
OWNER_BY_RISK_AREA = {
"Liability cap": "Legal",
"Liquidated damages": "Commercial + Legal",
"Consequential damages": "Legal",
"Force majeure": "Commercial",
"Change in law": "Commercial + Tax",
"Site conditions": "Commercial + Project controls",
"Variation notice": "Contracts + Project controls",
"Termination for convenience": "Legal + Commercial",
"Commissioning / takeover": "Project execution + Commissioning",
"Bonds and guarantees": "Treasury",
"Insurance": "Insurance / Risk",
"Flow-down / subcontracting": "Procurement + Legal",
"Claims notice": "Contracts + Project controls",
}
def load_excerpts() -> tuple[list[dict], list[dict]]:
payload = json.loads(EPCC_DATA_PATH.read_text())
return payload["excerpts"], payload["playbook"]
def _escalation(
confidence: float,
severity_from_risk_area: str,
abstain: bool,
deviation: bool,
) -> str:
if abstain:
return "Route to human reviewer (low model confidence)."
if severity_from_risk_area == "critical" and deviation:
return "Escalate immediately: critical risk area with playbook deviation."
if severity_from_risk_area == "critical":
return "Mandatory human review (critical risk area)."
if deviation:
return "Flag for SME review (deviation from preferred position)."
return "Standard review."
def _playbook_deviation(excerpt: dict) -> bool:
note = excerpt.get("playbook_note", "").lower()
deviation_keywords = [
"deviation",
"escalate",
"high risk",
"tight",
"may unlock",
"may still create",
"narrow",
"not standard",
"transferred to contractor",
"no margin",
"no cost relief",
]
return any(kw in note for kw in deviation_keywords)
def predict_excerpts(
excerpts: Iterable[dict],
abstain_threshold: float = DEFAULT_ABSTAIN_THRESHOLD,
) -> list[ClausePrediction]:
from src.features import EmbeddingEncoder
bundle = joblib.load(MODELS_DIR / "embed_lr.joblib")
clf = bundle["model"]
classes = np.asarray(bundle["classes_"])
encoder = EmbeddingEncoder(model_name=bundle["encoder_name"])
texts = [e["text"] for e in excerpts]
X = encoder.encode(texts, batch_size=16, show_progress=False)
probs = clf.predict_proba(X)
sev_map = label_to_severity()
out: list[ClausePrediction] = []
for excerpt, prob in zip(excerpts, probs):
order = np.argsort(prob)[::-1]
top3 = [(classes[i], float(prob[i])) for i in order[:3]]
pred_label = classes[order[0]]
conf = float(prob[order[0]])
abstain = conf < abstain_threshold
sev_from_label = sev_map.get(pred_label, "unmapped")
sev_from_risk = excerpt.get("expected_severity", "moderate")
deviation = _playbook_deviation(excerpt)
out.append(
ClausePrediction(
excerpt_id=excerpt["id"],
document=excerpt["document"],
text=excerpt["text"],
predicted_label=pred_label,
confidence=round(conf, 4),
top3=[(lab, round(p, 4)) for lab, p in top3],
severity_from_label=sev_from_label,
severity_from_risk_area=sev_from_risk,
epcc_risk_area=excerpt["epcc_risk_area"],
playbook_note=excerpt.get("playbook_note", ""),
abstain=abstain,
recommended_owner=OWNER_BY_RISK_AREA.get(
excerpt["epcc_risk_area"], "Legal"
),
escalation=_escalation(conf, sev_from_risk, abstain, deviation),
)
)
return out
def render_risk_register_md(preds: list[ClausePrediction]) -> str:
lines: list[str] = []
lines.append("# EPCC Risk Register (synthetic demo packet)\n")
lines.append(
"Generated from the CUAD-trained clause classifier applied to the 15 "
"synthetic excerpts in `data/epcc_synthetic.json`. The model predicts "
"the CUAD clause type; the EPCC risk area, severity, and playbook note "
"come from the curated mini-case packet. Escalation combines model "
"confidence and risk-area severity.\n"
)
header = (
"| ID | Doc | EPCC risk area | Predicted CUAD label | Conf | Sev (risk area) "
"| Sev (predicted label) | Owner | Escalation |\n"
"|---|---|---|---|---|---|---|---|---|\n"
)
lines.append(header)
for p in preds:
lines.append(
f"| {p.excerpt_id} | {p.document} | {p.epcc_risk_area} | "
f"{p.predicted_label} | {p.confidence:.2f} | "
f"{p.severity_from_risk_area} | {p.severity_from_label} | "
f"{p.recommended_owner} | {p.escalation} |"
)
lines.append("\n## Per-clause detail\n")
for p in preds:
lines.append(f"### {p.excerpt_id}{p.document} \n")
lines.append(f"**Clause:** {p.text}\n")
lines.append(
f"- Predicted CUAD label: **{p.predicted_label}** "
f"(confidence {p.confidence:.2f}{', ABSTAIN' if p.abstain else ''})"
)
top3_str = ", ".join(f"{l} ({c:.2f})" for l, c in p.top3)
lines.append(f"- Top-3: {top3_str}")
lines.append(f"- EPCC risk area: **{p.epcc_risk_area}** (severity: {p.severity_from_risk_area})")
lines.append(f"- Playbook note: {p.playbook_note}")
lines.append(f"- Recommended owner: {p.recommended_owner}")
lines.append(f"- Escalation: {p.escalation}\n")
return "\n".join(lines)
def main(out_path: Path | None = None) -> Path:
excerpts, _ = load_excerpts()
preds = predict_excerpts(excerpts)
md = render_risk_register_md(preds)
if out_path is None:
out_path = OUTPUTS_DIR / "epcc_risk_register.md"
out_path.write_text(md)
# also persist a JSON dump for the tests / Streamlit app
json_path = OUTPUTS_DIR / "epcc_predictions.json"
json_path.write_text(
json.dumps(
[
{
"excerpt_id": p.excerpt_id,
"document": p.document,
"text": p.text,
"predicted_label": p.predicted_label,
"confidence": p.confidence,
"top3": p.top3,
"severity_from_label": p.severity_from_label,
"severity_from_risk_area": p.severity_from_risk_area,
"epcc_risk_area": p.epcc_risk_area,
"playbook_note": p.playbook_note,
"abstain": p.abstain,
"recommended_owner": p.recommended_owner,
"escalation": p.escalation,
}
for p in preds
],
indent=2,
)
)
print(f"Wrote {out_path}")
print(f"Wrote {json_path}")
return out_path
if __name__ == "__main__":
main()