""" Evaluate the Deep Scan DistilBERT model accuracy. Loads the fine-tuned model from deep_s3_model_hf/ and evaluates it against a balanced test set built from the same data pipeline. Outputs: Accuracy, Precision, Recall, F1, Confusion Matrix. """ import os import sys import random import time import numpy as np from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report try: from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch except ImportError: print("ERROR: transformers and torch are required. Run: pip install transformers torch") sys.exit(1) import deep_ml_engine MODEL_DIR = os.path.join(os.path.dirname(__file__), "deep_s3_model_hf") def predict_batch(texts, tokenizer, model): """Run inference on a batch of texts using direct model forward pass (faster than pipeline).""" # Sanitize texts = [t if t.strip() else "empty" for t in texts] inputs = tokenizer(texts, padding=True, truncation=True, max_length=128, return_tensors="pt") with torch.no_grad(): outputs = model(**inputs) logits = outputs.logits preds = torch.argmax(logits, dim=-1).tolist() probs = torch.softmax(logits, dim=-1).tolist() return preds, probs def evaluate(): if not os.path.isdir(MODEL_DIR): print(f"ERROR: Model directory not found at {MODEL_DIR}") print("Train the model first with: python train_deep_model.py") sys.exit(1) print("=" * 64) print(" S3Shastra Deep Scanner — DistilBERT Accuracy Evaluation") print("=" * 64) # ── 1. Build the full dataset ── print("\n[1/5] Building evaluation dataset...") X_all, y_all = deep_ml_engine.build_dataset_synthetic() total_pos = sum(y_all) total_neg = len(y_all) - total_pos print(f" Total raw samples: {len(X_all)}") print(f" Positive (sensitive): {total_pos}") print(f" Negative (safe): {total_neg}") # ── 2. Create balanced evaluation subsets ── print("\n[2/5] Creating balanced evaluation sets...") # Separate by class pos_samples = [(x, y) for x, y in zip(X_all, y_all) if y == 1] neg_samples = [(x, y) for x, y in zip(X_all, y_all) if y == 0] random.seed(42) random.shuffle(pos_samples) random.shuffle(neg_samples) # Training set: the first 1000 the model actually trained on train_X = X_all[:1000] train_y = y_all[:1000] train_pos = sum(train_y) train_neg = len(train_y) - train_pos print(f" Training set: {len(train_X)} samples ({train_pos} pos, {train_neg} neg)") # Balanced evaluation set: take min(500, available) from each class # Use samples the model didn't train on (index 1000+) unseen_pos = [(x, y) for x, y in zip(X_all[1000:], y_all[1000:]) if y == 1] unseen_neg = [(x, y) for x, y in zip(X_all[1000:], y_all[1000:]) if y == 0] # For the balanced test, we also add the keyword samples (first ~60) and benign words # since these are critical to get right keyword_samples = [(x, y) for x, y in zip(X_all[:61], y_all[:61])] # ~31 keywords + 30 benign eval_size = min(500, len(unseen_pos), max(len(unseen_neg), 30)) # If we don't have enough unseen negatives, take from the training negatives too all_neg = neg_samples.copy() random.shuffle(all_neg) balanced_eval = [] # Add keyword/benign core samples balanced_eval.extend(keyword_samples) # Add unseen positive samples (cap at 500) balanced_eval.extend(unseen_pos[:500]) # Add all available negatives (they're rare) balanced_eval.extend(all_neg[:500]) random.shuffle(balanced_eval) eval_X = [s[0] for s in balanced_eval] eval_y = [s[1] for s in balanced_eval] eval_pos = sum(eval_y) eval_neg = len(eval_y) - eval_pos print(f" Eval set: {len(eval_X)} samples ({eval_pos} pos, {eval_neg} neg)") # ── 3. Load model ── print("\n[3/5] Loading fine-tuned DistilBERT model...") tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR) model = AutoModelForSequenceClassification.from_pretrained(MODEL_DIR) model.eval() print(" Model loaded successfully.") # ── 4. Run predictions ── print(f"\n[4/5] Running predictions...") start_time = time.time() # A) Training set predictions print(f" Evaluating training set ({len(train_X)} samples)...") train_preds = [] batch_size = 32 for i in range(0, len(train_X), batch_size): batch = train_X[i:i+batch_size] preds, _ = predict_batch(batch, tokenizer, model) train_preds.extend(preds) done = min(i + batch_size, len(train_X)) print(f" {done}/{len(train_X)}", end="\r") print(f" {len(train_X)}/{len(train_X)} done ") # B) Balanced eval set predictions print(f" Evaluating balanced test set ({len(eval_X)} samples)...") eval_preds = [] for i in range(0, len(eval_X), batch_size): batch = eval_X[i:i+batch_size] preds, _ = predict_batch(batch, tokenizer, model) eval_preds.extend(preds) done = min(i + batch_size, len(eval_X)) print(f" {done}/{len(eval_X)}", end="\r") print(f" {len(eval_X)}/{len(eval_X)} done ") elapsed = time.time() - start_time total_inferred = len(train_X) + len(eval_X) print(f" Inference complete: {total_inferred} samples in {elapsed:.1f}s ({total_inferred/elapsed:.0f} samples/sec)") # ── 5. Calculate & display metrics ── print(f"\n[5/5] Computing metrics...\n") # ── Training Set Results ── y_train_true = np.array(train_y) y_train_pred = np.array(train_preds) tr_acc = accuracy_score(y_train_true, y_train_pred) tr_prec = precision_score(y_train_true, y_train_pred, zero_division=0) tr_rec = recall_score(y_train_true, y_train_pred, zero_division=0) tr_f1 = f1_score(y_train_true, y_train_pred, zero_division=0) tr_cm = confusion_matrix(y_train_true, y_train_pred) print("=" * 64) print(" TRAINING SET (first 1000 samples — model saw these)") print("=" * 64) print(f" Samples: {len(train_X)} ({train_pos} sensitive, {train_neg} safe)") print(f" Accuracy: {tr_acc:.4f} ({tr_acc*100:.2f}%)") print(f" Precision: {tr_prec:.4f} ({tr_prec*100:.2f}%)") print(f" Recall: {tr_rec:.4f} ({tr_rec*100:.2f}%)") print(f" F1 Score: {tr_f1:.4f} ({tr_f1*100:.2f}%)") print(f"\n Confusion Matrix:") print(f" Predicted Safe Predicted Sensitive") if tr_cm.shape == (2, 2): print(f" Actual Safe {tr_cm[0][0]:>10} {tr_cm[0][1]:>10}") print(f" Actual Sensitive {tr_cm[1][0]:>10} {tr_cm[1][1]:>10}") else: print(f" {tr_cm}") # ── Balanced Eval Set Results ── y_eval_true = np.array(eval_y) y_eval_pred = np.array(eval_preds) ev_acc = accuracy_score(y_eval_true, y_eval_pred) ev_prec = precision_score(y_eval_true, y_eval_pred, zero_division=0) ev_rec = recall_score(y_eval_true, y_eval_pred, zero_division=0) ev_f1 = f1_score(y_eval_true, y_eval_pred, zero_division=0) ev_cm = confusion_matrix(y_eval_true, y_eval_pred) print(f"\n{'=' * 64}") print(" BALANCED EVALUATION SET (mixed seen + unseen data)") print("=" * 64) print(f" Samples: {len(eval_X)} ({eval_pos} sensitive, {eval_neg} safe)") print(f" Accuracy: {ev_acc:.4f} ({ev_acc*100:.2f}%)") print(f" Precision: {ev_prec:.4f} ({ev_prec*100:.2f}%)") print(f" Recall: {ev_rec:.4f} ({ev_rec*100:.2f}%)") print(f" F1 Score: {ev_f1:.4f} ({ev_f1*100:.2f}%)") print(f"\n Confusion Matrix:") print(f" Predicted Safe Predicted Sensitive") if ev_cm.shape == (2, 2): print(f" Actual Safe {ev_cm[0][0]:>10} {ev_cm[0][1]:>10}") print(f" Actual Sensitive {ev_cm[1][0]:>10} {ev_cm[1][1]:>10}") else: print(f" {ev_cm}") print(f"\n Classification Report:") print(classification_report(y_eval_true, y_eval_pred, target_names=["Safe (0)", "Sensitive (1)"], zero_division=0)) # ── Keyword-level analysis ── print("=" * 64) print(" KEYWORD-LEVEL ANALYSIS") print("=" * 64) print(" Testing each sensitive keyword individually:\n") kw_correct = 0 kw_total = len(deep_ml_engine.SENSITIVE_KEYWORDS) for kw in deep_ml_engine.SENSITIVE_KEYWORDS: preds, probs = predict_batch([kw.lower()], tokenizer, model) pred = preds[0] conf = probs[0][pred] * 100 status = "CORRECT" if pred == 1 else "MISSED" icon = "+" if pred == 1 else "X" if pred == 1: kw_correct += 1 print(f" [{icon}] {kw:<30s} -> {'Sensitive' if pred==1 else 'Safe':>10s} ({conf:.1f}% conf) [{status}]") print(f"\n Keywords detected: {kw_correct}/{kw_total} ({kw_correct/kw_total*100:.1f}%)") # ── Benign word analysis ── benign_words = ["app", "main", "index", "style", "script", "logo", "banner", "test", "data", "public"] print(f"\n Testing benign/safe words:\n") bn_correct = 0 for bw in benign_words: preds, probs = predict_batch([bw], tokenizer, model) pred = preds[0] conf = probs[0][pred] * 100 status = "CORRECT" if pred == 0 else "FALSE POS" icon = "+" if pred == 0 else "!" if pred == 0: bn_correct += 1 print(f" [{icon}] {bw:<30s} -> {'Safe' if pred==0 else 'Sensitive':>10s} ({conf:.1f}% conf) [{status}]") print(f"\n Benign correct: {bn_correct}/{len(benign_words)} ({bn_correct/len(benign_words)*100:.1f}%)") # ── Final Summary ── print(f"\n{'=' * 64}") print(" FINAL SUMMARY") print("=" * 64) print(f" Model: DistilBERT (distilbert-base-uncased)") print(f" Fine-tuned on: 1000 samples (1 epoch, lr=2e-5)") print(f" Dataset source: Custom keywords + nvidia/Nemotron-PII") print(f" Inference time: {elapsed:.1f}s ({total_inferred/elapsed:.0f} samples/sec)") print(f" ──────────────────────────────────────────────────────") print(f" Training Accuracy: {tr_acc*100:.2f}%") print(f" Balanced Eval Acc: {ev_acc*100:.2f}%") print(f" Balanced Eval F1: {ev_f1*100:.2f}%") print(f" Keyword Detection: {kw_correct}/{kw_total} ({kw_correct/kw_total*100:.1f}%)") print(f" Benign Rejection: {bn_correct}/{len(benign_words)} ({bn_correct/len(benign_words)*100:.1f}%)") print("=" * 64) if __name__ == "__main__": evaluate()