| | |
| | import argparse |
| | import json |
| | import math |
| | from statistics import median, quantiles |
| |
|
| |
|
| | LABEL_ORDER = ["low", "intermediate", "proficient"] |
| | TARGET_METRIC = "source_coverage" |
| | ORDERED_METRICS = {TARGET_METRIC} |
| |
|
| |
|
| | def normalize_label(key: str) -> str: |
| | key_l = key.lower() |
| | for label in LABEL_ORDER: |
| | if label in key_l: |
| | return label |
| | return key_l |
| |
|
| |
|
| | def five_number_summary(values): |
| | if not values: |
| | return None |
| | q1, _, q3 = quantiles(values, n=4, method="inclusive") |
| | return { |
| | "min": min(values), |
| | "q1": q1, |
| | "median": median(values), |
| | "q3": q3, |
| | "max": max(values), |
| | } |
| |
|
| |
|
| | def remove_outliers_iqr(values): |
| | if len(values) < 4: |
| | return values, 0 |
| | q1, _, q3 = quantiles(values, n=4, method="inclusive") |
| | iqr = q3 - q1 |
| | if math.isclose(iqr, 0.0): |
| | return values, 0 |
| | lower = q1 - 1.5 * iqr |
| | upper = q3 + 1.5 * iqr |
| | filtered = [v for v in values if lower <= v <= upper] |
| | return filtered, len(values) - len(filtered) |
| |
|
| |
|
| | def parse_scores(data, metrics): |
| | grouped = {label: {m: [] for m in metrics} for label in LABEL_ORDER} |
| | for item in data: |
| | levels = item.get("literacy_levels") or {} |
| | for key, payload in levels.items(): |
| | label = normalize_label(key) |
| | if label not in grouped: |
| | continue |
| | scores = (payload or {}).get("scores") or {} |
| | for m in metrics: |
| | if m in scores and scores[m] is not None: |
| | grouped[label][m].append(scores[m]) |
| | return grouped |
| |
|
| |
|
| | def suggest_thresholds(per_label_summaries, label_order): |
| | thresholds = {} |
| | for metric in per_label_summaries: |
| | thresholds[metric] = {} |
| | for i in range(len(label_order) - 1): |
| | lower_label = label_order[i] |
| | upper_label = label_order[i + 1] |
| | lower = per_label_summaries[metric].get(lower_label) |
| | upper = per_label_summaries[metric].get(upper_label) |
| | if not lower or not upper: |
| | thresholds[metric][f"{lower_label}_to_{upper_label}"] = None |
| | continue |
| | if lower["q3"] < upper["q1"]: |
| | boundary = (lower["q3"] + upper["q1"]) / 2 |
| | else: |
| | boundary = (lower["median"] + upper["median"]) / 2 |
| | thresholds[metric][f"{lower_label}_to_{upper_label}"] = boundary |
| | return thresholds |
| |
|
| |
|
| | def print_summary(metrics, cleaned_by_label, outlier_counts, summaries): |
| | for label in LABEL_ORDER: |
| | print(f"\nLabel: {label}") |
| | for m in metrics: |
| | vals = cleaned_by_label[label][m] |
| | summary = summaries[m].get(label) |
| | removed = outlier_counts[label][m] |
| | print(f" Metric: {m}") |
| | print(f" Count (after outliers): {len(vals)}") |
| | print(f" Outliers removed: {removed}") |
| | if summary: |
| | print( |
| | " Five-number summary: " |
| | f"min={summary['min']:.4f}, " |
| | f"q1={summary['q1']:.4f}, " |
| | f"median={summary['median']:.4f}, " |
| | f"q3={summary['q3']:.4f}, " |
| | f"max={summary['max']:.4f}" |
| | ) |
| | else: |
| | print(" Five-number summary: n/a") |
| |
|
| |
|
| | def medians_in_order(summaries, metric, label_order): |
| | medians = [] |
| | for label in label_order: |
| | summary = summaries.get(metric, {}).get(label) |
| | if not summary: |
| | return False |
| | medians.append(summary["median"]) |
| | return medians[0] <= medians[1] <= medians[2] |
| |
|
| |
|
| | def enforce_ordered_metrics(metrics, grouped, cleaned, outlier_counts, summaries): |
| | for metric in metrics: |
| | if metric not in ORDERED_METRICS: |
| | continue |
| | if medians_in_order(summaries, metric, LABEL_ORDER): |
| | continue |
| | for label in LABEL_ORDER: |
| | raw_values = grouped[label][metric] |
| | cleaned[label][metric] = raw_values |
| | outlier_counts[label][metric] = 0 |
| | if raw_values: |
| | summaries[metric][label] = five_number_summary(raw_values) |
| |
|
| |
|
| | def main(): |
| | parser = argparse.ArgumentParser( |
| | description="Compute five-number summaries for source_coverage by literacy label." |
| | ) |
| | parser.add_argument( |
| | "--input", |
| | default="/home/mshahidul/readctrl/data/factual_testing/full_details_evaluation_0_80_qwen3-30B_v2.json", |
| | help="Path to JSON evaluation file.", |
| | ) |
| | args = parser.parse_args() |
| |
|
| | metrics = [TARGET_METRIC] |
| | with open(args.input, "r", encoding="utf-8") as f: |
| | data = json.load(f) |
| |
|
| | grouped = parse_scores(data, metrics) |
| | cleaned = {label: {} for label in LABEL_ORDER} |
| | outlier_counts = {label: {} for label in LABEL_ORDER} |
| | summaries = {m: {} for m in metrics} |
| |
|
| | for label in LABEL_ORDER: |
| | for m in metrics: |
| | values = grouped[label][m] |
| | filtered, removed = remove_outliers_iqr(values) |
| | cleaned[label][m] = filtered |
| | outlier_counts[label][m] = removed |
| | if filtered: |
| | summaries[m][label] = five_number_summary(filtered) |
| |
|
| | enforce_ordered_metrics(metrics, grouped, cleaned, outlier_counts, summaries) |
| |
|
| | print_summary(metrics, cleaned, outlier_counts, summaries) |
| | thresholds = suggest_thresholds(summaries, LABEL_ORDER) |
| |
|
| | print("\nSuggested thresholds (based on cleaned quartiles/medians):") |
| | for m in metrics: |
| | print(f" Metric: {m}") |
| | for k, v in thresholds[m].items(): |
| | if v is None: |
| | print(f" {k}: n/a") |
| | else: |
| | print(f" {k}: {v:.4f}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|