Spaces:
Running
Running
File size: 6,873 Bytes
bebe233 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 | # ============================================================
# PhishGuard AI - email_analyzer.py
# Analyzes raw emails for phishing indicators.
# Checks: sender authentication (SPF/DKIM/DMARC),
# brand spoofing, urgency language, and embedded links.
#
# Reuses BERT model from bert_analyzer to avoid duplicate loading.
# ============================================================
import email
import re
from email import policy
from email.parser import BytesParser, Parser
# Reuse the NLP analyzer from bert_analyzer
from bert_analyzer import analyze_text as bert_analyze_text, _ensure_bert_loaded
import bert_analyzer
print("[PhishGuard] Email analyzer initialized (reusing shared NLP)")
URGENCY_PATTERNS = [
r'(act now|immediate action|urgent|verify immediately|account suspended)',
r'(click here to (verify|confirm|update|restore))',
r'(your account (will be|has been) (suspended|closed|deactivated))',
r'(limited time|expires in \d+ hours?)',
r'(unusual (sign-in|login|activity) detected)',
r'(confirm your (identity|password|email|account))',
r'(we noticed (suspicious|unusual|unauthorized))',
]
BRAND_SPOOFS = [
'paypal','amazon','apple','microsoft','google','netflix',
'facebook','instagram','linkedin','twitter','chase','wellsfargo',
'bankofamerica','citibank','irs','fedex','ups','dhl',
'dropbox','docusign','zoom','office365','hdfc','icici','sbi'
]
def parse_email_msg(raw):
"""Parse raw email bytes or string into an email.message object."""
if isinstance(raw, bytes):
return BytesParser(policy=policy.default).parsebytes(raw)
return Parser(policy=policy.default).parsestr(raw)
def extract_urls(text: str) -> list:
"""Extract all unique HTTP/HTTPS URLs from text."""
return list(set(re.findall(r'https?://[^\s<>"\'\\ ]+', text)))
def get_body(msg) -> str:
"""Extract plain text body from email message, falling back to HTML stripped of tags."""
parts = []
if msg.is_multipart():
for part in msg.walk():
ct = part.get_content_type()
if ct == 'text/plain':
try: parts.append(part.get_content())
except: pass
elif ct == 'text/html' and not parts:
try: parts.append(re.sub(r'<[^>]+>', ' ', part.get_content()))
except: pass
else:
try: parts.append(msg.get_content())
except: pass
return ' '.join(parts)
def check_sender_auth(msg) -> dict:
"""
Check email authentication headers:
- SPF (Sender Policy Framework)
- DKIM (DomainKeys Identified Mail)
- DMARC (Domain-based Message Authentication)
- From/Return-Path domain mismatch
- Free email provider usage
"""
auth = msg.get('Authentication-Results', '').lower()
spf_raw = msg.get('Received-SPF', '').lower()
spf_pass = 'spf=pass' in auth or 'pass' in spf_raw
dkim_pass = 'dkim=pass' in auth
dmarc_pass= 'dmarc=pass'in auth
from_addr = msg.get('From', '')
return_path = msg.get('Return-Path', '')
from_dom = re.search(r'@([\w.-]+)', from_addr)
ret_dom = re.search(r'@([\w.-]+)', return_path)
mismatch = bool(from_dom and ret_dom and
from_dom.group(1) != ret_dom.group(1))
free = {'gmail.com','yahoo.com','hotmail.com','outlook.com','protonmail.com'}
using_free = (from_dom.group(1).lower() in free) if from_dom else False
risk = 0
if not spf_pass: risk += 25
if not dkim_pass: risk += 20
if not dmarc_pass: risk += 15
if mismatch: risk += 30
if using_free: risk += 10
return {
"spf_pass": spf_pass, "dkim_pass": dkim_pass,
"dmarc_pass": dmarc_pass, "domain_mismatch": mismatch,
"using_free_email": using_free,
"auth_risk_score": min(risk, 100)
}
def check_brand_spoofing(subject: str, body: str, sender: str) -> dict:
"""Detect brand names mentioned in email content but not matching sender domain."""
combined = (subject + ' ' + body + ' ' + sender).lower()
sender_dom = re.search(r'@([\w.-]+)', sender)
s_dom = sender_dom.group(1).lower() if sender_dom else ''
spoofed = [b for b in BRAND_SPOOFS
if b in combined and b not in s_dom]
return {
"brand_spoof_detected": bool(spoofed),
"spoofed_brands": spoofed
}
def check_urgency(text: str) -> dict:
"""Detect urgency/pressure language patterns typical of phishing emails."""
matches = []
for pat in URGENCY_PATTERNS:
found = re.findall(pat, text.lower())
matches.extend(found)
return {
"urgency_detected": bool(matches),
"urgency_matches": [str(m) for m in matches[:5]],
"urgency_score": min(len(matches) * 15, 60)
}
def bert_score(text: str) -> float:
"""Run NLP classifier on email text and return phishing probability."""
if not text.strip():
return 0.1
try:
_ensure_bert_loaded()
if bert_analyzer._use_bert and bert_analyzer._classifier is not None:
result = bert_analyzer._classifier(text[:512])[0]
label = result['label'].upper()
score = result['score']
return score if ('SPAM' in label or label == 'LABEL_1') else 1 - score
else:
# Use keyword analysis from bert_analyzer
result = bert_analyze_text("", "", text)
return result.get("bert_phishing_prob", 0.3)
except:
return 0.3
def analyze_email(raw, return_urls: bool = True) -> dict:
"""
Full phishing analysis of a raw email.
Pass raw bytes or a string of the full email.
Combines: BERT NLP score + sender auth + brand spoofing + urgency detection.
"""
msg = parse_email_msg(raw)
subject = msg.get('Subject', '')
sender = msg.get('From', '')
body = get_body(msg)
urls = extract_urls(body)
auth = check_sender_auth(msg)
brand = check_brand_spoofing(subject, body, sender)
urgency = check_urgency(subject + ' ' + body)
bert_p = bert_score(subject + '. ' + body[:400])
raw_score = (bert_p * 40 +
auth['auth_risk_score'] * 0.30 +
urgency['urgency_score'] * 0.20 +
(30 if brand['brand_spoof_detected'] else 0) * 0.10)
final = min(raw_score / 100, 1.0)
result = {
"is_phishing": final > 0.60,
"phishing_probability": round(final, 4),
"subject": subject,
"sender": sender,
"auth_analysis": auth,
"brand_analysis": brand,
"urgency_analysis": urgency,
"bert_score": round(bert_p, 4),
"extracted_url_count": len(urls),
}
if return_urls:
result["extracted_urls"] = urls[:20]
return result
|