Spaces:
Sleeping
Sleeping
| """ | |
| Evaluate all three G.U.I.D.E. models and print train/validation metrics. | |
| Usage: | |
| # NER + NextActionPredictor only (no CFPB CSV needed) | |
| python scripts/evaluate_models.py --skip_classifier | |
| # All three models (run on Kaggle where CFPB CSV is available) | |
| python scripts/evaluate_models.py --cfpb_csv /kaggle/input/datasets/sharav95/complaint/complaints.csv | |
| Models are downloaded automatically from sarav95/guide-models on HuggingFace | |
| if not already present locally. Set HF_TOKEN env var if needed. | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import logging | |
| import os | |
| import sys | |
| from pathlib import Path | |
| import torch | |
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) | |
| logging.basicConfig(level=logging.WARNING) | |
| _HF_REPO = "sarav95/guide-models" | |
| _ROOT = Path(__file__).resolve().parents[1] | |
| def _ensure_models() -> None: | |
| """Download model checkpoints from HuggingFace if any are missing.""" | |
| models_dir = _ROOT / "models" | |
| evidence_ner_ok = (models_dir / "evidence_ner" / "config.json").exists() | |
| classifier_ok = (models_dir / "domain_classifier" / "config.json").exists() | |
| next_action_ok = (models_dir / "next_action" / "model.pt").exists() | |
| if evidence_ner_ok and classifier_ok and next_action_ok: | |
| return | |
| print(f" Model checkpoints missing — downloading from {_HF_REPO!r} …") | |
| try: | |
| from huggingface_hub import snapshot_download | |
| except ImportError: | |
| print(" [error] huggingface_hub not installed: pip install huggingface_hub") | |
| sys.exit(1) | |
| models_dir.mkdir(parents=True, exist_ok=True) | |
| token = os.environ.get("HF_TOKEN") | |
| snapshot_download( | |
| repo_id=_HF_REPO, | |
| local_dir=str(models_dir), | |
| local_dir_use_symlinks=False, | |
| token=token, | |
| ) | |
| print(" Model download complete.") | |
| # --------------------------------------------------------------------------- | |
| # print_summary_table | |
| # --------------------------------------------------------------------------- | |
| def print_summary_table(results: list[dict]) -> None: | |
| """Print a consolidated train/validation summary for all evaluated models. | |
| Args: | |
| results: list of dicts with keys: model, split, accuracy, macro_f1 | |
| """ | |
| if not results: | |
| return | |
| headers = ["Model", "Split", "Accuracy", "Macro-F1"] | |
| rows = [ | |
| [r["model"], r["split"], f"{r['accuracy']:.4f}", f"{r['macro_f1']:.4f}"] | |
| for r in results | |
| ] | |
| col_w = [max(len(str(x)) for x in [h] + [row[i] for row in rows]) | |
| for i, h in enumerate(headers)] | |
| fmt = " ".join(f"{{:<{w}}}" for w in col_w) | |
| sep = " ".join("-" * w for w in col_w) | |
| width = sum(col_w) + 2 * (len(col_w) - 1) | |
| print(f"\n{'='*width}") | |
| print(" Summary — All Models") | |
| print(f"{'='*width}") | |
| print(fmt.format(*headers)) | |
| print(sep) | |
| for row in rows: | |
| print(fmt.format(*row)) | |
| print() | |
| # --------------------------------------------------------------------------- | |
| # DomainClassifier | |
| # --------------------------------------------------------------------------- | |
| def evaluate_domain_classifier(cfpb_csv: str | None, results: list[dict]) -> None: | |
| """Evaluate DomainClassifier on train sample and validation set. | |
| Recreates the exact 90/10 split used during training (seed=42). | |
| Skips gracefully when cfpb_csv is None. | |
| Args: | |
| cfpb_csv: path to CFPB complaints CSV, or None to skip | |
| results: shared list to append summary rows to | |
| """ | |
| print("\n" + "=" * 72) | |
| print(" DomainClassifier (DistilBERT, 6-class)") | |
| print("=" * 72) | |
| # --- Training curve from Kaggle log (hardcoded) --- | |
| print("\n Training curve (from Kaggle log):") | |
| curve_headers = ["Epoch", "Train loss range", "Val loss", "Notes"] | |
| curve_rows = [ | |
| ["1", "0.8401 → 0.2807", "0.2768", ""], | |
| ["2", "0.2460 → 0.1955", "0.2720", "best checkpoint (load_best_model_at_end)"], | |
| ["3", "0.2129 → 0.1310", "0.3334", "overfitting — epoch 2 weights saved"], | |
| ] | |
| col_w = [max(len(str(x)) for x in [h] + [r[i] for r in curve_rows]) | |
| for i, h in enumerate(curve_headers)] | |
| fmt = " ".join(f"{{:<{w}}}" for w in col_w) | |
| sep = " ".join("-" * w for w in col_w) | |
| print(" " + fmt.format(*curve_headers)) | |
| print(" " + sep) | |
| for row in curve_rows: | |
| print(" " + fmt.format(*row)) | |
| print(" Final train loss: 0.2402 | train samples/sec: 37.12") | |
| if cfpb_csv is None: | |
| print("\n [skipped] Pass --cfpb_csv <path> to evaluate on data splits.") | |
| return | |
| from datasets import concatenate_datasets | |
| from sklearn.metrics import accuracy_score, classification_report, f1_score | |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
| from src.classifier.train import _build_supplement, load_and_remap_cfpb | |
| from src.classifier.model import DOMAIN_LABELS | |
| print("\n Loading data …") | |
| cfpb_ds = load_and_remap_cfpb(cfpb_csv, max_per_class=50_000) | |
| suppl_ds = _build_supplement(n_per_class=5_000) | |
| full_ds = concatenate_datasets([cfpb_ds, suppl_ds]).shuffle(seed=42) | |
| split = full_ds.train_test_split(test_size=0.1, seed=42) | |
| model_dir = "models/domain_classifier" | |
| print(f" Loading checkpoint from {model_dir} …") | |
| tokenizer = AutoTokenizer.from_pretrained(model_dir) | |
| model = AutoModelForSequenceClassification.from_pretrained(model_dir) | |
| model.eval() | |
| device = torch.device( | |
| "cuda" if torch.cuda.is_available() | |
| else "mps" if torch.backends.mps.is_available() | |
| else "cpu" | |
| ) | |
| model.to(device) | |
| def _predict_batch(texts: list[str]) -> list[int]: | |
| enc = tokenizer(texts, truncation=True, max_length=512, | |
| padding=True, return_tensors="pt") | |
| enc = {k: v.to(device) for k, v in enc.items()} | |
| with torch.no_grad(): | |
| logits = model(**enc).logits | |
| return logits.argmax(dim=-1).cpu().tolist() | |
| def _eval_split(ds, name: str, max_samples: int) -> None: | |
| if len(ds) > max_samples: | |
| ds = ds.select(range(max_samples)) | |
| texts, labels = ds["text"], ds["labels"] | |
| preds: list[int] = [] | |
| for i in range(0, len(texts), 64): | |
| preds.extend(_predict_batch(texts[i:i + 64])) | |
| acc = accuracy_score(labels, preds) | |
| mac_f1 = f1_score(labels, preds, average="macro", zero_division=0) | |
| print(f"\n [{name}] n={len(ds)} accuracy={acc:.4f} macro-F1={mac_f1:.4f}") | |
| report = classification_report(labels, preds, | |
| target_names=DOMAIN_LABELS, zero_division=0) | |
| for line in report.splitlines(): | |
| print(f" {line}") | |
| results.append({"model": "DomainClassifier", "split": name, | |
| "accuracy": acc, "macro_f1": mac_f1}) | |
| _eval_split(split["train"], "train", max_samples=5_000) | |
| _eval_split(split["test"], "validation", max_samples=len(split["test"])) | |
| # --------------------------------------------------------------------------- | |
| # EvidenceNER | |
| # --------------------------------------------------------------------------- | |
| def _words_to_bio(sentence: str, entities: list[dict]) -> list[str]: | |
| """Convert a sentence + entity list to a BIO tag sequence over whitespace tokens. | |
| Args: | |
| sentence: raw complaint sentence string | |
| entities: list of {"text": str, "label": str} dicts | |
| Returns: | |
| list of BIO label strings aligned to sentence.split() | |
| """ | |
| words = sentence.split() | |
| tags = ["O"] * len(words) | |
| for ent in entities: | |
| ent_words = ent["text"].split() | |
| label = ent["label"] | |
| # slide a window to find where entity words appear in sentence words | |
| for i in range(len(words) - len(ent_words) + 1): | |
| if words[i:i + len(ent_words)] == ent_words: | |
| tags[i] = f"B-{label}" | |
| for j in range(1, len(ent_words)): | |
| tags[i + j] = f"I-{label}" | |
| break | |
| return tags | |
| def _predict_bio_tags(sentence: str, model, tokenizer, id2label: dict, | |
| device: torch.device) -> list[str]: | |
| """Run NER model on a single sentence and return word-level BIO tags. | |
| Args: | |
| sentence: raw string to tag | |
| model: loaded token classification model | |
| tokenizer: matching tokenizer | |
| id2label: id→BIO label mapping | |
| device: torch device | |
| Returns: | |
| list of BIO label strings, one per whitespace token | |
| """ | |
| words = sentence.split() | |
| enc = tokenizer(words, truncation=True, max_length=512, | |
| is_split_into_words=True, return_tensors="pt") | |
| word_ids = tokenizer(words, truncation=True, max_length=512, | |
| is_split_into_words=True).word_ids() | |
| enc = {k: v.to(device) for k, v in enc.items()} | |
| with torch.no_grad(): | |
| logits = model(**enc).logits[0] | |
| pred_ids = logits.argmax(dim=-1).cpu().tolist() | |
| # First subword per word gets the predicted tag | |
| pred_tags: list[str] = [] | |
| prev_word_id = None | |
| for tok_idx, word_id in enumerate(word_ids): | |
| if word_id is None or word_id == prev_word_id: | |
| prev_word_id = word_id | |
| continue | |
| prev_word_id = word_id | |
| pred_tags.append(id2label[pred_ids[tok_idx]]) | |
| return pred_tags[:len(words)] | |
| def evaluate_ner_synthetic(model, tokenizer, id2label: dict, | |
| device: torch.device, results: list[dict]) -> None: | |
| """Evaluate EvidenceNER on synthetic train and validation splits. | |
| Recreates the 90/10 split from build_synthetic_dataset (seed=42). | |
| Args: | |
| model: loaded token classification model | |
| tokenizer: matching tokenizer | |
| id2label: id→BIO label mapping | |
| device: torch device | |
| results: shared list to append summary rows to | |
| """ | |
| try: | |
| from seqeval.metrics import ( | |
| accuracy_score, classification_report, | |
| f1_score, precision_score, recall_score, | |
| ) | |
| except ImportError: | |
| print(" [error] seqeval not installed: pip install seqeval") | |
| return | |
| from src.ner.train import build_synthetic_dataset, _try_load_conll | |
| from datasets import concatenate_datasets | |
| print("\n Building synthetic dataset …") | |
| synthetic_ds = build_synthetic_dataset(n_samples=4000) | |
| conll_ds = _try_load_conll() | |
| if conll_ds is not None: | |
| full_ds = concatenate_datasets([synthetic_ds, conll_ds]).shuffle(seed=42) | |
| else: | |
| full_ds = synthetic_ds | |
| split = full_ds.train_test_split(test_size=0.1, seed=42) | |
| def _eval_split(ds, name: str, max_samples: int) -> None: | |
| if len(ds) > max_samples: | |
| ds = ds.select(range(max_samples)) | |
| true_seqs, pred_seqs = [], [] | |
| for ex in ds: | |
| true_tags = [id2label[t] for t in ex["ner_tags"]] | |
| words = ex["words"] | |
| sentence = " ".join(words) | |
| pred_tags = _predict_bio_tags(sentence, model, tokenizer, | |
| id2label, device) | |
| n = min(len(true_tags), len(pred_tags)) | |
| true_seqs.append(true_tags[:n]) | |
| pred_seqs.append(pred_tags[:n]) | |
| acc = accuracy_score(true_seqs, pred_seqs) | |
| prec = precision_score(true_seqs, pred_seqs, zero_division=0) | |
| rec = recall_score(true_seqs, pred_seqs, zero_division=0) | |
| f1 = f1_score(true_seqs, pred_seqs, zero_division=0) | |
| print(f"\n [synthetic {name}] n={len(ds)}") | |
| print(f" accuracy={acc:.4f} precision={prec:.4f} " | |
| f"recall={rec:.4f} F1={f1:.4f}") | |
| report = classification_report(true_seqs, pred_seqs, zero_division=0) | |
| for line in report.splitlines(): | |
| print(f" {line}") | |
| results.append({"model": "EvidenceNER (synthetic)", | |
| "split": name, "accuracy": acc, "macro_f1": f1}) | |
| _eval_split(split["train"], "train", max_samples=2_000) | |
| _eval_split(split["test"], "validation", max_samples=len(split["test"])) | |
| def evaluate_ner_real(model, tokenizer, id2label: dict, | |
| device: torch.device, results: list[dict]) -> None: | |
| """Evaluate EvidenceNER on 40 real hand-verified complaint sentences. | |
| Loads data/eval/ner_real_complaints.json. Skips gracefully if missing. | |
| Args: | |
| model: loaded token classification model | |
| tokenizer: matching tokenizer | |
| id2label: id→BIO label mapping | |
| device: torch device | |
| results: shared list to append summary rows to | |
| """ | |
| dataset_path = Path("data/eval/ner_real_complaints.json") | |
| if not dataset_path.exists(): | |
| print(f"\n [skipped] {dataset_path} not found — real complaint eval skipped.") | |
| return | |
| try: | |
| from seqeval.metrics import ( | |
| accuracy_score, classification_report, | |
| f1_score, precision_score, recall_score, | |
| ) | |
| except ImportError: | |
| print(" [error] seqeval not installed: pip install seqeval") | |
| return | |
| with open(dataset_path) as f: | |
| dataset = json.load(f) | |
| true_seqs, pred_seqs = [], [] | |
| for item in dataset: | |
| sentence = item["sentence"] | |
| entities = item["entities"] | |
| true_tags = _words_to_bio(sentence, entities) | |
| pred_tags = _predict_bio_tags(sentence, model, tokenizer, id2label, device) | |
| n = min(len(true_tags), len(pred_tags)) | |
| true_seqs.append(true_tags[:n]) | |
| pred_seqs.append(pred_tags[:n]) | |
| acc = accuracy_score(true_seqs, pred_seqs) | |
| prec = precision_score(true_seqs, pred_seqs, zero_division=0) | |
| rec = recall_score(true_seqs, pred_seqs, zero_division=0) | |
| f1 = f1_score(true_seqs, pred_seqs, zero_division=0) | |
| print(f"\n [real complaints] n={len(dataset)}") | |
| print(f" accuracy={acc:.4f} precision={prec:.4f} " | |
| f"recall={rec:.4f} F1={f1:.4f}") | |
| report = classification_report(true_seqs, pred_seqs, zero_division=0) | |
| for line in report.splitlines(): | |
| print(f" {line}") | |
| results.append({"model": "EvidenceNER (real)", "split": "validation", | |
| "accuracy": acc, "macro_f1": f1}) | |
| def evaluate_ner(results: list[dict]) -> None: | |
| """Load EvidenceNER checkpoint and run synthetic + real complaint evaluation. | |
| Args: | |
| results: shared list to append summary rows to | |
| """ | |
| print("\n" + "=" * 72) | |
| print(" EvidenceNER (DistilBERT token classifier, BIO 13-label)") | |
| print("=" * 72) | |
| from transformers import AutoModelForTokenClassification, AutoTokenizer | |
| from src.ner.model import ID2LABEL | |
| model_dir = "models/evidence_ner" | |
| print(f" Loading checkpoint from {model_dir} …") | |
| tokenizer = AutoTokenizer.from_pretrained(model_dir) | |
| model = AutoModelForTokenClassification.from_pretrained(model_dir) | |
| model.eval() | |
| device = torch.device( | |
| "cuda" if torch.cuda.is_available() | |
| else "mps" if torch.backends.mps.is_available() | |
| else "cpu" | |
| ) | |
| model.to(device) | |
| evaluate_ner_synthetic(model, tokenizer, ID2LABEL, device, results) | |
| evaluate_ner_real(model, tokenizer, ID2LABEL, device, results) | |
| # --------------------------------------------------------------------------- | |
| # NextActionPredictor | |
| # --------------------------------------------------------------------------- | |
| def evaluate_next_action(results: list[dict]) -> None: | |
| """Evaluate NextActionPredictor on train (90%) and validation (10%) splits. | |
| Recreates 6000-sample dataset (seed=42), carves 90/10 split. | |
| Documents legal F1 = 0.00 as a known class-imbalance limitation. | |
| Args: | |
| results: shared list to append summary rows to | |
| """ | |
| print("\n" + "=" * 72) | |
| print(" NextActionPredictor (MLP 12→64→64→6)") | |
| print("=" * 72) | |
| from sklearn.metrics import accuracy_score, classification_report, f1_score | |
| from src.next_action.train import build_synthetic_dataset | |
| from src.next_action.model import ACTION_LABELS, GUIDE_MLP | |
| print(" Building synthetic dataset (n=6000, seed=42) …") | |
| X_list, y_list = build_synthetic_dataset(n_samples=6000, seed=42) | |
| X_all = torch.tensor(X_list, dtype=torch.float32) | |
| y_all = torch.tensor(y_list, dtype=torch.long) | |
| split_idx = int(len(X_all) * 0.9) | |
| X_train, X_val = X_all[:split_idx], X_all[split_idx:] | |
| y_train, y_val = y_all[:split_idx], y_all[split_idx:] | |
| model_path = "models/next_action/model.pt" | |
| print(f" Loading checkpoint from {model_path} …") | |
| ckpt = torch.load(model_path, map_location="cpu", weights_only=True) | |
| mlp = GUIDE_MLP() | |
| mlp.load_state_dict(ckpt["state_dict"]) | |
| mlp.eval() | |
| def _eval_split(X: torch.Tensor, y: torch.Tensor, name: str) -> None: | |
| with torch.no_grad(): | |
| preds = mlp(X).argmax(dim=-1).numpy() | |
| truths = y.numpy() | |
| acc = accuracy_score(truths, preds) | |
| mac_f1 = f1_score(truths, preds, average="macro", zero_division=0) | |
| print(f"\n [{name}] n={len(y)} accuracy={acc:.4f} macro-F1={mac_f1:.4f}") | |
| report = classification_report(truths, preds, | |
| target_names=ACTION_LABELS, zero_division=0) | |
| for line in report.splitlines(): | |
| print(f" {line}") | |
| results.append({"model": "NextActionPredictor", "split": name, | |
| "accuracy": acc, "macro_f1": mac_f1}) | |
| _eval_split(X_train, y_train, "train") | |
| _eval_split(X_val, y_val, "validation") | |
| print("\n NOTE: 'legal' class F1 = 0.00 is a known limitation.") | |
| print(" Cause: ~2.5% class frequency due to 20% coin-flip in label") | |
| print(" assignment. Model learns to never predict 'legal' to maximise") | |
| print(" overall accuracy. Fix: remove the coin-flip condition in train.py.") | |
| # --------------------------------------------------------------------------- | |
| # main | |
| # --------------------------------------------------------------------------- | |
| def main() -> None: | |
| """Parse CLI args, run selected model evaluations, print summary table.""" | |
| p = argparse.ArgumentParser(description="Evaluate G.U.I.D.E. models") | |
| p.add_argument("--cfpb_csv", default=None, | |
| help="Path to CFPB complaints CSV (required for DomainClassifier)") | |
| p.add_argument("--skip_classifier", action="store_true", | |
| help="Skip DomainClassifier evaluation") | |
| p.add_argument("--skip_ner", action="store_true", | |
| help="Skip EvidenceNER evaluation") | |
| p.add_argument("--skip_next_action", action="store_true", | |
| help="Skip NextActionPredictor evaluation") | |
| args = p.parse_args() | |
| _ensure_models() | |
| results: list[dict] = [] | |
| if not args.skip_classifier: | |
| evaluate_domain_classifier(args.cfpb_csv, results) | |
| if not args.skip_ner: | |
| evaluate_ner(results) | |
| if not args.skip_next_action: | |
| evaluate_next_action(results) | |
| print_summary_table(results) | |
| if __name__ == "__main__": | |
| main() | |