File size: 2,834 Bytes
214209a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import logging
from typing import List, Optional

logger = logging.getLogger(__name__)

def fuse_scores(rule_based_score: float, model_score: float, shap_values: Optional[List] = None, user_history_boost: float = 0.0) -> dict:
    """
    Fuse multiple risk scores into a single final verdict.

    Args:
        rule_based_score: Score from rule-based analysis (0-100)
        model_score: Score from ML model (0-100)
        shap_values: SHAP attribution values for explainability
        user_history_boost: Boost factor from user history anomalies (0-50)

    Returns:
        dict with fused_score, verdict, confidence, and key_factors
    """
    # Weighted average: 40% rule-based, 60% model
    base_score = (rule_based_score * 0.4) + (model_score * 0.6)

    # Apply user history boost
    final_score = min(100, base_score + user_history_boost)

    # Determine verdict
    if final_score >= 80:
        verdict = "Critical Risk"
        confidence = 0.95
    elif final_score >= 60:
        verdict = "High Risk"
        confidence = 0.85
    elif final_score >= 40:
        verdict = "Moderate Risk"
        confidence = 0.70
    else:
        verdict = "Low Risk"
        confidence = 0.80

    key_factors = []
    if rule_based_score > 50:
        key_factors.append("Rule-based detection triggered")
    if model_score > 50:
        key_factors.append("ML model flagged suspicious patterns")
    if user_history_boost > 0:
        key_factors.append("User history anomaly detected")

    return {
        "fused_score": round(final_score, 2),
        "verdict": verdict,
        "confidence": confidence,
        "key_factors": key_factors,
        "component_scores": {
            "rule_based": rule_based_score,
            "model": model_score,
            "history_boost": user_history_boost
        }
    }

async def process_and_fuse_verdict(message_sid: str, worker_results: dict) -> dict:
    """
    Mock Fusion Engine
    Takes results from various analysis workers (URL, Image, Text, Audio)
    and fuses them into a single final verdict and recommendation string.
    """
    logger.info(f"Fusing results for {message_sid}: {worker_results}")
    
    # Placeholder Logic
    risk_score = 15  # Out of 100
    verdict = "Low Risk"
    explanation = "No immediate threats detected."
    action = "No action needed."
    
    if worker_results.get("phishing_detected") or worker_results.get("deepfake_detected"):
        risk_score = 95
        verdict = "High Risk - Critical"
        explanation = "Suspicious patterns detected matching known phishing or deepfake signatures."
        action = "Do not pay. Verify on a live call. Report to 1930."
        
    return {
        "score": risk_score,
        "verdict": verdict,
        "explanation": explanation,
        "recommended_action": action
    }