""" eval_gpt5_results.py --------------------- Evaluate pre-generated GPT-5 inference results (from run_gpt5_inference.py) with the same metrics used by test_classifier_with_subclaim_thresholds.py: 1. Classifier accuracy (DSPy health-literacy classifier) 2. Completeness score (recall: summary_subclaims covered by gen_text) 3. Hallucination score (gen_text sentences NOT supported by input_text) Expected JSONL format (from run_gpt5_inference.py): each line has model, row_index, doc_id, gold_label, source_lang, prompt, prediction, generated_text, error. Reference (--reference-file) supplies summary_subclaims and input_text by (doc_id, gold_label). Usage ----- # Offline: count scores only (no classifier/support API required) python eval_gpt5_results.py --input-file gpt5mini-nano_inference/gpt5_inference_gpt-5_20260302_201653.jsonl --offline # Full evaluation (requires classifier API + support API + dspy) python eval_gpt5_results.py --input-file gpt5mini-nano_inference/gpt5_inference_gpt-5_20260302_201653.jsonl # Multiple files python eval_gpt5_results.py --input-file file1.jsonl file2.jsonl """ import argparse import json import os import re import traceback import urllib.error import urllib.request from datetime import datetime from typing import Any, Dict, List, Optional, Tuple try: import dspy except ImportError: dspy = None # type: ignore[assignment] import requests from tqdm import tqdm # --------------------------------------------------------------------------- # Defaults # --------------------------------------------------------------------------- DEFAULT_CLASSIFIER_API_BASE = "http://172.16.34.19:8040/v1" DEFAULT_SUPPORT_API_BASE = "http://172.16.34.19:8090" DEFAULT_MODEL_PATH = ( "/home/mshahidul/readctrl/code/readctrl_rl_inference/model.json" ) DEFAULT_REFERENCE_FILE = ( "/home/mshahidul/readctrl/code/text_classifier/data/" "verified_combined_0-80_clean200_with_subclaims.json" ) DEFAULT_OUTPUT_DIR = ( "/home/mshahidul/readctrl/code/readctrl_rl_inference/test_result_v4" ) VALID_LABELS = { "low_health_literacy", "intermediate_health_literacy", "proficient_health_literacy", } MIN_SENTENCE_CHARS = 15 # --------------------------------------------------------------------------- # Sentence splitter (mirrors reward_new_v5.py) # --------------------------------------------------------------------------- def _split_into_sentences(text: str, min_chars: int = MIN_SENTENCE_CHARS) -> List[str]: if not text or not text.strip(): return [] parts = re.split(r"(?<=[.!?])\s+", text.strip()) return [s.strip() for s in parts if len(s.strip()) >= min_chars] # --------------------------------------------------------------------------- # DSPy health-literacy classifier (only when dspy is available) # --------------------------------------------------------------------------- if dspy is not None: class HealthLiteracySignature(dspy.Signature): generated_text = dspy.InputField( desc="A version of the source text rewritten for a specific audience." ) literacy_label = dspy.OutputField( desc=( "Classification: low_health_literacy (simple words, no jargon), " "intermediate_health_literacy (moderate technicality), or " "proficient_health_literacy (highly technical/original level)." ) ) class HealthLiteracyClassifier(dspy.Module): def __init__(self): super().__init__() self.classifier = dspy.ChainOfThought(HealthLiteracySignature) def forward(self, generated_text): return self.classifier(generated_text=generated_text) else: HealthLiteracyClassifier = None # type: ignore[misc, assignment] # --------------------------------------------------------------------------- # Support-API verifier (mirrors reward_new_v5.py + test_classifier script) # --------------------------------------------------------------------------- class MedicalClaimVerifier: """ Calls FastAPI POST /check_support. base_url: 'http://host:8090' — NO /v1 suffix. """ def __init__(self, base_url: str): self.base_url = base_url.rstrip("/") def _call_support_api( self, context: str, subclaims: List[str], threshold: float = 0.5, batch_size: int = 128, ) -> Optional[List[str]]: """Returns label list or None on total network failure.""" if not context or not subclaims: return ["invalid"] * len(subclaims) try: resp = requests.post( f"{self.base_url}/check_support", json={ "context": context, "subclaims": subclaims, "threshold": threshold, "batch_size": batch_size, }, timeout=300, ) resp.raise_for_status() labels = resp.json().get("labels", ["invalid"] * len(subclaims)) if len(labels) < len(subclaims): labels.extend(["invalid"] * (len(subclaims) - len(labels))) elif len(labels) > len(subclaims): labels = labels[: len(subclaims)] return labels except requests.exceptions.RequestException as exc: print(f"Warning: Support API call failed (returning None): {exc}") return None def compute_completeness( self, summary_subclaims: List[str], gen_text: str ) -> Optional[float]: """Fraction of summary_subclaims covered by gen_text (recall direction).""" if not summary_subclaims or not gen_text or not gen_text.strip(): return 0.0 labels = self._call_support_api(context=gen_text, subclaims=summary_subclaims) if labels is None: return None valid = [l for l in labels if str(l).strip().lower() != "invalid"] if not valid: return None covered = sum(1 for l in valid if str(l).strip().lower() == "supported") return covered / len(valid) def compute_hallucination( self, input_text: str, gen_text: str ) -> Optional[float]: """Fraction of gen_text sentences NOT supported by input_text.""" gen_segs = _split_into_sentences(gen_text) if not gen_segs or not input_text or not input_text.strip(): return 0.0 input_sents = _split_into_sentences(input_text) stable_denom = max(len(gen_segs), len(input_sents)) if stable_denom == 0: return 0.0 labels = self._call_support_api(context=input_text, subclaims=gen_segs) if labels is None: return None valid = [l for l in labels if str(l).strip().lower() != "invalid"] if not valid: return None hallucinated = sum(1 for l in valid if str(l).strip().lower() != "supported") return hallucinated / stable_denom def evaluate_sample( self, gen_text: str, summary_subclaims: List[str], input_text: str ) -> Tuple[Optional[float], Optional[float]]: completeness = self.compute_completeness(summary_subclaims, gen_text) hallucination = self.compute_hallucination(input_text, gen_text) return completeness, hallucination # --------------------------------------------------------------------------- # Health checks # --------------------------------------------------------------------------- def check_api_base(api_base: str) -> None: models_url = api_base.rstrip("/") + "/models" req = urllib.request.Request(models_url, method="GET") try: with urllib.request.urlopen(req, timeout=5) as resp: if resp.status >= 400: raise RuntimeError(f"Unhealthy endpoint: {models_url}") except urllib.error.URLError as exc: raise ConnectionError( f"Cannot reach classifier API: {api_base}. Start vLLM server." ) from exc def check_support_api_base(api_base: str) -> None: url = api_base.rstrip("/") + "/check_support" try: resp = requests.post( url, json={"context": "test", "subclaims": ["test"], "threshold": 0.5, "batch_size": 1}, timeout=5, ) if resp.status_code >= 500: raise RuntimeError(f"Support API server error: {url}") except requests.exceptions.ConnectionError as exc: raise ConnectionError(f"Cannot reach Support API: {url}.") from exc except requests.exceptions.Timeout as exc: raise ConnectionError(f"Support API timed out: {url}") from exc # --------------------------------------------------------------------------- # Data loading # --------------------------------------------------------------------------- def load_compiled_classifier(path: str): if hasattr(dspy, "load"): try: return dspy.load(path) except Exception: pass classifier = HealthLiteracyClassifier() try: classifier.load(path) except Exception as exc: raise RuntimeError(f"Failed to load model: {path}") from exc return classifier def normalize_pred_label(pred_obj: Any) -> str: if not pred_obj or not hasattr(pred_obj, "literacy_label"): return "" return str(pred_obj.literacy_label).strip().lower() def load_inference_jsonl(path: str) -> List[Dict[str, Any]]: """ Load GPT-5 inference JSONL produced by run_gpt5_inference.py (or run_gpt5mini_nano_inference.py). Expected fields per row: model, row_index, doc_id, gold_label, generated_text, error; optional: source_lang, prompt, prediction, input_text. Rows with non-empty 'error' or empty 'generated_text' are kept but flagged so they can be skipped cleanly. """ items = [] with open(path, "r", encoding="utf-8") as f: for line_no, line in enumerate(f, start=1): if not line.strip(): continue row = json.loads(line) items.append({ "line_no": line_no, "model": str(row.get("model", "")).strip(), "row_index": row.get("row_index"), "doc_id": row.get("doc_id"), "gold_label": str(row.get("gold_label", "")).strip(), "generated_text": str(row.get("generated_text", "")).strip(), "input_text": str(row.get("input_text", "")).strip(), "error": str(row.get("error", "")).strip(), }) return items def load_reference_lookup( reference_path: str, ) -> Dict[Tuple[Any, str], Dict[str, Any]]: """ Returns (doc_id, label) → {summary_subclaims, input_text}. Falls back to 'fulltext' field for input_text if 'input_text' absent. """ with open(reference_path, "r", encoding="utf-8") as f: rows = json.load(f) if not isinstance(rows, list): raise ValueError("Reference file must be a JSON list.") lookup: Dict[Tuple[Any, str], Dict[str, Any]] = {} for row in rows: doc_id = row.get("doc_id") label = str(row.get("label", "")).strip() if label not in VALID_LABELS: continue summary_subclaims = row.get("summary_subclaims", row.get("gold_subclaims", [])) input_text = str(row.get("input_text", row.get("fulltext", ""))).strip() if not isinstance(summary_subclaims, list) or not summary_subclaims: continue entry = {"summary_subclaims": summary_subclaims, "input_text": input_text} for key in [(doc_id, label), (str(doc_id), label)]: lookup.setdefault(key, entry) if not lookup: raise ValueError(f"Reference lookup is empty: {reference_path}") return lookup # --------------------------------------------------------------------------- # Offline evaluation (no classifier/support API) # --------------------------------------------------------------------------- def evaluate_file_offline( *, input_path: str, reference_lookup: Dict, output_dir: str, max_samples: int, ) -> Dict[str, Any]: """ Compute basic counts and scores from inference JSONL without calling classifier or support API. Use --offline when those services are unavailable. """ rows = load_inference_jsonl(input_path) model_name = next((r["model"] for r in rows if r["model"]), os.path.basename(input_path)) if max_samples > 0: rows = rows[:max_samples] total_in_file = len(rows) error_rows = 0 no_text_rows = 0 unmatched_rows = 0 evaluated_count = 0 for row in rows: if row["error"]: error_rows += 1 continue if not row["generated_text"]: no_text_rows += 1 continue gold_label = row["gold_label"] if gold_label not in VALID_LABELS: continue doc_id = row["doc_id"] ref = reference_lookup.get((doc_id, gold_label)) or reference_lookup.get((str(doc_id), gold_label)) if not ref: unmatched_rows += 1 continue evaluated_count += 1 score_summary = { "model": model_name, "input_file": input_path, "total_rows_in_file": total_in_file, "error_rows_skipped": error_rows, "rows_without_generated_text": no_text_rows, "unmatched_rows": unmatched_rows, "evaluable_rows": evaluated_count, "success_rate": evaluated_count / total_in_file if total_in_file else 0.0, } ts = datetime.now().strftime("%Y%m%d_%H%M%S") model_slug = model_name.replace("/", "_").replace(" ", "_") os.makedirs(output_dir, exist_ok=True) summary_path = os.path.join(output_dir, f"gpt5_eval_offline_{model_slug}_{ts}.json") with open(summary_path, "w", encoding="utf-8") as f: json.dump(score_summary, f, indent=2) print(json.dumps(score_summary, indent=2)) print(f"[DONE] {model_name} (offline): summary → {summary_path}") return score_summary # --------------------------------------------------------------------------- # Per-file evaluation # --------------------------------------------------------------------------- def evaluate_file( *, input_path: str, reference_lookup: Dict, classifier, verifier: MedicalClaimVerifier, comp_threshold: float, halluc_threshold: float, output_dir: str, max_samples: int, provide_traceback: bool, ) -> Dict[str, Any]: """Run evaluation on one JSONL file; save summary + details; return summary dict.""" rows = load_inference_jsonl(input_path) # Detect model name from first valid row model_name = next((r["model"] for r in rows if r["model"]), os.path.basename(input_path)) if max_samples > 0: rows = rows[:max_samples] # ── counters ──────────────────────────────────────────────────────────── unmatched_rows = 0 error_rows = 0 total = 0 classifier_correct = 0 comp_pass_count = 0 halluc_fail_count = 0 cls_and_comp_count = 0 cls_comp_nh_count = 0 comp_sum, comp_n = 0.0, 0 halluc_sum, halluc_n = 0.0, 0 details: List[Dict[str, Any]] = [] CHECKPOINT_EVERY = 10 ts = datetime.now().strftime("%Y%m%d_%H%M%S") model_slug = model_name.replace("/", "_").replace(" ", "_") os.makedirs(output_dir, exist_ok=True) summary_path = os.path.join(output_dir, f"gpt5_eval_{model_slug}_{ts}.json") details_path = os.path.join(output_dir, f"gpt5_eval_{model_slug}_{ts}.jsonl") def build_summary() -> Dict[str, Any]: safe = lambda n: n / total if total else 0.0 return { "model": model_name, "input_file": input_path, "total_rows_in_file": len(rows), "total_samples_evaluated": total, "unmatched_rows": unmatched_rows, "error_rows_skipped": error_rows, # classifier "classifier_only_accuracy": safe(classifier_correct), # completeness "completeness_pass_rate": safe(comp_pass_count), "completeness_mean": comp_sum / comp_n if comp_n else None, "completeness_threshold": comp_threshold, # hallucination "hallucination_fail_rate": safe(halluc_fail_count), "hallucination_mean": halluc_sum / halluc_n if halluc_n else None, "hallucination_threshold": halluc_threshold, # combined "accuracy_cls_and_completeness": safe(cls_and_comp_count), "accuracy_cls_comp_no_hallucination": safe(cls_comp_nh_count), "details_path": details_path, } def save_checkpoint() -> None: with open(summary_path, "w", encoding="utf-8") as f: json.dump(build_summary(), f, indent=2) with open(details_path, "w", encoding="utf-8") as f: for item in details: f.write(json.dumps(item, ensure_ascii=False) + "\n") for idx, row in enumerate(tqdm(rows, desc=model_name), start=1): gold_label = row["gold_label"] generated_text = row["generated_text"] doc_id = row["doc_id"] if gold_label not in VALID_LABELS: continue if row["error"]: error_rows += 1 continue if not generated_text: continue ref = reference_lookup.get((doc_id, gold_label)) or \ reference_lookup.get((str(doc_id), gold_label)) if not ref: unmatched_rows += 1 continue summary_subclaims = ref["summary_subclaims"] input_text = ref.get("input_text") or row.get("input_text", "") total += 1 # 1. Classifier pred = classifier(generated_text=generated_text) pred_label = normalize_pred_label(pred) is_cls_correct = gold_label in pred_label classifier_correct += int(is_cls_correct) # 2. Completeness + Hallucination comp_score, halluc_score = verifier.evaluate_sample( gen_text=generated_text, summary_subclaims=summary_subclaims, input_text=input_text, ) comp_pass = (comp_score is not None) and (comp_score >= comp_threshold) halluc_fail = (halluc_score is not None) and (halluc_score > halluc_threshold) comp_pass_count += int(comp_pass) halluc_fail_count += int(halluc_fail) if comp_score is not None: comp_sum += comp_score; comp_n += 1 if halluc_score is not None: halluc_sum += halluc_score; halluc_n += 1 cls_and_comp = is_cls_correct and comp_pass cls_comp_no_h = cls_and_comp and not halluc_fail cls_and_comp_count += int(cls_and_comp) cls_comp_nh_count += int(cls_comp_no_h) details.append({ "idx": idx, "model": model_name, "line_no": row.get("line_no"), "row_index": row.get("row_index"), "doc_id": doc_id, "gold_label": gold_label, "generated_text": generated_text, "pred_label": pred_label, "classifier_correct": is_cls_correct, "completeness_score": comp_score, "completeness_pass": comp_pass, "completeness_threshold": comp_threshold, "hallucination_score": halluc_score, "hallucination_fail": halluc_fail, "hallucination_threshold": halluc_threshold, "pass_cls_and_completeness": cls_and_comp, "pass_cls_comp_no_hallucination": cls_comp_no_h, }) if total % CHECKPOINT_EVERY == 0: save_checkpoint() comp_avg = f"{comp_sum/comp_n:.4f}" if comp_n else "N/A" halluc_avg = f"{halluc_sum/halluc_n:.4f}" if halluc_n else "N/A" print( f"\n[CHECKPOINT {model_name}] {total} samples — " f"cls_acc={classifier_correct/total:.4f}, " f"comp_pass={comp_pass_count/total:.4f} (mean={comp_avg}), " f"halluc_fail={halluc_fail_count/total:.4f} (mean={halluc_avg})" ) if total == 0: raise RuntimeError(f"No valid rows found in {input_path}") save_checkpoint() summary = build_summary() print(json.dumps(summary, indent=2)) print(f"[DONE] {model_name}: summary → {summary_path}") print(f"[DONE] {model_name}: details → {details_path}") return summary # --------------------------------------------------------------------------- # CLI # --------------------------------------------------------------------------- def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description=( "Evaluate GPT-5 mini/nano inference results: classifier accuracy, " "completeness (recall), and hallucination score." ) ) parser.add_argument( "--input-file", nargs="+", required=True, help=( "One or more JSONL files produced by run_gpt5mini_nano_inference.py. " "Each file is evaluated separately." ), ) parser.add_argument("--model-path", default=DEFAULT_MODEL_PATH, help="DSPy health-literacy classifier model.json path.") parser.add_argument("--reference-file", default=DEFAULT_REFERENCE_FILE, help="Reference JSON with summary_subclaims + input_text.") parser.add_argument("--classifier-api-base", default=DEFAULT_CLASSIFIER_API_BASE) parser.add_argument( "--support-api-base", default=DEFAULT_SUPPORT_API_BASE, help="FastAPI /check_support base URL (NO /v1 suffix).", ) parser.add_argument("--output-dir", default=DEFAULT_OUTPUT_DIR) parser.add_argument("--comp-threshold", type=float, default=0.5, help="Completeness pass threshold (score >= value).") parser.add_argument("--hallucination-threshold", type=float, default=0.1, help="Hallucination fail threshold (score > value).") parser.add_argument("--max-samples", type=int, default=-1, help="Max rows per file. -1 = all.") parser.add_argument("--provide-traceback", action="store_true") parser.add_argument("--offline", action="store_true", help="Only compute counts/success rate; no classifier or support API.") return parser.parse_args() def main() -> None: args = parse_args() if not os.path.exists(args.reference_file): raise FileNotFoundError(f"Reference file not found: {args.reference_file}") for f in args.input_file: if not os.path.exists(f): raise FileNotFoundError(f"Input file not found: {f}") ref_lookup = load_reference_lookup(args.reference_file) if args.offline: all_summaries = [] for input_path in args.input_file: print(f"\n{'='*60}") print(f" Evaluating (offline): {os.path.basename(input_path)}") print(f"{'='*60}") summary = evaluate_file_offline( input_path=input_path, reference_lookup=ref_lookup, output_dir=args.output_dir, max_samples=args.max_samples, ) all_summaries.append(summary) if len(all_summaries) > 1: print(f"\n{'='*60}") print(" OFFLINE SUMMARY") print(f"{'='*60}") for s in all_summaries: print(f" {s['model']}: {s['evaluable_rows']}/{s['total_rows_in_file']} evaluable, success_rate={s['success_rate']:.4f}") return if not os.path.exists(args.model_path): raise FileNotFoundError(f"Model file not found: {args.model_path}") if dspy is None: raise RuntimeError( "Full evaluation requires dspy. Install with: pip install dspy-ai" ) try: check_api_base(args.classifier_api_base) check_support_api_base(args.support_api_base) lm = dspy.LM( model="openai/dspy", api_base=args.classifier_api_base, api_key="EMPTY", temperature=0.0, ) dspy.configure(lm=lm) classifier = load_compiled_classifier(args.model_path) verifier = MedicalClaimVerifier(base_url=args.support_api_base) all_summaries: List[Dict[str, Any]] = [] for input_path in args.input_file: print(f"\n{'='*60}") print(f" Evaluating: {os.path.basename(input_path)}") print(f"{'='*60}") summary = evaluate_file( input_path=input_path, reference_lookup=ref_lookup, classifier=classifier, verifier=verifier, comp_threshold=args.comp_threshold, halluc_threshold=args.hallucination_threshold, output_dir=args.output_dir, max_samples=args.max_samples, provide_traceback=args.provide_traceback, ) all_summaries.append(summary) # ── Cross-model comparison table ──────────────────────────────────── if len(all_summaries) > 1: print(f"\n{'='*60}") print(" CROSS-MODEL COMPARISON") print(f"{'='*60}") fmt = "{:<20} {:>10} {:>12} {:>12} {:>12} {:>14}" print(fmt.format( "Model", "CLS Acc", "Comp Pass%", "Comp Mean", "Halluc Fail%", "Cls+Comp+NoH%" )) print("-" * 82) for s in all_summaries: name = s["model"][-20:] cls_acc = f"{s['classifier_only_accuracy']*100:.1f}%" comp_pass = f"{s['completeness_pass_rate']*100:.1f}%" comp_mean_val = s.get("completeness_mean") comp_mean = f"{comp_mean_val:.4f}" if comp_mean_val is not None else "N/A" halluc_f = f"{s['hallucination_fail_rate']*100:.1f}%" combined = f"{s['accuracy_cls_comp_no_hallucination']*100:.1f}%" print(fmt.format(name, cls_acc, comp_pass, comp_mean, halluc_f, combined)) except Exception as exc: print(f"[error] {type(exc).__name__}: {exc}") if args.provide_traceback: traceback.print_exc() raise if __name__ == "__main__": main()