guide / scripts /evaluate_models.py
Saravanakumar R
open sepc for model evaluation draft-pill-fill inline-document
14a5b1e
Raw
History Blame Contribute Delete
19 kB
"""
Evaluate all three G.U.I.D.E. models and print train/validation metrics.
Usage:
# NER + NextActionPredictor only (no CFPB CSV needed)
python scripts/evaluate_models.py --skip_classifier
# All three models (run on Kaggle where CFPB CSV is available)
python scripts/evaluate_models.py --cfpb_csv /kaggle/input/datasets/sharav95/complaint/complaints.csv
Models are downloaded automatically from sarav95/guide-models on HuggingFace
if not already present locally. Set HF_TOKEN env var if needed.
"""
from __future__ import annotations
import argparse
import json
import logging
import os
import sys
from pathlib import Path
import torch
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
logging.basicConfig(level=logging.WARNING)
_HF_REPO = "sarav95/guide-models"
_ROOT = Path(__file__).resolve().parents[1]
def _ensure_models() -> None:
"""Download model checkpoints from HuggingFace if any are missing."""
models_dir = _ROOT / "models"
evidence_ner_ok = (models_dir / "evidence_ner" / "config.json").exists()
classifier_ok = (models_dir / "domain_classifier" / "config.json").exists()
next_action_ok = (models_dir / "next_action" / "model.pt").exists()
if evidence_ner_ok and classifier_ok and next_action_ok:
return
print(f" Model checkpoints missing — downloading from {_HF_REPO!r} …")
try:
from huggingface_hub import snapshot_download
except ImportError:
print(" [error] huggingface_hub not installed: pip install huggingface_hub")
sys.exit(1)
models_dir.mkdir(parents=True, exist_ok=True)
token = os.environ.get("HF_TOKEN")
snapshot_download(
repo_id=_HF_REPO,
local_dir=str(models_dir),
local_dir_use_symlinks=False,
token=token,
)
print(" Model download complete.")
# ---------------------------------------------------------------------------
# print_summary_table
# ---------------------------------------------------------------------------
def print_summary_table(results: list[dict]) -> None:
"""Print a consolidated train/validation summary for all evaluated models.
Args:
results: list of dicts with keys: model, split, accuracy, macro_f1
"""
if not results:
return
headers = ["Model", "Split", "Accuracy", "Macro-F1"]
rows = [
[r["model"], r["split"], f"{r['accuracy']:.4f}", f"{r['macro_f1']:.4f}"]
for r in results
]
col_w = [max(len(str(x)) for x in [h] + [row[i] for row in rows])
for i, h in enumerate(headers)]
fmt = " ".join(f"{{:<{w}}}" for w in col_w)
sep = " ".join("-" * w for w in col_w)
width = sum(col_w) + 2 * (len(col_w) - 1)
print(f"\n{'='*width}")
print(" Summary — All Models")
print(f"{'='*width}")
print(fmt.format(*headers))
print(sep)
for row in rows:
print(fmt.format(*row))
print()
# ---------------------------------------------------------------------------
# DomainClassifier
# ---------------------------------------------------------------------------
def evaluate_domain_classifier(cfpb_csv: str | None, results: list[dict]) -> None:
"""Evaluate DomainClassifier on train sample and validation set.
Recreates the exact 90/10 split used during training (seed=42).
Skips gracefully when cfpb_csv is None.
Args:
cfpb_csv: path to CFPB complaints CSV, or None to skip
results: shared list to append summary rows to
"""
print("\n" + "=" * 72)
print(" DomainClassifier (DistilBERT, 6-class)")
print("=" * 72)
# --- Training curve from Kaggle log (hardcoded) ---
print("\n Training curve (from Kaggle log):")
curve_headers = ["Epoch", "Train loss range", "Val loss", "Notes"]
curve_rows = [
["1", "0.8401 → 0.2807", "0.2768", ""],
["2", "0.2460 → 0.1955", "0.2720", "best checkpoint (load_best_model_at_end)"],
["3", "0.2129 → 0.1310", "0.3334", "overfitting — epoch 2 weights saved"],
]
col_w = [max(len(str(x)) for x in [h] + [r[i] for r in curve_rows])
for i, h in enumerate(curve_headers)]
fmt = " ".join(f"{{:<{w}}}" for w in col_w)
sep = " ".join("-" * w for w in col_w)
print(" " + fmt.format(*curve_headers))
print(" " + sep)
for row in curve_rows:
print(" " + fmt.format(*row))
print(" Final train loss: 0.2402 | train samples/sec: 37.12")
if cfpb_csv is None:
print("\n [skipped] Pass --cfpb_csv <path> to evaluate on data splits.")
return
from datasets import concatenate_datasets
from sklearn.metrics import accuracy_score, classification_report, f1_score
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from src.classifier.train import _build_supplement, load_and_remap_cfpb
from src.classifier.model import DOMAIN_LABELS
print("\n Loading data …")
cfpb_ds = load_and_remap_cfpb(cfpb_csv, max_per_class=50_000)
suppl_ds = _build_supplement(n_per_class=5_000)
full_ds = concatenate_datasets([cfpb_ds, suppl_ds]).shuffle(seed=42)
split = full_ds.train_test_split(test_size=0.1, seed=42)
model_dir = "models/domain_classifier"
print(f" Loading checkpoint from {model_dir} …")
tokenizer = AutoTokenizer.from_pretrained(model_dir)
model = AutoModelForSequenceClassification.from_pretrained(model_dir)
model.eval()
device = torch.device(
"cuda" if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available()
else "cpu"
)
model.to(device)
def _predict_batch(texts: list[str]) -> list[int]:
enc = tokenizer(texts, truncation=True, max_length=512,
padding=True, return_tensors="pt")
enc = {k: v.to(device) for k, v in enc.items()}
with torch.no_grad():
logits = model(**enc).logits
return logits.argmax(dim=-1).cpu().tolist()
def _eval_split(ds, name: str, max_samples: int) -> None:
if len(ds) > max_samples:
ds = ds.select(range(max_samples))
texts, labels = ds["text"], ds["labels"]
preds: list[int] = []
for i in range(0, len(texts), 64):
preds.extend(_predict_batch(texts[i:i + 64]))
acc = accuracy_score(labels, preds)
mac_f1 = f1_score(labels, preds, average="macro", zero_division=0)
print(f"\n [{name}] n={len(ds)} accuracy={acc:.4f} macro-F1={mac_f1:.4f}")
report = classification_report(labels, preds,
target_names=DOMAIN_LABELS, zero_division=0)
for line in report.splitlines():
print(f" {line}")
results.append({"model": "DomainClassifier", "split": name,
"accuracy": acc, "macro_f1": mac_f1})
_eval_split(split["train"], "train", max_samples=5_000)
_eval_split(split["test"], "validation", max_samples=len(split["test"]))
# ---------------------------------------------------------------------------
# EvidenceNER
# ---------------------------------------------------------------------------
def _words_to_bio(sentence: str, entities: list[dict]) -> list[str]:
"""Convert a sentence + entity list to a BIO tag sequence over whitespace tokens.
Args:
sentence: raw complaint sentence string
entities: list of {"text": str, "label": str} dicts
Returns:
list of BIO label strings aligned to sentence.split()
"""
words = sentence.split()
tags = ["O"] * len(words)
for ent in entities:
ent_words = ent["text"].split()
label = ent["label"]
# slide a window to find where entity words appear in sentence words
for i in range(len(words) - len(ent_words) + 1):
if words[i:i + len(ent_words)] == ent_words:
tags[i] = f"B-{label}"
for j in range(1, len(ent_words)):
tags[i + j] = f"I-{label}"
break
return tags
def _predict_bio_tags(sentence: str, model, tokenizer, id2label: dict,
device: torch.device) -> list[str]:
"""Run NER model on a single sentence and return word-level BIO tags.
Args:
sentence: raw string to tag
model: loaded token classification model
tokenizer: matching tokenizer
id2label: id→BIO label mapping
device: torch device
Returns:
list of BIO label strings, one per whitespace token
"""
words = sentence.split()
enc = tokenizer(words, truncation=True, max_length=512,
is_split_into_words=True, return_tensors="pt")
word_ids = tokenizer(words, truncation=True, max_length=512,
is_split_into_words=True).word_ids()
enc = {k: v.to(device) for k, v in enc.items()}
with torch.no_grad():
logits = model(**enc).logits[0]
pred_ids = logits.argmax(dim=-1).cpu().tolist()
# First subword per word gets the predicted tag
pred_tags: list[str] = []
prev_word_id = None
for tok_idx, word_id in enumerate(word_ids):
if word_id is None or word_id == prev_word_id:
prev_word_id = word_id
continue
prev_word_id = word_id
pred_tags.append(id2label[pred_ids[tok_idx]])
return pred_tags[:len(words)]
def evaluate_ner_synthetic(model, tokenizer, id2label: dict,
device: torch.device, results: list[dict]) -> None:
"""Evaluate EvidenceNER on synthetic train and validation splits.
Recreates the 90/10 split from build_synthetic_dataset (seed=42).
Args:
model: loaded token classification model
tokenizer: matching tokenizer
id2label: id→BIO label mapping
device: torch device
results: shared list to append summary rows to
"""
try:
from seqeval.metrics import (
accuracy_score, classification_report,
f1_score, precision_score, recall_score,
)
except ImportError:
print(" [error] seqeval not installed: pip install seqeval")
return
from src.ner.train import build_synthetic_dataset, _try_load_conll
from datasets import concatenate_datasets
print("\n Building synthetic dataset …")
synthetic_ds = build_synthetic_dataset(n_samples=4000)
conll_ds = _try_load_conll()
if conll_ds is not None:
full_ds = concatenate_datasets([synthetic_ds, conll_ds]).shuffle(seed=42)
else:
full_ds = synthetic_ds
split = full_ds.train_test_split(test_size=0.1, seed=42)
def _eval_split(ds, name: str, max_samples: int) -> None:
if len(ds) > max_samples:
ds = ds.select(range(max_samples))
true_seqs, pred_seqs = [], []
for ex in ds:
true_tags = [id2label[t] for t in ex["ner_tags"]]
words = ex["words"]
sentence = " ".join(words)
pred_tags = _predict_bio_tags(sentence, model, tokenizer,
id2label, device)
n = min(len(true_tags), len(pred_tags))
true_seqs.append(true_tags[:n])
pred_seqs.append(pred_tags[:n])
acc = accuracy_score(true_seqs, pred_seqs)
prec = precision_score(true_seqs, pred_seqs, zero_division=0)
rec = recall_score(true_seqs, pred_seqs, zero_division=0)
f1 = f1_score(true_seqs, pred_seqs, zero_division=0)
print(f"\n [synthetic {name}] n={len(ds)}")
print(f" accuracy={acc:.4f} precision={prec:.4f} "
f"recall={rec:.4f} F1={f1:.4f}")
report = classification_report(true_seqs, pred_seqs, zero_division=0)
for line in report.splitlines():
print(f" {line}")
results.append({"model": "EvidenceNER (synthetic)",
"split": name, "accuracy": acc, "macro_f1": f1})
_eval_split(split["train"], "train", max_samples=2_000)
_eval_split(split["test"], "validation", max_samples=len(split["test"]))
def evaluate_ner_real(model, tokenizer, id2label: dict,
device: torch.device, results: list[dict]) -> None:
"""Evaluate EvidenceNER on 40 real hand-verified complaint sentences.
Loads data/eval/ner_real_complaints.json. Skips gracefully if missing.
Args:
model: loaded token classification model
tokenizer: matching tokenizer
id2label: id→BIO label mapping
device: torch device
results: shared list to append summary rows to
"""
dataset_path = Path("data/eval/ner_real_complaints.json")
if not dataset_path.exists():
print(f"\n [skipped] {dataset_path} not found — real complaint eval skipped.")
return
try:
from seqeval.metrics import (
accuracy_score, classification_report,
f1_score, precision_score, recall_score,
)
except ImportError:
print(" [error] seqeval not installed: pip install seqeval")
return
with open(dataset_path) as f:
dataset = json.load(f)
true_seqs, pred_seqs = [], []
for item in dataset:
sentence = item["sentence"]
entities = item["entities"]
true_tags = _words_to_bio(sentence, entities)
pred_tags = _predict_bio_tags(sentence, model, tokenizer, id2label, device)
n = min(len(true_tags), len(pred_tags))
true_seqs.append(true_tags[:n])
pred_seqs.append(pred_tags[:n])
acc = accuracy_score(true_seqs, pred_seqs)
prec = precision_score(true_seqs, pred_seqs, zero_division=0)
rec = recall_score(true_seqs, pred_seqs, zero_division=0)
f1 = f1_score(true_seqs, pred_seqs, zero_division=0)
print(f"\n [real complaints] n={len(dataset)}")
print(f" accuracy={acc:.4f} precision={prec:.4f} "
f"recall={rec:.4f} F1={f1:.4f}")
report = classification_report(true_seqs, pred_seqs, zero_division=0)
for line in report.splitlines():
print(f" {line}")
results.append({"model": "EvidenceNER (real)", "split": "validation",
"accuracy": acc, "macro_f1": f1})
def evaluate_ner(results: list[dict]) -> None:
"""Load EvidenceNER checkpoint and run synthetic + real complaint evaluation.
Args:
results: shared list to append summary rows to
"""
print("\n" + "=" * 72)
print(" EvidenceNER (DistilBERT token classifier, BIO 13-label)")
print("=" * 72)
from transformers import AutoModelForTokenClassification, AutoTokenizer
from src.ner.model import ID2LABEL
model_dir = "models/evidence_ner"
print(f" Loading checkpoint from {model_dir} …")
tokenizer = AutoTokenizer.from_pretrained(model_dir)
model = AutoModelForTokenClassification.from_pretrained(model_dir)
model.eval()
device = torch.device(
"cuda" if torch.cuda.is_available()
else "mps" if torch.backends.mps.is_available()
else "cpu"
)
model.to(device)
evaluate_ner_synthetic(model, tokenizer, ID2LABEL, device, results)
evaluate_ner_real(model, tokenizer, ID2LABEL, device, results)
# ---------------------------------------------------------------------------
# NextActionPredictor
# ---------------------------------------------------------------------------
def evaluate_next_action(results: list[dict]) -> None:
"""Evaluate NextActionPredictor on train (90%) and validation (10%) splits.
Recreates 6000-sample dataset (seed=42), carves 90/10 split.
Documents legal F1 = 0.00 as a known class-imbalance limitation.
Args:
results: shared list to append summary rows to
"""
print("\n" + "=" * 72)
print(" NextActionPredictor (MLP 12→64→64→6)")
print("=" * 72)
from sklearn.metrics import accuracy_score, classification_report, f1_score
from src.next_action.train import build_synthetic_dataset
from src.next_action.model import ACTION_LABELS, GUIDE_MLP
print(" Building synthetic dataset (n=6000, seed=42) …")
X_list, y_list = build_synthetic_dataset(n_samples=6000, seed=42)
X_all = torch.tensor(X_list, dtype=torch.float32)
y_all = torch.tensor(y_list, dtype=torch.long)
split_idx = int(len(X_all) * 0.9)
X_train, X_val = X_all[:split_idx], X_all[split_idx:]
y_train, y_val = y_all[:split_idx], y_all[split_idx:]
model_path = "models/next_action/model.pt"
print(f" Loading checkpoint from {model_path} …")
ckpt = torch.load(model_path, map_location="cpu", weights_only=True)
mlp = GUIDE_MLP()
mlp.load_state_dict(ckpt["state_dict"])
mlp.eval()
def _eval_split(X: torch.Tensor, y: torch.Tensor, name: str) -> None:
with torch.no_grad():
preds = mlp(X).argmax(dim=-1).numpy()
truths = y.numpy()
acc = accuracy_score(truths, preds)
mac_f1 = f1_score(truths, preds, average="macro", zero_division=0)
print(f"\n [{name}] n={len(y)} accuracy={acc:.4f} macro-F1={mac_f1:.4f}")
report = classification_report(truths, preds,
target_names=ACTION_LABELS, zero_division=0)
for line in report.splitlines():
print(f" {line}")
results.append({"model": "NextActionPredictor", "split": name,
"accuracy": acc, "macro_f1": mac_f1})
_eval_split(X_train, y_train, "train")
_eval_split(X_val, y_val, "validation")
print("\n NOTE: 'legal' class F1 = 0.00 is a known limitation.")
print(" Cause: ~2.5% class frequency due to 20% coin-flip in label")
print(" assignment. Model learns to never predict 'legal' to maximise")
print(" overall accuracy. Fix: remove the coin-flip condition in train.py.")
# ---------------------------------------------------------------------------
# main
# ---------------------------------------------------------------------------
def main() -> None:
"""Parse CLI args, run selected model evaluations, print summary table."""
p = argparse.ArgumentParser(description="Evaluate G.U.I.D.E. models")
p.add_argument("--cfpb_csv", default=None,
help="Path to CFPB complaints CSV (required for DomainClassifier)")
p.add_argument("--skip_classifier", action="store_true",
help="Skip DomainClassifier evaluation")
p.add_argument("--skip_ner", action="store_true",
help="Skip EvidenceNER evaluation")
p.add_argument("--skip_next_action", action="store_true",
help="Skip NextActionPredictor evaluation")
args = p.parse_args()
_ensure_models()
results: list[dict] = []
if not args.skip_classifier:
evaluate_domain_classifier(args.cfpb_csv, results)
if not args.skip_ner:
evaluate_ner(results)
if not args.skip_next_action:
evaluate_next_action(results)
print_summary_table(results)
if __name__ == "__main__":
main()