| """ |
| InsureOS β Comprehensive Evaluation Suite |
| Evaluates all trained models: InsureLLM (generative), FraudNet, PricingGLM, DocClassifier, NER. |
| """ |
|
|
| import os |
| import json |
| import argparse |
| import time |
| from pathlib import Path |
|
|
| import torch |
| import numpy as np |
| from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig |
|
|
| |
| try: |
| from rouge_score import rouge_scorer |
| HAS_ROUGE = True |
| except ImportError: |
| HAS_ROUGE = False |
|
|
| try: |
| from bert_score import score as bert_score_fn |
| HAS_BERTSCORE = True |
| except ImportError: |
| HAS_BERTSCORE = False |
|
|
|
|
| |
|
|
| INSURELLM_MODEL = "models/insurellm-4b-dpo-merged" |
| EVAL_DATA = "data/output/insurance_sft_10k.jsonl" |
| OUTPUT_DIR = "evaluation/results" |
| N_SAMPLES = 100 |
|
|
|
|
| |
|
|
| DOMAIN_EVAL_PROMPTS = [ |
| { |
| "id": "fca_consumer_duty", |
| "prompt": "A customer's home insurance claim for escape of water has been open for 8 weeks with no update. They call asking what's happening. Draft a response.", |
| "required_elements": ["apology", "timeline", "next steps", "complaint rights", "financial ombudsman"], |
| "forbidden_elements": ["jargon", "indemnification", "proximate cause", "peril"], |
| }, |
| { |
| "id": "gdpr_data_protection", |
| "prompt": "A third party calls asking about the status of a motor claim made against our policyholder. How should we respond?", |
| "required_elements": ["data protection", "cannot share", "their own insurer", "GDPR"], |
| "forbidden_elements": ["policy number", "policyholder address", "bank details"], |
| }, |
| { |
| "id": "claims_process", |
| "prompt": "Explain the insurance claims process for a domestic property fire from FNOL through to settlement.", |
| "required_elements": ["FNOL", "loss adjuster", "investigation", "settlement", "excess"], |
| "forbidden_elements": [], |
| }, |
| { |
| "id": "fraud_indicators", |
| "prompt": "What are the key red flags for insurance fraud in motor third-party claims?", |
| "required_elements": ["exaggeration", "staged", "late reporting", "multiple claims"], |
| "forbidden_elements": [], |
| }, |
| { |
| "id": "lloyds_market", |
| "prompt": "Explain the role of an MGA in the Lloyd's market and how binding authority agreements work.", |
| "required_elements": ["binding authority", "capacity provider", "syndicate", "delegated authority", "bordereaux"], |
| "forbidden_elements": [], |
| }, |
| { |
| "id": "pricing_fairness", |
| "prompt": "An insurer wants to use first names as a rating factor because it improves their model by 3%. Should they?", |
| "required_elements": ["proxy discrimination", "protected characteristics", "Equality Act", "FCA"], |
| "forbidden_elements": [], |
| }, |
| { |
| "id": "subrogation", |
| "prompt": "Explain subrogation rights in UK insurance. When does an insurer pursue recovery?", |
| "required_elements": ["recovery", "third party", "policyholder indemnified", "non-fault"], |
| "forbidden_elements": [], |
| }, |
| { |
| "id": "renewal_transparency", |
| "prompt": "A customer's premium increased by 25% at renewal. They want to know why. Draft an explanation.", |
| "required_elements": ["transparency", "factors", "shop around", "Consumer Duty", "fair value"], |
| "forbidden_elements": ["take it or leave it", "market rate"], |
| }, |
| ] |
|
|
|
|
| def evaluate_insurellm(model_path: str, n_samples: int, output_dir: str) -> dict: |
| """Evaluate the generative InsureLLM model.""" |
| print(f"\n{'='*60}") |
| print(f" Evaluating InsureLLM: {model_path}") |
| print(f"{'='*60}") |
|
|
| |
| print("Loading model...") |
| tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True) |
| if tokenizer.pad_token is None: |
| tokenizer.pad_token = tokenizer.eos_token |
|
|
| bnb_config = BitsAndBytesConfig( |
| load_in_4bit=True, |
| bnb_4bit_quant_type="nf4", |
| bnb_4bit_compute_dtype=torch.bfloat16, |
| bnb_4bit_use_double_quant=True, |
| ) |
|
|
| model = AutoModelForCausalLM.from_pretrained( |
| model_path, |
| quantization_config=bnb_config, |
| device_map="auto", |
| trust_remote_code=True, |
| attn_implementation="sdpa", |
| dtype=torch.bfloat16, |
| ) |
| model.eval() |
|
|
| results = { |
| "model": model_path, |
| "domain_eval": [], |
| "generation_metrics": {}, |
| } |
|
|
| |
| print("\n[1/3] Domain-specific evaluation...") |
| for i, item in enumerate(DOMAIN_EVAL_PROMPTS): |
| print(f" Prompt {i+1}/{len(DOMAIN_EVAL_PROMPTS)}: {item['id']}...", flush=True) |
| messages = [ |
| {"role": "system", "content": "You are InsureLLM, a specialist UK insurance AI assistant. Answer directly without internal reasoning."}, |
| {"role": "user", "content": item["prompt"]}, |
| ] |
|
|
| text = tokenizer.apply_chat_template(messages, tokenize=False, add_generation_prompt=True) |
| |
| if "<|im_start|>assistant" in text: |
| text = text + "<think>\n</think>\n" |
| inputs = tokenizer(text, return_tensors="pt").to(model.device) |
|
|
| start = time.time() |
| with torch.no_grad(): |
| outputs = model.generate( |
| **inputs, |
| max_new_tokens=256, |
| do_sample=False, |
| ) |
| latency = time.time() - start |
|
|
| response = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) |
|
|
| |
| response_lower = response.lower() |
| found_required = [e for e in item["required_elements"] if e.lower() in response_lower] |
| found_forbidden = [e for e in item["forbidden_elements"] if e.lower() in response_lower] |
|
|
| score = len(found_required) / max(len(item["required_elements"]), 1) |
| penalty = len(found_forbidden) * 0.15 |
| final_score = max(0, score - penalty) |
|
|
| eval_result = { |
| "id": item["id"], |
| "score": final_score, |
| "required_found": len(found_required), |
| "required_total": len(item["required_elements"]), |
| "forbidden_found": len(found_forbidden), |
| "latency_s": latency, |
| "response_length": len(response.split()), |
| } |
| results["domain_eval"].append(eval_result) |
|
|
| status = "β" if final_score >= 0.7 else "β³" if final_score >= 0.4 else "β" |
| print(f" {status} {item['id']}: {final_score:.2f} " |
| f"({len(found_required)}/{len(item['required_elements'])} required, " |
| f"{len(found_forbidden)} forbidden, {latency:.1f}s)") |
|
|
| avg_domain = np.mean([r["score"] for r in results["domain_eval"]]) |
| avg_latency = np.mean([r["latency_s"] for r in results["domain_eval"]]) |
| print(f"\n Average domain score: {avg_domain:.3f}") |
| print(f" Average latency: {avg_latency:.1f}s") |
|
|
| |
| if HAS_ROUGE and os.path.exists(EVAL_DATA): |
| print("\n[2/3] ROUGE evaluation on SFT test set...") |
| scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True) |
|
|
| eval_records = [] |
| with open(EVAL_DATA) as f: |
| for line in f: |
| eval_records.append(json.loads(line)) |
|
|
| |
| eval_subset = eval_records[-min(n_samples, len(eval_records)):] |
|
|
| rouge1_scores = [] |
| rouge2_scores = [] |
| rougeL_scores = [] |
|
|
| for rec in eval_subset: |
| messages = rec["messages"] |
| |
| reference = messages[-1]["content"] |
| prompt_messages = messages[:-1] |
|
|
| text = tokenizer.apply_chat_template(prompt_messages, tokenize=False, add_generation_prompt=True) |
| |
| if "<|im_start|>assistant" in text: |
| text = text + "<think>\n</think>\n" |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=1024).to(model.device) |
|
|
| with torch.no_grad(): |
| outputs = model.generate(**inputs, max_new_tokens=256, do_sample=False) |
|
|
| generated = tokenizer.decode(outputs[0][inputs.input_ids.shape[1]:], skip_special_tokens=True) |
|
|
| scores = scorer.score(reference, generated) |
| rouge1_scores.append(scores["rouge1"].fmeasure) |
| rouge2_scores.append(scores["rouge2"].fmeasure) |
| rougeL_scores.append(scores["rougeL"].fmeasure) |
|
|
| results["generation_metrics"]["rouge1"] = float(np.mean(rouge1_scores)) |
| results["generation_metrics"]["rouge2"] = float(np.mean(rouge2_scores)) |
| results["generation_metrics"]["rougeL"] = float(np.mean(rougeL_scores)) |
|
|
| print(f" ROUGE-1: {results['generation_metrics']['rouge1']:.4f}") |
| print(f" ROUGE-2: {results['generation_metrics']['rouge2']:.4f}") |
| print(f" ROUGE-L: {results['generation_metrics']['rougeL']:.4f}") |
| else: |
| print("\n[2/3] Skipping ROUGE (rouge_score not installed or data not found)") |
|
|
| |
| print("\n[3/3] Computing summary...") |
| results["summary"] = { |
| "avg_domain_score": float(avg_domain), |
| "avg_latency_s": float(avg_latency), |
| "domain_pass_rate": float(np.mean([1 if r["score"] >= 0.7 else 0 for r in results["domain_eval"]])), |
| } |
|
|
| |
| os.makedirs(output_dir, exist_ok=True) |
| outpath = os.path.join(output_dir, "insurellm_eval.json") |
| with open(outpath, "w") as f: |
| json.dump(results, f, indent=2) |
| print(f"\nβ InsureLLM eval results β {outpath}") |
|
|
| return results |
|
|
|
|
| def evaluate_all(args): |
| """Run evaluation for all available models.""" |
| print(f"{'='*60}") |
| print(f" InsureOS β Full Evaluation Suite") |
| print(f"{'='*60}") |
|
|
| os.makedirs(args.output_dir, exist_ok=True) |
| all_results = {} |
|
|
| |
| if os.path.exists(args.insurellm_model): |
| all_results["insurellm"] = evaluate_insurellm( |
| args.insurellm_model, args.n_samples, args.output_dir |
| ) |
| else: |
| print(f"\nβ InsureLLM not found at {args.insurellm_model}, skipping") |
|
|
| |
| fraud_results = Path("models/fraudnet/training_results.json") |
| if fraud_results.exists(): |
| with open(fraud_results) as f: |
| all_results["fraudnet"] = json.load(f) |
| print(f"\nβ FraudNet results loaded from training") |
| else: |
| print(f"\nβ FraudNet results not found, skipping") |
|
|
| |
| pricing_results = Path("models/pricing-glm/training_results.json") |
| if pricing_results.exists(): |
| with open(pricing_results) as f: |
| all_results["pricing"] = json.load(f) |
| print(f"β Pricing model results loaded from training") |
| else: |
| print(f"β Pricing results not found, skipping") |
|
|
| |
| doc_meta = Path("models/doc-classifier/training_meta.json") |
| if doc_meta.exists(): |
| with open(doc_meta) as f: |
| all_results["doc_classifier"] = json.load(f) |
| print(f"β Doc classifier results loaded") |
| else: |
| print(f"β Doc classifier results not found, skipping") |
|
|
| |
| ner_meta = Path("models/ner-model/training_meta.json") |
| if ner_meta.exists(): |
| with open(ner_meta) as f: |
| all_results["ner"] = json.load(f) |
| print(f"β NER results loaded") |
| else: |
| print(f"β NER results not found, skipping") |
|
|
| |
| report_path = os.path.join(args.output_dir, "full_eval_report.json") |
| with open(report_path, "w") as f: |
| json.dump(all_results, f, indent=2, default=str) |
|
|
| print(f"\n{'='*60}") |
| print(f" EVALUATION SUMMARY") |
| print(f"{'='*60}") |
|
|
| if "insurellm" in all_results: |
| s = all_results["insurellm"].get("summary", {}) |
| print(f"\n InsureLLM (Generative):") |
| print(f" Domain score: {s.get('avg_domain_score', 'N/A')}") |
| print(f" Pass rate: {s.get('domain_pass_rate', 'N/A')}") |
| print(f" Latency: {s.get('avg_latency_s', 'N/A')}s") |
|
|
| if "fraudnet" in all_results: |
| for r in all_results["fraudnet"]: |
| if isinstance(r, dict): |
| print(f"\n FraudNet ({r.get('lob', '?')}):") |
| print(f" AUC-ROC: {r.get('auc_roc', 'N/A')}") |
| print(f" Avg Precision: {r.get('avg_precision', 'N/A')}") |
|
|
| if "pricing" in all_results: |
| for model_type in ["glm", "ebm"]: |
| if model_type in all_results["pricing"]: |
| m = all_results["pricing"][model_type] |
| print(f"\n Pricing {model_type.upper()}:") |
| print(f" MAE: Β£{m.get('mae', 'N/A')}") |
| print(f" RMSE: Β£{m.get('rmse', 'N/A')}") |
|
|
| if "doc_classifier" in all_results: |
| r = all_results["doc_classifier"].get("results", {}) |
| print(f"\n Document Classifier:") |
| print(f" Accuracy: {r.get('eval_accuracy', 'N/A')}") |
| print(f" F1 (macro): {r.get('eval_f1_macro', 'N/A')}") |
|
|
| if "ner" in all_results: |
| r = all_results["ner"].get("results", {}) |
| print(f"\n NER Model:") |
| print(f" F1: {r.get('eval_f1', 'N/A')}") |
| print(f" Precision: {r.get('eval_precision', 'N/A')}") |
| print(f" Recall: {r.get('eval_recall', 'N/A')}") |
|
|
| print(f"\n Full report β {report_path}") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="InsureOS evaluation suite") |
| parser.add_argument("--insurellm-model", default=INSURELLM_MODEL) |
| parser.add_argument("--n-samples", type=int, default=N_SAMPLES) |
| parser.add_argument("--output-dir", default=OUTPUT_DIR) |
| args = parser.parse_args() |
|
|
| evaluate_all(args) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|