from __future__ import annotations from collections.abc import Iterable, Sequence from dataclasses import dataclass import numpy as np import pandas as pd MIN_SLICE_N = 30 MIN_CONFIG_N = 50 # Offline diagnostic boundary: Recall@10 >= 0.80 means most gold evidence is present in the top-10 retrieved chunks. # It is a review lens for this evaluation corpus, not a production deployment policy. RETRIEVAL_OK_THRESHOLD = 0.80 MISSING_LABEL = "Missing / Not provided" # Review-priority weights for offline risk slicing. They intentionally emphasize answer error and # hallucination exposure ahead of retrieval weakness because the dashboard is meant to drive human # QA review queues, not tune a live serving policy. Keep these values deterministic for reproducible # portfolio artifacts; recalibrate them before using the approach on a real production corpus. RISK_SCORE_WEIGHTS = {"error": 0.45, "hallucination": 0.35, "retrieval": 0.20} # Evidence-strength weights summarize retrieval-side evidence signals for offline policy review. # This score is not model confidence or a calibrated probability of answer correctness. EVIDENCE_STRENGTH_WEIGHTS = {"top1_score": 0.35, "mean_retrieved_score": 0.15, "recall_at_10": 0.30, "mrr_at_10": 0.20} # Offline objective presets used by the configuration leaderboard. The weights are hand-tuned review # lenses for this bundled synthetic evaluation set: quality-heavy by default, with alternative views # for hallucination, latency, and cost sensitivity. They are not learned coefficients or production SLAs. CONFIG_OBJECTIVE_WEIGHTS = { "Balanced": {"correct": 0.42, "recall": 0.18, "halluc": 0.22, "latency": 0.10, "cost": 0.08}, "Max quality": {"correct": 0.55, "recall": 0.25, "halluc": 0.15, "latency": 0.03, "cost": 0.02}, "Min hallucination": {"correct": 0.30, "recall": 0.15, "halluc": 0.45, "latency": 0.05, "cost": 0.05}, "Low latency": {"correct": 0.28, "recall": 0.12, "halluc": 0.15, "latency": 0.38, "cost": 0.07}, "Low cost": {"correct": 0.28, "recall": 0.12, "halluc": 0.15, "latency": 0.07, "cost": 0.38}, } @dataclass(frozen=True) class DecisionBrief: posture: str posture_reason: str main_driver: str worst_slice: str best_config: str recommended_action: str def _numeric(series: pd.Series) -> pd.Series: return pd.to_numeric(series, errors="coerce") def _p95(series: pd.Series) -> float: val = _numeric(series).quantile(0.95) return np.nan if pd.isna(val) else float(val) def _safe_col(df: pd.DataFrame, col: str, default: float = np.nan) -> pd.Series: if col in df.columns: return _numeric(df[col]) return pd.Series(default, index=df.index, dtype="float64") def safe_mean(df: pd.DataFrame, col: str, default: float = 0.0) -> float: if col not in df.columns or len(df) == 0: return default val = _numeric(df[col]).mean() return default if pd.isna(val) else float(val) def safe_p95(df: pd.DataFrame, col: str, default: float = 0.0) -> float: if col not in df.columns or len(df) == 0: return default val = _numeric(df[col]).quantile(0.95) return default if pd.isna(val) else float(val) def fmt_pct(x: float) -> str: if pd.isna(x): return "n/a" return f"{x:.1%}" def fmt_money(x: float) -> str: if pd.isna(x): return "n/a" return f"${x:.4f}" def overview_metrics(eval_df: pd.DataFrame, docs: pd.DataFrame, chunks: pd.DataFrame, retrieval: pd.DataFrame) -> dict[str, float]: return { "evaluations": float(len(eval_df)), "retrieval_events": float(len(retrieval)), "documents": float(len(docs)), "chunks": float(len(chunks)), "correct_rate": safe_mean(eval_df, "is_correct", default=np.nan), "hallucination_rate": safe_mean(eval_df, "hallucination_flag", default=np.nan), "recall_at_10": safe_mean(eval_df, "recall_at_10", default=np.nan), "mrr_at_10": safe_mean(eval_df, "mrr_at_10", default=np.nan), "p95_latency_ms": safe_p95(eval_df, "total_latency_ms", default=np.nan), "avg_cost_usd": safe_mean(eval_df, "total_cost_usd", default=np.nan), } def quality_posture(metrics: dict[str, float]) -> tuple[str, str]: correct = metrics.get("correct_rate", np.nan) halluc = metrics.get("hallucination_rate", np.nan) recall = metrics.get("recall_at_10", np.nan) if pd.isna(correct) or pd.isna(halluc) or pd.isna(recall): return "Review", "one or more key quality signals are unavailable under the current filters" if correct >= 0.78 and halluc <= 0.10 and recall >= 0.70: return "Stable", "correctness, hallucination, and retrieval signals are within a usable operating band" if correct < 0.62 or halluc > 0.20 or recall < 0.45: return "High Risk", "one or more quality signals are outside the expected operating band" return "Watch", "quality is usable for analysis, but risk slices require targeted review" def risk_slices( eval_df: pd.DataFrame, group_cols: Sequence[str] = ("domain", "scenario_type", "difficulty"), min_n: int = MIN_SLICE_N, ) -> pd.DataFrame: """Aggregate risk by slice without silent count-as-metric fallbacks.""" available = [c for c in group_cols if c in eval_df.columns] if not available or len(eval_df) == 0: return pd.DataFrame() src = eval_df.copy() for col in available: src[col] = src[col].astype("string").fillna(MISSING_LABEL) src["__row_count"] = 1 agg_map = {"n": ("__row_count", "size")} optional_aggs = { "correct_rate": ("is_correct", "mean"), "hallucination_rate": ("hallucination_flag", "mean"), "recall_at_10": ("recall_at_10", "mean"), "mrr_at_10": ("mrr_at_10", "mean"), "p95_latency_ms": ("total_latency_ms", _p95), "avg_cost_usd": ("total_cost_usd", "mean"), } for out_col, spec in optional_aggs.items(): if spec[0] in src.columns: agg_map[out_col] = spec out = src.groupby(available, dropna=False).agg(**agg_map).reset_index() out = out[out["n"] >= int(min_n)].copy() if out.empty: return out for col in ["correct_rate", "hallucination_rate", "recall_at_10", "mrr_at_10", "p95_latency_ms", "avg_cost_usd"]: if col not in out.columns: out[col] = np.nan out["error_rate"] = 1 - out["correct_rate"] error_component = out["error_rate"].fillna(0.0).clip(0, 1) halluc_component = out["hallucination_rate"].fillna(0.0).clip(0, 1) retrieval_component = (1 - out["recall_at_10"].fillna(1.0)).clip(0, 1) out["risk_score"] = ( error_component * RISK_SCORE_WEIGHTS["error"] + halluc_component * RISK_SCORE_WEIGHTS["hallucination"] + retrieval_component * RISK_SCORE_WEIGHTS["retrieval"] ) return out.sort_values("risk_score", ascending=False).reset_index(drop=True) def retrieval_outcomes(eval_df: pd.DataFrame, threshold: float = RETRIEVAL_OK_THRESHOLD) -> pd.DataFrame: """Classify rows into retrieval/generation/hallucination modes using one canonical implementation.""" required = {"recall_at_10", "is_correct"} if len(eval_df) == 0 or not required.issubset(eval_df.columns): return pd.DataFrame() src = eval_df.copy() src["__row_count"] = 1 src["retrieval_state"] = np.where(_numeric(src["recall_at_10"]) >= threshold, "retrieval_ok", "retrieval_weak") src["answer_state"] = np.where(_numeric(src["is_correct"]).fillna(0.0) >= 0.5, "answer_correct", "answer_incorrect") halluc = _safe_col(src, "hallucination_flag", 0.0).fillna(0.0) >= 0.5 src["failure_mode"] = np.select( [ halluc & (src["answer_state"] == "answer_correct"), halluc & (src["answer_state"] == "answer_incorrect"), (src["retrieval_state"] == "retrieval_weak") & (src["answer_state"] == "answer_incorrect"), (src["retrieval_state"] == "retrieval_ok") & (src["answer_state"] == "answer_incorrect"), (src["retrieval_state"] == "retrieval_weak") & (src["answer_state"] == "answer_correct"), (src["retrieval_state"] == "retrieval_ok") & (src["answer_state"] == "answer_correct"), ], [ "hallucination_risk_correct_answer", "hallucination_failure", "retrieval_failure", "generation_failure", "recovered_by_generation", "healthy", ], default=MISSING_LABEL, ) agg_map = { "n": ("__row_count", "size"), "correct_rate": ("is_correct", "mean"), "recall_at_10": ("recall_at_10", "mean"), } if "hallucination_flag" in src.columns: agg_map["hallucination_rate"] = ("hallucination_flag", "mean") if "total_latency_ms" in src.columns: agg_map["p95_latency_ms"] = ("total_latency_ms", _p95) if "total_cost_usd" in src.columns: agg_map["avg_cost_usd"] = ("total_cost_usd", "mean") out = src.groupby("failure_mode", dropna=False).agg(**agg_map).reset_index() for col in ["hallucination_rate", "p95_latency_ms", "avg_cost_usd"]: if col not in out.columns: out[col] = np.nan out["share"] = out["n"] / max(out["n"].sum(), 1) return out.sort_values("n", ascending=False).reset_index(drop=True) def p95_scaled(series: pd.Series) -> pd.Series: vals = _numeric(series) denom = vals.quantile(0.95) if pd.isna(denom) or denom <= 0: return pd.Series(np.zeros(len(vals)), index=vals.index) return (vals.fillna(0.0) / denom).clip(0, 2) def _rate_ci(p: pd.Series, n: pd.Series, z: float = 1.96) -> tuple[pd.Series, pd.Series]: p = _numeric(p).clip(0, 1) n = _numeric(n).clip(lower=1) se = np.sqrt((p * (1 - p)) / n) return (p - z * se).clip(0, 1), (p + z * se).clip(0, 1) def config_leaderboard(eval_df: pd.DataFrame, objective: str = "Balanced", min_n: int = MIN_CONFIG_N) -> pd.DataFrame: required = {"retrieval_strategy", "generator_model", "is_correct", "hallucination_flag", "total_latency_ms", "total_cost_usd"} if not required.issubset(eval_df.columns) or len(eval_df) == 0: return pd.DataFrame() src = eval_df.copy() src["__row_count"] = 1 group_cols = ["retrieval_strategy", "generator_model"] if "chunking_strategy" in src.columns: group_cols.append("chunking_strategy") for col in group_cols: src[col] = src[col].astype("string").fillna(MISSING_LABEL) agg_map = { "n": ("__row_count", "size"), "correct_rate": ("is_correct", "mean"), "hallucination_rate": ("hallucination_flag", "mean"), "p95_latency_ms": ("total_latency_ms", _p95), "avg_cost_usd": ("total_cost_usd", "mean"), } if "recall_at_10" in src.columns: agg_map["recall_at_10"] = ("recall_at_10", "mean") if "mrr_at_10" in src.columns: agg_map["mrr_at_10"] = ("mrr_at_10", "mean") out = src.groupby(group_cols, dropna=False).agg(**agg_map).reset_index() out = out[out["n"] >= int(min_n)].copy() if out.empty: return out for col in ["recall_at_10", "mrr_at_10"]: if col not in out.columns: out[col] = np.nan out["correct_rate_ci_low"], out["correct_rate_ci_high"] = _rate_ci(out["correct_rate"], out["n"]) out["hallucination_rate_ci_low"], out["hallucination_rate_ci_high"] = _rate_ci(out["hallucination_rate"], out["n"]) out["latency_scaled"] = p95_scaled(out["p95_latency_ms"]) out["cost_scaled"] = p95_scaled(out["avg_cost_usd"]) weights = CONFIG_OBJECTIVE_WEIGHTS.get(objective, CONFIG_OBJECTIVE_WEIGHTS["Balanced"]) out["score"] = ( out["correct_rate"].fillna(0) * weights["correct"] + out["recall_at_10"].fillna(0) * weights["recall"] - out["hallucination_rate"].fillna(0) * weights["halluc"] - out["latency_scaled"].fillna(0) * weights["latency"] - out["cost_scaled"].fillna(0) * weights["cost"] ) out["config"] = out[group_cols].astype(str).agg(" / ".join, axis=1) return out.sort_values("score", ascending=False).reset_index(drop=True) def demand_coverage(eval_df: pd.DataFrame, docs: pd.DataFrame) -> pd.DataFrame: if "domain" not in eval_df.columns or "domain" not in docs.columns: return pd.DataFrame() demand = eval_df["domain"].astype("string").fillna(MISSING_LABEL).value_counts(normalize=True).rename("eval_demand_share") corpus = docs["domain"].astype("string").fillna(MISSING_LABEL).value_counts(normalize=True).rename("corpus_document_share") out = pd.concat([demand, corpus], axis=1).fillna(0) out.index.name = "domain" out = out.reset_index() out["demand_minus_corpus"] = out["eval_demand_share"] - out["corpus_document_share"] return out.sort_values("eval_demand_share", ascending=False).reset_index(drop=True) def evidence_strength_proxy(eval_df: pd.DataFrame, reference_df: pd.DataFrame | None = None) -> pd.Series: """Offline evidence-strength proxy derived from retrieval-side evaluation signals. The score is not LLM confidence, a calibrated correctness probability, or a production approval signal. The optional reference_df fixes normalization anchors so threshold behavior does not drift when the user changes dashboard filters. """ ref = reference_df if reference_df is not None and len(reference_df) else eval_df parts = [] weights = [] for col, weight in EVIDENCE_STRENGTH_WEIGHTS.items(): if col in eval_df.columns: s = _numeric(eval_df[col]).fillna(0.0) ref_s = _numeric(ref[col]).fillna(0.0) if col in ref.columns else s min_v, max_v = ref_s.min(), ref_s.max() if max_v > min_v: s = (s - min_v) / (max_v - min_v) parts.append(s.clip(0, 1) * weight) weights.append(weight) if not parts: return pd.Series(np.zeros(len(eval_df)), index=eval_df.index) score = sum(parts) / max(sum(weights), 1e-9) return score.clip(0, 1) def policy_curve( eval_df: pd.DataFrame, thresholds: Iterable[float] | None = None, reference_df: pd.DataFrame | None = None, ) -> pd.DataFrame: if len(eval_df) == 0: return pd.DataFrame() if thresholds is None: thresholds = np.linspace(0.05, 0.95, 19) src = eval_df.copy() src["evidence_strength_proxy"] = evidence_strength_proxy(src, reference_df=reference_df) rows = [] total_hallucinations = _safe_col(src, "hallucination_flag", 0.0).sum() for thr in thresholds: auto = src[src["evidence_strength_proxy"] >= thr] review = src[src["evidence_strength_proxy"] < thr] review_hallucinations = _safe_col(review, "hallucination_flag", 0.0).sum() rows.append( { "threshold": float(thr), "auto_approve_rate": len(auto) / max(len(src), 1), "review_queue_size": int(len(review)), "auto_correct_rate": safe_mean(auto, "is_correct", default=np.nan) if len(auto) else np.nan, "auto_hallucination_rate": safe_mean(auto, "hallucination_flag", default=np.nan) if len(auto) else np.nan, "risk_captured_in_review": (review_hallucinations / total_hallucinations) if total_hallucinations > 0 else np.nan, } ) return pd.DataFrame(rows) def policy_at_threshold(eval_df: pd.DataFrame, threshold: float, reference_df: pd.DataFrame | None = None) -> dict[str, float]: curve = policy_curve(eval_df, [threshold], reference_df=reference_df) if curve.empty: return {} return curve.iloc[0].to_dict() def make_decision_brief( eval_df: pd.DataFrame, docs: pd.DataFrame, chunks: pd.DataFrame, retrieval: pd.DataFrame, min_slice_n: int = MIN_SLICE_N, min_config_n: int = MIN_CONFIG_N, *, risk_table: pd.DataFrame | None = None, retrieval_table: pd.DataFrame | None = None, config_table: pd.DataFrame | None = None, ) -> DecisionBrief: metrics = overview_metrics(eval_df, docs, chunks, retrieval) posture, reason = quality_posture(metrics) retrieval_table = retrieval_table if retrieval_table is not None else retrieval_outcomes(eval_df) risk = risk_table if risk_table is not None else risk_slices(eval_df, min_n=min_slice_n) configs = config_table if config_table is not None else config_leaderboard(eval_df, min_n=min_config_n) main_driver = "Mixed" if not retrieval_table.empty: # Pick the dominant non-healthy mode for actionability. A large healthy segment should not hide # the strongest remaining failure class in the decision strip. driver_rows = retrieval_table[retrieval_table["failure_mode"].astype(str) != "healthy"] if driver_rows.empty: driver_rows = retrieval_table top_mode = str(driver_rows.iloc[0]["failure_mode"]) if "hallucination" in top_mode: main_driver = "Hallucination" elif "retrieval" in top_mode: main_driver = "Retrieval" elif "generation" in top_mode: main_driver = "Generation" elif top_mode == "healthy": main_driver = "Healthy majority" worst_slice = "No high-risk slice above minimum sample size" if not risk.empty: row = risk.iloc[0] parts = [str(row[c]) for c in ["domain", "scenario_type", "difficulty"] if c in risk.columns] worst_slice = " / ".join(parts) + f" · risk={row['risk_score']:.2f}" best_config = "No eligible configuration" if not configs.empty: row = configs.iloc[0] best_config = f"{row['config']} · score={row['score']:.2f} · n={int(row['n'])}" if posture == "High Risk": action = "Prioritize the top risk slice and inspect retrieval evidence before widening auto-approval." elif main_driver == "Retrieval": action = "Start with retrieval diagnostics: recall coverage, chunk ranking, and corpus-demand alignment." elif main_driver == "Generation": action = "Inspect answer generation behavior on retrieval-ok but incorrect examples." elif main_driver == "Hallucination": action = "Review hallucination-heavy examples even when answer correctness appears acceptable." else: action = "Use the policy simulator to choose a review threshold that balances coverage and risk." return DecisionBrief( posture=posture, posture_reason=reason, main_driver=main_driver, worst_slice=worst_slice, best_config=best_config, recommended_action=action, ) def top_examples(eval_df: pd.DataFrame, mode: str = "High risk", n: int = 100, reference_df: pd.DataFrame | None = None) -> pd.DataFrame: if len(eval_df) == 0: return pd.DataFrame() src = eval_df.copy() src["evidence_strength_proxy"] = evidence_strength_proxy(src, reference_df=reference_df) src["risk_rank_score"] = ( (1 - _safe_col(src, "is_correct", 0.0).fillna(0.0)).clip(0, 1) * RISK_SCORE_WEIGHTS["error"] + _safe_col(src, "hallucination_flag", 0.0).fillna(0.0).clip(0, 1) * RISK_SCORE_WEIGHTS["hallucination"] + (1 - _safe_col(src, "recall_at_10", 1.0).fillna(1.0)).clip(0, 1) * RISK_SCORE_WEIGHTS["retrieval"] ) if mode == "Incorrect" and "is_correct" in src.columns: src = src[_numeric(src["is_correct"]).fillna(0) == 0] elif mode == "Hallucination" and "hallucination_flag" in src.columns: src = src[_numeric(src["hallucination_flag"]).fillna(0) == 1] elif mode == "Low retrieval" and "recall_at_10" in src.columns: src = src[_numeric(src["recall_at_10"]).fillna(1) < RETRIEVAL_OK_THRESHOLD] return src.sort_values("risk_rank_score", ascending=False).head(n).reset_index(drop=True)