KES-Hack / src /services /fusion_engine.py
Meshyboi's picture
Upload 86 files
214209a verified
import logging
from typing import List, Optional
logger = logging.getLogger(__name__)
def fuse_scores(rule_based_score: float, model_score: float, shap_values: Optional[List] = None, user_history_boost: float = 0.0) -> dict:
"""
Fuse multiple risk scores into a single final verdict.
Args:
rule_based_score: Score from rule-based analysis (0-100)
model_score: Score from ML model (0-100)
shap_values: SHAP attribution values for explainability
user_history_boost: Boost factor from user history anomalies (0-50)
Returns:
dict with fused_score, verdict, confidence, and key_factors
"""
# Weighted average: 40% rule-based, 60% model
base_score = (rule_based_score * 0.4) + (model_score * 0.6)
# Apply user history boost
final_score = min(100, base_score + user_history_boost)
# Determine verdict
if final_score >= 80:
verdict = "Critical Risk"
confidence = 0.95
elif final_score >= 60:
verdict = "High Risk"
confidence = 0.85
elif final_score >= 40:
verdict = "Moderate Risk"
confidence = 0.70
else:
verdict = "Low Risk"
confidence = 0.80
key_factors = []
if rule_based_score > 50:
key_factors.append("Rule-based detection triggered")
if model_score > 50:
key_factors.append("ML model flagged suspicious patterns")
if user_history_boost > 0:
key_factors.append("User history anomaly detected")
return {
"fused_score": round(final_score, 2),
"verdict": verdict,
"confidence": confidence,
"key_factors": key_factors,
"component_scores": {
"rule_based": rule_based_score,
"model": model_score,
"history_boost": user_history_boost
}
}
async def process_and_fuse_verdict(message_sid: str, worker_results: dict) -> dict:
"""
Mock Fusion Engine
Takes results from various analysis workers (URL, Image, Text, Audio)
and fuses them into a single final verdict and recommendation string.
"""
logger.info(f"Fusing results for {message_sid}: {worker_results}")
# Placeholder Logic
risk_score = 15 # Out of 100
verdict = "Low Risk"
explanation = "No immediate threats detected."
action = "No action needed."
if worker_results.get("phishing_detected") or worker_results.get("deepfake_detected"):
risk_score = 95
verdict = "High Risk - Critical"
explanation = "Suspicious patterns detected matching known phishing or deepfake signatures."
action = "Do not pay. Verify on a live call. Report to 1930."
return {
"score": risk_score,
"verdict": verdict,
"explanation": explanation,
"recommended_action": action
}