rag-qa-command-cente / src /analytics.py
Tarek Masryo
chore: update project files
6bef416
from __future__ import annotations
from collections.abc import Iterable, Sequence
from dataclasses import dataclass
import numpy as np
import pandas as pd
MIN_SLICE_N = 30
MIN_CONFIG_N = 50
# Offline diagnostic boundary: Recall@10 >= 0.80 means most gold evidence is present in the top-10 retrieved chunks.
# It is a review lens for this evaluation corpus, not a production deployment policy.
RETRIEVAL_OK_THRESHOLD = 0.80
MISSING_LABEL = "Missing / Not provided"
# Review-priority weights for offline risk slicing. They intentionally emphasize answer error and
# hallucination exposure ahead of retrieval weakness because the dashboard is meant to drive human
# QA review queues, not tune a live serving policy. Keep these values deterministic for reproducible
# portfolio artifacts; recalibrate them before using the approach on a real production corpus.
RISK_SCORE_WEIGHTS = {"error": 0.45, "hallucination": 0.35, "retrieval": 0.20}
# Evidence-strength weights summarize retrieval-side evidence signals for offline policy review.
# This score is not model confidence or a calibrated probability of answer correctness.
EVIDENCE_STRENGTH_WEIGHTS = {"top1_score": 0.35, "mean_retrieved_score": 0.15, "recall_at_10": 0.30, "mrr_at_10": 0.20}
# Offline objective presets used by the configuration leaderboard. The weights are hand-tuned review
# lenses for this bundled synthetic evaluation set: quality-heavy by default, with alternative views
# for hallucination, latency, and cost sensitivity. They are not learned coefficients or production SLAs.
CONFIG_OBJECTIVE_WEIGHTS = {
"Balanced": {"correct": 0.42, "recall": 0.18, "halluc": 0.22, "latency": 0.10, "cost": 0.08},
"Max quality": {"correct": 0.55, "recall": 0.25, "halluc": 0.15, "latency": 0.03, "cost": 0.02},
"Min hallucination": {"correct": 0.30, "recall": 0.15, "halluc": 0.45, "latency": 0.05, "cost": 0.05},
"Low latency": {"correct": 0.28, "recall": 0.12, "halluc": 0.15, "latency": 0.38, "cost": 0.07},
"Low cost": {"correct": 0.28, "recall": 0.12, "halluc": 0.15, "latency": 0.07, "cost": 0.38},
}
@dataclass(frozen=True)
class DecisionBrief:
posture: str
posture_reason: str
main_driver: str
worst_slice: str
best_config: str
recommended_action: str
def _numeric(series: pd.Series) -> pd.Series:
return pd.to_numeric(series, errors="coerce")
def _p95(series: pd.Series) -> float:
val = _numeric(series).quantile(0.95)
return np.nan if pd.isna(val) else float(val)
def _safe_col(df: pd.DataFrame, col: str, default: float = np.nan) -> pd.Series:
if col in df.columns:
return _numeric(df[col])
return pd.Series(default, index=df.index, dtype="float64")
def safe_mean(df: pd.DataFrame, col: str, default: float = 0.0) -> float:
if col not in df.columns or len(df) == 0:
return default
val = _numeric(df[col]).mean()
return default if pd.isna(val) else float(val)
def safe_p95(df: pd.DataFrame, col: str, default: float = 0.0) -> float:
if col not in df.columns or len(df) == 0:
return default
val = _numeric(df[col]).quantile(0.95)
return default if pd.isna(val) else float(val)
def fmt_pct(x: float) -> str:
if pd.isna(x):
return "n/a"
return f"{x:.1%}"
def fmt_money(x: float) -> str:
if pd.isna(x):
return "n/a"
return f"${x:.4f}"
def overview_metrics(eval_df: pd.DataFrame, docs: pd.DataFrame, chunks: pd.DataFrame, retrieval: pd.DataFrame) -> dict[str, float]:
return {
"evaluations": float(len(eval_df)),
"retrieval_events": float(len(retrieval)),
"documents": float(len(docs)),
"chunks": float(len(chunks)),
"correct_rate": safe_mean(eval_df, "is_correct", default=np.nan),
"hallucination_rate": safe_mean(eval_df, "hallucination_flag", default=np.nan),
"recall_at_10": safe_mean(eval_df, "recall_at_10", default=np.nan),
"mrr_at_10": safe_mean(eval_df, "mrr_at_10", default=np.nan),
"p95_latency_ms": safe_p95(eval_df, "total_latency_ms", default=np.nan),
"avg_cost_usd": safe_mean(eval_df, "total_cost_usd", default=np.nan),
}
def quality_posture(metrics: dict[str, float]) -> tuple[str, str]:
correct = metrics.get("correct_rate", np.nan)
halluc = metrics.get("hallucination_rate", np.nan)
recall = metrics.get("recall_at_10", np.nan)
if pd.isna(correct) or pd.isna(halluc) or pd.isna(recall):
return "Review", "one or more key quality signals are unavailable under the current filters"
if correct >= 0.78 and halluc <= 0.10 and recall >= 0.70:
return "Stable", "correctness, hallucination, and retrieval signals are within a usable operating band"
if correct < 0.62 or halluc > 0.20 or recall < 0.45:
return "High Risk", "one or more quality signals are outside the expected operating band"
return "Watch", "quality is usable for analysis, but risk slices require targeted review"
def risk_slices(
eval_df: pd.DataFrame,
group_cols: Sequence[str] = ("domain", "scenario_type", "difficulty"),
min_n: int = MIN_SLICE_N,
) -> pd.DataFrame:
"""Aggregate risk by slice without silent count-as-metric fallbacks."""
available = [c for c in group_cols if c in eval_df.columns]
if not available or len(eval_df) == 0:
return pd.DataFrame()
src = eval_df.copy()
for col in available:
src[col] = src[col].astype("string").fillna(MISSING_LABEL)
src["__row_count"] = 1
agg_map = {"n": ("__row_count", "size")}
optional_aggs = {
"correct_rate": ("is_correct", "mean"),
"hallucination_rate": ("hallucination_flag", "mean"),
"recall_at_10": ("recall_at_10", "mean"),
"mrr_at_10": ("mrr_at_10", "mean"),
"p95_latency_ms": ("total_latency_ms", _p95),
"avg_cost_usd": ("total_cost_usd", "mean"),
}
for out_col, spec in optional_aggs.items():
if spec[0] in src.columns:
agg_map[out_col] = spec
out = src.groupby(available, dropna=False).agg(**agg_map).reset_index()
out = out[out["n"] >= int(min_n)].copy()
if out.empty:
return out
for col in ["correct_rate", "hallucination_rate", "recall_at_10", "mrr_at_10", "p95_latency_ms", "avg_cost_usd"]:
if col not in out.columns:
out[col] = np.nan
out["error_rate"] = 1 - out["correct_rate"]
error_component = out["error_rate"].fillna(0.0).clip(0, 1)
halluc_component = out["hallucination_rate"].fillna(0.0).clip(0, 1)
retrieval_component = (1 - out["recall_at_10"].fillna(1.0)).clip(0, 1)
out["risk_score"] = (
error_component * RISK_SCORE_WEIGHTS["error"]
+ halluc_component * RISK_SCORE_WEIGHTS["hallucination"]
+ retrieval_component * RISK_SCORE_WEIGHTS["retrieval"]
)
return out.sort_values("risk_score", ascending=False).reset_index(drop=True)
def retrieval_outcomes(eval_df: pd.DataFrame, threshold: float = RETRIEVAL_OK_THRESHOLD) -> pd.DataFrame:
"""Classify rows into retrieval/generation/hallucination modes using one canonical implementation."""
required = {"recall_at_10", "is_correct"}
if len(eval_df) == 0 or not required.issubset(eval_df.columns):
return pd.DataFrame()
src = eval_df.copy()
src["__row_count"] = 1
src["retrieval_state"] = np.where(_numeric(src["recall_at_10"]) >= threshold, "retrieval_ok", "retrieval_weak")
src["answer_state"] = np.where(_numeric(src["is_correct"]).fillna(0.0) >= 0.5, "answer_correct", "answer_incorrect")
halluc = _safe_col(src, "hallucination_flag", 0.0).fillna(0.0) >= 0.5
src["failure_mode"] = np.select(
[
halluc & (src["answer_state"] == "answer_correct"),
halluc & (src["answer_state"] == "answer_incorrect"),
(src["retrieval_state"] == "retrieval_weak") & (src["answer_state"] == "answer_incorrect"),
(src["retrieval_state"] == "retrieval_ok") & (src["answer_state"] == "answer_incorrect"),
(src["retrieval_state"] == "retrieval_weak") & (src["answer_state"] == "answer_correct"),
(src["retrieval_state"] == "retrieval_ok") & (src["answer_state"] == "answer_correct"),
],
[
"hallucination_risk_correct_answer",
"hallucination_failure",
"retrieval_failure",
"generation_failure",
"recovered_by_generation",
"healthy",
],
default=MISSING_LABEL,
)
agg_map = {
"n": ("__row_count", "size"),
"correct_rate": ("is_correct", "mean"),
"recall_at_10": ("recall_at_10", "mean"),
}
if "hallucination_flag" in src.columns:
agg_map["hallucination_rate"] = ("hallucination_flag", "mean")
if "total_latency_ms" in src.columns:
agg_map["p95_latency_ms"] = ("total_latency_ms", _p95)
if "total_cost_usd" in src.columns:
agg_map["avg_cost_usd"] = ("total_cost_usd", "mean")
out = src.groupby("failure_mode", dropna=False).agg(**agg_map).reset_index()
for col in ["hallucination_rate", "p95_latency_ms", "avg_cost_usd"]:
if col not in out.columns:
out[col] = np.nan
out["share"] = out["n"] / max(out["n"].sum(), 1)
return out.sort_values("n", ascending=False).reset_index(drop=True)
def p95_scaled(series: pd.Series) -> pd.Series:
vals = _numeric(series)
denom = vals.quantile(0.95)
if pd.isna(denom) or denom <= 0:
return pd.Series(np.zeros(len(vals)), index=vals.index)
return (vals.fillna(0.0) / denom).clip(0, 2)
def _rate_ci(p: pd.Series, n: pd.Series, z: float = 1.96) -> tuple[pd.Series, pd.Series]:
p = _numeric(p).clip(0, 1)
n = _numeric(n).clip(lower=1)
se = np.sqrt((p * (1 - p)) / n)
return (p - z * se).clip(0, 1), (p + z * se).clip(0, 1)
def config_leaderboard(eval_df: pd.DataFrame, objective: str = "Balanced", min_n: int = MIN_CONFIG_N) -> pd.DataFrame:
required = {"retrieval_strategy", "generator_model", "is_correct", "hallucination_flag", "total_latency_ms", "total_cost_usd"}
if not required.issubset(eval_df.columns) or len(eval_df) == 0:
return pd.DataFrame()
src = eval_df.copy()
src["__row_count"] = 1
group_cols = ["retrieval_strategy", "generator_model"]
if "chunking_strategy" in src.columns:
group_cols.append("chunking_strategy")
for col in group_cols:
src[col] = src[col].astype("string").fillna(MISSING_LABEL)
agg_map = {
"n": ("__row_count", "size"),
"correct_rate": ("is_correct", "mean"),
"hallucination_rate": ("hallucination_flag", "mean"),
"p95_latency_ms": ("total_latency_ms", _p95),
"avg_cost_usd": ("total_cost_usd", "mean"),
}
if "recall_at_10" in src.columns:
agg_map["recall_at_10"] = ("recall_at_10", "mean")
if "mrr_at_10" in src.columns:
agg_map["mrr_at_10"] = ("mrr_at_10", "mean")
out = src.groupby(group_cols, dropna=False).agg(**agg_map).reset_index()
out = out[out["n"] >= int(min_n)].copy()
if out.empty:
return out
for col in ["recall_at_10", "mrr_at_10"]:
if col not in out.columns:
out[col] = np.nan
out["correct_rate_ci_low"], out["correct_rate_ci_high"] = _rate_ci(out["correct_rate"], out["n"])
out["hallucination_rate_ci_low"], out["hallucination_rate_ci_high"] = _rate_ci(out["hallucination_rate"], out["n"])
out["latency_scaled"] = p95_scaled(out["p95_latency_ms"])
out["cost_scaled"] = p95_scaled(out["avg_cost_usd"])
weights = CONFIG_OBJECTIVE_WEIGHTS.get(objective, CONFIG_OBJECTIVE_WEIGHTS["Balanced"])
out["score"] = (
out["correct_rate"].fillna(0) * weights["correct"]
+ out["recall_at_10"].fillna(0) * weights["recall"]
- out["hallucination_rate"].fillna(0) * weights["halluc"]
- out["latency_scaled"].fillna(0) * weights["latency"]
- out["cost_scaled"].fillna(0) * weights["cost"]
)
out["config"] = out[group_cols].astype(str).agg(" / ".join, axis=1)
return out.sort_values("score", ascending=False).reset_index(drop=True)
def demand_coverage(eval_df: pd.DataFrame, docs: pd.DataFrame) -> pd.DataFrame:
if "domain" not in eval_df.columns or "domain" not in docs.columns:
return pd.DataFrame()
demand = eval_df["domain"].astype("string").fillna(MISSING_LABEL).value_counts(normalize=True).rename("eval_demand_share")
corpus = docs["domain"].astype("string").fillna(MISSING_LABEL).value_counts(normalize=True).rename("corpus_document_share")
out = pd.concat([demand, corpus], axis=1).fillna(0)
out.index.name = "domain"
out = out.reset_index()
out["demand_minus_corpus"] = out["eval_demand_share"] - out["corpus_document_share"]
return out.sort_values("eval_demand_share", ascending=False).reset_index(drop=True)
def evidence_strength_proxy(eval_df: pd.DataFrame, reference_df: pd.DataFrame | None = None) -> pd.Series:
"""Offline evidence-strength proxy derived from retrieval-side evaluation signals.
The score is not LLM confidence, a calibrated correctness probability, or a production
approval signal. The optional reference_df fixes normalization anchors so threshold
behavior does not drift when the user changes dashboard filters.
"""
ref = reference_df if reference_df is not None and len(reference_df) else eval_df
parts = []
weights = []
for col, weight in EVIDENCE_STRENGTH_WEIGHTS.items():
if col in eval_df.columns:
s = _numeric(eval_df[col]).fillna(0.0)
ref_s = _numeric(ref[col]).fillna(0.0) if col in ref.columns else s
min_v, max_v = ref_s.min(), ref_s.max()
if max_v > min_v:
s = (s - min_v) / (max_v - min_v)
parts.append(s.clip(0, 1) * weight)
weights.append(weight)
if not parts:
return pd.Series(np.zeros(len(eval_df)), index=eval_df.index)
score = sum(parts) / max(sum(weights), 1e-9)
return score.clip(0, 1)
def policy_curve(
eval_df: pd.DataFrame,
thresholds: Iterable[float] | None = None,
reference_df: pd.DataFrame | None = None,
) -> pd.DataFrame:
if len(eval_df) == 0:
return pd.DataFrame()
if thresholds is None:
thresholds = np.linspace(0.05, 0.95, 19)
src = eval_df.copy()
src["evidence_strength_proxy"] = evidence_strength_proxy(src, reference_df=reference_df)
rows = []
total_hallucinations = _safe_col(src, "hallucination_flag", 0.0).sum()
for thr in thresholds:
auto = src[src["evidence_strength_proxy"] >= thr]
review = src[src["evidence_strength_proxy"] < thr]
review_hallucinations = _safe_col(review, "hallucination_flag", 0.0).sum()
rows.append(
{
"threshold": float(thr),
"auto_approve_rate": len(auto) / max(len(src), 1),
"review_queue_size": int(len(review)),
"auto_correct_rate": safe_mean(auto, "is_correct", default=np.nan) if len(auto) else np.nan,
"auto_hallucination_rate": safe_mean(auto, "hallucination_flag", default=np.nan) if len(auto) else np.nan,
"risk_captured_in_review": (review_hallucinations / total_hallucinations) if total_hallucinations > 0 else np.nan,
}
)
return pd.DataFrame(rows)
def policy_at_threshold(eval_df: pd.DataFrame, threshold: float, reference_df: pd.DataFrame | None = None) -> dict[str, float]:
curve = policy_curve(eval_df, [threshold], reference_df=reference_df)
if curve.empty:
return {}
return curve.iloc[0].to_dict()
def make_decision_brief(
eval_df: pd.DataFrame,
docs: pd.DataFrame,
chunks: pd.DataFrame,
retrieval: pd.DataFrame,
min_slice_n: int = MIN_SLICE_N,
min_config_n: int = MIN_CONFIG_N,
*,
risk_table: pd.DataFrame | None = None,
retrieval_table: pd.DataFrame | None = None,
config_table: pd.DataFrame | None = None,
) -> DecisionBrief:
metrics = overview_metrics(eval_df, docs, chunks, retrieval)
posture, reason = quality_posture(metrics)
retrieval_table = retrieval_table if retrieval_table is not None else retrieval_outcomes(eval_df)
risk = risk_table if risk_table is not None else risk_slices(eval_df, min_n=min_slice_n)
configs = config_table if config_table is not None else config_leaderboard(eval_df, min_n=min_config_n)
main_driver = "Mixed"
if not retrieval_table.empty:
# Pick the dominant non-healthy mode for actionability. A large healthy segment should not hide
# the strongest remaining failure class in the decision strip.
driver_rows = retrieval_table[retrieval_table["failure_mode"].astype(str) != "healthy"]
if driver_rows.empty:
driver_rows = retrieval_table
top_mode = str(driver_rows.iloc[0]["failure_mode"])
if "hallucination" in top_mode:
main_driver = "Hallucination"
elif "retrieval" in top_mode:
main_driver = "Retrieval"
elif "generation" in top_mode:
main_driver = "Generation"
elif top_mode == "healthy":
main_driver = "Healthy majority"
worst_slice = "No high-risk slice above minimum sample size"
if not risk.empty:
row = risk.iloc[0]
parts = [str(row[c]) for c in ["domain", "scenario_type", "difficulty"] if c in risk.columns]
worst_slice = " / ".join(parts) + f" 路 risk={row['risk_score']:.2f}"
best_config = "No eligible configuration"
if not configs.empty:
row = configs.iloc[0]
best_config = f"{row['config']} 路 score={row['score']:.2f} 路 n={int(row['n'])}"
if posture == "High Risk":
action = "Prioritize the top risk slice and inspect retrieval evidence before widening auto-approval."
elif main_driver == "Retrieval":
action = "Start with retrieval diagnostics: recall coverage, chunk ranking, and corpus-demand alignment."
elif main_driver == "Generation":
action = "Inspect answer generation behavior on retrieval-ok but incorrect examples."
elif main_driver == "Hallucination":
action = "Review hallucination-heavy examples even when answer correctness appears acceptable."
else:
action = "Use the policy simulator to choose a review threshold that balances coverage and risk."
return DecisionBrief(
posture=posture,
posture_reason=reason,
main_driver=main_driver,
worst_slice=worst_slice,
best_config=best_config,
recommended_action=action,
)
def top_examples(eval_df: pd.DataFrame, mode: str = "High risk", n: int = 100, reference_df: pd.DataFrame | None = None) -> pd.DataFrame:
if len(eval_df) == 0:
return pd.DataFrame()
src = eval_df.copy()
src["evidence_strength_proxy"] = evidence_strength_proxy(src, reference_df=reference_df)
src["risk_rank_score"] = (
(1 - _safe_col(src, "is_correct", 0.0).fillna(0.0)).clip(0, 1) * RISK_SCORE_WEIGHTS["error"]
+ _safe_col(src, "hallucination_flag", 0.0).fillna(0.0).clip(0, 1) * RISK_SCORE_WEIGHTS["hallucination"]
+ (1 - _safe_col(src, "recall_at_10", 1.0).fillna(1.0)).clip(0, 1) * RISK_SCORE_WEIGHTS["retrieval"]
)
if mode == "Incorrect" and "is_correct" in src.columns:
src = src[_numeric(src["is_correct"]).fillna(0) == 0]
elif mode == "Hallucination" and "hallucination_flag" in src.columns:
src = src[_numeric(src["hallucination_flag"]).fillna(0) == 1]
elif mode == "Low retrieval" and "recall_at_10" in src.columns:
src = src[_numeric(src["recall_at_10"]).fillna(1) < RETRIEVAL_OK_THRESHOLD]
return src.sort_values("risk_rank_score", ascending=False).head(n).reset_index(drop=True)