s3shastra / evaluate_deep_model.py
Atharv834
Deploy S3Shastra backend - FastAPI + scanners + ML models
6a4dcb6
"""
Evaluate the Deep Scan DistilBERT model accuracy.
Loads the fine-tuned model from deep_s3_model_hf/ and evaluates it
against a balanced test set built from the same data pipeline.
Outputs: Accuracy, Precision, Recall, F1, Confusion Matrix.
"""
import os
import sys
import random
import time
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, classification_report
try:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
except ImportError:
print("ERROR: transformers and torch are required. Run: pip install transformers torch")
sys.exit(1)
import deep_ml_engine
MODEL_DIR = os.path.join(os.path.dirname(__file__), "deep_s3_model_hf")
def predict_batch(texts, tokenizer, model):
"""Run inference on a batch of texts using direct model forward pass (faster than pipeline)."""
# Sanitize
texts = [t if t.strip() else "empty" for t in texts]
inputs = tokenizer(texts, padding=True, truncation=True, max_length=128, return_tensors="pt")
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
preds = torch.argmax(logits, dim=-1).tolist()
probs = torch.softmax(logits, dim=-1).tolist()
return preds, probs
def evaluate():
if not os.path.isdir(MODEL_DIR):
print(f"ERROR: Model directory not found at {MODEL_DIR}")
print("Train the model first with: python train_deep_model.py")
sys.exit(1)
print("=" * 64)
print(" S3Shastra Deep Scanner β€” DistilBERT Accuracy Evaluation")
print("=" * 64)
# ── 1. Build the full dataset ──
print("\n[1/5] Building evaluation dataset...")
X_all, y_all = deep_ml_engine.build_dataset_synthetic()
total_pos = sum(y_all)
total_neg = len(y_all) - total_pos
print(f" Total raw samples: {len(X_all)}")
print(f" Positive (sensitive): {total_pos}")
print(f" Negative (safe): {total_neg}")
# ── 2. Create balanced evaluation subsets ──
print("\n[2/5] Creating balanced evaluation sets...")
# Separate by class
pos_samples = [(x, y) for x, y in zip(X_all, y_all) if y == 1]
neg_samples = [(x, y) for x, y in zip(X_all, y_all) if y == 0]
random.seed(42)
random.shuffle(pos_samples)
random.shuffle(neg_samples)
# Training set: the first 1000 the model actually trained on
train_X = X_all[:1000]
train_y = y_all[:1000]
train_pos = sum(train_y)
train_neg = len(train_y) - train_pos
print(f" Training set: {len(train_X)} samples ({train_pos} pos, {train_neg} neg)")
# Balanced evaluation set: take min(500, available) from each class
# Use samples the model didn't train on (index 1000+)
unseen_pos = [(x, y) for x, y in zip(X_all[1000:], y_all[1000:]) if y == 1]
unseen_neg = [(x, y) for x, y in zip(X_all[1000:], y_all[1000:]) if y == 0]
# For the balanced test, we also add the keyword samples (first ~60) and benign words
# since these are critical to get right
keyword_samples = [(x, y) for x, y in zip(X_all[:61], y_all[:61])] # ~31 keywords + 30 benign
eval_size = min(500, len(unseen_pos), max(len(unseen_neg), 30))
# If we don't have enough unseen negatives, take from the training negatives too
all_neg = neg_samples.copy()
random.shuffle(all_neg)
balanced_eval = []
# Add keyword/benign core samples
balanced_eval.extend(keyword_samples)
# Add unseen positive samples (cap at 500)
balanced_eval.extend(unseen_pos[:500])
# Add all available negatives (they're rare)
balanced_eval.extend(all_neg[:500])
random.shuffle(balanced_eval)
eval_X = [s[0] for s in balanced_eval]
eval_y = [s[1] for s in balanced_eval]
eval_pos = sum(eval_y)
eval_neg = len(eval_y) - eval_pos
print(f" Eval set: {len(eval_X)} samples ({eval_pos} pos, {eval_neg} neg)")
# ── 3. Load model ──
print("\n[3/5] Loading fine-tuned DistilBERT model...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_DIR)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_DIR)
model.eval()
print(" Model loaded successfully.")
# ── 4. Run predictions ──
print(f"\n[4/5] Running predictions...")
start_time = time.time()
# A) Training set predictions
print(f" Evaluating training set ({len(train_X)} samples)...")
train_preds = []
batch_size = 32
for i in range(0, len(train_X), batch_size):
batch = train_X[i:i+batch_size]
preds, _ = predict_batch(batch, tokenizer, model)
train_preds.extend(preds)
done = min(i + batch_size, len(train_X))
print(f" {done}/{len(train_X)}", end="\r")
print(f" {len(train_X)}/{len(train_X)} done ")
# B) Balanced eval set predictions
print(f" Evaluating balanced test set ({len(eval_X)} samples)...")
eval_preds = []
for i in range(0, len(eval_X), batch_size):
batch = eval_X[i:i+batch_size]
preds, _ = predict_batch(batch, tokenizer, model)
eval_preds.extend(preds)
done = min(i + batch_size, len(eval_X))
print(f" {done}/{len(eval_X)}", end="\r")
print(f" {len(eval_X)}/{len(eval_X)} done ")
elapsed = time.time() - start_time
total_inferred = len(train_X) + len(eval_X)
print(f" Inference complete: {total_inferred} samples in {elapsed:.1f}s ({total_inferred/elapsed:.0f} samples/sec)")
# ── 5. Calculate & display metrics ──
print(f"\n[5/5] Computing metrics...\n")
# ── Training Set Results ──
y_train_true = np.array(train_y)
y_train_pred = np.array(train_preds)
tr_acc = accuracy_score(y_train_true, y_train_pred)
tr_prec = precision_score(y_train_true, y_train_pred, zero_division=0)
tr_rec = recall_score(y_train_true, y_train_pred, zero_division=0)
tr_f1 = f1_score(y_train_true, y_train_pred, zero_division=0)
tr_cm = confusion_matrix(y_train_true, y_train_pred)
print("=" * 64)
print(" TRAINING SET (first 1000 samples β€” model saw these)")
print("=" * 64)
print(f" Samples: {len(train_X)} ({train_pos} sensitive, {train_neg} safe)")
print(f" Accuracy: {tr_acc:.4f} ({tr_acc*100:.2f}%)")
print(f" Precision: {tr_prec:.4f} ({tr_prec*100:.2f}%)")
print(f" Recall: {tr_rec:.4f} ({tr_rec*100:.2f}%)")
print(f" F1 Score: {tr_f1:.4f} ({tr_f1*100:.2f}%)")
print(f"\n Confusion Matrix:")
print(f" Predicted Safe Predicted Sensitive")
if tr_cm.shape == (2, 2):
print(f" Actual Safe {tr_cm[0][0]:>10} {tr_cm[0][1]:>10}")
print(f" Actual Sensitive {tr_cm[1][0]:>10} {tr_cm[1][1]:>10}")
else:
print(f" {tr_cm}")
# ── Balanced Eval Set Results ──
y_eval_true = np.array(eval_y)
y_eval_pred = np.array(eval_preds)
ev_acc = accuracy_score(y_eval_true, y_eval_pred)
ev_prec = precision_score(y_eval_true, y_eval_pred, zero_division=0)
ev_rec = recall_score(y_eval_true, y_eval_pred, zero_division=0)
ev_f1 = f1_score(y_eval_true, y_eval_pred, zero_division=0)
ev_cm = confusion_matrix(y_eval_true, y_eval_pred)
print(f"\n{'=' * 64}")
print(" BALANCED EVALUATION SET (mixed seen + unseen data)")
print("=" * 64)
print(f" Samples: {len(eval_X)} ({eval_pos} sensitive, {eval_neg} safe)")
print(f" Accuracy: {ev_acc:.4f} ({ev_acc*100:.2f}%)")
print(f" Precision: {ev_prec:.4f} ({ev_prec*100:.2f}%)")
print(f" Recall: {ev_rec:.4f} ({ev_rec*100:.2f}%)")
print(f" F1 Score: {ev_f1:.4f} ({ev_f1*100:.2f}%)")
print(f"\n Confusion Matrix:")
print(f" Predicted Safe Predicted Sensitive")
if ev_cm.shape == (2, 2):
print(f" Actual Safe {ev_cm[0][0]:>10} {ev_cm[0][1]:>10}")
print(f" Actual Sensitive {ev_cm[1][0]:>10} {ev_cm[1][1]:>10}")
else:
print(f" {ev_cm}")
print(f"\n Classification Report:")
print(classification_report(y_eval_true, y_eval_pred, target_names=["Safe (0)", "Sensitive (1)"], zero_division=0))
# ── Keyword-level analysis ──
print("=" * 64)
print(" KEYWORD-LEVEL ANALYSIS")
print("=" * 64)
print(" Testing each sensitive keyword individually:\n")
kw_correct = 0
kw_total = len(deep_ml_engine.SENSITIVE_KEYWORDS)
for kw in deep_ml_engine.SENSITIVE_KEYWORDS:
preds, probs = predict_batch([kw.lower()], tokenizer, model)
pred = preds[0]
conf = probs[0][pred] * 100
status = "CORRECT" if pred == 1 else "MISSED"
icon = "+" if pred == 1 else "X"
if pred == 1:
kw_correct += 1
print(f" [{icon}] {kw:<30s} -> {'Sensitive' if pred==1 else 'Safe':>10s} ({conf:.1f}% conf) [{status}]")
print(f"\n Keywords detected: {kw_correct}/{kw_total} ({kw_correct/kw_total*100:.1f}%)")
# ── Benign word analysis ──
benign_words = ["app", "main", "index", "style", "script", "logo", "banner", "test", "data", "public"]
print(f"\n Testing benign/safe words:\n")
bn_correct = 0
for bw in benign_words:
preds, probs = predict_batch([bw], tokenizer, model)
pred = preds[0]
conf = probs[0][pred] * 100
status = "CORRECT" if pred == 0 else "FALSE POS"
icon = "+" if pred == 0 else "!"
if pred == 0:
bn_correct += 1
print(f" [{icon}] {bw:<30s} -> {'Safe' if pred==0 else 'Sensitive':>10s} ({conf:.1f}% conf) [{status}]")
print(f"\n Benign correct: {bn_correct}/{len(benign_words)} ({bn_correct/len(benign_words)*100:.1f}%)")
# ── Final Summary ──
print(f"\n{'=' * 64}")
print(" FINAL SUMMARY")
print("=" * 64)
print(f" Model: DistilBERT (distilbert-base-uncased)")
print(f" Fine-tuned on: 1000 samples (1 epoch, lr=2e-5)")
print(f" Dataset source: Custom keywords + nvidia/Nemotron-PII")
print(f" Inference time: {elapsed:.1f}s ({total_inferred/elapsed:.0f} samples/sec)")
print(f" ──────────────────────────────────────────────────────")
print(f" Training Accuracy: {tr_acc*100:.2f}%")
print(f" Balanced Eval Acc: {ev_acc*100:.2f}%")
print(f" Balanced Eval F1: {ev_f1*100:.2f}%")
print(f" Keyword Detection: {kw_correct}/{kw_total} ({kw_correct/kw_total*100:.1f}%)")
print(f" Benign Rejection: {bn_correct}/{len(benign_words)} ({bn_correct/len(benign_words)*100:.1f}%)")
print("=" * 64)
if __name__ == "__main__":
evaluate()