File size: 6,873 Bytes
bebe233
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
# ============================================================
# PhishGuard AI - email_analyzer.py
# Analyzes raw emails for phishing indicators.
# Checks: sender authentication (SPF/DKIM/DMARC),
#         brand spoofing, urgency language, and embedded links.
#
# Reuses BERT model from bert_analyzer to avoid duplicate loading.
# ============================================================

import email
import re
from email import policy
from email.parser import BytesParser, Parser

# Reuse the NLP analyzer from bert_analyzer
from bert_analyzer import analyze_text as bert_analyze_text, _ensure_bert_loaded
import bert_analyzer

print("[PhishGuard] Email analyzer initialized (reusing shared NLP)")

URGENCY_PATTERNS = [
    r'(act now|immediate action|urgent|verify immediately|account suspended)',
    r'(click here to (verify|confirm|update|restore))',
    r'(your account (will be|has been) (suspended|closed|deactivated))',
    r'(limited time|expires in \d+ hours?)',
    r'(unusual (sign-in|login|activity) detected)',
    r'(confirm your (identity|password|email|account))',
    r'(we noticed (suspicious|unusual|unauthorized))',
]

BRAND_SPOOFS = [
    'paypal','amazon','apple','microsoft','google','netflix',
    'facebook','instagram','linkedin','twitter','chase','wellsfargo',
    'bankofamerica','citibank','irs','fedex','ups','dhl',
    'dropbox','docusign','zoom','office365','hdfc','icici','sbi'
]


def parse_email_msg(raw):
    """Parse raw email bytes or string into an email.message object."""
    if isinstance(raw, bytes):
        return BytesParser(policy=policy.default).parsebytes(raw)
    return Parser(policy=policy.default).parsestr(raw)


def extract_urls(text: str) -> list:
    """Extract all unique HTTP/HTTPS URLs from text."""
    return list(set(re.findall(r'https?://[^\s<>"\'\\  ]+', text)))


def get_body(msg) -> str:
    """Extract plain text body from email message, falling back to HTML stripped of tags."""
    parts = []
    if msg.is_multipart():
        for part in msg.walk():
            ct = part.get_content_type()
            if ct == 'text/plain':
                try: parts.append(part.get_content())
                except: pass
            elif ct == 'text/html' and not parts:
                try: parts.append(re.sub(r'<[^>]+>', ' ', part.get_content()))
                except: pass
    else:
        try: parts.append(msg.get_content())
        except: pass
    return ' '.join(parts)


def check_sender_auth(msg) -> dict:
    """
    Check email authentication headers:
    - SPF (Sender Policy Framework)
    - DKIM (DomainKeys Identified Mail)
    - DMARC (Domain-based Message Authentication)
    - From/Return-Path domain mismatch
    - Free email provider usage
    """
    auth      = msg.get('Authentication-Results', '').lower()
    spf_raw   = msg.get('Received-SPF', '').lower()
    spf_pass  = 'spf=pass'  in auth or 'pass' in spf_raw
    dkim_pass = 'dkim=pass' in auth
    dmarc_pass= 'dmarc=pass'in auth

    from_addr   = msg.get('From', '')
    return_path = msg.get('Return-Path', '')
    from_dom    = re.search(r'@([\w.-]+)', from_addr)
    ret_dom     = re.search(r'@([\w.-]+)', return_path)
    mismatch    = bool(from_dom and ret_dom and
                       from_dom.group(1) != ret_dom.group(1))

    free = {'gmail.com','yahoo.com','hotmail.com','outlook.com','protonmail.com'}
    using_free = (from_dom.group(1).lower() in free) if from_dom else False

    risk = 0
    if not spf_pass:   risk += 25
    if not dkim_pass:  risk += 20
    if not dmarc_pass: risk += 15
    if mismatch:       risk += 30
    if using_free:     risk += 10

    return {
        "spf_pass": spf_pass, "dkim_pass": dkim_pass,
        "dmarc_pass": dmarc_pass, "domain_mismatch": mismatch,
        "using_free_email": using_free,
        "auth_risk_score": min(risk, 100)
    }


def check_brand_spoofing(subject: str, body: str, sender: str) -> dict:
    """Detect brand names mentioned in email content but not matching sender domain."""
    combined   = (subject + ' ' + body + ' ' + sender).lower()
    sender_dom = re.search(r'@([\w.-]+)', sender)
    s_dom      = sender_dom.group(1).lower() if sender_dom else ''
    spoofed    = [b for b in BRAND_SPOOFS
                  if b in combined and b not in s_dom]
    return {
        "brand_spoof_detected": bool(spoofed),
        "spoofed_brands": spoofed
    }


def check_urgency(text: str) -> dict:
    """Detect urgency/pressure language patterns typical of phishing emails."""
    matches = []
    for pat in URGENCY_PATTERNS:
        found = re.findall(pat, text.lower())
        matches.extend(found)
    return {
        "urgency_detected": bool(matches),
        "urgency_matches":  [str(m) for m in matches[:5]],
        "urgency_score":    min(len(matches) * 15, 60)
    }


def bert_score(text: str) -> float:
    """Run NLP classifier on email text and return phishing probability."""
    if not text.strip():
        return 0.1
    try:
        _ensure_bert_loaded()
        if bert_analyzer._use_bert and bert_analyzer._classifier is not None:
            result = bert_analyzer._classifier(text[:512])[0]
            label  = result['label'].upper()
            score  = result['score']
            return score if ('SPAM' in label or label == 'LABEL_1') else 1 - score
        else:
            # Use keyword analysis from bert_analyzer
            result = bert_analyze_text("", "", text)
            return result.get("bert_phishing_prob", 0.3)
    except:
        return 0.3


def analyze_email(raw, return_urls: bool = True) -> dict:
    """
    Full phishing analysis of a raw email.
    Pass raw bytes or a string of the full email.

    Combines: BERT NLP score + sender auth + brand spoofing + urgency detection.
    """
    msg     = parse_email_msg(raw)
    subject = msg.get('Subject', '')
    sender  = msg.get('From', '')
    body    = get_body(msg)
    urls    = extract_urls(body)

    auth    = check_sender_auth(msg)
    brand   = check_brand_spoofing(subject, body, sender)
    urgency = check_urgency(subject + ' ' + body)
    bert_p  = bert_score(subject + '. ' + body[:400])

    raw_score = (bert_p * 40 +
                 auth['auth_risk_score'] * 0.30 +
                 urgency['urgency_score'] * 0.20 +
                 (30 if brand['brand_spoof_detected'] else 0) * 0.10)
    final = min(raw_score / 100, 1.0)

    result = {
        "is_phishing":          final > 0.60,
        "phishing_probability": round(final, 4),
        "subject":              subject,
        "sender":               sender,
        "auth_analysis":        auth,
        "brand_analysis":       brand,
        "urgency_analysis":     urgency,
        "bert_score":           round(bert_p, 4),
        "extracted_url_count":  len(urls),
    }
    if return_urls:
        result["extracted_urls"] = urls[:20]
    return result