| import logging |
| from typing import List, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
| def fuse_scores(rule_based_score: float, model_score: float, shap_values: Optional[List] = None, user_history_boost: float = 0.0) -> dict: |
| """ |
| Fuse multiple risk scores into a single final verdict. |
| |
| Args: |
| rule_based_score: Score from rule-based analysis (0-100) |
| model_score: Score from ML model (0-100) |
| shap_values: SHAP attribution values for explainability |
| user_history_boost: Boost factor from user history anomalies (0-50) |
| |
| Returns: |
| dict with fused_score, verdict, confidence, and key_factors |
| """ |
| |
| base_score = (rule_based_score * 0.4) + (model_score * 0.6) |
|
|
| |
| final_score = min(100, base_score + user_history_boost) |
|
|
| |
| if final_score >= 80: |
| verdict = "Critical Risk" |
| confidence = 0.95 |
| elif final_score >= 60: |
| verdict = "High Risk" |
| confidence = 0.85 |
| elif final_score >= 40: |
| verdict = "Moderate Risk" |
| confidence = 0.70 |
| else: |
| verdict = "Low Risk" |
| confidence = 0.80 |
|
|
| key_factors = [] |
| if rule_based_score > 50: |
| key_factors.append("Rule-based detection triggered") |
| if model_score > 50: |
| key_factors.append("ML model flagged suspicious patterns") |
| if user_history_boost > 0: |
| key_factors.append("User history anomaly detected") |
|
|
| return { |
| "fused_score": round(final_score, 2), |
| "verdict": verdict, |
| "confidence": confidence, |
| "key_factors": key_factors, |
| "component_scores": { |
| "rule_based": rule_based_score, |
| "model": model_score, |
| "history_boost": user_history_boost |
| } |
| } |
|
|
| async def process_and_fuse_verdict(message_sid: str, worker_results: dict) -> dict: |
| """ |
| Mock Fusion Engine |
| Takes results from various analysis workers (URL, Image, Text, Audio) |
| and fuses them into a single final verdict and recommendation string. |
| """ |
| logger.info(f"Fusing results for {message_sid}: {worker_results}") |
| |
| |
| risk_score = 15 |
| verdict = "Low Risk" |
| explanation = "No immediate threats detected." |
| action = "No action needed." |
| |
| if worker_results.get("phishing_detected") or worker_results.get("deepfake_detected"): |
| risk_score = 95 |
| verdict = "High Risk - Critical" |
| explanation = "Suspicious patterns detected matching known phishing or deepfake signatures." |
| action = "Do not pay. Verify on a live call. Report to 1930." |
| |
| return { |
| "score": risk_score, |
| "verdict": verdict, |
| "explanation": explanation, |
| "recommended_action": action |
| } |
|
|