import logging logger = logging.getLogger(__name__) class FusionEngine: """Combines multiple signals into a single verdict.""" # Thresholds aligned with documentation PHISHING_THRESHOLD = 0.65 SUSPICIOUS_THRESHOLD = 0.45 WHITELIST_THRESHOLD = 0.15 # DNS score below this means whitelisted def __init__(self): logger.info("FusionEngine: Initialized") def fuse(self, lexical_score, dns_score, lexical_findings=None, dns_findings=None, content_data=None, homo_score=0.0, homo_findings=None): """ Combine scores and return verdict. Lexical gets 60% weight, DNS gets 40%. Special case: If DNS score is very low (whitelisted domain), override to SAFE regardless of lexical score. """ # If DNS agent marked as whitelisted (score <= 0.15), trust it if dns_score <= self.WHITELIST_THRESHOLD: score = dns_score verdict = "SAFE" threat = "LOW" is_phishing = False findings = (lexical_findings or []) + (dns_findings or []) return { 'verdict': verdict, 'threat': threat, 'is_phishing': is_phishing, 'score': round(score, 3), 'confidence': 95, 'lexical_score': round(lexical_score, 3), 'dns_score': round(dns_score, 3), 'findings': findings[:8] } # --- CONTENT ANALYSIS --- content_findings = [] if content_data and content_data.get('text'): text = content_data['text'].lower() # Critical Keyword Combinations risk_keywords = [ ('password', 'verify'), ('credit card', 'update'), ('social security', 'confirm'), ('bank account', 'locked'), ('urgent', 'action required'), ('signin', 'microsoft'), ('login', 'google'), ('verify', 'identity') ] for k1, k2 in risk_keywords: if k1 in text and k2 in text: content_findings.append(f"Content asks for dangerous info: '{k1}' + '{k2}'") # Normal weighted combination score = (0.6 * lexical_score) + (0.4 * dns_score) # Boost score if content is suspicious if content_findings: score = max(score, 0.75) # Content matches force High Risk # --- HOMOGRAPH OVERRIDE --- homo_findings = homo_findings or [] if homo_score > 0.8: # Granular calculation: Base 90% + 2% per extra finding score = 0.90 + (len(homo_findings) * 0.02) score = min(0.989, score) # Cap at 98.9% verdict = "DANGER" threat = "CRITICAL" # New Critical Level for Impersonation is_phishing = True findings = homo_findings + (lexical_findings or []) return { 'verdict': verdict, 'threat': threat, 'is_phishing': is_phishing, 'score': round(score, 3), 'confidence': 99, 'lexical_score': round(lexical_score, 3), 'dns_score': round(dns_score, 3), 'findings': findings[:8] } # --- VERDICT DETERMINATION --- # Add small granular adjustments based on finding count total_findings = len((lexical_findings or []) + (dns_findings or []) + content_findings) # Add 1.5% risk per finding to make score variable score += (total_findings * 0.015) if score >= self.PHISHING_THRESHOLD: verdict = "DANGER" threat = "HIGH" is_phishing = True elif score >= self.SUSPICIOUS_THRESHOLD: verdict = "WARNING" threat = "MEDIUM" is_phishing = False else: verdict = "SAFE" threat = "LOW" is_phishing = False # If safe, ensure we don't return flat 0 or 20 # Scale down but keep some "noise" from findings if score < 0.1: score = 0.05 + (total_findings * 0.01) # Clamp final score to 0.0 - 0.989 score = max(0.01, min(0.989, score)) # Confidence based on how far from middle threshold confidence = round(abs(score - 0.5) * 200, 1) confidence = min(95, max(30, confidence)) findings = (lexical_findings or []) + (dns_findings or []) + content_findings + homo_findings # --- FALSE POSITIVE GUARDRAIL --- if not findings and score >= self.SUSPICIOUS_THRESHOLD: logger.info(f"FusionEngine: Guardrail triggered. Downgrading score {score:.2f} -> SAFE.") score = 0.12 # Natural safe score verdict = "SAFE" threat = "LOW" is_phishing = False return { 'verdict': verdict, 'threat': threat, 'is_phishing': is_phishing, 'score': round(score, 3), 'confidence': confidence, 'lexical_score': round(lexical_score, 3), 'dns_score': round(dns_score, 3), 'findings': findings[:8] }