Spaces:
Sleeping
Sleeping
| import logging | |
| logger = logging.getLogger(__name__) | |
| class FusionEngine: | |
| """Combines multiple signals into a single verdict.""" | |
| # Thresholds aligned with documentation | |
| PHISHING_THRESHOLD = 0.65 | |
| SUSPICIOUS_THRESHOLD = 0.45 | |
| WHITELIST_THRESHOLD = 0.15 # DNS score below this means whitelisted | |
| def __init__(self): | |
| logger.info("FusionEngine: Initialized") | |
| def fuse(self, lexical_score, dns_score, lexical_findings=None, dns_findings=None, content_data=None, homo_score=0.0, homo_findings=None): | |
| """ | |
| Combine scores and return verdict. | |
| Lexical gets 60% weight, DNS gets 40%. | |
| Special case: If DNS score is very low (whitelisted domain), | |
| override to SAFE regardless of lexical score. | |
| """ | |
| # If DNS agent marked as whitelisted (score <= 0.15), trust it | |
| if dns_score <= self.WHITELIST_THRESHOLD: | |
| score = dns_score | |
| verdict = "SAFE" | |
| threat = "LOW" | |
| is_phishing = False | |
| findings = (lexical_findings or []) + (dns_findings or []) | |
| return { | |
| 'verdict': verdict, 'threat': threat, 'is_phishing': is_phishing, | |
| 'score': round(score, 3), 'confidence': 95, | |
| 'lexical_score': round(lexical_score, 3), 'dns_score': round(dns_score, 3), | |
| 'findings': findings[:8] | |
| } | |
| # --- CONTENT ANALYSIS --- | |
| content_findings = [] | |
| if content_data and content_data.get('text'): | |
| text = content_data['text'].lower() | |
| # Critical Keyword Combinations | |
| risk_keywords = [ | |
| ('password', 'verify'), ('credit card', 'update'), | |
| ('social security', 'confirm'), ('bank account', 'locked'), | |
| ('urgent', 'action required'), ('signin', 'microsoft'), | |
| ('login', 'google'), ('verify', 'identity') | |
| ] | |
| for k1, k2 in risk_keywords: | |
| if k1 in text and k2 in text: | |
| content_findings.append(f"Content asks for dangerous info: '{k1}' + '{k2}'") | |
| # Normal weighted combination | |
| score = (0.6 * lexical_score) + (0.4 * dns_score) | |
| # Boost score if content is suspicious | |
| if content_findings: | |
| score = max(score, 0.75) # Content matches force High Risk | |
| # --- HOMOGRAPH OVERRIDE --- | |
| homo_findings = homo_findings or [] | |
| if homo_score > 0.8: | |
| # Granular calculation: Base 90% + 2% per extra finding | |
| score = 0.90 + (len(homo_findings) * 0.02) | |
| score = min(0.989, score) # Cap at 98.9% | |
| verdict = "DANGER" | |
| threat = "CRITICAL" # New Critical Level for Impersonation | |
| is_phishing = True | |
| findings = homo_findings + (lexical_findings or []) | |
| return { | |
| 'verdict': verdict, 'threat': threat, 'is_phishing': is_phishing, | |
| 'score': round(score, 3), 'confidence': 99, | |
| 'lexical_score': round(lexical_score, 3), 'dns_score': round(dns_score, 3), | |
| 'findings': findings[:8] | |
| } | |
| # --- VERDICT DETERMINATION --- | |
| # Add small granular adjustments based on finding count | |
| total_findings = len((lexical_findings or []) + (dns_findings or []) + content_findings) | |
| # Add 1.5% risk per finding to make score variable | |
| score += (total_findings * 0.015) | |
| if score >= self.PHISHING_THRESHOLD: | |
| verdict = "DANGER" | |
| threat = "HIGH" | |
| is_phishing = True | |
| elif score >= self.SUSPICIOUS_THRESHOLD: | |
| verdict = "WARNING" | |
| threat = "MEDIUM" | |
| is_phishing = False | |
| else: | |
| verdict = "SAFE" | |
| threat = "LOW" | |
| is_phishing = False | |
| # If safe, ensure we don't return flat 0 or 20 | |
| # Scale down but keep some "noise" from findings | |
| if score < 0.1: score = 0.05 + (total_findings * 0.01) | |
| # Clamp final score to 0.0 - 0.989 | |
| score = max(0.01, min(0.989, score)) | |
| # Confidence based on how far from middle threshold | |
| confidence = round(abs(score - 0.5) * 200, 1) | |
| confidence = min(95, max(30, confidence)) | |
| findings = (lexical_findings or []) + (dns_findings or []) + content_findings + homo_findings | |
| # --- FALSE POSITIVE GUARDRAIL --- | |
| if not findings and score >= self.SUSPICIOUS_THRESHOLD: | |
| logger.info(f"FusionEngine: Guardrail triggered. Downgrading score {score:.2f} -> SAFE.") | |
| score = 0.12 # Natural safe score | |
| verdict = "SAFE" | |
| threat = "LOW" | |
| is_phishing = False | |
| return { | |
| 'verdict': verdict, | |
| 'threat': threat, | |
| 'is_phishing': is_phishing, | |
| 'score': round(score, 3), | |
| 'confidence': confidence, | |
| 'lexical_score': round(lexical_score, 3), | |
| 'dns_score': round(dns_score, 3), | |
| 'findings': findings[:8] | |
| } | |