| """ |
| Evaluate the Deep Scan DistilBERT model accuracy. |
| Loads the fine-tuned model from deep_s3_model_hf/ and evaluates it |
| against a balanced test set built from the same data pipeline. |
| Outputs: Accuracy, Precision, Recall, F1, Confusion Matrix. |
| """ |
|
|
| import os |
| import sys |
| import random |
| import time |
| import numpy as np |
| from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report |
|
|
| try: |
| from transformers import AutoTokenizer, AutoModelForSequenceClassification |
| import torch |
| except ImportError: |
| print("ERROR: transformers and torch are required. Run: pip install transformers torch") |
| sys.exit(1) |
|
|
| import deep_ml_engine |
|
|
| MODEL_DIR = os.path.join(os.path.dirname(__file__), "deep_s3_model_hf") |
|
|
|
|
| def predict_batch(texts, tokenizer, model): |
| """Run inference on a batch of texts using direct model forward pass (faster than pipeline).""" |
| |
| texts = [t if t.strip() else "empty" for t in texts] |
| inputs = tokenizer(texts, padding=True, truncation=True, max_length=128, return_tensors="pt") |
| with torch.no_grad(): |
| outputs = model(**inputs) |
| logits = outputs.logits |
| preds = torch.argmax(logits, dim=-1).tolist() |
| probs = torch.softmax(logits, dim=-1).tolist() |
| return preds, probs |
|
|
|
|
| def evaluate(): |
| if not os.path.isdir(MODEL_DIR): |
| print(f"ERROR: Model directory not found at {MODEL_DIR}") |
| print("Train the model first with: python train_deep_model.py") |
| sys.exit(1) |
|
|
| print("=" * 64) |
| print(" S3Shastra Deep Scanner β DistilBERT Accuracy Evaluation") |
| print("=" * 64) |
|
|
| |
| print("\n[1/5] Building evaluation dataset...") |
| X_all, y_all = deep_ml_engine.build_dataset_synthetic() |
| |
| total_pos = sum(y_all) |
| total_neg = len(y_all) - total_pos |
| print(f" Total raw samples: {len(X_all)}") |
| print(f" Positive (sensitive): {total_pos}") |
| print(f" Negative (safe): {total_neg}") |
|
|
| |
| print("\n[2/5] Creating balanced evaluation sets...") |
| |
| |
| pos_samples = [(x, y) for x, y in zip(X_all, y_all) if y == 1] |
| neg_samples = [(x, y) for x, y in zip(X_all, y_all) if y == 0] |
| |
| random.seed(42) |
| random.shuffle(pos_samples) |
| random.shuffle(neg_samples) |
| |
| |
| train_X = X_all[:1000] |
| train_y = y_all[:1000] |
| train_pos = sum(train_y) |
| train_neg = len(train_y) - train_pos |
| print(f" Training set: {len(train_X)} samples ({train_pos} pos, {train_neg} neg)") |
| |
| |
| |
| unseen_pos = [(x, y) for x, y in zip(X_all[1000:], y_all[1000:]) if y == 1] |
| unseen_neg = [(x, y) for x, y in zip(X_all[1000:], y_all[1000:]) if y == 0] |
| |
| |
| |
| keyword_samples = [(x, y) for x, y in zip(X_all[:61], y_all[:61])] |
| |
| eval_size = min(500, len(unseen_pos), max(len(unseen_neg), 30)) |
| |
| |
| all_neg = neg_samples.copy() |
| random.shuffle(all_neg) |
| |
| balanced_eval = [] |
| |
| balanced_eval.extend(keyword_samples) |
| |
| balanced_eval.extend(unseen_pos[:500]) |
| |
| balanced_eval.extend(all_neg[:500]) |
| |
| random.shuffle(balanced_eval) |
| eval_X = [s[0] for s in balanced_eval] |
| eval_y = [s[1] for s in balanced_eval] |
| |
| eval_pos = sum(eval_y) |
| eval_neg = len(eval_y) - eval_pos |
| print(f" Eval set: {len(eval_X)} samples ({eval_pos} pos, {eval_neg} neg)") |
|
|
| |
| print("\n[3/5] Loading fine-tuned DistilBERT model...") |
| tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR) |
| model = AutoModelForSequenceClassification.from_pretrained(MODEL_DIR) |
| model.eval() |
| print(" Model loaded successfully.") |
|
|
| |
| print(f"\n[4/5] Running predictions...") |
| |
| start_time = time.time() |
| |
| |
| print(f" Evaluating training set ({len(train_X)} samples)...") |
| train_preds = [] |
| batch_size = 32 |
| for i in range(0, len(train_X), batch_size): |
| batch = train_X[i:i+batch_size] |
| preds, _ = predict_batch(batch, tokenizer, model) |
| train_preds.extend(preds) |
| done = min(i + batch_size, len(train_X)) |
| print(f" {done}/{len(train_X)}", end="\r") |
| print(f" {len(train_X)}/{len(train_X)} done ") |
|
|
| |
| print(f" Evaluating balanced test set ({len(eval_X)} samples)...") |
| eval_preds = [] |
| for i in range(0, len(eval_X), batch_size): |
| batch = eval_X[i:i+batch_size] |
| preds, _ = predict_batch(batch, tokenizer, model) |
| eval_preds.extend(preds) |
| done = min(i + batch_size, len(eval_X)) |
| print(f" {done}/{len(eval_X)}", end="\r") |
| print(f" {len(eval_X)}/{len(eval_X)} done ") |
| |
| elapsed = time.time() - start_time |
| total_inferred = len(train_X) + len(eval_X) |
| print(f" Inference complete: {total_inferred} samples in {elapsed:.1f}s ({total_inferred/elapsed:.0f} samples/sec)") |
|
|
| |
| print(f"\n[5/5] Computing metrics...\n") |
| |
| |
| y_train_true = np.array(train_y) |
| y_train_pred = np.array(train_preds) |
| |
| tr_acc = accuracy_score(y_train_true, y_train_pred) |
| tr_prec = precision_score(y_train_true, y_train_pred, zero_division=0) |
| tr_rec = recall_score(y_train_true, y_train_pred, zero_division=0) |
| tr_f1 = f1_score(y_train_true, y_train_pred, zero_division=0) |
| tr_cm = confusion_matrix(y_train_true, y_train_pred) |
| |
| print("=" * 64) |
| print(" TRAINING SET (first 1000 samples β model saw these)") |
| print("=" * 64) |
| print(f" Samples: {len(train_X)} ({train_pos} sensitive, {train_neg} safe)") |
| print(f" Accuracy: {tr_acc:.4f} ({tr_acc*100:.2f}%)") |
| print(f" Precision: {tr_prec:.4f} ({tr_prec*100:.2f}%)") |
| print(f" Recall: {tr_rec:.4f} ({tr_rec*100:.2f}%)") |
| print(f" F1 Score: {tr_f1:.4f} ({tr_f1*100:.2f}%)") |
| print(f"\n Confusion Matrix:") |
| print(f" Predicted Safe Predicted Sensitive") |
| if tr_cm.shape == (2, 2): |
| print(f" Actual Safe {tr_cm[0][0]:>10} {tr_cm[0][1]:>10}") |
| print(f" Actual Sensitive {tr_cm[1][0]:>10} {tr_cm[1][1]:>10}") |
| else: |
| print(f" {tr_cm}") |
| |
| |
| y_eval_true = np.array(eval_y) |
| y_eval_pred = np.array(eval_preds) |
| |
| ev_acc = accuracy_score(y_eval_true, y_eval_pred) |
| ev_prec = precision_score(y_eval_true, y_eval_pred, zero_division=0) |
| ev_rec = recall_score(y_eval_true, y_eval_pred, zero_division=0) |
| ev_f1 = f1_score(y_eval_true, y_eval_pred, zero_division=0) |
| ev_cm = confusion_matrix(y_eval_true, y_eval_pred) |
| |
| print(f"\n{'=' * 64}") |
| print(" BALANCED EVALUATION SET (mixed seen + unseen data)") |
| print("=" * 64) |
| print(f" Samples: {len(eval_X)} ({eval_pos} sensitive, {eval_neg} safe)") |
| print(f" Accuracy: {ev_acc:.4f} ({ev_acc*100:.2f}%)") |
| print(f" Precision: {ev_prec:.4f} ({ev_prec*100:.2f}%)") |
| print(f" Recall: {ev_rec:.4f} ({ev_rec*100:.2f}%)") |
| print(f" F1 Score: {ev_f1:.4f} ({ev_f1*100:.2f}%)") |
| print(f"\n Confusion Matrix:") |
| print(f" Predicted Safe Predicted Sensitive") |
| if ev_cm.shape == (2, 2): |
| print(f" Actual Safe {ev_cm[0][0]:>10} {ev_cm[0][1]:>10}") |
| print(f" Actual Sensitive {ev_cm[1][0]:>10} {ev_cm[1][1]:>10}") |
| else: |
| print(f" {ev_cm}") |
| |
| print(f"\n Classification Report:") |
| print(classification_report(y_eval_true, y_eval_pred, target_names=["Safe (0)", "Sensitive (1)"], zero_division=0)) |
| |
| |
| print("=" * 64) |
| print(" KEYWORD-LEVEL ANALYSIS") |
| print("=" * 64) |
| print(" Testing each sensitive keyword individually:\n") |
| kw_correct = 0 |
| kw_total = len(deep_ml_engine.SENSITIVE_KEYWORDS) |
| for kw in deep_ml_engine.SENSITIVE_KEYWORDS: |
| preds, probs = predict_batch([kw.lower()], tokenizer, model) |
| pred = preds[0] |
| conf = probs[0][pred] * 100 |
| status = "CORRECT" if pred == 1 else "MISSED" |
| icon = "+" if pred == 1 else "X" |
| if pred == 1: |
| kw_correct += 1 |
| print(f" [{icon}] {kw:<30s} -> {'Sensitive' if pred==1 else 'Safe':>10s} ({conf:.1f}% conf) [{status}]") |
| |
| print(f"\n Keywords detected: {kw_correct}/{kw_total} ({kw_correct/kw_total*100:.1f}%)") |
| |
| |
| benign_words = ["app", "main", "index", "style", "script", "logo", "banner", "test", "data", "public"] |
| print(f"\n Testing benign/safe words:\n") |
| bn_correct = 0 |
| for bw in benign_words: |
| preds, probs = predict_batch([bw], tokenizer, model) |
| pred = preds[0] |
| conf = probs[0][pred] * 100 |
| status = "CORRECT" if pred == 0 else "FALSE POS" |
| icon = "+" if pred == 0 else "!" |
| if pred == 0: |
| bn_correct += 1 |
| print(f" [{icon}] {bw:<30s} -> {'Safe' if pred==0 else 'Sensitive':>10s} ({conf:.1f}% conf) [{status}]") |
| |
| print(f"\n Benign correct: {bn_correct}/{len(benign_words)} ({bn_correct/len(benign_words)*100:.1f}%)") |
| |
| |
| print(f"\n{'=' * 64}") |
| print(" FINAL SUMMARY") |
| print("=" * 64) |
| print(f" Model: DistilBERT (distilbert-base-uncased)") |
| print(f" Fine-tuned on: 1000 samples (1 epoch, lr=2e-5)") |
| print(f" Dataset source: Custom keywords + nvidia/Nemotron-PII") |
| print(f" Inference time: {elapsed:.1f}s ({total_inferred/elapsed:.0f} samples/sec)") |
| print(f" ββββββββββββββββββββββββββββββββββββββββββββββββββββββ") |
| print(f" Training Accuracy: {tr_acc*100:.2f}%") |
| print(f" Balanced Eval Acc: {ev_acc*100:.2f}%") |
| print(f" Balanced Eval F1: {ev_f1*100:.2f}%") |
| print(f" Keyword Detection: {kw_correct}/{kw_total} ({kw_correct/kw_total*100:.1f}%)") |
| print(f" Benign Rejection: {bn_correct}/{len(benign_words)} ({bn_correct/len(benign_words)*100:.1f}%)") |
| print("=" * 64) |
|
|
|
|
| if __name__ == "__main__": |
| evaluate() |
|
|