princemaxp commited on
Commit
089b374
·
verified ·
1 Parent(s): c3f2fbf

Update body_analyzer.py

Browse files
Files changed (1) hide show
  1. body_analyzer.py +74 -189
body_analyzer.py CHANGED
@@ -1,60 +1,42 @@
1
- # body_analyzer.py
2
  import os
3
  import re
4
  import requests
5
- import base64
6
- import io
7
  from typing import List
8
 
9
  HF_API_KEY = os.getenv("HF_API_KEY")
10
  HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
11
  HF_TIMEOUT = 20 # seconds
12
 
13
- # ML model names
14
- PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1"
 
 
 
15
  ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
16
 
17
- # Suspicious phrase patterns (lowercased when matching)
18
  SUSPICIOUS_PATTERNS = [
19
- "verify your account",
20
- "urgent action",
21
- "click here",
22
- "reset password",
23
- "confirm your identity",
24
- "bank account",
25
- "invoice",
26
- "payment required",
27
- "unauthorized login",
28
- "compromised",
29
- "final reminder",
30
- "account suspended",
31
- "account deactivated",
32
- "update your information",
33
- "legal action",
34
- "limited time offer",
35
- "claim your prize",
36
- "verify immediately",
37
- "verify now",
38
- "verify your credentials",
39
  ]
40
 
41
  # zero-shot candidate labels for message behavior
42
  BEHAVIOR_LABELS = [
43
- "credential harvesting",
44
- "invoice/payment fraud",
45
- "marketing",
46
- "benign",
47
- "malware",
48
- "account takeover",
49
  ]
50
 
51
  def _call_hf_text_model(model_name: str, text: str):
52
- """Call HF Inference API for text. Return raw JSON or None on failure."""
53
  if not HF_API_KEY:
54
  return None
55
  try:
56
  payload = {"inputs": text}
57
- # For zero-shot, caller will pass parameters in payload if needed
58
  res = requests.post(
59
  f"https://api-inference.huggingface.co/models/{model_name}",
60
  headers=HF_HEADERS,
@@ -66,6 +48,7 @@ def _call_hf_text_model(model_name: str, text: str):
66
  return None
67
 
68
  def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
 
69
  if not HF_API_KEY:
70
  return None
71
  try:
@@ -80,107 +63,35 @@ def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
80
  except Exception:
81
  return None
82
 
83
- def _call_hf_image_ocr(model_name: str, image_bytes: bytes):
84
- """
85
- Call HF image OCR model endpoint. Returns string or None.
86
- Uses raw bytes upload: content-type application/octet-stream body.
87
- """
88
- if not HF_API_KEY:
89
- return None
90
- try:
91
- headers = HF_HEADERS.copy()
92
- headers["Content-Type"] = "application/octet-stream"
93
- res = requests.post(
94
- f"https://api-inference.huggingface.co/models/{model_name}",
95
- headers=headers,
96
- data=image_bytes,
97
- timeout=HF_TIMEOUT,
98
- )
99
- # Many vision models return {"generated_text": "..."} or list; attempt to parse common shapes
100
- data = res.json()
101
- if isinstance(data, dict):
102
- # TrOCR-style may return {"generated_text": "..."}
103
- if "generated_text" in data:
104
- return data["generated_text"]
105
- # Some OCR endpoints may return list of dicts
106
- if isinstance(data, list) and data and isinstance(data[0], dict):
107
- # choose text-like fields if present
108
- candidate = data[0].get("generated_text") or data[0].get("text") or data[0].get("caption")
109
- return candidate
110
- # fallback: try string concatenation if possible
111
- if isinstance(data, str):
112
- return data
113
- except Exception:
114
- pass
115
- return None
116
-
117
- # local pytesseract fallback
118
- def _ocr_local_pytesseract(image_bytes):
119
- try:
120
- from PIL import Image
121
- import pytesseract
122
- import io
123
- image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
124
- text = pytesseract.image_to_string(image)
125
- return text
126
- except Exception:
127
- return None
128
-
129
- def _parse_hf_phishing_model_output(result):
130
- """
131
- Expected: model may return list of logits/probs. Try common shapes.
132
- Returns: label:str, confidence:float (0..1), all_probs:dict
133
- """
134
  if not result:
135
- return None, 0.0, {}
136
- # if list of dicts with label & score
137
  if isinstance(result, list) and len(result) > 0 and isinstance(result[0], dict):
138
- r0 = result[0]
139
- label = r0.get("label")
140
- score = r0.get("score", 0.0)
141
- return label, float(score or 0.0), {label: float(score or 0.0)}
142
- # if dict with labels & scores
143
- if isinstance(result, dict):
144
- # sometimes returns {'labels': [...], 'scores': [...]}
145
- labels = result.get("labels") or result.get("label") or []
146
- scores = result.get("scores") or result.get("score") or []
147
- if isinstance(labels, list) and isinstance(scores, list) and labels and scores:
148
- all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)}
149
- # pick max
150
- max_lab = max(all_probs.items(), key=lambda x: x[1])
151
- return max_lab[0], float(max_lab[1]), all_probs
152
- return None, 0.0, {}
153
 
154
  def analyze_body(subject: str, body: str, urls: list, images: list):
155
- """
156
- Inputs:
157
- subject: email subject (str)
158
- body: plaintext combined body (str)
159
- urls: list of urls
160
- images: list of image bytes
161
- Returns:
162
- findings (list[str]), score (int 0..100), highlighted_body (str), verdict (str)
163
- """
164
  findings = []
165
  score = 0
166
- highlighted_body = (body or "") # will attempt to highlight suspicious text/URLs
 
167
 
168
- # 1) Basic heuristics on subject + body
169
- combined_lower = ((subject or "") + "\n" + (body or "")).lower()
170
  for pattern in SUSPICIOUS_PATTERNS:
171
- if pattern in combined_lower:
172
  findings.append(f"Suspicious phrase detected: \"{pattern}\"")
173
- # weight subject phrases more heavily
174
- if pattern in (subject or "").lower():
175
- score += 30
176
- else:
177
- score += 18
178
  try:
179
  highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
180
  except Exception:
181
  pass
182
 
183
- # 2) URL heuristics (always include)
184
  for u in urls or []:
185
  findings.append(f"Suspicious URL detected: {u}")
186
  score += 10
@@ -188,7 +99,6 @@ def analyze_body(subject: str, body: str, urls: list, images: list):
188
  highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
189
  except Exception:
190
  pass
191
- # suspicious domain structure bump
192
  domain_match = re.search(r"https?://([^/]+)/?", u)
193
  if domain_match:
194
  domain = domain_match.group(1)
@@ -196,76 +106,40 @@ def analyze_body(subject: str, body: str, urls: list, images: list):
196
  findings.append(f"URL: suspicious-looking domain {domain}")
197
  score += 10
198
 
199
- # 3) OCR images
200
- ocr_texts = []
201
- if images:
202
- for img_bytes in images:
203
- text = None
204
- # Prefer HF TrOCR-like endpoint if HF_API_KEY provided
205
- if HF_API_KEY:
206
- # try a well-known OCR-capable model; TrOCR base is a candidate
207
- ocr_result = _call_hf_image_ocr("microsoft/trocr-base-stage1", img_bytes)
208
- if ocr_result:
209
- text = ocr_result
210
- if not text:
211
- # fallback to local pytesseract
212
- text = _ocr_local_pytesseract(img_bytes)
213
- if text:
214
- ocr_texts.append(text)
215
- findings.append("OCR: extracted text from image.")
216
- # add small heuristic score for OCR results
217
- lower = text.lower()
218
- for pat in SUSPICIOUS_PATTERNS:
219
- if pat in lower:
220
- findings.append(f"OCR: suspicious phrase in image -> \"{pat}\"")
221
- score += 20
222
-
223
- # 4) ML phishing model (Hugging Face)
224
- ml_label = None
225
- ml_conf = 0.0
226
- ml_all = {}
227
- model_input = "\n".join([subject or "", body or "", "\n".join(urls or []), "\n".join(ocr_texts or [])]).strip()
228
- if model_input and HF_API_KEY:
229
- raw = _call_hf_text_model(PHISHING_MODEL, model_input)
230
- label, conf, allp = _parse_hf_phishing_model_output(raw)
231
- if label:
232
- ml_label = label
233
- ml_conf = conf
234
- ml_all = allp
235
- findings.append(f"HuggingFace phishing model → {label} (conf {conf:.2f})")
236
- # confidence scaled to score (but cap)
237
- score += int(conf * 100 * 0.9) # slightly reduce to avoid double-counting
238
-
239
- # 5) Zero-shot behavior intent model (when HF available)
240
- behavior = None
241
  behavior_conf = 0.0
242
  if HF_API_KEY and model_input:
243
  zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
244
- try:
245
- if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
246
- best_label = zs["labels"][0]
247
- best_score = float(zs["scores"][0])
248
- behavior = best_label
249
- behavior_conf = best_score
250
- findings.append(f"Behavior inference → {behavior} (conf {behavior_conf:.2f})")
251
- # add modest boost for strong behavior confidence
252
- if behavior_conf >= 0.7:
253
- score += int(behavior_conf * 30)
254
- except Exception:
255
- pass
256
-
257
- # 6) Final heuristics fallbacks
258
- # If ML already strongly flagged phishing, ensure high score
259
- if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()):
260
- score = max(score, 80)
261
 
262
- # clamp
263
- try:
264
- score = int(max(0, min(score, 100)))
265
- except Exception:
266
- score = 0
267
 
268
- # Final verdict mapping (tunable)
269
  if score >= 70:
270
  verdict = "🚨 Malicious"
271
  elif 50 <= score < 70:
@@ -276,5 +150,16 @@ def analyze_body(subject: str, body: str, urls: list, images: list):
276
  verdict = "✅ Safe"
277
  findings.append("No strong phishing signals detected by models/heuristics.")
278
 
279
- # Return findings, score, highlighted body (with possible <mark> tags), verdict
280
- return findings, score, highlighted_body, verdict
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # body_analyzer_v2.py
2
  import os
3
  import re
4
  import requests
 
 
5
  from typing import List
6
 
7
  HF_API_KEY = os.getenv("HF_API_KEY")
8
  HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
9
  HF_TIMEOUT = 20 # seconds
10
 
11
+ # Hugging Face model names
12
+ PHISHING_MODELS = [
13
+ "cybersectony/phishing-email-detection-distilbert_v2.4.1",
14
+ "ealvaradob/bert-finetuned-phishing"
15
+ ]
16
  ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
17
 
18
+ # Suspicious phrase patterns
19
  SUSPICIOUS_PATTERNS = [
20
+ "verify your account", "urgent action", "click here", "reset password",
21
+ "confirm your identity", "bank account", "invoice", "payment required",
22
+ "unauthorized login", "compromised", "final reminder", "account suspended",
23
+ "account deactivated", "update your information", "legal action",
24
+ "limited time offer", "claim your prize", "verify immediately",
25
+ "verify now", "verify your credentials",
 
 
 
 
 
 
 
 
 
 
 
 
 
 
26
  ]
27
 
28
  # zero-shot candidate labels for message behavior
29
  BEHAVIOR_LABELS = [
30
+ "credential harvesting", "invoice/payment fraud", "marketing",
31
+ "benign", "malware", "account takeover",
 
 
 
 
32
  ]
33
 
34
  def _call_hf_text_model(model_name: str, text: str):
35
+ """Call HF Inference API for text classification"""
36
  if not HF_API_KEY:
37
  return None
38
  try:
39
  payload = {"inputs": text}
 
40
  res = requests.post(
41
  f"https://api-inference.huggingface.co/models/{model_name}",
42
  headers=HF_HEADERS,
 
48
  return None
49
 
50
  def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
51
+ """Zero-shot classification for email behavior/intent"""
52
  if not HF_API_KEY:
53
  return None
54
  try:
 
63
  except Exception:
64
  return None
65
 
66
+ def _parse_hf_model_output(result):
67
+ """Extract label and confidence from HF output"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
68
  if not result:
69
+ return None, 0.0
 
70
  if isinstance(result, list) and len(result) > 0 and isinstance(result[0], dict):
71
+ label = result[0].get("label")
72
+ score = result[0].get("score", 0.0)
73
+ return label, float(score or 0.0)
74
+ if isinstance(result, dict) and "labels" in result and "scores" in result:
75
+ return result["labels"][0], float(result["scores"][0])
76
+ return None, 0.0
 
 
 
 
 
 
 
 
 
77
 
78
  def analyze_body(subject: str, body: str, urls: list, images: list):
 
 
 
 
 
 
 
 
 
79
  findings = []
80
  score = 0
81
+ highlighted_body = body or ""
82
+ combined_text = f"{subject}\n{body}".lower()
83
 
84
+ # 1) Basic heuristics: suspicious phrases
 
85
  for pattern in SUSPICIOUS_PATTERNS:
86
+ if pattern in combined_text:
87
  findings.append(f"Suspicious phrase detected: \"{pattern}\"")
88
+ score += 30 if pattern in (subject or "").lower() else 18
 
 
 
 
89
  try:
90
  highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
91
  except Exception:
92
  pass
93
 
94
+ # 2) URL heuristics
95
  for u in urls or []:
96
  findings.append(f"Suspicious URL detected: {u}")
97
  score += 10
 
99
  highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
100
  except Exception:
101
  pass
 
102
  domain_match = re.search(r"https?://([^/]+)/?", u)
103
  if domain_match:
104
  domain = domain_match.group(1)
 
106
  findings.append(f"URL: suspicious-looking domain {domain}")
107
  score += 10
108
 
109
+ # 3) ML Phishing detection using multiple HF models
110
+ ml_labels = []
111
+ ml_confidences = []
112
+ model_input = "\n".join([subject or "", body or ""] + (urls or []))
113
+ for phish_model in PHISHING_MODELS:
114
+ if HF_API_KEY and model_input:
115
+ result = _call_hf_text_model(phish_model, model_input)
116
+ label, conf = _parse_hf_model_output(result)
117
+ if label:
118
+ findings.append(f"HF phishing model ({phish_model}) → {label} (conf {conf:.2f})")
119
+ ml_labels.append(label)
120
+ ml_confidences.append(conf)
121
+ # Take the max confidence phishing prediction
122
+ if ml_confidences:
123
+ max_idx = ml_confidences.index(max(ml_confidences))
124
+ if "phish" in (ml_labels[max_idx] or "").lower():
125
+ score += int(ml_confidences[max_idx] * 100 * 0.9)
126
+
127
+ # 4) Zero-shot intent/behavior classification
128
+ behavior_label = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
129
  behavior_conf = 0.0
130
  if HF_API_KEY and model_input:
131
  zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
132
+ if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
133
+ behavior_label = zs["labels"][0]
134
+ behavior_conf = float(zs["scores"][0])
135
+ findings.append(f"Behavior inference → {behavior_label} (conf {behavior_conf:.2f})")
136
+ if behavior_conf >= 0.7:
137
+ score += int(behavior_conf * 30)
 
 
 
 
 
 
 
 
 
 
 
138
 
139
+ # 5) Final score clamping
140
+ score = max(0, min(score, 100))
 
 
 
141
 
142
+ # 6) Verdict
143
  if score >= 70:
144
  verdict = "🚨 Malicious"
145
  elif 50 <= score < 70:
 
150
  verdict = "✅ Safe"
151
  findings.append("No strong phishing signals detected by models/heuristics.")
152
 
153
+ # 7) Richer textual summary (like your example)
154
+ summary = f"""
155
+ Email analysis summary:
156
+ - Subject: {subject}
157
+ - Body length: {len(body)} chars
158
+ - Detected behavior/intent: {behavior_label} (conf {behavior_conf:.2f})
159
+ - Top phishing alert: {ml_labels[max_idx] if ml_labels else 'None'}
160
+ - Suspicious phrases found: {len([f for f in findings if 'Suspicious phrase' in f])}
161
+ - Total score: {score}/100
162
+ Verdict: {verdict}
163
+ """
164
+
165
+ return findings, score, highlighted_body, verdict, summary