LT360 commited on
Commit
c551752
·
1 Parent(s): b5c79a0

Organized folders, and added a BERT-Mini model and explainer to use for email classifications

Browse files
.gitattributes CHANGED
File without changes
.gitignore CHANGED
File without changes
.vscode/launch.json CHANGED
File without changes
Dockerfile CHANGED
File without changes
README.md CHANGED
File without changes
app/__init__.py CHANGED
File without changes
app/assets/email_preprocessor_20250506_203148.joblib CHANGED
File without changes
app/assets/phishing_nb_model_20250506_203148.joblib CHANGED
File without changes
app/main.py CHANGED
@@ -1,7 +1,7 @@
1
  from fastapi import FastAPI
2
  from pydantic import BaseModel
3
  from typing import List, Tuple, Optional
4
- from .ml_logic import get_prediction_and_explanation # helper function from ml_logic
5
 
6
  app = FastAPI(title="AI-Powered Phishing Email Detection System")
7
 
@@ -10,6 +10,7 @@ class EmailInput(BaseModel):
10
  subject: Optional[str] = ""
11
  sender: Optional[str] = ""
12
  body: str
 
13
 
14
  # Define output data model
15
  class PredictionResponse(BaseModel):
@@ -24,16 +25,27 @@ class PredictionResponse(BaseModel):
24
  async def root():
25
  return {"message": "AI-Powered Phishing Email Detection API. POST to /predict with 'subject', 'sender', 'body'."}
26
 
 
 
 
 
 
27
  @app.post("/predict", response_model=PredictionResponse)
28
  async def predict_email(email_input: EmailInput):
 
 
 
 
29
  try:
30
- result = get_prediction_and_explanation(
31
- email_input.subject or "",
32
- email_input.sender or "",
33
- email_input.body
 
34
  )
35
- if "error" in result and result["error"]:
36
- return PredictionResponse(prediction="Error", label=-1, confidence=0.0, explanation=[], error=result["error"])
37
  return PredictionResponse(**result)
 
38
  except Exception as e:
39
- return PredictionResponse(prediction="Error", label=-1, confidence=0.0, explanation=[], error=f"API error: {str(e)}")
 
 
 
1
  from fastapi import FastAPI
2
  from pydantic import BaseModel
3
  from typing import List, Tuple, Optional
4
+ from .ml import get_model_prediction, check_model_status
5
 
6
  app = FastAPI(title="AI-Powered Phishing Email Detection System")
7
 
 
10
  subject: Optional[str] = ""
11
  sender: Optional[str] = ""
12
  body: str
13
+ model_choice: Optional[str] = "nb" # Default to Naive Bayes
14
 
15
  # Define output data model
16
  class PredictionResponse(BaseModel):
 
25
  async def root():
26
  return {"message": "AI-Powered Phishing Email Detection API. POST to /predict with 'subject', 'sender', 'body'."}
27
 
28
+ @app.get("/status")
29
+ async def model_status():
30
+ return check_model_status()
31
+
32
+
33
  @app.post("/predict", response_model=PredictionResponse)
34
  async def predict_email(email_input: EmailInput):
35
+
36
+ if email_input.model_choice not in ["nb", "bert-mini"]:
37
+ return PredictionResponse(prediction="Error", label=-1, confidence=0.0, explanation=[],
38
+ error="Invalid model_choice. Please use 'nb' or 'bert-mini'.")
39
  try:
40
+ result = get_model_prediction(
41
+ subject=email_input.subject or "",
42
+ sender=email_input.sender or "",
43
+ body=email_input.body,
44
+ model_choice=email_input.model_choice
45
  )
 
 
46
  return PredictionResponse(**result)
47
+
48
  except Exception as e:
49
+ # Fallback for truly unexpected errors in the endpoint itself
50
+ return PredictionResponse(prediction="Error", label=-1, confidence=0.0, explanation=[],
51
+ error=f"Critical API endpoint error: {str(e)}")
app/ml/__init__.py ADDED
@@ -0,0 +1,36 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/ml/__init__.py
2
+ from .nb_model import get_prediction_and_explanation_nb, nb_model, nb_preprocessor
3
+ from .bert_mini_model import get_prediction_and_explanation_bert_mini, bert_mini_model, bert_mini_tokenizer
4
+ from typing import Dict
5
+
6
+ def get_model_prediction(subject: str, sender: str, body: str, model_choice: str = "nb") -> Dict:
7
+ """
8
+ # Dispatcher function to get predictions from the chosen model.
9
+ """
10
+ if model_choice == "bert-mini":
11
+ if bert_mini_model is None or bert_mini_tokenizer is None:
12
+ return {"error": "BERT-Mini Model/Tokenizer is not available. Check server logs.",
13
+ "prediction": "Error", "label": -1, "confidence": 0.0, "explanation": []}
14
+ return get_prediction_and_explanation_bert_mini(subject, sender, body)
15
+ elif model_choice == "nb":
16
+ if nb_model is None or nb_preprocessor is None: # Check if NB loaded successfully
17
+ return {"error": "Multinomial Naive Bayes Model/Preprocessor is not available. Check server logs.",
18
+ "prediction": "Error", "label": -1, "confidence": 0.0, "explanation": []}
19
+ return get_prediction_and_explanation_nb(subject, sender, body)
20
+ else:
21
+ return {"error": f"Invalid model_choice: '{model_choice}'. Choose 'nb' or 'bert-mini'.",
22
+ "prediction": "Error", "label": -1, "confidence": 0.0, "explanation": []}
23
+
24
+ # You can also add a health check function here if needed
25
+ def check_model_status():
26
+ status = {
27
+ "naive_bayes": {
28
+ "model_loaded": nb_model is not None,
29
+ "preprocessor_loaded": nb_preprocessor is not None
30
+ },
31
+ "bert-mini": {
32
+ "model_loaded": bert_mini_model is not None,
33
+ "tokenizer_loaded": bert_mini_tokenizer is not None
34
+ }
35
+ }
36
+ return status
app/ml/bert_mini_model.py ADDED
@@ -0,0 +1,121 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # app/ml/bert_mini_model.py
2
+ from transformers import AutoTokenizer, AutoModelForSequenceClassification
3
+ import torch
4
+ import numpy as np
5
+ from .common import simple_text_clean, CLASS_NAMES
6
+ import traceback
7
+ from transformers_interpret import SequenceClassificationExplainer
8
+
9
+ # # Load BERT-mini model and tokenizer from Hugging Face Hub
10
+ BERT_MODEL_ID = "lleratodev/720-bert-mini-phishing"
11
+ bert_mini_tokenizer = None
12
+ bert_mini_model = None
13
+ device = None
14
+ cls_explainer_bert_mini = None
15
+
16
+ try:
17
+ bert_mini_tokenizer = AutoTokenizer.from_pretrained(BERT_MODEL_ID)
18
+ bert_mini_model = AutoModelForSequenceClassification.from_pretrained(BERT_MODEL_ID)
19
+
20
+ device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
21
+ bert_mini_model.to(device)
22
+ bert_mini_model.eval()
23
+ cls_explainer_bert_mini = SequenceClassificationExplainer(bert_mini_model, bert_mini_tokenizer)
24
+
25
+ print(f"BERT-mini model ('{BERT_MODEL_ID}'), tokenizer, and Transformers-Interpret Explainer loaded successfully.")
26
+ print(f"BERT-mini model running on device: {device}")
27
+
28
+ except Exception as e:
29
+ print(f"FATAL ERROR (BERT): Could not load model/tokenizer '{BERT_MODEL_ID}' or initialize Transformers-Interpret Explainer: {e}")
30
+ traceback.print_exc()
31
+
32
+ # # Using BERT-Mini model to make email classifications
33
+ def bert_mini_predict_probability_for_lime(text_instances: list) -> np.ndarray:
34
+ if bert_mini_tokenizer is None or bert_mini_model is None:
35
+ # Return neutral probabilities if model isn't loaded (number of instances, number of classes)
36
+ return np.array([[1.0/len(CLASS_NAMES)] * len(CLASS_NAMES)] * len(text_instances))
37
+
38
+ all_probabilities = []
39
+ try:
40
+ for text_instance in text_instances:
41
+ inputs = bert_mini_tokenizer(text_instance, return_tensors="pt", truncation=True, padding="max_length", max_length=512)
42
+ inputs = {k: v.to(device) for k, v in inputs.items()}
43
+
44
+ with torch.no_grad():
45
+ outputs = bert_mini_model(**inputs)
46
+ logits = outputs.logits
47
+
48
+ probabilities_tensor = torch.softmax(logits, dim=-1)
49
+
50
+ probabilities_for_instance = probabilities_tensor.cpu().numpy().squeeze()
51
+ all_probabilities.append(probabilities_for_instance)
52
+
53
+ return np.array(all_probabilities)
54
+
55
+ except Exception as e:
56
+ print(f"Error in bert_mini_predict_probability_for_lime: {e}")
57
+ traceback.print_exc()
58
+ return np.array([[1.0/len(CLASS_NAMES)] * len(CLASS_NAMES)] * len(text_instances))
59
+
60
+
61
+ def get_prediction_and_explanation_bert_mini(subject: str, sender: str, body: str) -> dict:
62
+ if bert_mini_tokenizer is None or bert_mini_model is None or cls_explainer_bert_mini is None:
63
+ return {"error": "BERT-Mini Model/Tokenizer/Explainer not loaded correctly. Check server logs.",
64
+ "prediction": "Error", "label": -1, "confidence": 0.0, "explanation": []}
65
+
66
+ cleaned_sender = simple_text_clean(sender)
67
+ cleaned_subject = simple_text_clean(subject)
68
+ cleaned_body = simple_text_clean(body)
69
+
70
+ combined_text_for_prediction = f"{cleaned_sender} {cleaned_subject} {cleaned_body}"
71
+ text_for_lime_explanation = combined_text_for_prediction
72
+
73
+
74
+ try:
75
+ inputs = bert_mini_tokenizer(combined_text_for_prediction, return_tensors="pt", truncation=True, padding="max_length", max_length=512)
76
+ inputs = {k: v.to(device) for k, v in inputs.items()}
77
+
78
+ with torch.no_grad():
79
+ outputs = bert_mini_model(**inputs)
80
+ logits = outputs.logits
81
+
82
+ probabilities_tensor = torch.softmax(logits, dim=-1)
83
+ probabilities = probabilities_tensor.cpu().numpy()[0]
84
+
85
+ prediction_label_int = np.argmax(probabilities).item()
86
+ confidence_score = probabilities[prediction_label_int].item()
87
+ predicted_class_name = CLASS_NAMES[prediction_label_int]
88
+
89
+ explanation_data = []
90
+ try:
91
+ word_attributions = cls_explainer_bert_mini(
92
+ combined_text_for_prediction,
93
+ index = prediction_label_int
94
+ )
95
+
96
+ explanation_data = [(word, float(score)) for word, score in word_attributions]
97
+
98
+ explanation_data.sort(key=lambda x: abs(x[1]), reverse=True)
99
+ explanation_data = explanation_data[:15]
100
+
101
+ except Exception as e:
102
+ print(f"Transformers-Interpret explanation error: {e}")
103
+ traceback.print_exc()
104
+ explanation_data = [("Explanation error with Transformers-Interpret", 0.0)]
105
+ # --- End Explanation ---
106
+
107
+ return {
108
+ "prediction": predicted_class_name,
109
+ "label": int(prediction_label_int),
110
+ "confidence": float(confidence_score),
111
+ "explanation": explanation_data,
112
+ "error": None
113
+ }
114
+ except Exception as e:
115
+ print(f"--- ORIGINAL ERROR in predict_with_bert_mini ---")
116
+ print(f"Error type: {type(e)}")
117
+ print(f"Error message: {str(e)}")
118
+ print("Traceback:")
119
+ traceback.print_exc()
120
+ return {"error": f"BERT-Mini Prediction error: {str(e)}", "prediction": "Error",
121
+ "label": -1, "confidence": 0.0, "explanation": []}
app/ml/common.py ADDED
@@ -0,0 +1,14 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+
3
+ # Text cleaning function, makes everything lowercase, removed non alpha-numeric characters and normalize white spaces
4
+ def simple_text_clean(text: str) -> str:
5
+ if isinstance(text, str):
6
+ text = text.lower()
7
+ text = re.sub(r'[^a-z0-9\s]', '', text) # Keep spaces, remove other non-alphanumeric
8
+ text = re.sub(r'\s+', ' ', text).strip()
9
+ else:
10
+ text = ''
11
+ return text
12
+
13
+ # Class names for predictions
14
+ CLASS_NAMES = ['Legitimate', 'Phishing'] # 0: Legitimate, 1: Phishing
app/ml/nb_model.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import joblib
2
+ import pandas as pd
3
+ import numpy as np
4
+ import os
5
+ from lime.lime_text import LimeTextExplainer
6
+ from .common import simple_text_clean, CLASS_NAMES
7
+
8
+ ASSETS_DIR = os.path.join(os.path.dirname(__file__), '..', 'assets')
9
+ PREPROCESSOR_FILENAME = "email_preprocessor_20250506_203148.joblib"
10
+ MODEL_FILENAME = "phishing_nb_model_20250506_203148.joblib"
11
+ PREPROCESSOR_PATH = os.path.join(ASSETS_DIR, PREPROCESSOR_FILENAME)
12
+ MODEL_PATH = os.path.join(ASSETS_DIR, MODEL_FILENAME)
13
+
14
+ nb_preprocessor = None
15
+ nb_model = None
16
+ lime_explainer_nb = None
17
+
18
+ try:
19
+ nb_preprocessor = joblib.load(PREPROCESSOR_PATH)
20
+ nb_model = joblib.load(MODEL_PATH)
21
+ lime_explainer_nb = LimeTextExplainer(class_names=CLASS_NAMES)
22
+ print("Multinomial NB model, Preprocessor, and LIME Explainer loaded successfully.")
23
+ except FileNotFoundError:
24
+ print(f"FATAL ERROR (Naive Bayes): Could not find model ('{MODEL_PATH}') or nb_preprocessor ('{PREPROCESSOR_PATH}').")
25
+ print("Ensure files are in 'app/assets/' and filenames are correct.")
26
+ except Exception as e:
27
+ print(f"Error loading Multinomial NB model/preprocessor or initializing LIME: {e}")
28
+
29
+ def model_predict_probability_for_lime(combined_texts):
30
+ if nb_preprocessor is None or nb_model is None:
31
+ return np.array([[0.5, 0.5]] * len(combined_texts))
32
+
33
+ subjects = []
34
+ senders = []
35
+ bodies = []
36
+
37
+ for combined_text in combined_texts:
38
+ s_marker = "subject: "
39
+ d_marker = " sender: "
40
+ b_marker = " body: "
41
+
42
+ s_text, d_text, b_text = "", "", ""
43
+
44
+ if d_marker in combined_text:
45
+ s_text_part, rest = combined_text.split(d_marker, 1)
46
+ if s_marker in s_text_part:
47
+ s_text = s_text_part.replace(s_marker, "").strip()
48
+
49
+ if b_marker in rest:
50
+ d_text_part, b_text_part = rest.split(b_marker, 1)
51
+ d_text = d_text_part.strip()
52
+ b_text = b_text_part.strip()
53
+ else:
54
+ d_text = rest.strip()
55
+ else:
56
+ if s_marker in combined_text and b_marker in combined_text :
57
+ s_text_part, b_text_part = combined_text.split(b_marker, 1)
58
+ s_text = s_text_part.replace(s_marker, "").strip()
59
+ b_text = b_text_part.strip()
60
+ elif s_marker in combined_text:
61
+ s_text = combined_text.replace(s_marker,"").strip()
62
+ else:
63
+ b_text = combined_text.strip()
64
+
65
+
66
+ subjects.append(simple_text_clean(s_text))
67
+ senders.append(simple_text_clean(d_text))
68
+ bodies.append(simple_text_clean(b_text))
69
+
70
+ data_for_lime = pd.DataFrame({
71
+ 'subject': subjects,
72
+ 'sender': senders,
73
+ 'body': bodies
74
+ })
75
+
76
+ try:
77
+ vectorized_input = nb_preprocessor.transform(data_for_lime)
78
+ probabilities = nb_model.predict_proba(vectorized_input)
79
+ return probabilities
80
+ except Exception as e:
81
+ print(f"Error in model_predict_probability_for_lime function during transform/predict: {e}")
82
+ return np.array([[0.5, 0.5]] * len(combined_texts))
83
+
84
+ def get_prediction_and_explanation_nb(subject: str, sender: str, body: str):
85
+ if nb_preprocessor is None or nb_model is None:
86
+ return {"error": "Model/Preprocessor not loaded. Check server logs.", "prediction": "Error", "label": -1, "confidence": 0.0, "explanation": []}
87
+
88
+ cleaned_subject = simple_text_clean(subject)
89
+ cleaned_sender = simple_text_clean(sender)
90
+ cleaned_body = simple_text_clean(body)
91
+
92
+ input_df_for_model = pd.DataFrame({
93
+ 'subject': [cleaned_subject],
94
+ 'sender': [cleaned_sender],
95
+ 'body': [cleaned_body]
96
+ })
97
+
98
+ try:
99
+ vectorized_input = nb_preprocessor.transform(input_df_for_model)
100
+ prediction_label_int = nb_model.predict(vectorized_input)[0]
101
+ probabilities = nb_model.predict_proba(vectorized_input)[0]
102
+
103
+ predicted_class_name = CLASS_NAMES[prediction_label_int]
104
+ confidence_score = probabilities[prediction_label_int]
105
+ except Exception as e:
106
+ return {"error": f"Prediction error: {e}", "prediction": "Error",
107
+ "label": -1, "confidence": 0.0, "explanation": []}
108
+
109
+ text_for_lime = f"{cleaned_subject} : {cleaned_sender} : {cleaned_body}"
110
+
111
+ explanation_data = []
112
+ try:
113
+ exp = lime_explainer_nb.explain_instance(
114
+ text_instance=text_for_lime,
115
+ classifier_fn=model_predict_probability_for_lime,
116
+ num_features=15,
117
+ top_labels=1,
118
+ labels=(prediction_label_int,)
119
+ )
120
+ explanation_data = exp.as_list(label=prediction_label_int)
121
+ print(f"LIME Explanation (Top 3): {explanation_data[:3]}")
122
+ except Exception as e:
123
+ print(f"LIME explanation error: {e}")
124
+ explanation_data = [("LIME explanation error or N/A", 0.0)]
125
+
126
+ return {
127
+ "prediction": predicted_class_name,
128
+ "label": int(prediction_label_int),
129
+ "confidence": float(confidence_score),
130
+ "explanation": explanation_data
131
+ }
app/ml_logic.py CHANGED
@@ -5,6 +5,9 @@ from lime.lime_text import LimeTextExplainer
5
  import numpy as np
6
  import os
7
 
 
 
 
8
  # Configure and setup model and preprocessor files
9
  ASSETS_DIR = os.path.join(os.path.dirname(__file__), 'assets')
10
  PREPROCESSOR_FILENAME = "email_preprocessor_20250506_203148.joblib"
@@ -27,6 +30,26 @@ except Exception as e:
27
  preprocessor = None
28
  model = None
29
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
30
  # Text cleaning function, makes everything lowercase, removed non alpha-numeric characters and normalize white spaces
31
  def simple_text_clean(text):
32
  if isinstance(text, str):
@@ -37,7 +60,7 @@ def simple_text_clean(text):
37
  text = ''
38
  return text
39
 
40
- # For explanability, LIME setup
41
  class_names = ['Legitimate', 'Phishing'] # 0: Legitimate, 1: Phishing
42
  explainer = LimeTextExplainer(class_names=class_names)
43
 
 
5
  import numpy as np
6
  import os
7
 
8
+ from transformers import AutoTokenizer, AutoModelForSequenceClassification
9
+ import torch
10
+
11
  # Configure and setup model and preprocessor files
12
  ASSETS_DIR = os.path.join(os.path.dirname(__file__), 'assets')
13
  PREPROCESSOR_FILENAME = "email_preprocessor_20250506_203148.joblib"
 
30
  preprocessor = None
31
  model = None
32
 
33
+ # --- Load BERT-mini model and tokenizer from Hugging Face Hub ---
34
+ # Replace with your actual Hugging Face model ID
35
+ BERT_MODEL_ID = "lleratodev/720-bert-mini-phishing" # e.g., "LeratoLetsepe/phishing-bert-mini"
36
+ try:
37
+ bert_tokenizer = AutoTokenizer.from_pretrained(BERT_MODEL_ID)
38
+ bert_model = AutoModelForSequenceClassification.from_pretrained(BERT_MODEL_ID)
39
+ bert_model.eval() # Set model to evaluation mode
40
+ print(f"BERT-mini model ('{BERT_MODEL_ID}') and tokenizer loaded successfully from Hugging Face Hub.")
41
+ # Determine device for BERT model (CPU by default, can be adapted for GPU)
42
+ device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
43
+ bert_model.to(device)
44
+ print(f"BERT model moved to device: {device}")
45
+
46
+ except Exception as e:
47
+ print(f"FATAL ERROR (BERT): Could not load model/tokenizer '{BERT_MODEL_ID}' from Hugging Face Hub: {e}")
48
+ print("Ensure the model ID is correct, you have an internet connection, and the model files are correctly set up on the Hub.")
49
+ bert_tokenizer = None
50
+ bert_model = None
51
+ # --- End BERT Loading ---
52
+
53
  # Text cleaning function, makes everything lowercase, removed non alpha-numeric characters and normalize white spaces
54
  def simple_text_clean(text):
55
  if isinstance(text, str):
 
60
  text = ''
61
  return text
62
 
63
+ # For explanability, LIME setup - # LIME probability function for MultinomialNB model
64
  class_names = ['Legitimate', 'Phishing'] # 0: Legitimate, 1: Phishing
65
  explainer = LimeTextExplainer(class_names=class_names)
66
 
requirements.txt CHANGED
@@ -1,10 +1,14 @@
1
  fastapi
2
  uvicorn[standard]
3
- scikit-learn
 
4
  pandas
5
  joblib
6
  scipy
7
- numpy
8
  lime
9
  python-multipart
10
- dill
 
 
 
 
 
1
  fastapi
2
  uvicorn[standard]
3
+ scikit-learn==1.5.1
4
+ numpy==1.26.4
5
  pandas
6
  joblib
7
  scipy
 
8
  lime
9
  python-multipart
10
+ dill
11
+
12
+ transformers
13
+ transformers-interpret
14
+ torch