| from __future__ import annotations |
|
|
| from collections.abc import Iterable, Sequence |
| from dataclasses import dataclass |
|
|
| import numpy as np |
| import pandas as pd |
|
|
| MIN_SLICE_N = 30 |
| MIN_CONFIG_N = 50 |
| |
| |
| RETRIEVAL_OK_THRESHOLD = 0.80 |
| MISSING_LABEL = "Missing / Not provided" |
|
|
| |
| |
| |
| |
| RISK_SCORE_WEIGHTS = {"error": 0.45, "hallucination": 0.35, "retrieval": 0.20} |
|
|
| |
| |
| EVIDENCE_STRENGTH_WEIGHTS = {"top1_score": 0.35, "mean_retrieved_score": 0.15, "recall_at_10": 0.30, "mrr_at_10": 0.20} |
|
|
| |
| |
| |
| CONFIG_OBJECTIVE_WEIGHTS = { |
| "Balanced": {"correct": 0.42, "recall": 0.18, "halluc": 0.22, "latency": 0.10, "cost": 0.08}, |
| "Max quality": {"correct": 0.55, "recall": 0.25, "halluc": 0.15, "latency": 0.03, "cost": 0.02}, |
| "Min hallucination": {"correct": 0.30, "recall": 0.15, "halluc": 0.45, "latency": 0.05, "cost": 0.05}, |
| "Low latency": {"correct": 0.28, "recall": 0.12, "halluc": 0.15, "latency": 0.38, "cost": 0.07}, |
| "Low cost": {"correct": 0.28, "recall": 0.12, "halluc": 0.15, "latency": 0.07, "cost": 0.38}, |
| } |
|
|
|
|
| @dataclass(frozen=True) |
| class DecisionBrief: |
| posture: str |
| posture_reason: str |
| main_driver: str |
| worst_slice: str |
| best_config: str |
| recommended_action: str |
|
|
|
|
| def _numeric(series: pd.Series) -> pd.Series: |
| return pd.to_numeric(series, errors="coerce") |
|
|
|
|
| def _p95(series: pd.Series) -> float: |
| val = _numeric(series).quantile(0.95) |
| return np.nan if pd.isna(val) else float(val) |
|
|
|
|
| def _safe_col(df: pd.DataFrame, col: str, default: float = np.nan) -> pd.Series: |
| if col in df.columns: |
| return _numeric(df[col]) |
| return pd.Series(default, index=df.index, dtype="float64") |
|
|
|
|
| def safe_mean(df: pd.DataFrame, col: str, default: float = 0.0) -> float: |
| if col not in df.columns or len(df) == 0: |
| return default |
| val = _numeric(df[col]).mean() |
| return default if pd.isna(val) else float(val) |
|
|
|
|
| def safe_p95(df: pd.DataFrame, col: str, default: float = 0.0) -> float: |
| if col not in df.columns or len(df) == 0: |
| return default |
| val = _numeric(df[col]).quantile(0.95) |
| return default if pd.isna(val) else float(val) |
|
|
|
|
| def fmt_pct(x: float) -> str: |
| if pd.isna(x): |
| return "n/a" |
| return f"{x:.1%}" |
|
|
|
|
| def fmt_money(x: float) -> str: |
| if pd.isna(x): |
| return "n/a" |
| return f"${x:.4f}" |
|
|
|
|
| def overview_metrics(eval_df: pd.DataFrame, docs: pd.DataFrame, chunks: pd.DataFrame, retrieval: pd.DataFrame) -> dict[str, float]: |
| return { |
| "evaluations": float(len(eval_df)), |
| "retrieval_events": float(len(retrieval)), |
| "documents": float(len(docs)), |
| "chunks": float(len(chunks)), |
| "correct_rate": safe_mean(eval_df, "is_correct", default=np.nan), |
| "hallucination_rate": safe_mean(eval_df, "hallucination_flag", default=np.nan), |
| "recall_at_10": safe_mean(eval_df, "recall_at_10", default=np.nan), |
| "mrr_at_10": safe_mean(eval_df, "mrr_at_10", default=np.nan), |
| "p95_latency_ms": safe_p95(eval_df, "total_latency_ms", default=np.nan), |
| "avg_cost_usd": safe_mean(eval_df, "total_cost_usd", default=np.nan), |
| } |
|
|
|
|
| def quality_posture(metrics: dict[str, float]) -> tuple[str, str]: |
| correct = metrics.get("correct_rate", np.nan) |
| halluc = metrics.get("hallucination_rate", np.nan) |
| recall = metrics.get("recall_at_10", np.nan) |
| if pd.isna(correct) or pd.isna(halluc) or pd.isna(recall): |
| return "Review", "one or more key quality signals are unavailable under the current filters" |
| if correct >= 0.78 and halluc <= 0.10 and recall >= 0.70: |
| return "Stable", "correctness, hallucination, and retrieval signals are within a usable operating band" |
| if correct < 0.62 or halluc > 0.20 or recall < 0.45: |
| return "High Risk", "one or more quality signals are outside the expected operating band" |
| return "Watch", "quality is usable for analysis, but risk slices require targeted review" |
|
|
|
|
| def risk_slices( |
| eval_df: pd.DataFrame, |
| group_cols: Sequence[str] = ("domain", "scenario_type", "difficulty"), |
| min_n: int = MIN_SLICE_N, |
| ) -> pd.DataFrame: |
| """Aggregate risk by slice without silent count-as-metric fallbacks.""" |
| available = [c for c in group_cols if c in eval_df.columns] |
| if not available or len(eval_df) == 0: |
| return pd.DataFrame() |
|
|
| src = eval_df.copy() |
| for col in available: |
| src[col] = src[col].astype("string").fillna(MISSING_LABEL) |
| src["__row_count"] = 1 |
|
|
| agg_map = {"n": ("__row_count", "size")} |
| optional_aggs = { |
| "correct_rate": ("is_correct", "mean"), |
| "hallucination_rate": ("hallucination_flag", "mean"), |
| "recall_at_10": ("recall_at_10", "mean"), |
| "mrr_at_10": ("mrr_at_10", "mean"), |
| "p95_latency_ms": ("total_latency_ms", _p95), |
| "avg_cost_usd": ("total_cost_usd", "mean"), |
| } |
| for out_col, spec in optional_aggs.items(): |
| if spec[0] in src.columns: |
| agg_map[out_col] = spec |
|
|
| out = src.groupby(available, dropna=False).agg(**agg_map).reset_index() |
| out = out[out["n"] >= int(min_n)].copy() |
| if out.empty: |
| return out |
|
|
| for col in ["correct_rate", "hallucination_rate", "recall_at_10", "mrr_at_10", "p95_latency_ms", "avg_cost_usd"]: |
| if col not in out.columns: |
| out[col] = np.nan |
|
|
| out["error_rate"] = 1 - out["correct_rate"] |
| error_component = out["error_rate"].fillna(0.0).clip(0, 1) |
| halluc_component = out["hallucination_rate"].fillna(0.0).clip(0, 1) |
| retrieval_component = (1 - out["recall_at_10"].fillna(1.0)).clip(0, 1) |
| out["risk_score"] = ( |
| error_component * RISK_SCORE_WEIGHTS["error"] |
| + halluc_component * RISK_SCORE_WEIGHTS["hallucination"] |
| + retrieval_component * RISK_SCORE_WEIGHTS["retrieval"] |
| ) |
| return out.sort_values("risk_score", ascending=False).reset_index(drop=True) |
|
|
|
|
| def retrieval_outcomes(eval_df: pd.DataFrame, threshold: float = RETRIEVAL_OK_THRESHOLD) -> pd.DataFrame: |
| """Classify rows into retrieval/generation/hallucination modes using one canonical implementation.""" |
| required = {"recall_at_10", "is_correct"} |
| if len(eval_df) == 0 or not required.issubset(eval_df.columns): |
| return pd.DataFrame() |
|
|
| src = eval_df.copy() |
| src["__row_count"] = 1 |
| src["retrieval_state"] = np.where(_numeric(src["recall_at_10"]) >= threshold, "retrieval_ok", "retrieval_weak") |
| src["answer_state"] = np.where(_numeric(src["is_correct"]).fillna(0.0) >= 0.5, "answer_correct", "answer_incorrect") |
| halluc = _safe_col(src, "hallucination_flag", 0.0).fillna(0.0) >= 0.5 |
|
|
| src["failure_mode"] = np.select( |
| [ |
| halluc & (src["answer_state"] == "answer_correct"), |
| halluc & (src["answer_state"] == "answer_incorrect"), |
| (src["retrieval_state"] == "retrieval_weak") & (src["answer_state"] == "answer_incorrect"), |
| (src["retrieval_state"] == "retrieval_ok") & (src["answer_state"] == "answer_incorrect"), |
| (src["retrieval_state"] == "retrieval_weak") & (src["answer_state"] == "answer_correct"), |
| (src["retrieval_state"] == "retrieval_ok") & (src["answer_state"] == "answer_correct"), |
| ], |
| [ |
| "hallucination_risk_correct_answer", |
| "hallucination_failure", |
| "retrieval_failure", |
| "generation_failure", |
| "recovered_by_generation", |
| "healthy", |
| ], |
| default=MISSING_LABEL, |
| ) |
|
|
| agg_map = { |
| "n": ("__row_count", "size"), |
| "correct_rate": ("is_correct", "mean"), |
| "recall_at_10": ("recall_at_10", "mean"), |
| } |
| if "hallucination_flag" in src.columns: |
| agg_map["hallucination_rate"] = ("hallucination_flag", "mean") |
| if "total_latency_ms" in src.columns: |
| agg_map["p95_latency_ms"] = ("total_latency_ms", _p95) |
| if "total_cost_usd" in src.columns: |
| agg_map["avg_cost_usd"] = ("total_cost_usd", "mean") |
|
|
| out = src.groupby("failure_mode", dropna=False).agg(**agg_map).reset_index() |
| for col in ["hallucination_rate", "p95_latency_ms", "avg_cost_usd"]: |
| if col not in out.columns: |
| out[col] = np.nan |
| out["share"] = out["n"] / max(out["n"].sum(), 1) |
| return out.sort_values("n", ascending=False).reset_index(drop=True) |
|
|
|
|
| def p95_scaled(series: pd.Series) -> pd.Series: |
| vals = _numeric(series) |
| denom = vals.quantile(0.95) |
| if pd.isna(denom) or denom <= 0: |
| return pd.Series(np.zeros(len(vals)), index=vals.index) |
| return (vals.fillna(0.0) / denom).clip(0, 2) |
|
|
|
|
| def _rate_ci(p: pd.Series, n: pd.Series, z: float = 1.96) -> tuple[pd.Series, pd.Series]: |
| p = _numeric(p).clip(0, 1) |
| n = _numeric(n).clip(lower=1) |
| se = np.sqrt((p * (1 - p)) / n) |
| return (p - z * se).clip(0, 1), (p + z * se).clip(0, 1) |
|
|
|
|
| def config_leaderboard(eval_df: pd.DataFrame, objective: str = "Balanced", min_n: int = MIN_CONFIG_N) -> pd.DataFrame: |
| required = {"retrieval_strategy", "generator_model", "is_correct", "hallucination_flag", "total_latency_ms", "total_cost_usd"} |
| if not required.issubset(eval_df.columns) or len(eval_df) == 0: |
| return pd.DataFrame() |
|
|
| src = eval_df.copy() |
| src["__row_count"] = 1 |
| group_cols = ["retrieval_strategy", "generator_model"] |
| if "chunking_strategy" in src.columns: |
| group_cols.append("chunking_strategy") |
| for col in group_cols: |
| src[col] = src[col].astype("string").fillna(MISSING_LABEL) |
|
|
| agg_map = { |
| "n": ("__row_count", "size"), |
| "correct_rate": ("is_correct", "mean"), |
| "hallucination_rate": ("hallucination_flag", "mean"), |
| "p95_latency_ms": ("total_latency_ms", _p95), |
| "avg_cost_usd": ("total_cost_usd", "mean"), |
| } |
| if "recall_at_10" in src.columns: |
| agg_map["recall_at_10"] = ("recall_at_10", "mean") |
| if "mrr_at_10" in src.columns: |
| agg_map["mrr_at_10"] = ("mrr_at_10", "mean") |
|
|
| out = src.groupby(group_cols, dropna=False).agg(**agg_map).reset_index() |
| out = out[out["n"] >= int(min_n)].copy() |
| if out.empty: |
| return out |
| for col in ["recall_at_10", "mrr_at_10"]: |
| if col not in out.columns: |
| out[col] = np.nan |
|
|
| out["correct_rate_ci_low"], out["correct_rate_ci_high"] = _rate_ci(out["correct_rate"], out["n"]) |
| out["hallucination_rate_ci_low"], out["hallucination_rate_ci_high"] = _rate_ci(out["hallucination_rate"], out["n"]) |
| out["latency_scaled"] = p95_scaled(out["p95_latency_ms"]) |
| out["cost_scaled"] = p95_scaled(out["avg_cost_usd"]) |
|
|
| weights = CONFIG_OBJECTIVE_WEIGHTS.get(objective, CONFIG_OBJECTIVE_WEIGHTS["Balanced"]) |
|
|
| out["score"] = ( |
| out["correct_rate"].fillna(0) * weights["correct"] |
| + out["recall_at_10"].fillna(0) * weights["recall"] |
| - out["hallucination_rate"].fillna(0) * weights["halluc"] |
| - out["latency_scaled"].fillna(0) * weights["latency"] |
| - out["cost_scaled"].fillna(0) * weights["cost"] |
| ) |
| out["config"] = out[group_cols].astype(str).agg(" / ".join, axis=1) |
| return out.sort_values("score", ascending=False).reset_index(drop=True) |
|
|
|
|
| def demand_coverage(eval_df: pd.DataFrame, docs: pd.DataFrame) -> pd.DataFrame: |
| if "domain" not in eval_df.columns or "domain" not in docs.columns: |
| return pd.DataFrame() |
| demand = eval_df["domain"].astype("string").fillna(MISSING_LABEL).value_counts(normalize=True).rename("eval_demand_share") |
| corpus = docs["domain"].astype("string").fillna(MISSING_LABEL).value_counts(normalize=True).rename("corpus_document_share") |
| out = pd.concat([demand, corpus], axis=1).fillna(0) |
| out.index.name = "domain" |
| out = out.reset_index() |
| out["demand_minus_corpus"] = out["eval_demand_share"] - out["corpus_document_share"] |
| return out.sort_values("eval_demand_share", ascending=False).reset_index(drop=True) |
|
|
|
|
| def evidence_strength_proxy(eval_df: pd.DataFrame, reference_df: pd.DataFrame | None = None) -> pd.Series: |
| """Offline evidence-strength proxy derived from retrieval-side evaluation signals. |
| |
| The score is not LLM confidence, a calibrated correctness probability, or a production |
| approval signal. The optional reference_df fixes normalization anchors so threshold |
| behavior does not drift when the user changes dashboard filters. |
| """ |
| ref = reference_df if reference_df is not None and len(reference_df) else eval_df |
| parts = [] |
| weights = [] |
| for col, weight in EVIDENCE_STRENGTH_WEIGHTS.items(): |
| if col in eval_df.columns: |
| s = _numeric(eval_df[col]).fillna(0.0) |
| ref_s = _numeric(ref[col]).fillna(0.0) if col in ref.columns else s |
| min_v, max_v = ref_s.min(), ref_s.max() |
| if max_v > min_v: |
| s = (s - min_v) / (max_v - min_v) |
| parts.append(s.clip(0, 1) * weight) |
| weights.append(weight) |
| if not parts: |
| return pd.Series(np.zeros(len(eval_df)), index=eval_df.index) |
| score = sum(parts) / max(sum(weights), 1e-9) |
| return score.clip(0, 1) |
|
|
|
|
| def policy_curve( |
| eval_df: pd.DataFrame, |
| thresholds: Iterable[float] | None = None, |
| reference_df: pd.DataFrame | None = None, |
| ) -> pd.DataFrame: |
| if len(eval_df) == 0: |
| return pd.DataFrame() |
| if thresholds is None: |
| thresholds = np.linspace(0.05, 0.95, 19) |
| src = eval_df.copy() |
| src["evidence_strength_proxy"] = evidence_strength_proxy(src, reference_df=reference_df) |
| rows = [] |
| total_hallucinations = _safe_col(src, "hallucination_flag", 0.0).sum() |
| for thr in thresholds: |
| auto = src[src["evidence_strength_proxy"] >= thr] |
| review = src[src["evidence_strength_proxy"] < thr] |
| review_hallucinations = _safe_col(review, "hallucination_flag", 0.0).sum() |
| rows.append( |
| { |
| "threshold": float(thr), |
| "auto_approve_rate": len(auto) / max(len(src), 1), |
| "review_queue_size": int(len(review)), |
| "auto_correct_rate": safe_mean(auto, "is_correct", default=np.nan) if len(auto) else np.nan, |
| "auto_hallucination_rate": safe_mean(auto, "hallucination_flag", default=np.nan) if len(auto) else np.nan, |
| "risk_captured_in_review": (review_hallucinations / total_hallucinations) if total_hallucinations > 0 else np.nan, |
| } |
| ) |
| return pd.DataFrame(rows) |
|
|
|
|
| def policy_at_threshold(eval_df: pd.DataFrame, threshold: float, reference_df: pd.DataFrame | None = None) -> dict[str, float]: |
| curve = policy_curve(eval_df, [threshold], reference_df=reference_df) |
| if curve.empty: |
| return {} |
| return curve.iloc[0].to_dict() |
|
|
|
|
| def make_decision_brief( |
| eval_df: pd.DataFrame, |
| docs: pd.DataFrame, |
| chunks: pd.DataFrame, |
| retrieval: pd.DataFrame, |
| min_slice_n: int = MIN_SLICE_N, |
| min_config_n: int = MIN_CONFIG_N, |
| *, |
| risk_table: pd.DataFrame | None = None, |
| retrieval_table: pd.DataFrame | None = None, |
| config_table: pd.DataFrame | None = None, |
| ) -> DecisionBrief: |
| metrics = overview_metrics(eval_df, docs, chunks, retrieval) |
| posture, reason = quality_posture(metrics) |
| retrieval_table = retrieval_table if retrieval_table is not None else retrieval_outcomes(eval_df) |
| risk = risk_table if risk_table is not None else risk_slices(eval_df, min_n=min_slice_n) |
| configs = config_table if config_table is not None else config_leaderboard(eval_df, min_n=min_config_n) |
|
|
| main_driver = "Mixed" |
| if not retrieval_table.empty: |
| |
| |
| driver_rows = retrieval_table[retrieval_table["failure_mode"].astype(str) != "healthy"] |
| if driver_rows.empty: |
| driver_rows = retrieval_table |
| top_mode = str(driver_rows.iloc[0]["failure_mode"]) |
| if "hallucination" in top_mode: |
| main_driver = "Hallucination" |
| elif "retrieval" in top_mode: |
| main_driver = "Retrieval" |
| elif "generation" in top_mode: |
| main_driver = "Generation" |
| elif top_mode == "healthy": |
| main_driver = "Healthy majority" |
|
|
| worst_slice = "No high-risk slice above minimum sample size" |
| if not risk.empty: |
| row = risk.iloc[0] |
| parts = [str(row[c]) for c in ["domain", "scenario_type", "difficulty"] if c in risk.columns] |
| worst_slice = " / ".join(parts) + f" 路 risk={row['risk_score']:.2f}" |
|
|
| best_config = "No eligible configuration" |
| if not configs.empty: |
| row = configs.iloc[0] |
| best_config = f"{row['config']} 路 score={row['score']:.2f} 路 n={int(row['n'])}" |
|
|
| if posture == "High Risk": |
| action = "Prioritize the top risk slice and inspect retrieval evidence before widening auto-approval." |
| elif main_driver == "Retrieval": |
| action = "Start with retrieval diagnostics: recall coverage, chunk ranking, and corpus-demand alignment." |
| elif main_driver == "Generation": |
| action = "Inspect answer generation behavior on retrieval-ok but incorrect examples." |
| elif main_driver == "Hallucination": |
| action = "Review hallucination-heavy examples even when answer correctness appears acceptable." |
| else: |
| action = "Use the policy simulator to choose a review threshold that balances coverage and risk." |
|
|
| return DecisionBrief( |
| posture=posture, |
| posture_reason=reason, |
| main_driver=main_driver, |
| worst_slice=worst_slice, |
| best_config=best_config, |
| recommended_action=action, |
| ) |
|
|
|
|
| def top_examples(eval_df: pd.DataFrame, mode: str = "High risk", n: int = 100, reference_df: pd.DataFrame | None = None) -> pd.DataFrame: |
| if len(eval_df) == 0: |
| return pd.DataFrame() |
| src = eval_df.copy() |
| src["evidence_strength_proxy"] = evidence_strength_proxy(src, reference_df=reference_df) |
| src["risk_rank_score"] = ( |
| (1 - _safe_col(src, "is_correct", 0.0).fillna(0.0)).clip(0, 1) * RISK_SCORE_WEIGHTS["error"] |
| + _safe_col(src, "hallucination_flag", 0.0).fillna(0.0).clip(0, 1) * RISK_SCORE_WEIGHTS["hallucination"] |
| + (1 - _safe_col(src, "recall_at_10", 1.0).fillna(1.0)).clip(0, 1) * RISK_SCORE_WEIGHTS["retrieval"] |
| ) |
| if mode == "Incorrect" and "is_correct" in src.columns: |
| src = src[_numeric(src["is_correct"]).fillna(0) == 0] |
| elif mode == "Hallucination" and "hallucination_flag" in src.columns: |
| src = src[_numeric(src["hallucination_flag"]).fillna(0) == 1] |
| elif mode == "Low retrieval" and "recall_at_10" in src.columns: |
| src = src[_numeric(src["recall_at_10"]).fillna(1) < RETRIEVAL_OK_THRESHOLD] |
| return src.sort_values("risk_rank_score", ascending=False).head(n).reset_index(drop=True) |
|
|