princemaxp commited on
Commit
b00d456
·
verified ·
1 Parent(s): 1ad7be9

Update header_analyzer.py

Browse files
Files changed (1) hide show
  1. header_analyzer.py +112 -192
header_analyzer.py CHANGED
@@ -1,230 +1,150 @@
1
  import re
2
- import difflib
3
  import whois
4
- from datetime import datetime
5
-
6
- # -----------------------------
7
- # Brand intelligence (extendable)
8
- # -----------------------------
9
- BRAND_OFFICIAL = {
10
- "paypal": ["paypal.com"],
11
- "amazon": ["amazon.com"],
12
- "google": ["google.com", "gmail.com"],
13
- "microsoft": ["microsoft.com", "outlook.com", "live.com"],
14
- "apple": ["apple.com"],
15
- "flowtoscale": ["flowtoscale.com"],
16
- }
17
-
18
- SUSPICIOUS_TLDS = {"xyz", "top", "click", "work", "loan", "tk", "info"}
19
-
20
- FREE_EMAIL_PROVIDERS = {
21
- "gmail.com", "yahoo.com", "outlook.com", "hotmail.com", "aol.com"
22
- }
23
-
24
- # -----------------------------
25
  # Helpers
26
- # -----------------------------
27
- def get_domain_age_days(domain: str):
 
 
 
 
 
 
 
 
 
28
  try:
29
  w = whois.whois(domain)
30
- cd = w.creation_date
31
- if isinstance(cd, list):
32
- cd = cd[0]
33
- if isinstance(cd, str):
34
- try:
35
- cd = datetime.fromisoformat(cd)
36
- except Exception:
37
- return None
38
- if isinstance(cd, datetime):
39
- return (datetime.now() - cd).days
40
  except Exception:
41
- return None
42
  return None
43
 
 
 
 
 
 
 
44
 
45
- def extract_domain(header_value: str):
46
- if not header_value:
47
- return ""
48
- m = re.search(r'@([a-zA-Z0-9.-]+)', header_value)
49
- return m.group(1).lower() if m else ""
50
-
51
-
52
- def extract_display_name(from_header: str):
53
- if not from_header:
54
- return ""
55
- m = re.match(r'\"?([^"<]+)\"?\s*<', from_header)
56
- return m.group(1).strip().lower() if m else ""
57
-
58
-
59
- # -----------------------------
60
- # Authentication parsing (structured)
61
- # -----------------------------
62
- def parse_authentication_results(auth_header: str):
63
- results = {
64
- "spf": "unknown",
65
- "dkim": "unknown",
66
- "dmarc": "unknown",
67
- "dmarc_policy": "unknown",
68
- }
69
 
70
- if not auth_header:
71
- return results
72
 
73
- auth = auth_header.lower()
 
74
 
75
  for key in ["spf", "dkim", "dmarc"]:
76
- m = re.search(rf"{key}=(pass|fail|softfail|neutral|none|permerror|temperror)", auth)
77
  if m:
78
- results[key] = m.group(1)
79
-
80
- m = re.search(r"p=(none|quarantine|reject)", auth)
81
- if m:
82
- results["dmarc_policy"] = m.group(1)
83
-
84
- return results
85
-
86
-
87
- # -----------------------------
88
- # Display-name spoofing (BEC core)
89
- # -----------------------------
90
- def detect_display_name_spoof(display_name: str, from_domain: str):
91
- if not display_name or not from_domain:
92
- return None
93
 
94
- for brand, legit_domains in BRAND_OFFICIAL.items():
95
- if brand in display_name:
96
- if not any(from_domain.endswith(ld) for ld in legit_domains):
97
- return (
98
- f"Display-name spoofing detected: "
99
- f"'{display_name}' but sender domain is {from_domain}"
100
- )
101
- return None
102
 
 
103
 
104
- # -----------------------------
105
- # Main Analyzer
106
- # -----------------------------
107
- def analyze_headers(headers, body=""):
108
- """
109
- Returns:
110
- findings (list[str]),
111
- score (int),
112
- auth_summary (dict)
113
- """
114
 
115
- headers = headers or {}
116
  findings = []
117
  score = 0
118
 
119
- # -----------------------------
120
- # Extract headers
121
- # -----------------------------
122
- from_header = headers.get("From", "")
123
- reply_to = headers.get("Reply-To", "")
124
- return_path = headers.get("Return-Path", "")
125
- auth_raw = headers.get("Authentication-Results") or headers.get("Authentication-results") or ""
126
 
127
- from_domain = extract_domain(from_header)
128
  reply_domain = extract_domain(reply_to)
129
  return_domain = extract_domain(return_path)
130
 
131
- display_name = extract_display_name(from_header)
 
132
 
133
- # -----------------------------
134
- # AUTHENTICATION ANALYSIS
135
- # -----------------------------
136
- auth = parse_authentication_results(auth_raw)
137
 
138
- if auth["dmarc"] == "fail":
139
- findings.append("Authentication failure: DMARC failed")
140
- score += 40
141
-
142
- if auth["spf"] in {"fail", "softfail"}:
143
- findings.append("Authentication failure: SPF failed or soft-failed")
144
- score += 25
145
 
146
- if auth["dkim"] in {"fail", "permerror"}:
147
- findings.append("Authentication failure: DKIM failed")
148
- score += 25
149
 
150
- if auth["dmarc"] == "pass" and auth["dmarc_policy"] == "none":
151
- findings.append("Weak DMARC policy: p=none (monitoring only)")
152
  score += 10
153
 
154
- # -----------------------------
155
- # IDENTITY / BEC SIGNALS
156
- # -----------------------------
157
- spoof = detect_display_name_spoof(display_name, from_domain)
158
- if spoof:
159
- findings.append(spoof)
160
- score += 45 # High confidence BEC signal
161
-
162
- if from_domain and reply_domain and from_domain != reply_domain:
163
- findings.append(
164
- f"Reply-To mismatch: From domain {from_domain}, Reply-To domain {reply_domain}"
165
- )
166
  score += 25
167
 
168
- if return_domain and from_domain and return_domain != from_domain:
169
- findings.append(
170
- f"Return-Path mismatch: From domain {from_domain}, Return-Path {return_domain}"
171
- )
172
- score += 20
173
 
174
- # -----------------------------
175
- # DOMAIN REPUTATION & AGE
176
- # -----------------------------
177
  if from_domain:
178
- if from_domain in FREE_EMAIL_PROVIDERS:
179
- findings.append(f"Free email provider used ({from_domain})")
180
- score += 10
181
-
182
- parts = from_domain.split(".")
183
- tld = parts[-1]
 
 
 
 
184
 
185
- if tld in SUSPICIOUS_TLDS:
186
- findings.append(f"Suspicious or abused TLD detected ({tld})")
 
 
 
187
  score += 15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
188
 
189
- age_days = get_domain_age_days(from_domain)
190
- if age_days is not None:
191
- if age_days < 14:
192
- findings.append(f"Domain is extremely new ({age_days} days old)")
193
- score += 40
194
- elif age_days < 30:
195
- findings.append(f"Domain is newly registered ({age_days} days old)")
196
- score += 25
197
- elif age_days < 90:
198
- findings.append(f"Domain is relatively new ({age_days} days old)")
199
- score += 10
200
-
201
- # Look-alike domain detection (smarter)
202
- for brand, legit_domains in BRAND_OFFICIAL.items():
203
- for legit in legit_domains:
204
- ratio = difflib.SequenceMatcher(None, from_domain, legit).ratio()
205
- if 0.75 <= ratio < 0.95 and from_domain != legit:
206
- findings.append(
207
- f"Possible look-alike domain spoofing ({from_domain} vs {legit})"
208
- )
209
- score += 30
210
-
211
- # -----------------------------
212
- # HEADER ANOMALIES (LOW NOISE)
213
- # -----------------------------
214
- if headers.get("Bcc") or headers.get("bcc"):
215
- findings.append("BCC header present (mass-mailing / phishing indicator)")
216
- score += 8
217
-
218
- if not headers.get("Message-ID"):
219
- findings.append("Missing Message-ID header")
220
- score += 8
221
-
222
- # -----------------------------
223
- # FINALIZE
224
- # -----------------------------
225
- score = int(max(0, min(score, 100)))
226
-
227
- if not findings:
228
- return ["No suspicious issues found in headers."], 0, auth
229
-
230
- return findings, score, auth
 
1
  import re
2
+ import email
3
  import whois
4
+ from email.utils import parseaddr
5
+ from datetime import datetime, timezone
6
+
7
+ # ---------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8
  # Helpers
9
+ # ---------------------------
10
+
11
+ def extract_domain(addr: str):
12
+ if not addr:
13
+ return None
14
+ return addr.split("@")[-1].lower().strip()
15
+
16
+ def safe_lower(value):
17
+ return value.lower() if isinstance(value, str) else ""
18
+
19
+ def days_old(domain):
20
  try:
21
  w = whois.whois(domain)
22
+ created = w.creation_date
23
+ if isinstance(created, list):
24
+ created = created[0]
25
+ if created:
26
+ return (datetime.now(timezone.utc) - created).days
 
 
 
 
 
27
  except Exception:
28
+ pass
29
  return None
30
 
31
+ def looks_like_domain_spoof(display_name, from_domain):
32
+ if not display_name or not from_domain:
33
+ return False
34
+ dn = safe_lower(display_name)
35
+ fd = safe_lower(from_domain.split(".")[0])
36
+ return fd in dn and not dn.endswith(from_domain)
37
 
38
+ # ---------------------------
39
+ # AUTH RESULTS PARSING
40
+ # ---------------------------
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
+ def parse_auth_results(headers):
43
+ auth = {"spf": "unknown", "dkim": "unknown", "dmarc": "unknown", "policy": None}
44
 
45
+ raw = headers.get("Authentication-Results", "")
46
+ raw = safe_lower(raw)
47
 
48
  for key in ["spf", "dkim", "dmarc"]:
49
+ m = re.search(rf"{key}=(pass|fail|softfail|neutral|none)", raw)
50
  if m:
51
+ auth[key] = m.group(1)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
 
53
+ pol = re.search(r"dmarc=.*?policy=(none|quarantine|reject)", raw)
54
+ if pol:
55
+ auth["policy"] = pol.group(1)
 
 
 
 
 
56
 
57
+ return auth
58
 
59
+ # ---------------------------
60
+ # MAIN ANALYZER
61
+ # ---------------------------
 
 
 
 
 
 
 
62
 
63
+ def analyze_headers(headers: dict):
64
  findings = []
65
  score = 0
66
 
67
+ # --- Extract addresses ---
68
+ from_addr = parseaddr(headers.get("From", ""))[1]
69
+ reply_to = parseaddr(headers.get("Reply-To", ""))[1]
70
+ return_path = parseaddr(headers.get("Return-Path", ""))[1]
71
+ display_name = parseaddr(headers.get("From", ""))[0]
 
 
72
 
73
+ from_domain = extract_domain(from_addr)
74
  reply_domain = extract_domain(reply_to)
75
  return_domain = extract_domain(return_path)
76
 
77
+ # --- AUTH CHECKS ---
78
+ auth = parse_auth_results(headers)
79
 
80
+ if auth["spf"] in ["fail", "softfail"]:
81
+ findings.append("SPF authentication failed")
82
+ score += 15
 
83
 
84
+ if auth["dkim"] == "fail":
85
+ findings.append("DKIM authentication failed")
86
+ score += 15
 
 
 
 
87
 
88
+ if auth["dmarc"] == "fail":
89
+ findings.append("DMARC authentication failed")
90
+ score += 20
91
 
92
+ if auth["policy"] in ["quarantine", "reject"] and auth["dmarc"] != "pass":
93
+ findings.append("DMARC policy enforcement triggered")
94
  score += 10
95
 
96
+ # --- DISPLAY NAME SPOOFING ---
97
+ if looks_like_domain_spoof(display_name, from_domain):
98
+ findings.append("Display name spoofing detected")
99
+ score += 20
100
+
101
+ # --- BEC / REPLY-TO ---
102
+ if reply_domain and reply_domain != from_domain:
103
+ findings.append("Reply-To domain mismatch (possible BEC)")
 
 
 
 
104
  score += 25
105
 
106
+ if return_domain and return_domain != from_domain:
107
+ findings.append("Return-Path domain mismatch")
108
+ score += 15
 
 
109
 
110
+ # --- DOMAIN AGE ---
 
 
111
  if from_domain:
112
+ age = days_old(from_domain)
113
+ if age is not None and age < 90:
114
+ findings.append(f"Sender domain is newly registered ({age} days old)")
115
+ score += 20
116
+
117
+ # --- RECEIVED CHAIN ANALYSIS ---
118
+ received_headers = headers.get_all("Received", [])
119
+ if received_headers and len(received_headers) > 8:
120
+ findings.append("Unusually long Received header chain")
121
+ score += 10
122
 
123
+ if received_headers:
124
+ first = safe_lower(received_headers[-1])
125
+ last = safe_lower(received_headers[0])
126
+ if "localhost" in first or "127.0.0.1" in first:
127
+ findings.append("Suspicious mail origination (localhost)")
128
  score += 15
129
+ if "unknown" in last:
130
+ findings.append("Unknown sending host detected")
131
+ score += 10
132
+
133
+ # --- ALIGNMENT SUMMARY ---
134
+ if auth["spf"] == "pass" and auth["dkim"] == "pass" and auth["dmarc"] == "pass":
135
+ findings.append("Email authentication alignment passed")
136
+
137
+ # --- CLAMP SCORE ---
138
+ score = min(score, 100)
139
+
140
+ auth_summary = {
141
+ "SPF": auth["spf"],
142
+ "DKIM": auth["dkim"],
143
+ "DMARC": auth["dmarc"],
144
+ "DMARC Policy": auth["policy"] or "unknown",
145
+ "From Domain": from_domain,
146
+ "Reply-To Domain": reply_domain,
147
+ "Return-Path Domain": return_domain,
148
+ }
149
 
150
+ return findings, score, auth_summary