LT360
Update CORS
30cda43
import joblib
import pandas as pd
import re
from lime.lime_text import LimeTextExplainer
import numpy as np
import os
from transformers import AutoTokenizer, AutoModelForSequenceClassification
import torch
# Configure and setup model and preprocessor files
ASSETS_DIR = os.path.join(os.path.dirname(__file__), 'assets')
PREPROCESSOR_FILENAME = "email_preprocessor_20250506_203148.joblib"
MODEL_FILENAME = "phishing_nb_model_20250506_203148.joblib"
PREPROCESSOR_PATH = os.path.join(ASSETS_DIR, PREPROCESSOR_FILENAME)
MODEL_PATH = os.path.join(ASSETS_DIR, MODEL_FILENAME)
# Load model and preprocessor
try:
preprocessor = joblib.load(PREPROCESSOR_PATH)
model = joblib.load(MODEL_PATH)
print("ML Model and Preprocessor loaded successfully from ml_logic.")
except FileNotFoundError:
print(f"FATAL ERROR: Could not find model ('{MODEL_PATH}') or preprocessor ('{PREPROCESSOR_PATH}').")
print("Ensure files are in 'app/assets/' and filenames are correct in ml_logic.py.")
preprocessor = None
model = None
except Exception as e:
print(f"Error loading ML model/preprocessor: {e}")
preprocessor = None
model = None
# --- Load BERT-mini model and tokenizer from Hugging Face Hub ---
BERT_MODEL_ID = "lleratodev/720-bert-mini-phishing" # e.g., "LeratoLetsepe/phishing-bert-mini"
try:
bert_tokenizer = AutoTokenizer.from_pretrained(BERT_MODEL_ID)
bert_model = AutoModelForSequenceClassification.from_pretrained(BERT_MODEL_ID)
bert_model.eval() # Set model to evaluation mode
print(f"BERT-mini model ('{BERT_MODEL_ID}') and tokenizer loaded successfully from Hugging Face Hub.")
# Determine device for BERT model (CPU by default, can be adapted for GPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
bert_model.to(device)
print(f"BERT model moved to device: {device}")
except Exception as e:
print(f"FATAL ERROR (BERT): Could not load model/tokenizer '{BERT_MODEL_ID}' from Hugging Face Hub: {e}")
print("Ensure the model ID is correct, you have an internet connection, and the model files are correctly set up on the Hub.")
bert_tokenizer = None
bert_model = None
# --- End BERT Loading ---
# Text cleaning function, makes everything lowercase, removed non alpha-numeric characters and normalize white spaces
def simple_text_clean(text):
if isinstance(text, str):
text = text.lower()
text = re.sub(r'[^a-z0-9\s]', '', text)
text = re.sub(r'\s+', ' ', text).strip()
else:
text = ''
return text
# For explanability, LIME setup - # LIME probability function for MultinomialNB model
class_names = ['Legitimate', 'Phishing'] # 0: Legitimate, 1: Phishing
explainer = LimeTextExplainer(class_names=class_names)
def model_predict_probability_for_lime(combined_texts):
if preprocessor is None or model is None:
return np.array([[0.5, 0.5]] * len(combined_texts))
subjects = []
senders = []
bodies = []
for combined_text in combined_texts:
s_marker = "subject: "
d_marker = " sender: "
b_marker = " body: "
s_text, d_text, b_text = "", "", ""
if d_marker in combined_text:
s_text_part, rest = combined_text.split(d_marker, 1)
if s_marker in s_text_part:
s_text = s_text_part.replace(s_marker, "").strip()
if b_marker in rest:
d_text_part, b_text_part = rest.split(b_marker, 1)
d_text = d_text_part.strip()
b_text = b_text_part.strip()
else:
d_text = rest.strip()
else:
if s_marker in combined_text and b_marker in combined_text :
s_text_part, b_text_part = combined_text.split(b_marker, 1)
s_text = s_text_part.replace(s_marker, "").strip()
b_text = b_text_part.strip()
elif s_marker in combined_text:
s_text = combined_text.replace(s_marker,"").strip()
else:
b_text = combined_text.strip()
subjects.append(simple_text_clean(s_text))
senders.append(simple_text_clean(d_text))
bodies.append(simple_text_clean(b_text))
data_for_lime = pd.DataFrame({
'subject': subjects,
'sender': senders,
'body': bodies
})
try:
vectorized_input = preprocessor.transform(data_for_lime)
probabilities = model.predict_proba(vectorized_input)
return probabilities
except Exception as e:
print(f"Error in model_predict_probability_for_lime function during transform/predict: {e}")
return np.array([[0.5, 0.5]] * len(combined_texts))
def get_prediction_and_explanation(subject: str, sender: str, body: str):
if preprocessor is None or model is None:
return {"error": "Model/Preprocessor not loaded. Check server logs.", "prediction": "Error", "label": -1, "confidence": 0.0, "explanation": []}
cleaned_subject = simple_text_clean(subject)
cleaned_sender = simple_text_clean(sender)
cleaned_body = simple_text_clean(body)
input_df_for_model = pd.DataFrame({
'subject': [cleaned_subject],
'sender': [cleaned_sender],
'body': [cleaned_body]
})
try:
vectorized_input = preprocessor.transform(input_df_for_model)
prediction_label_int = model.predict(vectorized_input)[0]
probabilities = model.predict_proba(vectorized_input)[0]
predicted_class_name = class_names[prediction_label_int]
confidence_score = probabilities[prediction_label_int]
except Exception as e:
return {"error": f"Prediction error: {e}", "prediction": "Error",
"label": -1, "confidence": 0.0, "explanation": []}
text_for_lime = f"{cleaned_subject} : {cleaned_sender} : {cleaned_body}"
explanation_data = []
try:
exp = explainer.explain_instance(
text_instance=text_for_lime,
classifier_fn=model_predict_probability_for_lime,
num_features=15,
top_labels=1,
labels=(prediction_label_int,)
)
explanation_data = exp.as_list(label=prediction_label_int)
print(f"LIME Explanation (Top 3): {explanation_data[:3]}")
except Exception as e:
print(f"LIME explanation error: {e}")
explanation_data = [("LIME explanation error or N/A", 0.0)]
return {
"prediction": predicted_class_name,
"label": int(prediction_label_int),
"confidence": float(confidence_score),
"explanation": explanation_data
}