| |
| import argparse |
| import json |
| import math |
| from statistics import median, quantiles |
|
|
|
|
| LABEL_ORDER = ["low", "intermediate", "proficient"] |
| ORDERED_METRICS = {"source_coverage", "completeness"} |
|
|
|
|
| def normalize_label(key: str) -> str: |
| key_l = key.lower() |
| for label in LABEL_ORDER: |
| if label in key_l: |
| return label |
| return key_l |
|
|
|
|
| def five_number_summary(values): |
| if not values: |
| return None |
| q1, _, q3 = quantiles(values, n=4, method="inclusive") |
| return { |
| "min": min(values), |
| "q1": q1, |
| "median": median(values), |
| "q3": q3, |
| "max": max(values), |
| } |
|
|
|
|
| def remove_outliers_iqr(values): |
| if len(values) < 4: |
| return values, 0 |
| q1, _, q3 = quantiles(values, n=4, method="inclusive") |
| iqr = q3 - q1 |
| if math.isclose(iqr, 0.0): |
| return values, 0 |
| lower = q1 - 1.5 * iqr |
| upper = q3 + 1.5 * iqr |
| filtered = [v for v in values if lower <= v <= upper] |
| return filtered, len(values) - len(filtered) |
|
|
|
|
| def parse_scores(data, metrics): |
| grouped = {label: {m: [] for m in metrics} for label in LABEL_ORDER} |
| for item in data: |
| levels = item.get("literacy_levels") or {} |
| for key, payload in levels.items(): |
| label = normalize_label(key) |
| if label not in grouped: |
| continue |
| scores = (payload or {}).get("scores") or {} |
| for m in metrics: |
| if m in scores and scores[m] is not None: |
| grouped[label][m].append(scores[m]) |
| return grouped |
|
|
|
|
| def suggest_thresholds(per_label_summaries, label_order): |
| thresholds = {} |
| for metric in per_label_summaries: |
| thresholds[metric] = {} |
| for i in range(len(label_order) - 1): |
| lower_label = label_order[i] |
| upper_label = label_order[i + 1] |
| lower = per_label_summaries[metric].get(lower_label) |
| upper = per_label_summaries[metric].get(upper_label) |
| if not lower or not upper: |
| thresholds[metric][f"{lower_label}_to_{upper_label}"] = None |
| continue |
| if lower["q3"] < upper["q1"]: |
| boundary = (lower["q3"] + upper["q1"]) / 2 |
| else: |
| boundary = (lower["median"] + upper["median"]) / 2 |
| thresholds[metric][f"{lower_label}_to_{upper_label}"] = boundary |
| return thresholds |
|
|
|
|
| def print_summary(metrics, cleaned_by_label, outlier_counts, summaries): |
| for label in LABEL_ORDER: |
| print(f"\nLabel: {label}") |
| for m in metrics: |
| vals = cleaned_by_label[label][m] |
| summary = summaries[m].get(label) |
| removed = outlier_counts[label][m] |
| print(f" Metric: {m}") |
| print(f" Count (after outliers): {len(vals)}") |
| print(f" Outliers removed: {removed}") |
| if summary: |
| print( |
| " Five-number summary: " |
| f"min={summary['min']:.4f}, " |
| f"q1={summary['q1']:.4f}, " |
| f"median={summary['median']:.4f}, " |
| f"q3={summary['q3']:.4f}, " |
| f"max={summary['max']:.4f}" |
| ) |
| else: |
| print(" Five-number summary: n/a") |
|
|
|
|
| def medians_in_order(summaries, metric, label_order): |
| medians = [] |
| for label in label_order: |
| summary = summaries.get(metric, {}).get(label) |
| if not summary: |
| return False |
| medians.append(summary["median"]) |
| return medians[0] <= medians[1] <= medians[2] |
|
|
|
|
| def enforce_ordered_metrics(metrics, grouped, cleaned, outlier_counts, summaries): |
| for metric in metrics: |
| if metric not in ORDERED_METRICS: |
| continue |
| if medians_in_order(summaries, metric, LABEL_ORDER): |
| continue |
| for label in LABEL_ORDER: |
| raw_values = grouped[label][metric] |
| cleaned[label][metric] = raw_values |
| outlier_counts[label][metric] = 0 |
| if raw_values: |
| summaries[metric][label] = five_number_summary(raw_values) |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description="Compute five-number summaries by literacy label with outlier removal." |
| ) |
| parser.add_argument( |
| "--input", |
| default="/home/mshahidul/readctrl/data/factual_testing/full_details_evaluation_0_80_qwen3-30B_v2.json", |
| help="Path to JSON evaluation file.", |
| ) |
| parser.add_argument( |
| "--metrics", |
| default="factual_attribution,completeness,source_coverage", |
| help="Comma-separated metrics to analyze.", |
| ) |
| args = parser.parse_args() |
|
|
| metrics = [m.strip() for m in args.metrics.split(",") if m.strip()] |
| with open(args.input, "r", encoding="utf-8") as f: |
| data = json.load(f) |
|
|
| grouped = parse_scores(data, metrics) |
| cleaned = {label: {} for label in LABEL_ORDER} |
| outlier_counts = {label: {} for label in LABEL_ORDER} |
| summaries = {m: {} for m in metrics} |
|
|
| for label in LABEL_ORDER: |
| for m in metrics: |
| values = grouped[label][m] |
| filtered, removed = remove_outliers_iqr(values) |
| cleaned[label][m] = filtered |
| outlier_counts[label][m] = removed |
| if filtered: |
| summaries[m][label] = five_number_summary(filtered) |
|
|
| enforce_ordered_metrics(metrics, grouped, cleaned, outlier_counts, summaries) |
|
|
| print_summary(metrics, cleaned, outlier_counts, summaries) |
| thresholds = suggest_thresholds(summaries, LABEL_ORDER) |
|
|
| print("\nSuggested thresholds (based on cleaned quartiles/medians):") |
| for m in metrics: |
| print(f" Metric: {m}") |
| for k, v in thresholds[m].items(): |
| if v is None: |
| print(f" {k}: n/a") |
| else: |
| print(f" {k}: {v:.4f}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|