File size: 5,936 Bytes
1db7196 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 | #!/usr/bin/env python3
import argparse
import json
import math
from statistics import median, quantiles
LABEL_ORDER = ["low", "intermediate", "proficient"]
ORDERED_METRICS = {"source_coverage", "completeness"}
def normalize_label(key: str) -> str:
key_l = key.lower()
for label in LABEL_ORDER:
if label in key_l:
return label
return key_l
def five_number_summary(values):
if not values:
return None
q1, _, q3 = quantiles(values, n=4, method="inclusive")
return {
"min": min(values),
"q1": q1,
"median": median(values),
"q3": q3,
"max": max(values),
}
def remove_outliers_iqr(values):
if len(values) < 4:
return values, 0
q1, _, q3 = quantiles(values, n=4, method="inclusive")
iqr = q3 - q1
if math.isclose(iqr, 0.0):
return values, 0
lower = q1 - 1.5 * iqr
upper = q3 + 1.5 * iqr
filtered = [v for v in values if lower <= v <= upper]
return filtered, len(values) - len(filtered)
def parse_scores(data, metrics):
grouped = {label: {m: [] for m in metrics} for label in LABEL_ORDER}
for item in data:
levels = item.get("literacy_levels") or {}
for key, payload in levels.items():
label = normalize_label(key)
if label not in grouped:
continue
scores = (payload or {}).get("scores") or {}
for m in metrics:
if m in scores and scores[m] is not None:
grouped[label][m].append(scores[m])
return grouped
def suggest_thresholds(per_label_summaries, label_order):
thresholds = {}
for metric in per_label_summaries:
thresholds[metric] = {}
for i in range(len(label_order) - 1):
lower_label = label_order[i]
upper_label = label_order[i + 1]
lower = per_label_summaries[metric].get(lower_label)
upper = per_label_summaries[metric].get(upper_label)
if not lower or not upper:
thresholds[metric][f"{lower_label}_to_{upper_label}"] = None
continue
if lower["q3"] < upper["q1"]:
boundary = (lower["q3"] + upper["q1"]) / 2
else:
boundary = (lower["median"] + upper["median"]) / 2
thresholds[metric][f"{lower_label}_to_{upper_label}"] = boundary
return thresholds
def print_summary(metrics, cleaned_by_label, outlier_counts, summaries):
for label in LABEL_ORDER:
print(f"\nLabel: {label}")
for m in metrics:
vals = cleaned_by_label[label][m]
summary = summaries[m].get(label)
removed = outlier_counts[label][m]
print(f" Metric: {m}")
print(f" Count (after outliers): {len(vals)}")
print(f" Outliers removed: {removed}")
if summary:
print(
" Five-number summary: "
f"min={summary['min']:.4f}, "
f"q1={summary['q1']:.4f}, "
f"median={summary['median']:.4f}, "
f"q3={summary['q3']:.4f}, "
f"max={summary['max']:.4f}"
)
else:
print(" Five-number summary: n/a")
def medians_in_order(summaries, metric, label_order):
medians = []
for label in label_order:
summary = summaries.get(metric, {}).get(label)
if not summary:
return False
medians.append(summary["median"])
return medians[0] <= medians[1] <= medians[2]
def enforce_ordered_metrics(metrics, grouped, cleaned, outlier_counts, summaries):
for metric in metrics:
if metric not in ORDERED_METRICS:
continue
if medians_in_order(summaries, metric, LABEL_ORDER):
continue
for label in LABEL_ORDER:
raw_values = grouped[label][metric]
cleaned[label][metric] = raw_values
outlier_counts[label][metric] = 0
if raw_values:
summaries[metric][label] = five_number_summary(raw_values)
def main():
parser = argparse.ArgumentParser(
description="Compute five-number summaries by literacy label with outlier removal."
)
parser.add_argument(
"--input",
default="/home/mshahidul/readctrl/data/factual_testing/full_details_evaluation_0_80_qwen3-30B_v2.json",
help="Path to JSON evaluation file.",
)
parser.add_argument(
"--metrics",
default="factual_attribution,completeness,source_coverage",
help="Comma-separated metrics to analyze.",
)
args = parser.parse_args()
metrics = [m.strip() for m in args.metrics.split(",") if m.strip()]
with open(args.input, "r", encoding="utf-8") as f:
data = json.load(f)
grouped = parse_scores(data, metrics)
cleaned = {label: {} for label in LABEL_ORDER}
outlier_counts = {label: {} for label in LABEL_ORDER}
summaries = {m: {} for m in metrics}
for label in LABEL_ORDER:
for m in metrics:
values = grouped[label][m]
filtered, removed = remove_outliers_iqr(values)
cleaned[label][m] = filtered
outlier_counts[label][m] = removed
if filtered:
summaries[m][label] = five_number_summary(filtered)
enforce_ordered_metrics(metrics, grouped, cleaned, outlier_counts, summaries)
print_summary(metrics, cleaned, outlier_counts, summaries)
thresholds = suggest_thresholds(summaries, LABEL_ORDER)
print("\nSuggested thresholds (based on cleaned quartiles/medians):")
for m in metrics:
print(f" Metric: {m}")
for k, v in thresholds[m].items():
if v is None:
print(f" {k}: n/a")
else:
print(f" {k}: {v:.4f}")
if __name__ == "__main__":
main()
|