# ============================================================ # PhishGuard AI - email_analyzer.py # Analyzes raw emails for phishing indicators. # Checks: sender authentication (SPF/DKIM/DMARC), # brand spoofing, urgency language, and embedded links. # # Reuses BERT model from bert_analyzer to avoid duplicate loading. # ============================================================ import email import re from email import policy from email.parser import BytesParser, Parser # Reuse the NLP analyzer from bert_analyzer from bert_analyzer import analyze_text as bert_analyze_text, _ensure_bert_loaded import bert_analyzer print("[PhishGuard] Email analyzer initialized (reusing shared NLP)") URGENCY_PATTERNS = [ r'(act now|immediate action|urgent|verify immediately|account suspended)', r'(click here to (verify|confirm|update|restore))', r'(your account (will be|has been) (suspended|closed|deactivated))', r'(limited time|expires in \d+ hours?)', r'(unusual (sign-in|login|activity) detected)', r'(confirm your (identity|password|email|account))', r'(we noticed (suspicious|unusual|unauthorized))', ] BRAND_SPOOFS = [ 'paypal','amazon','apple','microsoft','google','netflix', 'facebook','instagram','linkedin','twitter','chase','wellsfargo', 'bankofamerica','citibank','irs','fedex','ups','dhl', 'dropbox','docusign','zoom','office365','hdfc','icici','sbi' ] def parse_email_msg(raw): """Parse raw email bytes or string into an email.message object.""" if isinstance(raw, bytes): return BytesParser(policy=policy.default).parsebytes(raw) return Parser(policy=policy.default).parsestr(raw) def extract_urls(text: str) -> list: """Extract all unique HTTP/HTTPS URLs from text.""" return list(set(re.findall(r'https?://[^\s<>"\'\\ ]+', text))) def get_body(msg) -> str: """Extract plain text body from email message, falling back to HTML stripped of tags.""" parts = [] if msg.is_multipart(): for part in msg.walk(): ct = part.get_content_type() if ct == 'text/plain': try: parts.append(part.get_content()) except: pass elif ct == 'text/html' and not parts: try: parts.append(re.sub(r'<[^>]+>', ' ', part.get_content())) except: pass else: try: parts.append(msg.get_content()) except: pass return ' '.join(parts) def check_sender_auth(msg) -> dict: """ Check email authentication headers: - SPF (Sender Policy Framework) - DKIM (DomainKeys Identified Mail) - DMARC (Domain-based Message Authentication) - From/Return-Path domain mismatch - Free email provider usage """ auth = msg.get('Authentication-Results', '').lower() spf_raw = msg.get('Received-SPF', '').lower() spf_pass = 'spf=pass' in auth or 'pass' in spf_raw dkim_pass = 'dkim=pass' in auth dmarc_pass= 'dmarc=pass'in auth from_addr = msg.get('From', '') return_path = msg.get('Return-Path', '') from_dom = re.search(r'@([\w.-]+)', from_addr) ret_dom = re.search(r'@([\w.-]+)', return_path) mismatch = bool(from_dom and ret_dom and from_dom.group(1) != ret_dom.group(1)) free = {'gmail.com','yahoo.com','hotmail.com','outlook.com','protonmail.com'} using_free = (from_dom.group(1).lower() in free) if from_dom else False risk = 0 if not spf_pass: risk += 25 if not dkim_pass: risk += 20 if not dmarc_pass: risk += 15 if mismatch: risk += 30 if using_free: risk += 10 return { "spf_pass": spf_pass, "dkim_pass": dkim_pass, "dmarc_pass": dmarc_pass, "domain_mismatch": mismatch, "using_free_email": using_free, "auth_risk_score": min(risk, 100) } def check_brand_spoofing(subject: str, body: str, sender: str) -> dict: """Detect brand names mentioned in email content but not matching sender domain.""" combined = (subject + ' ' + body + ' ' + sender).lower() sender_dom = re.search(r'@([\w.-]+)', sender) s_dom = sender_dom.group(1).lower() if sender_dom else '' spoofed = [b for b in BRAND_SPOOFS if b in combined and b not in s_dom] return { "brand_spoof_detected": bool(spoofed), "spoofed_brands": spoofed } def check_urgency(text: str) -> dict: """Detect urgency/pressure language patterns typical of phishing emails.""" matches = [] for pat in URGENCY_PATTERNS: found = re.findall(pat, text.lower()) matches.extend(found) return { "urgency_detected": bool(matches), "urgency_matches": [str(m) for m in matches[:5]], "urgency_score": min(len(matches) * 15, 60) } def bert_score(text: str) -> float: """Run NLP classifier on email text and return phishing probability.""" if not text.strip(): return 0.1 try: _ensure_bert_loaded() if bert_analyzer._use_bert and bert_analyzer._classifier is not None: result = bert_analyzer._classifier(text[:512])[0] label = result['label'].upper() score = result['score'] return score if ('SPAM' in label or label == 'LABEL_1') else 1 - score else: # Use keyword analysis from bert_analyzer result = bert_analyze_text("", "", text) return result.get("bert_phishing_prob", 0.3) except: return 0.3 def analyze_email(raw, return_urls: bool = True) -> dict: """ Full phishing analysis of a raw email. Pass raw bytes or a string of the full email. Combines: BERT NLP score + sender auth + brand spoofing + urgency detection. """ msg = parse_email_msg(raw) subject = msg.get('Subject', '') sender = msg.get('From', '') body = get_body(msg) urls = extract_urls(body) auth = check_sender_auth(msg) brand = check_brand_spoofing(subject, body, sender) urgency = check_urgency(subject + ' ' + body) bert_p = bert_score(subject + '. ' + body[:400]) raw_score = (bert_p * 40 + auth['auth_risk_score'] * 0.30 + urgency['urgency_score'] * 0.20 + (30 if brand['brand_spoof_detected'] else 0) * 0.10) final = min(raw_score / 100, 1.0) result = { "is_phishing": final > 0.60, "phishing_probability": round(final, 4), "subject": subject, "sender": sender, "auth_analysis": auth, "brand_analysis": brand, "urgency_analysis": urgency, "bert_score": round(bert_p, 4), "extracted_url_count": len(urls), } if return_urls: result["extracted_urls"] = urls[:20] return result