bobaoxu2001
Deploy forward-deployed AI simulation dashboard
c4fe0a4
"""Evaluation metrics for the extraction pipeline.
All metrics operate on lists of (case_dict, extraction_dict) pairs
or on validation results, so they work whether you run batch eval
from files or from the database.
"""
from collections import Counter
from pipeline.validate import validate_extraction, check_evidence_present
from pipeline.gate import compute_gate_decision
# --- Core metrics ---
def schema_pass_rate(extractions: list[dict]) -> float:
"""Fraction of extractions that pass EXTRACTION_SCHEMA validation.
Target: >= 0.98
"""
if not extractions:
return 0.0
passed = sum(1 for e in extractions if validate_extraction(e)[0])
return passed / len(extractions)
def evidence_coverage_rate(extractions: list[dict]) -> float:
"""Fraction of extractions with non-empty, non-blank evidence quotes.
Target: >= 0.90
"""
if not extractions:
return 0.0
covered = sum(1 for e in extractions if check_evidence_present(e)[0])
return covered / len(extractions)
def review_required_rate(extractions: list[dict]) -> float:
"""Fraction of extractions routed to human review by the gate."""
if not extractions:
return 0.0
reviewed = sum(
1 for e in extractions if compute_gate_decision(e)["route"] == "review"
)
return reviewed / len(extractions)
def unsupported_recommendation_rate(extractions: list[dict]) -> float:
"""Fraction of extractions where next_best_actions exist but evidence is empty.
An unsupported recommendation = has actions but no evidence quotes.
Target: <= 0.02
"""
if not extractions:
return 0.0
unsupported = 0
for e in extractions:
has_actions = bool(e.get("next_best_actions"))
has_evidence = check_evidence_present(e)[0]
if has_actions and not has_evidence:
unsupported += 1
return unsupported / len(extractions)
def root_cause_consistency(extractions: list[dict], cases: list[dict]) -> float:
"""Measure consistency: do similar tickets get the same root_cause_l1?
Groups cases by source_dataset, then checks if cases from the same source
cluster on the same root_cause_l1. Returns the average within-group
agreement rate (fraction of cases matching the group's majority label).
This is a proxy for consistency — perfect consistency = 1.0.
Target: >= 0.70
"""
if not extractions or not cases:
return 0.0
# Build a mapping: case_id -> root_cause_l1
case_id_to_rc = {}
for e in extractions:
case_id = e.get("case_id", "")
rc = e.get("root_cause_l1", "unknown")
if case_id:
case_id_to_rc[case_id] = rc
# Group by source_dataset
groups: dict[str, list[str]] = {}
for c in cases:
source = c.get("source_dataset", "unknown")
case_id = c.get("case_id", "")
if case_id in case_id_to_rc:
groups.setdefault(source, []).append(case_id_to_rc[case_id])
if not groups:
return 0.0
# For each group, compute agreement with majority label
total_agreement = 0.0
total_groups = 0
for source, labels in groups.items():
if len(labels) < 2:
continue
counter = Counter(labels)
majority_count = counter.most_common(1)[0][1]
agreement = majority_count / len(labels)
total_agreement += agreement
total_groups += 1
if total_groups == 0:
return 1.0 # Only singleton groups — trivially consistent
return total_agreement / total_groups
def review_routing_precision_recall(
predicted_review: list[bool],
gold_review: list[bool],
) -> dict:
"""Precision and recall for review routing against gold labels.
Target: precision >= 0.80, recall >= 0.90
"""
if not predicted_review or not gold_review:
return {"precision": 0.0, "recall": 0.0, "f1": 0.0}
tp = sum(1 for p, g in zip(predicted_review, gold_review) if p and g)
fp = sum(1 for p, g in zip(predicted_review, gold_review) if p and not g)
fn = sum(1 for p, g in zip(predicted_review, gold_review) if not p and g)
precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
f1 = (
2 * precision * recall / (precision + recall)
if (precision + recall) > 0
else 0.0
)
return {"precision": precision, "recall": recall, "f1": f1}
# --- Aggregate all metrics ---
def compute_all_metrics(
extractions: list[dict],
cases: list[dict] | None = None,
) -> dict:
"""Compute all evaluation metrics.
Returns a dict with all metric names and values, plus pass/fail
against target thresholds.
"""
targets = {
"schema_pass_rate": 0.98,
"evidence_coverage_rate": 0.90,
"unsupported_recommendation_rate": 0.02, # upper bound
"root_cause_consistency": 0.70,
}
spr = schema_pass_rate(extractions)
ecr = evidence_coverage_rate(extractions)
rrr = review_required_rate(extractions)
urr = unsupported_recommendation_rate(extractions)
rcc = root_cause_consistency(extractions, cases or [])
metrics = {
"schema_pass_rate": {"value": spr, "target": targets["schema_pass_rate"], "pass": spr >= targets["schema_pass_rate"]},
"evidence_coverage_rate": {"value": ecr, "target": targets["evidence_coverage_rate"], "pass": ecr >= targets["evidence_coverage_rate"]},
"review_required_rate": {"value": rrr, "target": None, "pass": None}, # no fixed target — informational
"unsupported_recommendation_rate": {"value": urr, "target": targets["unsupported_recommendation_rate"], "pass": urr <= targets["unsupported_recommendation_rate"]},
"root_cause_consistency": {"value": rcc, "target": targets["root_cause_consistency"], "pass": rcc >= targets["root_cause_consistency"]},
"total_cases": len(extractions),
}
return metrics