princemaxp commited on
Commit
e99affa
·
verified ·
1 Parent(s): e9f9e54

Update header_analyzer.py

Browse files
Files changed (1) hide show
  1. header_analyzer.py +156 -158
header_analyzer.py CHANGED
@@ -1,167 +1,165 @@
 
 
1
  import re
2
- import difflib
3
- import whois
4
- from datetime import datetime
5
-
6
- # Official brand domains (extend as needed)
7
- BRAND_OFFICIAL = {
8
- "paypal": ["paypal.com"],
9
- "amazon": ["amazon.com"],
10
- "google": ["google.com", "gmail.com"],
11
- "microsoft": ["microsoft.com", "outlook.com", "live.com"],
12
- "apple": ["apple.com"],
13
- "flowtoscale": ["flowtoscale.com"], # Example from your case
14
- }
15
-
16
- # Suspicious / cheap TLDs often abused
17
- SUSPICIOUS_TLDS = {"info", "xyz", "top", "click", "work", "loan", "tk"}
18
-
19
- def get_domain_age_days(domain: str):
20
- """Return domain age in days (or None if lookup fails)."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21
  try:
22
- w = whois.whois(domain)
23
- creation_date = w.creation_date
24
- if isinstance(creation_date, list): # sometimes returns list
25
- creation_date = creation_date[0]
26
- if creation_date:
27
- return (datetime.now() - creation_date).days
 
 
28
  except Exception:
29
  return None
30
- return None
31
 
32
- def parse_auth_results(auth_header: str):
33
- """
34
- Parse the Authentication-Results header and return a readable summary.
35
- """
36
- auth_header = (auth_header or "").lower()
37
- findings = []
38
-
39
- if not auth_header:
40
- return "No Authentication-Results header found"
41
-
42
- # SPF
43
- if "spf=pass" in auth_header:
44
- findings.append("SPF passed")
45
- elif "spf=fail" in auth_header:
46
- findings.append("SPF failed")
47
-
48
- # DKIM
49
- if "dkim=pass" in auth_header:
50
- findings.append("DKIM passed")
51
- elif "dkim=fail" in auth_header or "dkim=permerror" in auth_header:
52
- findings.append("DKIM failed")
53
-
54
- # DMARC
55
- if "dmarc=pass" in auth_header:
56
- findings.append("DMARC passed")
57
- elif "dmarc=fail" in auth_header:
58
- findings.append("DMARC failed")
59
-
60
- if not findings:
61
- return "Authentication results unclear or missing"
62
-
63
- return ", ".join(findings)
64
-
65
- def analyze_headers(headers, body=""):
66
- """
67
- Input: headers dict, optional body text
68
- Output: (findings: list[str], score: int, auth_summary: str)
69
- """
70
  findings = []
71
  score = 0
72
- headers = headers or {}
73
-
74
- auth_results = (headers.get("Authentication-Results") or headers.get("Authentication-results") or "").lower()
75
-
76
- # Strict auth failures
77
- if "dkim=fail" in auth_results or "dkim=permerror" in auth_results:
78
- findings.append("Header: DKIM check failed")
79
- score += 30
80
- if "spf=fail" in auth_results:
81
- findings.append("Header: SPF check failed")
82
- score += 30
83
- if "dmarc=fail" in auth_results:
84
- findings.append("Header: DMARC check failed")
85
- score += 30
86
-
87
- # Softer auth problems
88
- if any(x in auth_results for x in ["spf=softfail", "spf=neutral", "spf=none"]):
89
- findings.append("Header: SPF not properly aligned")
90
  score += 10
91
- if any(x in auth_results for x in ["dmarc=temperror", "dkim=temperror"]):
92
- findings.append("Header: Temporary auth errors (DKIM/DMARC)")
93
- score += 5
94
-
95
- # From and Reply-To domain compare
96
- from_addr = headers.get("From", "") or ""
97
- reply_to = headers.get("Reply-To", "") or ""
98
- from_domain_m = re.search(r'@([a-zA-Z0-9.-]+)', from_addr)
99
- reply_domain_m = re.search(r'@([a-zA-Z0-9.-]+)', reply_to)
100
- if from_domain_m and reply_domain_m:
101
- from_domain = from_domain_m.group(1).lower()
102
- reply_domain = reply_domain_m.group(1).lower()
103
- if from_domain != reply_domain:
104
- findings.append(f"Header: Reply-To domain mismatch (From: {from_domain}, Reply-To: {reply_domain})")
105
- score += 20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
106
  else:
107
- from_domain = from_domain_m.group(1).lower() if from_domain_m else ""
108
-
109
- # Sender domain analysis
110
- if from_domain:
111
- parts = from_domain.split('.')
112
- tld = parts[-1]
113
-
114
- # free provider detection
115
- if from_domain in ["gmail.com", "yahoo.com", "outlook.com", "hotmail.com"]:
116
- findings.append(f"Header: Free email provider used ({from_domain})")
117
- score += 8
118
-
119
- # suspicious domain structure
120
- if len(parts) > 4 or (parts and any(ch.isdigit() for ch in parts[0])):
121
- findings.append(f"Header: Suspicious-looking domain structure ({from_domain})")
122
- score += 15
123
-
124
- # suspicious TLD
125
- if tld in SUSPICIOUS_TLDS:
126
- findings.append(f"Header: Suspicious/abused TLD used ({tld})")
127
- score += 20
128
-
129
- # Domain age check
130
- age_days = get_domain_age_days(from_domain)
131
- if age_days is not None and age_days < 90:
132
- findings.append(f"Header: Domain {from_domain} is very new ({age_days} days old)")
133
- score += 35
134
-
135
- # brand-squatting / look-alike check
136
- for brand, official_list in BRAND_OFFICIAL.items():
137
- if brand in from_domain:
138
- is_official = any(
139
- from_domain.endswith("." + off) or from_domain == off
140
- for off in official_list
141
- )
142
- if not is_official:
143
- findings.append(f"Header: Domain contains brand '{brand}' but is not official ({from_domain})")
144
- score += 30
145
-
146
- # fuzzy look-alike
147
- for legit in official_list:
148
- ratio = difflib.SequenceMatcher(None, from_domain, legit).ratio()
149
- if ratio > 0.7 and from_domain != legit:
150
- findings.append(f"Header: Possible look-alike spoofing ({from_domain} vs {legit})")
151
- score += 40
152
-
153
- # Content-to-domain mismatch (organization spoofing)
154
- if body and "ravenmail" in body.lower() and "ravenmail" not in from_domain:
155
- findings.append("Header/Content: Possible spoofing — mentions RavenMail but sender domain is unrelated")
156
- score += 40
157
-
158
- # Bcc usage
159
- if headers.get("Bcc") or headers.get("bcc"):
160
- findings.append("Header: Email sent with BCC (common in mass phishing)")
161
- score += 12
162
-
163
- if not findings:
164
- return ["No suspicious issues found in headers."], 0, "No Authentication-Results header found"
165
-
166
- # Return findings, cumulative score, and parsed authentication summary
167
- return findings, score, parse_auth_results(auth_results)
 
1
+ # body_analyzer.py
2
+ import os
3
  import re
4
+ import requests
5
+ from typing import List
6
+
7
+ HF_API_KEY = os.getenv("HF_API_KEY")
8
+ HF_HEADERS = {"Authorization": f"Bearer {HF_API_KEY}"} if HF_API_KEY else {}
9
+ HF_TIMEOUT = 20 # seconds
10
+
11
+ # ML model names
12
+ PHISHING_MODEL = "cybersectony/phishing-email-detection-distilbert_v2.4.1"
13
+ ZERO_SHOT_MODEL = "facebook/bart-large-mnli" # for intent/behavior
14
+
15
+ # Suspicious phrase patterns
16
+ SUSPICIOUS_PATTERNS = [
17
+ "verify your account",
18
+ "urgent action",
19
+ "click here",
20
+ "reset password",
21
+ "confirm your identity",
22
+ "bank account",
23
+ "invoice",
24
+ "payment required",
25
+ "unauthorized login",
26
+ "compromised",
27
+ "final reminder",
28
+ "account suspended",
29
+ "account deactivated",
30
+ "update your information",
31
+ "legal action",
32
+ "limited time offer",
33
+ "claim your prize",
34
+ "verify immediately",
35
+ "verify now",
36
+ "verify your credentials",
37
+ ]
38
+
39
+ # Zero-shot candidate labels for intent/behavior
40
+ BEHAVIOR_LABELS = [
41
+ "credential harvesting",
42
+ "invoice/payment fraud",
43
+ "marketing",
44
+ "benign",
45
+ "malware",
46
+ "account takeover",
47
+ ]
48
+
49
+ def _call_hf_text_model(model_name: str, text: str):
50
+ if not HF_API_KEY:
51
+ return None
52
  try:
53
+ payload = {"inputs": text}
54
+ res = requests.post(
55
+ f"https://api-inference.huggingface.co/models/{model_name}",
56
+ headers=HF_HEADERS,
57
+ json=payload,
58
+ timeout=HF_TIMEOUT,
59
+ )
60
+ return res.json()
61
  except Exception:
62
  return None
 
63
 
64
+ def _call_hf_zero_shot(text: str, candidate_labels: List[str]):
65
+ if not HF_API_KEY:
66
+ return None
67
+ try:
68
+ payload = {"inputs": text, "parameters": {"candidate_labels": candidate_labels}}
69
+ res = requests.post(
70
+ f"https://api-inference.huggingface.co/models/{ZERO_SHOT_MODEL}",
71
+ headers=HF_HEADERS,
72
+ json=payload,
73
+ timeout=HF_TIMEOUT,
74
+ )
75
+ return res.json()
76
+ except Exception:
77
+ return None
78
+
79
+ def _parse_hf_phishing_model_output(result):
80
+ if not result:
81
+ return None, 0.0, {}
82
+ if isinstance(result, list) and result and isinstance(result[0], dict):
83
+ r0 = result[0]
84
+ label = r0.get("label")
85
+ score = r0.get("score", 0.0)
86
+ return label, float(score), {label: float(score)}
87
+ if isinstance(result, dict):
88
+ labels = result.get("labels") or result.get("label") or []
89
+ scores = result.get("scores") or result.get("score") or []
90
+ if isinstance(labels, list) and isinstance(scores, list) and labels and scores:
91
+ all_probs = {lab: float(sc) for lab, sc in zip(labels, scores)}
92
+ max_lab = max(all_probs.items(), key=lambda x: x[1])
93
+ return max_lab[0], float(max_lab[1]), all_probs
94
+ return None, 0.0, {}
95
+
96
+ def analyze_body(subject: str, body: str, urls: list, images: list):
 
 
 
 
 
97
  findings = []
98
  score = 0
99
+ highlighted_body = (body or "")
100
+
101
+ combined_lower = ((subject or "") + "\n" + (body or "")).lower()
102
+ for pattern in SUSPICIOUS_PATTERNS:
103
+ if pattern in combined_lower:
104
+ findings.append(f"Suspicious phrase detected: \"{pattern}\"")
105
+ score += 18
106
+ try:
107
+ highlighted_body = re.sub(re.escape(pattern), f"<mark>{pattern}</mark>", highlighted_body, flags=re.IGNORECASE)
108
+ except Exception:
109
+ pass
110
+
111
+ # URL checks
112
+ for u in urls or []:
113
+ findings.append(f"Suspicious URL detected: {u}")
 
 
 
114
  score += 10
115
+ try:
116
+ highlighted_body = re.sub(re.escape(u), f"<mark>{u}</mark>", highlighted_body, flags=re.IGNORECASE)
117
+ except Exception:
118
+ pass
119
+
120
+ # ML phishing model
121
+ ml_label = None
122
+ ml_conf = 0.0
123
+ model_input = "\n".join([subject or "", body or "", "\n".join(urls or [])]).strip()
124
+ if model_input and HF_API_KEY:
125
+ raw = _call_hf_text_model(PHISHING_MODEL, model_input)
126
+ label, conf, _ = _parse_hf_phishing_model_output(raw)
127
+ if label:
128
+ ml_label = label
129
+ ml_conf = conf
130
+ findings.append(f"HuggingFace phishing model → {label} (conf {conf:.2f})")
131
+ score += int(conf * 100 * 0.9)
132
+
133
+ # Zero-shot behavior
134
+ behavior = None
135
+ behavior_conf = 0.0
136
+ if HF_API_KEY and model_input:
137
+ zs = _call_hf_zero_shot(model_input, BEHAVIOR_LABELS)
138
+ try:
139
+ if isinstance(zs, dict) and "labels" in zs and "scores" in zs:
140
+ behavior = zs["labels"][0]
141
+ behavior_conf = float(zs["scores"][0])
142
+ findings.append(f"Behavior inference → {behavior} (conf {behavior_conf:.2f})")
143
+ if behavior_conf >= 0.7:
144
+ score += int(behavior_conf * 30)
145
+ except Exception:
146
+ pass
147
+
148
+ if ml_conf >= 0.8 and ("phishing" in (ml_label or "").lower()):
149
+ score = max(score, 80)
150
+
151
+ score = int(max(0, min(score, 100)))
152
+
153
+ # Verdict
154
+ if score >= 70:
155
+ verdict = "🚨 Malicious"
156
+ elif 50 <= score < 70:
157
+ verdict = "⚠️ Suspicious"
158
+ elif 30 <= score < 50:
159
+ verdict = "📩 Spam"
160
  else:
161
+ verdict = "✅ Safe"
162
+ findings.append("No strong phishing signals detected by models/heuristics.")
163
+
164
+ # Return exactly 4 values
165
+ return findings, score, highlighted_body, verdict