| """ |
| Deepfake Reasoning Engine β ML-Primary Adaptive Scoring + LLM Reasoning |
| |
| Uses the ML model's raw detection scores as the primary decision signal, |
| applies sigmoid-based confidence mapping with signal quality multipliers, |
| and uses the LLM only for human-readable reasoning (never to override verdict). |
| """ |
|
|
| import math |
| import logging |
| import asyncio |
| from typing import Dict, Any, Optional |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| SIGMOID_STEEPNESS = 8 |
| BASE_DEAD_ZONE = 0.12 |
| MAX_LLM_NUDGE = 0.08 |
|
|
|
|
| def _signal_quality( |
| std_score: float = 0.0, |
| frames_analyzed: int = 1, |
| total_frames: int = 1, |
| ) -> float: |
| """ |
| Compute a signal quality multiplier in [0.5, 1.0]. |
| |
| High quality β many frames analysed, low score variance. |
| Low quality β few frames, high variance. |
| """ |
| coverage = min(frames_analyzed / max(total_frames, 1), 1.0) |
| consistency = max(0.0, 1.0 - std_score * 3) |
|
|
| quality = 0.5 + 0.25 * min(coverage * 10, 1.0) + 0.25 * consistency |
| return min(quality, 1.0) |
|
|
|
|
| def _sigmoid_confidence(avg_score: float, quality: float) -> float: |
| """ |
| Map a raw ML average score β directional confidence via steepened sigmoid. |
| |
| Returns a value in (0, 1] where: |
| - values near 0.5 β low confidence (uncertain) |
| - values near 0 / 1 β high confidence |
| """ |
| raw = 1.0 / (1.0 + math.exp(-SIGMOID_STEEPNESS * (avg_score - 0.5))) |
|
|
| |
| if raw >= 0.5: |
| confidence = raw * quality |
| else: |
| confidence = (1.0 - raw) * quality |
|
|
| return min(confidence, 1.0) |
|
|
|
|
| def _adaptive_verdict(avg_score: float, quality: float) -> str: |
| """ |
| Determine verdict using a dead-zone that adapts to signal quality. |
| |
| Poor quality β wider inconclusive zone (more cautious). |
| High quality β narrower zone (more decisive). |
| """ |
| adjusted_dz = BASE_DEAD_ZONE * (1.5 - 0.5 * quality) |
|
|
| if avg_score > 0.5 + adjusted_dz: |
| return "DEEPFAKE" |
| elif avg_score < 0.5 - adjusted_dz: |
| return "AUTHENTIC" |
| else: |
| return "Likely not Deepfake" |
|
|
|
|
| def _apply_llm_nudge( |
| confidence: float, |
| ml_verdict: str, |
| llm_result: Optional[Dict[str, Any]], |
| ) -> tuple[float, bool]: |
| """ |
| Bayesian-style nudge: LLM can adjust confidence by up to Β±MAX_LLM_NUDGE |
| but CANNOT flip the verdict. |
| |
| Returns (adjusted_confidence, llm_agrees). |
| """ |
| if not llm_result: |
| return confidence, True |
|
|
| llm_verdict = llm_result.get("verdict", "INCONCLUSIVE") |
| llm_conf_factor = llm_result.get("confidence", 50) / 100.0 |
|
|
| agrees = ( |
| llm_verdict == ml_verdict |
| or llm_verdict == "Likely not Deepfake" |
| or ml_verdict == "Likely not Deepfake" |
| ) |
|
|
| if agrees: |
| confidence += MAX_LLM_NUDGE * llm_conf_factor |
| else: |
| confidence -= MAX_LLM_NUDGE * 0.5 |
| logger.warning( |
| f"[Reasoning] ML/LLM DISAGREE β ML={ml_verdict}, " |
| f"LLM={llm_verdict} ({llm_conf_factor:.0%}). " |
| "Keeping ML verdict, applying negative nudge." |
| ) |
|
|
| return max(0.0, min(confidence, 1.0)), agrees |
|
|
|
|
| |
|
|
| async def analyze_with_reasoning( |
| detection_result: Dict[str, Any], |
| media_type: str = "unknown", |
| ) -> Dict[str, Any]: |
| """ |
| Enhance a raw ML detection result with: |
| 1. Sigmoid-based adaptive confidence |
| 2. Dynamic dead-zone verdict |
| 3. Bounded LLM nudge |
| 4. LLM-generated human-readable reasoning |
| """ |
| |
| if detection_result.get("error"): |
| return detection_result |
|
|
| |
| avg_score = detection_result.get( |
| "average_score", |
| detection_result.get("score", |
| detection_result.get("anomaly_score", |
| detection_result.get("confidence", 0.5) |
| ) |
| ) |
| ) |
| std_score = detection_result.get("std_score", 0.0) |
| frames_analyzed = detection_result.get("frames_analyzed", 1) |
| total_frames = detection_result.get("total_frames", 1) |
|
|
| |
| quality = _signal_quality(std_score, frames_analyzed, total_frames) |
| confidence = _sigmoid_confidence(avg_score, quality) |
| ml_verdict = _adaptive_verdict(avg_score, quality) |
|
|
| |
| llm_result = None |
| try: |
| from .featherless_llm import analyze_with_llm |
| llm_result = await analyze_with_llm(detection_result, media_type) |
| except Exception as e: |
| logger.warning(f"[Reasoning] LLM analysis failed: {e}") |
|
|
| |
| confidence, llm_agrees = _apply_llm_nudge(confidence, ml_verdict, llm_result) |
|
|
| |
| if ml_verdict == "DEEPFAKE": |
| is_deepfake = True |
| elif ml_verdict == "AUTHENTIC": |
| is_deepfake = False |
| else: |
| is_deepfake = False |
|
|
| |
| detection_result["ml_raw_score"] = float(avg_score) |
| detection_result["ml_confidence"] = detection_result.get("confidence", 0.0) |
| detection_result["signal_quality"] = round(quality, 3) |
| detection_result["is_deepfake"] = is_deepfake |
| detection_result["confidence"] = float(confidence) |
| detection_result["verdict"] = ml_verdict |
|
|
| if llm_result: |
| reasoning = llm_result.get("reasoning", "") |
| if not llm_agrees: |
| reasoning = ( |
| f"[Note: The AI reasoning model suggested '{llm_result.get('verdict', '?')}' " |
| f"but the ML detection signals indicate '{ml_verdict}'. " |
| f"The ML-based verdict is used.] {reasoning}" |
| ) |
| detection_result["reasoning"] = reasoning |
| detection_result["key_factors"] = llm_result.get("key_factors", []) |
| detection_result["llm_verdict"] = llm_result.get("verdict") |
| detection_result["llm_confidence"] = llm_result.get("confidence", 0) / 100.0 |
| detection_result["analysis_source"] = "ml_primary_llm_reasoned" |
| else: |
| detection_result["reasoning"] = _generate_fallback_reasoning( |
| detection_result, media_type, ml_verdict, confidence |
| ) |
| detection_result["key_factors"] = _generate_fallback_factors(detection_result) |
| detection_result["analysis_source"] = "ml_only" |
|
|
| |
| if detection_result.get("low_trust"): |
| detection_result["analysis_source"] += "_low_trust" |
| detection_result["reasoning"] = ( |
| "[Low confidence β heuristic fallback] " |
| + detection_result.get("reasoning", "") |
| ) |
|
|
| logger.info( |
| f"[Reasoning] {ml_verdict} " |
| f"(confidence: {confidence:.0%}, ML raw: {avg_score:.4f}, " |
| f"quality: {quality:.2f})" |
| ) |
|
|
| return detection_result |
|
|
|
|
| |
|
|
| def _generate_fallback_reasoning( |
| result: Dict[str, Any], |
| media_type: str, |
| verdict: str, |
| confidence: float, |
| ) -> str: |
| """Generate human-readable reasoning from ML signals alone.""" |
| conf_pct = round(confidence * 100) |
| raw = result.get("ml_raw_score", 0) |
|
|
| if verdict == "Likely not Deepfake": |
| return ( |
| f"Analysis produced borderline results (raw score: {raw:.2f}). " |
| f"The signals are not strong enough to confidently determine manipulation, " |
| f"so it is likely not a deepfake. Manual review is recommended if context is suspicious." |
| ) |
|
|
| if media_type == "audio": |
| anomaly = result.get("anomaly_score", 0) |
| if verdict == "DEEPFAKE": |
| return ( |
| f"Audio analysis detected anomalous patterns with {conf_pct}% confidence. " |
| f"The spectral characteristics show irregularities (anomaly score: {anomaly:.2f}) " |
| f"consistent with AI-generated or manipulated audio." |
| ) |
| return ( |
| f"Audio analysis found natural speech characteristics with {conf_pct}% confidence. " |
| f"The spectral patterns and voice timbre appear consistent with authentic human speech." |
| ) |
|
|
| if media_type in ("video", "image"): |
| frames = result.get("frames_analyzed", 0) |
| total = result.get("total_frames", 0) |
| if verdict == "DEEPFAKE": |
| return ( |
| f"Visual analysis detected manipulation markers with {conf_pct}% confidence. " |
| f"Analyzed {frames}/{total} frames with an average deepfake score of {raw:.2f}. " |
| f"Facial features, blur patterns, or frequency artifacts suggest synthetic generation." |
| ) |
| return ( |
| f"Visual analysis found consistent and natural features with {conf_pct}% confidence. " |
| f"Analyzed {frames}/{total} frames with no significant manipulation artifacts detected." |
| ) |
|
|
| return f"Analysis completed with {conf_pct}% confidence. Verdict: {verdict}." |
|
|
|
|
| def _generate_fallback_factors(result: Dict[str, Any]) -> list: |
| """Generate key factor list from ML signals alone.""" |
| factors = [] |
|
|
| if "ml_raw_score" in result: |
| raw = result["ml_raw_score"] |
| factors.append({ |
| "name": "ML Detection Score", |
| "value": f"{round(raw * 100)}% deepfake probability", |
| "impact": "high" if raw > 0.7 or raw < 0.3 else "medium", |
| }) |
|
|
| if "signal_quality" in result: |
| q = result["signal_quality"] |
| factors.append({ |
| "name": "Signal Quality", |
| "value": f"{round(q * 100)}%", |
| "impact": "high" if q < 0.6 else "low", |
| }) |
|
|
| if "frames_analyzed" in result: |
| fa = result["frames_analyzed"] |
| total = result.get("total_frames", 0) |
| coverage = (fa / total * 100) if total > 0 else 0 |
| factors.append({ |
| "name": "Frame Coverage", |
| "value": f"{fa}/{total} frames ({coverage:.0f}%)", |
| "impact": "medium" if coverage > 3 else "low", |
| }) |
|
|
| if "std_score" in result: |
| std = result["std_score"] |
| factors.append({ |
| "name": "Score Consistency", |
| "value": "Consistent" if std < 0.15 else "Variable", |
| "impact": "medium", |
| }) |
|
|
| if "anomaly_score" in result: |
| anomaly = result["anomaly_score"] |
| factors.append({ |
| "name": "Audio Anomaly Level", |
| "value": f"{round(anomaly * 100)}%", |
| "impact": "high" if anomaly > 0.5 else "medium" if anomaly > 0.3 else "low", |
| }) |
|
|
| if "model" in result: |
| model = result["model"] |
| model_short = model.split("/")[-1] if "/" in model else model |
| factors.append({ |
| "name": "Detection Model", |
| "value": model_short, |
| "impact": "low", |
| }) |
|
|
| return factors |
|
|