Spaces:
Running
Running
| """ | |
| High-Fidelity MMSA Dissonance Engine. | |
| Orchestration layer for tri-modal (Audio + Text + Video) deception detection. | |
| Now wired to DissonanceEngine for real wav2vec2 / DistilBERT features. | |
| Env vars consumed by this layer: | |
| USE_DIMENSIONAL_AUDIO_MODEL=true → audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim | |
| USE_EMOTION2VEC=true → emotion2vec/emotion2vec_plus_large (requires funasr) | |
| Env vars forwarded to DissonanceEngine (set them in the same .env): | |
| AUDIO_TEMPERATURE=1.5 → softmax temperature for audio emotion calibration | |
| DISSONANCE_WEIGHTS=0.5,0.3,0.2 → w1·audio_text + w2·prosody + w3·text_conf | |
| """ | |
| import os | |
| import logging | |
| import time | |
| import json | |
| from typing import Dict, List, Any, Optional | |
| import numpy as np | |
| from pathlib import Path | |
| # Guardian Integrated services | |
| from .guardian_sensory import guardian_sensory | |
| from .scam_graph import scam_graph | |
| # Core dissonance engine — real models, VAD space, prosody, persistence | |
| from .dissonance_engine import dissonance_engine, _probs_to_vad | |
| # Heavyweight optional imports — deferred | |
| torch = None | |
| mp = None | |
| tasks = None | |
| vision = None | |
| cv2 = None | |
| yt_dlp = None | |
| static_ffmpeg = None | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Optional Upgrade 1: audeering dimensional model | |
| # --------------------------------------------------------------------------- | |
| _USE_DIMENSIONAL = os.getenv("USE_DIMENSIONAL_AUDIO_MODEL", "false").lower() == "true" | |
| _USE_EMOTION2VEC = os.getenv("USE_EMOTION2VEC", "false").lower() == "true" | |
| DIMENSIONAL_MODEL_NAME = "audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim" | |
| class _DimensionalAudioModel: | |
| """ | |
| Wraps the audeering regression model that directly outputs | |
| [arousal, dominance, valence] floats — no VAD lookup needed. | |
| """ | |
| def __init__(self): | |
| self._loaded = False | |
| self.processor = None | |
| self.model = None | |
| def load(self) -> bool: | |
| global torch | |
| try: | |
| import torch as _torch | |
| torch = _torch | |
| import torch.nn as nn | |
| from transformers import Wav2Vec2Processor | |
| from transformers.models.wav2vec2.modeling_wav2vec2 import ( | |
| Wav2Vec2Model, | |
| Wav2Vec2PreTrainedModel, | |
| ) | |
| class RegressionHead(nn.Module): | |
| def __init__(self, config): | |
| super().__init__() | |
| self.dense = nn.Linear(config.hidden_size, config.hidden_size) | |
| self.dropout = nn.Dropout(config.final_dropout) | |
| self.out_proj = nn.Linear(config.hidden_size, config.num_labels) | |
| def forward(self, features, **kwargs): | |
| x = self.dropout(features) | |
| x = self.dense(x) | |
| x = _torch.tanh(x) | |
| x = self.dropout(x) | |
| return self.out_proj(x) | |
| class EmotionModel(Wav2Vec2PreTrainedModel): | |
| def __init__(self, config): | |
| super().__init__(config) | |
| self.wav2vec2 = Wav2Vec2Model(config) | |
| self.classifier = RegressionHead(config) | |
| self.init_weights() | |
| def forward(self, input_values): | |
| hidden = self.wav2vec2(input_values)[0] | |
| hidden = _torch.mean(hidden, dim=1) | |
| return self.classifier(hidden) | |
| logger.info("[MMSA] Loading audeering dimensional audio model …") | |
| self.processor = Wav2Vec2Processor.from_pretrained(DIMENSIONAL_MODEL_NAME) | |
| self.model = EmotionModel.from_pretrained(DIMENSIONAL_MODEL_NAME) | |
| self.model.eval() | |
| self._loaded = True | |
| logger.info("[MMSA] Dimensional model ready (outputs arousal/dominance/valence).") | |
| return True | |
| except Exception as e: | |
| logger.warning("[MMSA] Dimensional model unavailable: %s", e) | |
| return False | |
| def predict_vad(self, audio_path: str) -> np.ndarray: | |
| """ | |
| Returns np.ndarray [valence, arousal, dominance] ∈ [0, 1]. | |
| audeering model outputs [arousal, dominance, valence] — reordered here | |
| to match the [val, aro, dom] convention used everywhere else. | |
| """ | |
| import librosa | |
| y, _ = librosa.load(audio_path, sr=16000) | |
| inputs = self.processor(y, sampling_rate=16000, return_tensors="pt", padding=True) | |
| with torch.no_grad(): | |
| out = self.model(inputs.input_values) | |
| vad = torch.sigmoid(out).squeeze().numpy() # [arousal, dominance, valence] | |
| # Reorder to [valence, arousal, dominance] for consistency with EMOTION_TO_VAD | |
| return np.array([vad[2], vad[0], vad[1]]) | |
| _dimensional_model = _DimensionalAudioModel() if _USE_DIMENSIONAL else None | |
| # --------------------------------------------------------------------------- | |
| # Main engine | |
| # --------------------------------------------------------------------------- | |
| class MMSADissonanceEngine: | |
| """ | |
| Tri-modal deception detection: Text × Audio × Video. | |
| Real features via DissonanceEngine (wav2vec2 + DistilBERT + prosody). | |
| """ | |
| def __init__(self): | |
| self.model_name = "self_mm" | |
| self._vision_loaded = False | |
| self.landmarker = None | |
| self.dissonance_threshold = 0.4 | |
| self.memory_path = os.path.join( | |
| os.path.dirname(__file__), "..", "cache", "emotional_baselines.json" | |
| ) | |
| os.makedirs(os.path.dirname(self.memory_path), exist_ok=True) | |
| self.guardian = guardian_sensory | |
| self.scam_memory = scam_graph | |
| self.action_guidance = { | |
| "BLOCK": "Immediate Risk: Block sender and do not respond. Save screenshot as evidence.", | |
| "WARN": "Suspicious Journey: Pattern matches known scam scripts. Verify via official app.", | |
| "RESTRICT": "Potential Fraud: Dissonance and metadata conflicts. Do not share OTP.", | |
| } | |
| # Load optional dimensional model at startup | |
| if _USE_DIMENSIONAL and _dimensional_model: | |
| _dimensional_model.load() | |
| # ------------------------------------------------------------------ | |
| # MediaPipe face landmarker (lazy) | |
| # ------------------------------------------------------------------ | |
| def _lazy_load_vision(self) -> bool: | |
| global mp, tasks, vision, cv2, yt_dlp, static_ffmpeg | |
| if self._vision_loaded: | |
| return True | |
| try: | |
| import mediapipe as _mp | |
| from mediapipe.tasks import python as _tasks | |
| from mediapipe.tasks.python import vision as _vision | |
| import cv2 as _cv2 | |
| import yt_dlp as _yt_dlp | |
| import static_ffmpeg as _sffmpeg | |
| from static_ffmpeg import run as _ffmpeg_run | |
| mp = _mp; tasks = _tasks; vision = _vision; cv2 = _cv2 | |
| yt_dlp = _yt_dlp; static_ffmpeg = _ffmpeg_run | |
| model_path = os.path.join(os.path.dirname(__file__), "face_landmarker.task") | |
| if os.path.exists(model_path): | |
| base_opts = tasks.BaseOptions(model_asset_path=model_path) | |
| opts = vision.FaceLandmarkerOptions( | |
| base_options=base_opts, | |
| output_face_blendshapes=True, | |
| output_facial_transformation_matrixes=True, | |
| num_faces=1, | |
| ) | |
| self.landmarker = vision.FaceLandmarker.create_from_options(opts) | |
| self._vision_loaded = True | |
| logger.info("[MMSA] Vision / MediaPipe sensors active.") | |
| return True | |
| except ImportError as e: | |
| logger.warning("[MMSA] Vision deps missing (%s); video analysis disabled.", e) | |
| return False | |
| except Exception as e: | |
| logger.error("[MMSA] Vision init failed: %s", e) | |
| return False | |
| # ------------------------------------------------------------------ | |
| # Public: analyze file triplet | |
| # ------------------------------------------------------------------ | |
| def analyze( | |
| self, | |
| audio_path: str, | |
| transcript: str, | |
| video_path: Optional[str] = None, | |
| video_id: Optional[str] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Full tri-modal deception analysis. | |
| Real audio/text features from DissonanceEngine; MediaPipe for video. | |
| """ | |
| start = time.time() | |
| # ── Guardian context ────────────────────────────────────────── | |
| intent_signals = self.guardian._detect_signals(transcript) | |
| entities = self.guardian._extract_entities(transcript) | |
| journey_report = self.scam_memory.get_journey_score(entities) | |
| self.scam_memory.add_event("call_transcript", entities, intent_signals) | |
| # ── 1. Core cross-modal dissonance (REAL features) ──────────── | |
| try: | |
| dissonance_result = dissonance_engine.analyze( | |
| audio_path, transcript, video_id=video_id | |
| ) | |
| except Exception as e: | |
| # Dissonance engine models not loaded yet (cold start / no GPU) | |
| logger.warning("[MMSA] DissonanceEngine unavailable: %s — falling back.", e) | |
| dissonance_result = { | |
| "dissonance_score": 0.0, | |
| "audio_text_divergence": 0.0, | |
| "conflict_detected": False, | |
| "likely_sarcasm": False, | |
| "audio_dominant_emotion": "unknown", | |
| "text_dominant_emotion": "unknown", | |
| "audio_vad": [0.5, 0.5, 0.5], | |
| "text_vad": [0.5, 0.5, 0.5], | |
| "prosody": {}, | |
| "fusion_contributions": {}, | |
| "dominant_modality": "unavailable", | |
| "error": str(e), | |
| } | |
| final_score = dissonance_result.get("dissonance_score", 0.0) | |
| audio_text_div = dissonance_result.get("audio_text_divergence", 0.0) | |
| conflicts: Dict[str, float] = {"text_audio": round(audio_text_div, 4)} | |
| # ── 2. Optional dimensional audio override ──────────────────── | |
| audio_vad = np.array(dissonance_result.get("audio_vad", [0.5, 0.5, 0.5])) | |
| text_vad = np.array(dissonance_result.get("text_vad", [0.5, 0.5, 0.5])) | |
| if _USE_DIMENSIONAL and _dimensional_model and _dimensional_model._loaded: | |
| try: | |
| dim_vad = _dimensional_model.predict_vad(audio_path) | |
| import scipy.spatial.distance as _dist | |
| dim_div = float(_dist.cosine(dim_vad, text_vad)) | |
| if dim_div > audio_text_div: | |
| final_score = max(final_score, dim_div) | |
| conflicts["dimensional_audio_text"] = round(dim_div, 4) | |
| logger.info("[MMSA] Dimensional model raised score to %.4f", final_score) | |
| except Exception as e: | |
| logger.warning("[MMSA] Dimensional model inference failed: %s", e) | |
| # ── 3. Video sentiment (MediaPipe face blendshapes) ─────────── | |
| visual_sentiment = None | |
| if video_path and self._lazy_load_vision(): | |
| try: | |
| visual_sentiment = self._extract_visual_sentiment(video_path) | |
| import scipy.spatial.distance as _dist | |
| vis_vad = np.array([visual_sentiment, 0.5, 0.5]) # simplified | |
| div_tv = float(_dist.cosine(text_vad, vis_vad)) | |
| div_av = float(_dist.cosine(audio_vad, vis_vad)) | |
| conflicts["text_visual"] = round(div_tv, 4) | |
| conflicts["audio_visual"] = round(div_av, 4) | |
| final_score = max(final_score, div_tv, div_av) | |
| except Exception as e: | |
| logger.warning("[MMSA] Visual sentiment failed: %s", e) | |
| # ── 4. Deception meta-analysis ──────────────────────────────── | |
| audio_score = dissonance_result.get("audio_vad", [0.5, 0.5, 0.5])[0] # valence | |
| text_score = dissonance_result.get("text_vad", [0.5, 0.5, 0.5])[0] | |
| deception = self._detect_deception(text_score, audio_score, visual_sentiment) | |
| # ── 5. Confidence assessment ────────────────────────────────── | |
| confidence = self._calculate_confidence(transcript, audio_path, video_path) | |
| reliability = "HIGH" if confidence > 0.8 else "MEDIUM" if confidence > 0.5 else "LOW" | |
| is_dissonant = final_score > self.dissonance_threshold | |
| # ── 6. Guardian fusion ──────────────────────────────────────── | |
| guardian_risk = journey_report["score"] | |
| if deception["leakage"] or final_score > 0.6: | |
| guardian_risk = min(100.0, guardian_risk + 30.0) | |
| action_key = ( | |
| "BLOCK" if guardian_risk >= 70 | |
| else "WARN" if guardian_risk >= 40 | |
| else "RESTRICT" if is_dissonant | |
| else "ALLOW" | |
| ) | |
| result: Dict[str, Any] = { | |
| # Core scores | |
| "dissonance_score": round(final_score, 4), | |
| "audio_text_divergence": round(audio_text_div, 4), | |
| "is_dissonant": is_dissonant, | |
| "conflict_detected": dissonance_result.get("conflict_detected", False), | |
| "likely_sarcasm": dissonance_result.get("likely_sarcasm", False), | |
| # Confidence | |
| "confidence_score": round(confidence, 2), | |
| "reliability_tier": reliability, | |
| # Guardian | |
| "guardian_score": round(guardian_risk, 2), | |
| "safe_action": self.action_guidance.get( | |
| action_key, "No immediate threat detected. Stay vigilant." | |
| ), | |
| "scam_journey": journey_report, | |
| # Deception | |
| "deception_probability": deception["probability"], | |
| "emotional_leakage_detected": deception["leakage"], | |
| "analysis_tags": deception["tags"], | |
| # VAD vectors | |
| "audio_vad": audio_vad.tolist(), | |
| "text_vad": text_vad.tolist(), | |
| # Modality breakdown | |
| "modality_scores": { | |
| "text": round(text_score, 4), | |
| "audio": round(audio_score, 4), | |
| "visual": round(visual_sentiment, 4) if visual_sentiment is not None else None, | |
| }, | |
| "pairwise_conflicts": conflicts, | |
| "dominant_modality": dissonance_result.get("dominant_modality", "unknown"), | |
| "fusion_contributions": dissonance_result.get("fusion_contributions", {}), | |
| # Prosody | |
| "prosody": dissonance_result.get("prosody", {}), | |
| # Raw emotion probs | |
| "audio_emotion_probs": dissonance_result.get("audio_emotion_probs", {}), | |
| "text_emotion_probs": dissonance_result.get("text_emotion_probs", {}), | |
| # Meta | |
| "framework": "DissonanceEngine(wav2vec2+DistilBERT+VAD) + ZeroTrust Guardian", | |
| "dimensional_model_used": _USE_DIMENSIONAL and _dimensional_model and _dimensional_model._loaded, | |
| "duration_ms": int((time.time() - start) * 1000), | |
| "source_url": None, | |
| } | |
| self._update_cognitive_memory(result) | |
| logger.info( | |
| "[MMSA] Analysis done. score=%.4f sarcasm=%s deception=%.2f", | |
| final_score, result["likely_sarcasm"], deception["probability"], | |
| ) | |
| return result | |
| # ------------------------------------------------------------------ | |
| # Public: analyze URL (yt-dlp download) | |
| # ------------------------------------------------------------------ | |
| def analyze_url(self, url: str, transcript: str) -> Dict[str, Any]: | |
| if not self._lazy_load_vision(): | |
| # Vision optional; still run audio+text analysis if possible | |
| logger.warning("[MMSA] Vision unavailable; attempting audio-only URL analysis.") | |
| import tempfile, shutil | |
| from static_ffmpeg import run as ffmpeg_run | |
| tmp_dir = tempfile.mkdtemp(prefix="janus_yt_") | |
| output_template = os.path.join(tmp_dir, "media.%(ext)s") | |
| ffmpeg_bin, _ = ffmpeg_run.get_or_fetch_platform_executables_else_raise() | |
| ydl_opts = { | |
| "format": "bestvideo[height<=720]+bestaudio/best[height<=720]", | |
| "outtmpl": output_template, | |
| "merge_output_format": "mp4", | |
| "quiet": True, | |
| "no_warnings": True, | |
| "ffmpeg_location": ffmpeg_bin, | |
| } | |
| try: | |
| logger.info("[MMSA] Ingesting URL: %s", url) | |
| import yt_dlp as _ytdlp | |
| with _ytdlp.YoutubeDL(ydl_opts) as ydl: | |
| ydl.download([url]) | |
| files = os.listdir(tmp_dir) | |
| if not files: | |
| raise RuntimeError("Failed to download media from URL.") | |
| video_path = os.path.join(tmp_dir, files[0]) | |
| audio_path = os.path.join(tmp_dir, "extracted_audio.wav") | |
| os.system( | |
| f"{ffmpeg_bin} -i {video_path} -ss 00:00:00 -t 00:01:00 " | |
| f"-vn -acodec pcm_s16le -ar 16000 -ac 1 {audio_path} -y" | |
| ) | |
| # Derive stable video_id from URL | |
| import hashlib | |
| video_id = hashlib.md5(url.encode()).hexdigest()[:12] | |
| results = self.analyze(audio_path, transcript, video_path=video_path, video_id=video_id) | |
| results["source_url"] = url | |
| return results | |
| except Exception as e: | |
| logger.error("[MMSA] URL analysis failed: %s", e) | |
| return {"error": str(e)} | |
| finally: | |
| if os.path.exists(tmp_dir): | |
| shutil.rmtree(tmp_dir) | |
| # ------------------------------------------------------------------ | |
| # Internal helpers | |
| # ------------------------------------------------------------------ | |
| def _detect_deception( | |
| self, t: float, a: float, v: Optional[float] | |
| ) -> Dict[str, Any]: | |
| """Meta-analysis for deception using VAD-space emotional paradoxes.""" | |
| prob = 0.0 | |
| tags = [] | |
| leakage = False | |
| # Rule 1: Sarcasm / Irony — high valence text, low valence audio | |
| if t > 0.6 and a < 0.35: | |
| prob += 0.4 | |
| tags.append("Sarcasm/Irony Detected") | |
| # Rule 2: Emotional masking — positive face, negative voice | |
| if v is not None and v > 0.3 and a < 0.3: | |
| prob += 0.5 | |
| leakage = True | |
| tags.append("Emotional Masking (Fake Expression)") | |
| # Rule 3: High-intensity paradox across all modalities | |
| div_ta = abs(t - a) | |
| div_tv = abs(t - (v or 0.5)) | |
| if div_ta > 0.5 and div_tv > 0.5: | |
| prob += 0.3 | |
| tags.append("High-Fidelity Cognitive Dissonance") | |
| return { | |
| "probability": min(1.0, round(prob, 2)), | |
| "leakage": leakage, | |
| "tags": tags, | |
| } | |
| def _extract_visual_sentiment(self, video_path: str) -> float: | |
| """Sample up to 15 frames with MediaPipe face blendshapes → mean valence proxy.""" | |
| from mediapipe.tasks.python.vision.core.image import Image as MpImage | |
| cap = cv2.VideoCapture(video_path) | |
| sentiments = [] | |
| frame_count = 0 | |
| while cap.isOpened() and frame_count < 15: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| mp_image = MpImage( | |
| image_format=mp.ImageFormat.SRGB, | |
| data=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), | |
| ) | |
| result = self.landmarker.detect(mp_image) | |
| if result.face_blendshapes: | |
| sentiments.append(-0.4) # placeholder until blendshape → valence mapping added | |
| frame_count += 1 | |
| cap.release() | |
| return float(np.mean(sentiments)) if sentiments else 0.0 | |
| def _calculate_confidence( | |
| self, transcript: str, a_path: str, v_path: Optional[str] | |
| ) -> float: | |
| score = 1.0 | |
| words = transcript.split() | |
| if len(words) < 10: | |
| score -= 0.3 | |
| elif len(words) < 20: | |
| score -= 0.1 | |
| score -= 0.05 # baseline noise assumption | |
| if not v_path: | |
| score -= 0.1 | |
| return max(0.0, min(1.0, score)) | |
| def _update_cognitive_memory(self, result: Dict[str, Any]) -> None: | |
| try: | |
| memory: Dict = {} | |
| if os.path.exists(self.memory_path): | |
| with open(self.memory_path) as f: | |
| memory = json.load(f) | |
| source = result.get("source_url") or "local_file" | |
| if source not in memory: | |
| memory[source] = [] | |
| memory[source].append({ | |
| "timestamp": time.time(), | |
| "dissonance": result["dissonance_score"], | |
| "deception": result["deception_probability"], | |
| "sarcasm": result.get("likely_sarcasm", False), | |
| "tags": result["analysis_tags"], | |
| }) | |
| memory[source] = memory[source][-100:] | |
| with open(self.memory_path, "w") as f: | |
| json.dump(memory, f, indent=2) | |
| except Exception as e: | |
| logger.warning("[MMSA] Memory update failed: %s", e) | |
| def calibrate(self) -> Dict[str, Any]: | |
| """Proxy calibration through DissonanceEngine.""" | |
| dissonance_engine.refine_with_dataset() | |
| return { | |
| "status": "calibrated", | |
| "threshold": dissonance_engine.dissonance_threshold, | |
| "timestamp": time.time(), | |
| } | |
| # Singleton | |
| mmsa_engine = MMSADissonanceEngine() | |