#!/usr/bin/env python3 import argparse import json import math from statistics import median, quantiles LABEL_ORDER = ["low", "intermediate", "proficient"] ORDERED_METRICS = {"source_coverage", "completeness"} def normalize_label(key: str) -> str: key_l = key.lower() for label in LABEL_ORDER: if label in key_l: return label return key_l def five_number_summary(values): if not values: return None q1, _, q3 = quantiles(values, n=4, method="inclusive") return { "min": min(values), "q1": q1, "median": median(values), "q3": q3, "max": max(values), } def remove_outliers_iqr(values): if len(values) < 4: return values, 0 q1, _, q3 = quantiles(values, n=4, method="inclusive") iqr = q3 - q1 if math.isclose(iqr, 0.0): return values, 0 lower = q1 - 1.5 * iqr upper = q3 + 1.5 * iqr filtered = [v for v in values if lower <= v <= upper] return filtered, len(values) - len(filtered) def parse_scores(data, metrics): grouped = {label: {m: [] for m in metrics} for label in LABEL_ORDER} for item in data: levels = item.get("literacy_levels") or {} for key, payload in levels.items(): label = normalize_label(key) if label not in grouped: continue scores = (payload or {}).get("scores") or {} for m in metrics: if m in scores and scores[m] is not None: grouped[label][m].append(scores[m]) return grouped def suggest_thresholds(per_label_summaries, label_order): thresholds = {} for metric in per_label_summaries: thresholds[metric] = {} for i in range(len(label_order) - 1): lower_label = label_order[i] upper_label = label_order[i + 1] lower = per_label_summaries[metric].get(lower_label) upper = per_label_summaries[metric].get(upper_label) if not lower or not upper: thresholds[metric][f"{lower_label}_to_{upper_label}"] = None continue if lower["q3"] < upper["q1"]: boundary = (lower["q3"] + upper["q1"]) / 2 else: boundary = (lower["median"] + upper["median"]) / 2 thresholds[metric][f"{lower_label}_to_{upper_label}"] = boundary return thresholds def print_summary(metrics, cleaned_by_label, outlier_counts, summaries): for label in LABEL_ORDER: print(f"\nLabel: {label}") for m in metrics: vals = cleaned_by_label[label][m] summary = summaries[m].get(label) removed = outlier_counts[label][m] print(f" Metric: {m}") print(f" Count (after outliers): {len(vals)}") print(f" Outliers removed: {removed}") if summary: print( " Five-number summary: " f"min={summary['min']:.4f}, " f"q1={summary['q1']:.4f}, " f"median={summary['median']:.4f}, " f"q3={summary['q3']:.4f}, " f"max={summary['max']:.4f}" ) else: print(" Five-number summary: n/a") def medians_in_order(summaries, metric, label_order): medians = [] for label in label_order: summary = summaries.get(metric, {}).get(label) if not summary: return False medians.append(summary["median"]) return medians[0] <= medians[1] <= medians[2] def enforce_ordered_metrics(metrics, grouped, cleaned, outlier_counts, summaries): for metric in metrics: if metric not in ORDERED_METRICS: continue if medians_in_order(summaries, metric, LABEL_ORDER): continue for label in LABEL_ORDER: raw_values = grouped[label][metric] cleaned[label][metric] = raw_values outlier_counts[label][metric] = 0 if raw_values: summaries[metric][label] = five_number_summary(raw_values) def main(): parser = argparse.ArgumentParser( description="Compute five-number summaries by literacy label with outlier removal." ) parser.add_argument( "--input", default="/home/mshahidul/readctrl/data/factual_testing/full_details_evaluation_0_80_qwen3-30B_v2.json", help="Path to JSON evaluation file.", ) parser.add_argument( "--metrics", default="factual_attribution,completeness,source_coverage", help="Comma-separated metrics to analyze.", ) args = parser.parse_args() metrics = [m.strip() for m in args.metrics.split(",") if m.strip()] with open(args.input, "r", encoding="utf-8") as f: data = json.load(f) grouped = parse_scores(data, metrics) cleaned = {label: {} for label in LABEL_ORDER} outlier_counts = {label: {} for label in LABEL_ORDER} summaries = {m: {} for m in metrics} for label in LABEL_ORDER: for m in metrics: values = grouped[label][m] filtered, removed = remove_outliers_iqr(values) cleaned[label][m] = filtered outlier_counts[label][m] = removed if filtered: summaries[m][label] = five_number_summary(filtered) enforce_ordered_metrics(metrics, grouped, cleaned, outlier_counts, summaries) print_summary(metrics, cleaned, outlier_counts, summaries) thresholds = suggest_thresholds(summaries, LABEL_ORDER) print("\nSuggested thresholds (based on cleaned quartiles/medians):") for m in metrics: print(f" Metric: {m}") for k, v in thresholds[m].items(): if v is None: print(f" {k}: n/a") else: print(f" {k}: {v:.4f}") if __name__ == "__main__": main()