readCtrl_lambda / code /readctrl_rl_inference /eval_gpt5_results.py
mshahidul
Initial commit of readCtrl code without large models
030876e
"""
eval_gpt5_results.py
---------------------
Evaluate pre-generated GPT-5 inference results (from run_gpt5_inference.py)
with the same metrics used by test_classifier_with_subclaim_thresholds.py:
1. Classifier accuracy (DSPy health-literacy classifier)
2. Completeness score (recall: summary_subclaims covered by gen_text)
3. Hallucination score (gen_text sentences NOT supported by input_text)
Expected JSONL format (from run_gpt5_inference.py): each line has model,
row_index, doc_id, gold_label, source_lang, prompt, prediction, generated_text,
error. Reference (--reference-file) supplies summary_subclaims and input_text
by (doc_id, gold_label).
Usage
-----
# Offline: count scores only (no classifier/support API required)
python eval_gpt5_results.py --input-file gpt5mini-nano_inference/gpt5_inference_gpt-5_20260302_201653.jsonl --offline
# Full evaluation (requires classifier API + support API + dspy)
python eval_gpt5_results.py --input-file gpt5mini-nano_inference/gpt5_inference_gpt-5_20260302_201653.jsonl
# Multiple files
python eval_gpt5_results.py --input-file file1.jsonl file2.jsonl
"""
import argparse
import json
import os
import re
import traceback
import urllib.error
import urllib.request
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple
try:
import dspy
except ImportError:
dspy = None # type: ignore[assignment]
import requests
from tqdm import tqdm
# ---------------------------------------------------------------------------
# Defaults
# ---------------------------------------------------------------------------
DEFAULT_CLASSIFIER_API_BASE = "http://172.16.34.19:8040/v1"
DEFAULT_SUPPORT_API_BASE = "http://172.16.34.19:8090"
DEFAULT_MODEL_PATH = (
"/home/mshahidul/readctrl/code/readctrl_rl_inference/model.json"
)
DEFAULT_REFERENCE_FILE = (
"/home/mshahidul/readctrl/code/text_classifier/data/"
"verified_combined_0-80_clean200_with_subclaims.json"
)
DEFAULT_OUTPUT_DIR = (
"/home/mshahidul/readctrl/code/readctrl_rl_inference/test_result_v4"
)
VALID_LABELS = {
"low_health_literacy",
"intermediate_health_literacy",
"proficient_health_literacy",
}
MIN_SENTENCE_CHARS = 15
# ---------------------------------------------------------------------------
# Sentence splitter (mirrors reward_new_v5.py)
# ---------------------------------------------------------------------------
def _split_into_sentences(text: str, min_chars: int = MIN_SENTENCE_CHARS) -> List[str]:
if not text or not text.strip():
return []
parts = re.split(r"(?<=[.!?])\s+", text.strip())
return [s.strip() for s in parts if len(s.strip()) >= min_chars]
# ---------------------------------------------------------------------------
# DSPy health-literacy classifier (only when dspy is available)
# ---------------------------------------------------------------------------
if dspy is not None:
class HealthLiteracySignature(dspy.Signature):
generated_text = dspy.InputField(
desc="A version of the source text rewritten for a specific audience."
)
literacy_label = dspy.OutputField(
desc=(
"Classification: low_health_literacy (simple words, no jargon), "
"intermediate_health_literacy (moderate technicality), or "
"proficient_health_literacy (highly technical/original level)."
)
)
class HealthLiteracyClassifier(dspy.Module):
def __init__(self):
super().__init__()
self.classifier = dspy.ChainOfThought(HealthLiteracySignature)
def forward(self, generated_text):
return self.classifier(generated_text=generated_text)
else:
HealthLiteracyClassifier = None # type: ignore[misc, assignment]
# ---------------------------------------------------------------------------
# Support-API verifier (mirrors reward_new_v5.py + test_classifier script)
# ---------------------------------------------------------------------------
class MedicalClaimVerifier:
"""
Calls FastAPI POST /check_support.
base_url: 'http://host:8090' β€” NO /v1 suffix.
"""
def __init__(self, base_url: str):
self.base_url = base_url.rstrip("/")
def _call_support_api(
self,
context: str,
subclaims: List[str],
threshold: float = 0.5,
batch_size: int = 128,
) -> Optional[List[str]]:
"""Returns label list or None on total network failure."""
if not context or not subclaims:
return ["invalid"] * len(subclaims)
try:
resp = requests.post(
f"{self.base_url}/check_support",
json={
"context": context,
"subclaims": subclaims,
"threshold": threshold,
"batch_size": batch_size,
},
timeout=300,
)
resp.raise_for_status()
labels = resp.json().get("labels", ["invalid"] * len(subclaims))
if len(labels) < len(subclaims):
labels.extend(["invalid"] * (len(subclaims) - len(labels)))
elif len(labels) > len(subclaims):
labels = labels[: len(subclaims)]
return labels
except requests.exceptions.RequestException as exc:
print(f"Warning: Support API call failed (returning None): {exc}")
return None
def compute_completeness(
self, summary_subclaims: List[str], gen_text: str
) -> Optional[float]:
"""Fraction of summary_subclaims covered by gen_text (recall direction)."""
if not summary_subclaims or not gen_text or not gen_text.strip():
return 0.0
labels = self._call_support_api(context=gen_text, subclaims=summary_subclaims)
if labels is None:
return None
valid = [l for l in labels if str(l).strip().lower() != "invalid"]
if not valid:
return None
covered = sum(1 for l in valid if str(l).strip().lower() == "supported")
return covered / len(valid)
def compute_hallucination(
self, input_text: str, gen_text: str
) -> Optional[float]:
"""Fraction of gen_text sentences NOT supported by input_text."""
gen_segs = _split_into_sentences(gen_text)
if not gen_segs or not input_text or not input_text.strip():
return 0.0
input_sents = _split_into_sentences(input_text)
stable_denom = max(len(gen_segs), len(input_sents))
if stable_denom == 0:
return 0.0
labels = self._call_support_api(context=input_text, subclaims=gen_segs)
if labels is None:
return None
valid = [l for l in labels if str(l).strip().lower() != "invalid"]
if not valid:
return None
hallucinated = sum(1 for l in valid if str(l).strip().lower() != "supported")
return hallucinated / stable_denom
def evaluate_sample(
self, gen_text: str, summary_subclaims: List[str], input_text: str
) -> Tuple[Optional[float], Optional[float]]:
completeness = self.compute_completeness(summary_subclaims, gen_text)
hallucination = self.compute_hallucination(input_text, gen_text)
return completeness, hallucination
# ---------------------------------------------------------------------------
# Health checks
# ---------------------------------------------------------------------------
def check_api_base(api_base: str) -> None:
models_url = api_base.rstrip("/") + "/models"
req = urllib.request.Request(models_url, method="GET")
try:
with urllib.request.urlopen(req, timeout=5) as resp:
if resp.status >= 400:
raise RuntimeError(f"Unhealthy endpoint: {models_url}")
except urllib.error.URLError as exc:
raise ConnectionError(
f"Cannot reach classifier API: {api_base}. Start vLLM server."
) from exc
def check_support_api_base(api_base: str) -> None:
url = api_base.rstrip("/") + "/check_support"
try:
resp = requests.post(
url,
json={"context": "test", "subclaims": ["test"], "threshold": 0.5, "batch_size": 1},
timeout=5,
)
if resp.status_code >= 500:
raise RuntimeError(f"Support API server error: {url}")
except requests.exceptions.ConnectionError as exc:
raise ConnectionError(f"Cannot reach Support API: {url}.") from exc
except requests.exceptions.Timeout as exc:
raise ConnectionError(f"Support API timed out: {url}") from exc
# ---------------------------------------------------------------------------
# Data loading
# ---------------------------------------------------------------------------
def load_compiled_classifier(path: str):
if hasattr(dspy, "load"):
try:
return dspy.load(path)
except Exception:
pass
classifier = HealthLiteracyClassifier()
try:
classifier.load(path)
except Exception as exc:
raise RuntimeError(f"Failed to load model: {path}") from exc
return classifier
def normalize_pred_label(pred_obj: Any) -> str:
if not pred_obj or not hasattr(pred_obj, "literacy_label"):
return ""
return str(pred_obj.literacy_label).strip().lower()
def load_inference_jsonl(path: str) -> List[Dict[str, Any]]:
"""
Load GPT-5 inference JSONL produced by run_gpt5_inference.py (or
run_gpt5mini_nano_inference.py). Expected fields per row: model,
row_index, doc_id, gold_label, generated_text, error; optional:
source_lang, prompt, prediction, input_text.
Rows with non-empty 'error' or empty 'generated_text' are kept but
flagged so they can be skipped cleanly.
"""
items = []
with open(path, "r", encoding="utf-8") as f:
for line_no, line in enumerate(f, start=1):
if not line.strip():
continue
row = json.loads(line)
items.append({
"line_no": line_no,
"model": str(row.get("model", "")).strip(),
"row_index": row.get("row_index"),
"doc_id": row.get("doc_id"),
"gold_label": str(row.get("gold_label", "")).strip(),
"generated_text": str(row.get("generated_text", "")).strip(),
"input_text": str(row.get("input_text", "")).strip(),
"error": str(row.get("error", "")).strip(),
})
return items
def load_reference_lookup(
reference_path: str,
) -> Dict[Tuple[Any, str], Dict[str, Any]]:
"""
Returns (doc_id, label) β†’ {summary_subclaims, input_text}.
Falls back to 'fulltext' field for input_text if 'input_text' absent.
"""
with open(reference_path, "r", encoding="utf-8") as f:
rows = json.load(f)
if not isinstance(rows, list):
raise ValueError("Reference file must be a JSON list.")
lookup: Dict[Tuple[Any, str], Dict[str, Any]] = {}
for row in rows:
doc_id = row.get("doc_id")
label = str(row.get("label", "")).strip()
if label not in VALID_LABELS:
continue
summary_subclaims = row.get("summary_subclaims", row.get("gold_subclaims", []))
input_text = str(row.get("input_text", row.get("fulltext", ""))).strip()
if not isinstance(summary_subclaims, list) or not summary_subclaims:
continue
entry = {"summary_subclaims": summary_subclaims, "input_text": input_text}
for key in [(doc_id, label), (str(doc_id), label)]:
lookup.setdefault(key, entry)
if not lookup:
raise ValueError(f"Reference lookup is empty: {reference_path}")
return lookup
# ---------------------------------------------------------------------------
# Offline evaluation (no classifier/support API)
# ---------------------------------------------------------------------------
def evaluate_file_offline(
*,
input_path: str,
reference_lookup: Dict,
output_dir: str,
max_samples: int,
) -> Dict[str, Any]:
"""
Compute basic counts and scores from inference JSONL without calling
classifier or support API. Use --offline when those services are unavailable.
"""
rows = load_inference_jsonl(input_path)
model_name = next((r["model"] for r in rows if r["model"]), os.path.basename(input_path))
if max_samples > 0:
rows = rows[:max_samples]
total_in_file = len(rows)
error_rows = 0
no_text_rows = 0
unmatched_rows = 0
evaluated_count = 0
for row in rows:
if row["error"]:
error_rows += 1
continue
if not row["generated_text"]:
no_text_rows += 1
continue
gold_label = row["gold_label"]
if gold_label not in VALID_LABELS:
continue
doc_id = row["doc_id"]
ref = reference_lookup.get((doc_id, gold_label)) or reference_lookup.get((str(doc_id), gold_label))
if not ref:
unmatched_rows += 1
continue
evaluated_count += 1
score_summary = {
"model": model_name,
"input_file": input_path,
"total_rows_in_file": total_in_file,
"error_rows_skipped": error_rows,
"rows_without_generated_text": no_text_rows,
"unmatched_rows": unmatched_rows,
"evaluable_rows": evaluated_count,
"success_rate": evaluated_count / total_in_file if total_in_file else 0.0,
}
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
model_slug = model_name.replace("/", "_").replace(" ", "_")
os.makedirs(output_dir, exist_ok=True)
summary_path = os.path.join(output_dir, f"gpt5_eval_offline_{model_slug}_{ts}.json")
with open(summary_path, "w", encoding="utf-8") as f:
json.dump(score_summary, f, indent=2)
print(json.dumps(score_summary, indent=2))
print(f"[DONE] {model_name} (offline): summary β†’ {summary_path}")
return score_summary
# ---------------------------------------------------------------------------
# Per-file evaluation
# ---------------------------------------------------------------------------
def evaluate_file(
*,
input_path: str,
reference_lookup: Dict,
classifier,
verifier: MedicalClaimVerifier,
comp_threshold: float,
halluc_threshold: float,
output_dir: str,
max_samples: int,
provide_traceback: bool,
) -> Dict[str, Any]:
"""Run evaluation on one JSONL file; save summary + details; return summary dict."""
rows = load_inference_jsonl(input_path)
# Detect model name from first valid row
model_name = next((r["model"] for r in rows if r["model"]), os.path.basename(input_path))
if max_samples > 0:
rows = rows[:max_samples]
# ── counters ────────────────────────────────────────────────────────────
unmatched_rows = 0
error_rows = 0
total = 0
classifier_correct = 0
comp_pass_count = 0
halluc_fail_count = 0
cls_and_comp_count = 0
cls_comp_nh_count = 0
comp_sum, comp_n = 0.0, 0
halluc_sum, halluc_n = 0.0, 0
details: List[Dict[str, Any]] = []
CHECKPOINT_EVERY = 10
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
model_slug = model_name.replace("/", "_").replace(" ", "_")
os.makedirs(output_dir, exist_ok=True)
summary_path = os.path.join(output_dir, f"gpt5_eval_{model_slug}_{ts}.json")
details_path = os.path.join(output_dir, f"gpt5_eval_{model_slug}_{ts}.jsonl")
def build_summary() -> Dict[str, Any]:
safe = lambda n: n / total if total else 0.0
return {
"model": model_name,
"input_file": input_path,
"total_rows_in_file": len(rows),
"total_samples_evaluated": total,
"unmatched_rows": unmatched_rows,
"error_rows_skipped": error_rows,
# classifier
"classifier_only_accuracy": safe(classifier_correct),
# completeness
"completeness_pass_rate": safe(comp_pass_count),
"completeness_mean": comp_sum / comp_n if comp_n else None,
"completeness_threshold": comp_threshold,
# hallucination
"hallucination_fail_rate": safe(halluc_fail_count),
"hallucination_mean": halluc_sum / halluc_n if halluc_n else None,
"hallucination_threshold": halluc_threshold,
# combined
"accuracy_cls_and_completeness": safe(cls_and_comp_count),
"accuracy_cls_comp_no_hallucination": safe(cls_comp_nh_count),
"details_path": details_path,
}
def save_checkpoint() -> None:
with open(summary_path, "w", encoding="utf-8") as f:
json.dump(build_summary(), f, indent=2)
with open(details_path, "w", encoding="utf-8") as f:
for item in details:
f.write(json.dumps(item, ensure_ascii=False) + "\n")
for idx, row in enumerate(tqdm(rows, desc=model_name), start=1):
gold_label = row["gold_label"]
generated_text = row["generated_text"]
doc_id = row["doc_id"]
if gold_label not in VALID_LABELS:
continue
if row["error"]:
error_rows += 1
continue
if not generated_text:
continue
ref = reference_lookup.get((doc_id, gold_label)) or \
reference_lookup.get((str(doc_id), gold_label))
if not ref:
unmatched_rows += 1
continue
summary_subclaims = ref["summary_subclaims"]
input_text = ref.get("input_text") or row.get("input_text", "")
total += 1
# 1. Classifier
pred = classifier(generated_text=generated_text)
pred_label = normalize_pred_label(pred)
is_cls_correct = gold_label in pred_label
classifier_correct += int(is_cls_correct)
# 2. Completeness + Hallucination
comp_score, halluc_score = verifier.evaluate_sample(
gen_text=generated_text,
summary_subclaims=summary_subclaims,
input_text=input_text,
)
comp_pass = (comp_score is not None) and (comp_score >= comp_threshold)
halluc_fail = (halluc_score is not None) and (halluc_score > halluc_threshold)
comp_pass_count += int(comp_pass)
halluc_fail_count += int(halluc_fail)
if comp_score is not None:
comp_sum += comp_score; comp_n += 1
if halluc_score is not None:
halluc_sum += halluc_score; halluc_n += 1
cls_and_comp = is_cls_correct and comp_pass
cls_comp_no_h = cls_and_comp and not halluc_fail
cls_and_comp_count += int(cls_and_comp)
cls_comp_nh_count += int(cls_comp_no_h)
details.append({
"idx": idx,
"model": model_name,
"line_no": row.get("line_no"),
"row_index": row.get("row_index"),
"doc_id": doc_id,
"gold_label": gold_label,
"generated_text": generated_text,
"pred_label": pred_label,
"classifier_correct": is_cls_correct,
"completeness_score": comp_score,
"completeness_pass": comp_pass,
"completeness_threshold": comp_threshold,
"hallucination_score": halluc_score,
"hallucination_fail": halluc_fail,
"hallucination_threshold": halluc_threshold,
"pass_cls_and_completeness": cls_and_comp,
"pass_cls_comp_no_hallucination": cls_comp_no_h,
})
if total % CHECKPOINT_EVERY == 0:
save_checkpoint()
comp_avg = f"{comp_sum/comp_n:.4f}" if comp_n else "N/A"
halluc_avg = f"{halluc_sum/halluc_n:.4f}" if halluc_n else "N/A"
print(
f"\n[CHECKPOINT {model_name}] {total} samples β€” "
f"cls_acc={classifier_correct/total:.4f}, "
f"comp_pass={comp_pass_count/total:.4f} (mean={comp_avg}), "
f"halluc_fail={halluc_fail_count/total:.4f} (mean={halluc_avg})"
)
if total == 0:
raise RuntimeError(f"No valid rows found in {input_path}")
save_checkpoint()
summary = build_summary()
print(json.dumps(summary, indent=2))
print(f"[DONE] {model_name}: summary β†’ {summary_path}")
print(f"[DONE] {model_name}: details β†’ {details_path}")
return summary
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description=(
"Evaluate GPT-5 mini/nano inference results: classifier accuracy, "
"completeness (recall), and hallucination score."
)
)
parser.add_argument(
"--input-file",
nargs="+",
required=True,
help=(
"One or more JSONL files produced by run_gpt5mini_nano_inference.py. "
"Each file is evaluated separately."
),
)
parser.add_argument("--model-path", default=DEFAULT_MODEL_PATH,
help="DSPy health-literacy classifier model.json path.")
parser.add_argument("--reference-file", default=DEFAULT_REFERENCE_FILE,
help="Reference JSON with summary_subclaims + input_text.")
parser.add_argument("--classifier-api-base", default=DEFAULT_CLASSIFIER_API_BASE)
parser.add_argument(
"--support-api-base", default=DEFAULT_SUPPORT_API_BASE,
help="FastAPI /check_support base URL (NO /v1 suffix).",
)
parser.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR)
parser.add_argument("--comp-threshold", type=float, default=0.5,
help="Completeness pass threshold (score >= value).")
parser.add_argument("--hallucination-threshold", type=float, default=0.1,
help="Hallucination fail threshold (score > value).")
parser.add_argument("--max-samples", type=int, default=-1,
help="Max rows per file. -1 = all.")
parser.add_argument("--provide-traceback", action="store_true")
parser.add_argument("--offline", action="store_true",
help="Only compute counts/success rate; no classifier or support API.")
return parser.parse_args()
def main() -> None:
args = parse_args()
if not os.path.exists(args.reference_file):
raise FileNotFoundError(f"Reference file not found: {args.reference_file}")
for f in args.input_file:
if not os.path.exists(f):
raise FileNotFoundError(f"Input file not found: {f}")
ref_lookup = load_reference_lookup(args.reference_file)
if args.offline:
all_summaries = []
for input_path in args.input_file:
print(f"\n{'='*60}")
print(f" Evaluating (offline): {os.path.basename(input_path)}")
print(f"{'='*60}")
summary = evaluate_file_offline(
input_path=input_path,
reference_lookup=ref_lookup,
output_dir=args.output_dir,
max_samples=args.max_samples,
)
all_summaries.append(summary)
if len(all_summaries) > 1:
print(f"\n{'='*60}")
print(" OFFLINE SUMMARY")
print(f"{'='*60}")
for s in all_summaries:
print(f" {s['model']}: {s['evaluable_rows']}/{s['total_rows_in_file']} evaluable, success_rate={s['success_rate']:.4f}")
return
if not os.path.exists(args.model_path):
raise FileNotFoundError(f"Model file not found: {args.model_path}")
if dspy is None:
raise RuntimeError(
"Full evaluation requires dspy. Install with: pip install dspy-ai"
)
try:
check_api_base(args.classifier_api_base)
check_support_api_base(args.support_api_base)
lm = dspy.LM(
model="openai/dspy",
api_base=args.classifier_api_base,
api_key="EMPTY",
temperature=0.0,
)
dspy.configure(lm=lm)
classifier = load_compiled_classifier(args.model_path)
verifier = MedicalClaimVerifier(base_url=args.support_api_base)
all_summaries: List[Dict[str, Any]] = []
for input_path in args.input_file:
print(f"\n{'='*60}")
print(f" Evaluating: {os.path.basename(input_path)}")
print(f"{'='*60}")
summary = evaluate_file(
input_path=input_path,
reference_lookup=ref_lookup,
classifier=classifier,
verifier=verifier,
comp_threshold=args.comp_threshold,
halluc_threshold=args.hallucination_threshold,
output_dir=args.output_dir,
max_samples=args.max_samples,
provide_traceback=args.provide_traceback,
)
all_summaries.append(summary)
# ── Cross-model comparison table ────────────────────────────────────
if len(all_summaries) > 1:
print(f"\n{'='*60}")
print(" CROSS-MODEL COMPARISON")
print(f"{'='*60}")
fmt = "{:<20} {:>10} {:>12} {:>12} {:>12} {:>14}"
print(fmt.format(
"Model", "CLS Acc", "Comp Pass%",
"Comp Mean", "Halluc Fail%", "Cls+Comp+NoH%"
))
print("-" * 82)
for s in all_summaries:
name = s["model"][-20:]
cls_acc = f"{s['classifier_only_accuracy']*100:.1f}%"
comp_pass = f"{s['completeness_pass_rate']*100:.1f}%"
comp_mean_val = s.get("completeness_mean")
comp_mean = f"{comp_mean_val:.4f}" if comp_mean_val is not None else "N/A"
halluc_f = f"{s['hallucination_fail_rate']*100:.1f}%"
combined = f"{s['accuracy_cls_comp_no_hallucination']*100:.1f}%"
print(fmt.format(name, cls_acc, comp_pass, comp_mean, halluc_f, combined))
except Exception as exc:
print(f"[error] {type(exc).__name__}: {exc}")
if args.provide_traceback:
traceback.print_exc()
raise
if __name__ == "__main__":
main()