insureos-models / evaluation /run_eval.py
piyushptiwari's picture
Upload folder using huggingface_hub
2cc32a5 verified
"""
InsureOS β€” Comprehensive Evaluation Suite
Evaluates all trained models: InsureLLM (generative), FraudNet, PricingGLM, DocClassifier, NER.
"""
import os
import json
import argparse
import time
from pathlib import Path
import torch
import numpy as np
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
# Optional imports (gracefully degrade if not installed)
try:
from rouge_score import rouge_scorer
HAS_ROUGE = True
except ImportError:
HAS_ROUGE = False
try:
from bert_score import score as bert_score_fn
HAS_BERTSCORE = True
except ImportError:
HAS_BERTSCORE = False
# ── Defaults ──
INSURELLM_MODEL = "models/insurellm-4b-dpo-merged"
EVAL_DATA = "data/output/insurance_sft_10k.jsonl"
OUTPUT_DIR = "evaluation/results"
N_SAMPLES = 100 # number of samples for generative eval
# ── UK Insurance Domain-Specific Evaluation Prompts ──
DOMAIN_EVAL_PROMPTS = [
{
"id": "fca_consumer_duty",
"prompt": "A customer's home insurance claim for escape of water has been open for 8 weeks with no update. They call asking what's happening. Draft a response.",
"required_elements": ["apology", "timeline", "next steps", "complaint rights", "financial ombudsman"],
"forbidden_elements": ["jargon", "indemnification", "proximate cause", "peril"],
},
{
"id": "gdpr_data_protection",
"prompt": "A third party calls asking about the status of a motor claim made against our policyholder. How should we respond?",
"required_elements": ["data protection", "cannot share", "their own insurer", "GDPR"],
"forbidden_elements": ["policy number", "policyholder address", "bank details"],
},
{
"id": "claims_process",
"prompt": "Explain the insurance claims process for a domestic property fire from FNOL through to settlement.",
"required_elements": ["FNOL", "loss adjuster", "investigation", "settlement", "excess"],
"forbidden_elements": [],
},
{
"id": "fraud_indicators",
"prompt": "What are the key red flags for insurance fraud in motor third-party claims?",
"required_elements": ["exaggeration", "staged", "late reporting", "multiple claims"],
"forbidden_elements": [],
},
{
"id": "lloyds_market",
"prompt": "Explain the role of an MGA in the Lloyd's market and how binding authority agreements work.",
"required_elements": ["binding authority", "capacity provider", "syndicate", "delegated authority", "bordereaux"],
"forbidden_elements": [],
},
{
"id": "pricing_fairness",
"prompt": "An insurer wants to use first names as a rating factor because it improves their model by 3%. Should they?",
"required_elements": ["proxy discrimination", "protected characteristics", "Equality Act", "FCA"],
"forbidden_elements": [],
},
{
"id": "subrogation",
"prompt": "Explain subrogation rights in UK insurance. When does an insurer pursue recovery?",
"required_elements": ["recovery", "third party", "policyholder indemnified", "non-fault"],
"forbidden_elements": [],
},
{
"id": "renewal_transparency",
"prompt": "A customer's premium increased by 25% at renewal. They want to know why. Draft an explanation.",
"required_elements": ["transparency", "factors", "shop around", "Consumer Duty", "fair value"],
"forbidden_elements": ["take it or leave it", "market rate"],
},
]
def evaluate_insurellm(model_path: str, n_samples: int, output_dir: str) -> dict:
"""Evaluate the generative InsureLLM model."""
print(f"\n{'='*60}")
print(f" Evaluating InsureLLM: {model_path}")
print(f"{'='*60}")
# Load model
print("Loading model...")
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
bnb_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_quant_type="nf4",
bnb_4bit_compute_dtype=torch.bfloat16,
bnb_4bit_use_double_quant=True,
)
model = AutoModelForCausalLM.from_pretrained(
model_path,
quantization_config=bnb_config,
device_map="auto",
trust_remote_code=True,
attn_implementation="sdpa",
dtype=torch.bfloat16,
)
model.eval()
results = {
"model": model_path,
"domain_eval": [],
"generation_metrics": {},
}
# ── 1. Domain-Specific Evaluation ──
print("\n[1/3] Domain-specific evaluation...")
for i, item in enumerate(DOMAIN_EVAL_PROMPTS):
print(f" Prompt {i+1}/{len(DOMAIN_EVAL_PROMPTS)}: {item['id']}...", flush=True)
messages = [
{"role": "system", "content": "You are InsureLLM, a specialist UK insurance AI assistant. Answer directly without internal reasoning."},
{"role": "user", "content": item["prompt"]},
]
text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
# Disable Qwen3 thinking mode by appending <think>\n</think>\n
if "<|im_start|>assistant" in text:
text = text + "<think>\n</think>\n"
inputs = tokenizer(text, return_tensors="pt").to(model.device)
start = time.time()
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=256,
do_sample=False,
)
latency = time.time() - start
response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
# Check required elements
response_lower = response.lower()
found_required = [e for e in item["required_elements"] if e.lower() in response_lower]
found_forbidden = [e for e in item["forbidden_elements"] if e.lower() in response_lower]
score = len(found_required) / max(len(item["required_elements"]), 1)
penalty = len(found_forbidden) * 0.15
final_score = max(0, score - penalty)
eval_result = {
"id": item["id"],
"score": final_score,
"required_found": len(found_required),
"required_total": len(item["required_elements"]),
"forbidden_found": len(found_forbidden),
"latency_s": latency,
"response_length": len(response.split()),
}
results["domain_eval"].append(eval_result)
status = "βœ“" if final_score >= 0.7 else "β–³" if final_score >= 0.4 else "βœ—"
print(f" {status} {item['id']}: {final_score:.2f} "
f"({len(found_required)}/{len(item['required_elements'])} required, "
f"{len(found_forbidden)} forbidden, {latency:.1f}s)")
avg_domain = np.mean([r["score"] for r in results["domain_eval"]])
avg_latency = np.mean([r["latency_s"] for r in results["domain_eval"]])
print(f"\n Average domain score: {avg_domain:.3f}")
print(f" Average latency: {avg_latency:.1f}s")
# ── 2. ROUGE scores on held-out SFT data ──
if HAS_ROUGE and os.path.exists(EVAL_DATA):
print("\n[2/3] ROUGE evaluation on SFT test set...")
scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True)
eval_records = []
with open(EVAL_DATA) as f:
for line in f:
eval_records.append(json.loads(line))
# Use last N as eval
eval_subset = eval_records[-min(n_samples, len(eval_records)):]
rouge1_scores = []
rouge2_scores = []
rougeL_scores = []
for rec in eval_subset:
messages = rec["messages"]
# Get reference (last assistant message)
reference = messages[-1]["content"]
prompt_messages = messages[:-1]
text = tokenizer.apply_chat_template(prompt_messages, tokenize=False, add_generation_prompt=True)
# Disable Qwen3 thinking mode
if "<|im_start|>assistant" in text:
text = text + "<think>\n</think>\n"
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=1024).to(model.device)
with torch.no_grad():
outputs = model.generate(**inputs, max_new_tokens=256, do_sample=False)
generated = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True)
scores = scorer.score(reference, generated)
rouge1_scores.append(scores["rouge1"].fmeasure)
rouge2_scores.append(scores["rouge2"].fmeasure)
rougeL_scores.append(scores["rougeL"].fmeasure)
results["generation_metrics"]["rouge1"] = float(np.mean(rouge1_scores))
results["generation_metrics"]["rouge2"] = float(np.mean(rouge2_scores))
results["generation_metrics"]["rougeL"] = float(np.mean(rougeL_scores))
print(f" ROUGE-1: {results['generation_metrics']['rouge1']:.4f}")
print(f" ROUGE-2: {results['generation_metrics']['rouge2']:.4f}")
print(f" ROUGE-L: {results['generation_metrics']['rougeL']:.4f}")
else:
print("\n[2/3] Skipping ROUGE (rouge_score not installed or data not found)")
# ── 3. Summary metrics ──
print("\n[3/3] Computing summary...")
results["summary"] = {
"avg_domain_score": float(avg_domain),
"avg_latency_s": float(avg_latency),
"domain_pass_rate": float(np.mean([1 if r["score"] >= 0.7 else 0 for r in results["domain_eval"]])),
}
# Save
os.makedirs(output_dir, exist_ok=True)
outpath = os.path.join(output_dir, "insurellm_eval.json")
with open(outpath, "w") as f:
json.dump(results, f, indent=2)
print(f"\nβœ“ InsureLLM eval results β†’ {outpath}")
return results
def evaluate_all(args):
"""Run evaluation for all available models."""
print(f"{'='*60}")
print(f" InsureOS β€” Full Evaluation Suite")
print(f"{'='*60}")
os.makedirs(args.output_dir, exist_ok=True)
all_results = {}
# 1. InsureLLM
if os.path.exists(args.insurellm_model):
all_results["insurellm"] = evaluate_insurellm(
args.insurellm_model, args.n_samples, args.output_dir
)
else:
print(f"\n⚠ InsureLLM not found at {args.insurellm_model}, skipping")
# 2. FraudNet β€” just check if results exist from training
fraud_results = Path("models/fraudnet/training_results.json")
if fraud_results.exists():
with open(fraud_results) as f:
all_results["fraudnet"] = json.load(f)
print(f"\nβœ“ FraudNet results loaded from training")
else:
print(f"\n⚠ FraudNet results not found, skipping")
# 3. Pricing GLM
pricing_results = Path("models/pricing-glm/training_results.json")
if pricing_results.exists():
with open(pricing_results) as f:
all_results["pricing"] = json.load(f)
print(f"βœ“ Pricing model results loaded from training")
else:
print(f"⚠ Pricing results not found, skipping")
# 4. Doc Classifier
doc_meta = Path("models/doc-classifier/training_meta.json")
if doc_meta.exists():
with open(doc_meta) as f:
all_results["doc_classifier"] = json.load(f)
print(f"βœ“ Doc classifier results loaded")
else:
print(f"⚠ Doc classifier results not found, skipping")
# 5. NER
ner_meta = Path("models/ner-model/training_meta.json")
if ner_meta.exists():
with open(ner_meta) as f:
all_results["ner"] = json.load(f)
print(f"βœ“ NER results loaded")
else:
print(f"⚠ NER results not found, skipping")
# ── Summary report ──
report_path = os.path.join(args.output_dir, "full_eval_report.json")
with open(report_path, "w") as f:
json.dump(all_results, f, indent=2, default=str)
print(f"\n{'='*60}")
print(f" EVALUATION SUMMARY")
print(f"{'='*60}")
if "insurellm" in all_results:
s = all_results["insurellm"].get("summary", {})
print(f"\n InsureLLM (Generative):")
print(f" Domain score: {s.get('avg_domain_score', 'N/A')}")
print(f" Pass rate: {s.get('domain_pass_rate', 'N/A')}")
print(f" Latency: {s.get('avg_latency_s', 'N/A')}s")
if "fraudnet" in all_results:
for r in all_results["fraudnet"]:
if isinstance(r, dict):
print(f"\n FraudNet ({r.get('lob', '?')}):")
print(f" AUC-ROC: {r.get('auc_roc', 'N/A')}")
print(f" Avg Precision: {r.get('avg_precision', 'N/A')}")
if "pricing" in all_results:
for model_type in ["glm", "ebm"]:
if model_type in all_results["pricing"]:
m = all_results["pricing"][model_type]
print(f"\n Pricing {model_type.upper()}:")
print(f" MAE: Β£{m.get('mae', 'N/A')}")
print(f" RMSE: Β£{m.get('rmse', 'N/A')}")
if "doc_classifier" in all_results:
r = all_results["doc_classifier"].get("results", {})
print(f"\n Document Classifier:")
print(f" Accuracy: {r.get('eval_accuracy', 'N/A')}")
print(f" F1 (macro): {r.get('eval_f1_macro', 'N/A')}")
if "ner" in all_results:
r = all_results["ner"].get("results", {})
print(f"\n NER Model:")
print(f" F1: {r.get('eval_f1', 'N/A')}")
print(f" Precision: {r.get('eval_precision', 'N/A')}")
print(f" Recall: {r.get('eval_recall', 'N/A')}")
print(f"\n Full report β†’ {report_path}")
def main():
parser = argparse.ArgumentParser(description="InsureOS evaluation suite")
parser.add_argument("--insurellm-model", default=INSURELLM_MODEL)
parser.add_argument("--n-samples", type=int, default=N_SAMPLES)
parser.add_argument("--output-dir", default=OUTPUT_DIR)
args = parser.parse_args()
evaluate_all(args)
if __name__ == "__main__":
main()