PhishingInsight / src /core /fusion_engine.py
deveshpunjabi's picture
Upload folder using huggingface_hub
154408f verified
import logging
logger = logging.getLogger(__name__)
class FusionEngine:
"""Combines multiple signals into a single verdict."""
# Thresholds aligned with documentation
PHISHING_THRESHOLD = 0.65
SUSPICIOUS_THRESHOLD = 0.45
WHITELIST_THRESHOLD = 0.15 # DNS score below this means whitelisted
def __init__(self):
logger.info("FusionEngine: Initialized")
def fuse(self, lexical_score, dns_score, lexical_findings=None, dns_findings=None, content_data=None, homo_score=0.0, homo_findings=None):
"""
Combine scores and return verdict.
Lexical gets 60% weight, DNS gets 40%.
Special case: If DNS score is very low (whitelisted domain),
override to SAFE regardless of lexical score.
"""
# If DNS agent marked as whitelisted (score <= 0.15), trust it
if dns_score <= self.WHITELIST_THRESHOLD:
score = dns_score
verdict = "SAFE"
threat = "LOW"
is_phishing = False
findings = (lexical_findings or []) + (dns_findings or [])
return {
'verdict': verdict, 'threat': threat, 'is_phishing': is_phishing,
'score': round(score, 3), 'confidence': 95,
'lexical_score': round(lexical_score, 3), 'dns_score': round(dns_score, 3),
'findings': findings[:8]
}
# --- CONTENT ANALYSIS ---
content_findings = []
if content_data and content_data.get('text'):
text = content_data['text'].lower()
# Critical Keyword Combinations
risk_keywords = [
('password', 'verify'), ('credit card', 'update'),
('social security', 'confirm'), ('bank account', 'locked'),
('urgent', 'action required'), ('signin', 'microsoft'),
('login', 'google'), ('verify', 'identity')
]
for k1, k2 in risk_keywords:
if k1 in text and k2 in text:
content_findings.append(f"Content asks for dangerous info: '{k1}' + '{k2}'")
# Normal weighted combination
score = (0.6 * lexical_score) + (0.4 * dns_score)
# Boost score if content is suspicious
if content_findings:
score = max(score, 0.75) # Content matches force High Risk
# --- HOMOGRAPH OVERRIDE ---
homo_findings = homo_findings or []
if homo_score > 0.8:
# Granular calculation: Base 90% + 2% per extra finding
score = 0.90 + (len(homo_findings) * 0.02)
score = min(0.989, score) # Cap at 98.9%
verdict = "DANGER"
threat = "CRITICAL" # New Critical Level for Impersonation
is_phishing = True
findings = homo_findings + (lexical_findings or [])
return {
'verdict': verdict, 'threat': threat, 'is_phishing': is_phishing,
'score': round(score, 3), 'confidence': 99,
'lexical_score': round(lexical_score, 3), 'dns_score': round(dns_score, 3),
'findings': findings[:8]
}
# --- VERDICT DETERMINATION ---
# Add small granular adjustments based on finding count
total_findings = len((lexical_findings or []) + (dns_findings or []) + content_findings)
# Add 1.5% risk per finding to make score variable
score += (total_findings * 0.015)
if score >= self.PHISHING_THRESHOLD:
verdict = "DANGER"
threat = "HIGH"
is_phishing = True
elif score >= self.SUSPICIOUS_THRESHOLD:
verdict = "WARNING"
threat = "MEDIUM"
is_phishing = False
else:
verdict = "SAFE"
threat = "LOW"
is_phishing = False
# If safe, ensure we don't return flat 0 or 20
# Scale down but keep some "noise" from findings
if score < 0.1: score = 0.05 + (total_findings * 0.01)
# Clamp final score to 0.0 - 0.989
score = max(0.01, min(0.989, score))
# Confidence based on how far from middle threshold
confidence = round(abs(score - 0.5) * 200, 1)
confidence = min(95, max(30, confidence))
findings = (lexical_findings or []) + (dns_findings or []) + content_findings + homo_findings
# --- FALSE POSITIVE GUARDRAIL ---
if not findings and score >= self.SUSPICIOUS_THRESHOLD:
logger.info(f"FusionEngine: Guardrail triggered. Downgrading score {score:.2f} -> SAFE.")
score = 0.12 # Natural safe score
verdict = "SAFE"
threat = "LOW"
is_phishing = False
return {
'verdict': verdict,
'threat': threat,
'is_phishing': is_phishing,
'score': round(score, 3),
'confidence': confidence,
'lexical_score': round(lexical_score, 3),
'dns_score': round(dns_score, 3),
'findings': findings[:8]
}