Janus-backend / backend /app /services /mmsa_engine.py
DevodG's picture
feat: stable janus intelligence with kaggle distillation
5f91e0b
"""
High-Fidelity MMSA Dissonance Engine.
Orchestration layer for tri-modal (Audio + Text + Video) deception detection.
Now wired to DissonanceEngine for real wav2vec2 / DistilBERT features.
Env vars consumed by this layer:
USE_DIMENSIONAL_AUDIO_MODEL=true → audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim
USE_EMOTION2VEC=true → emotion2vec/emotion2vec_plus_large (requires funasr)
Env vars forwarded to DissonanceEngine (set them in the same .env):
AUDIO_TEMPERATURE=1.5 → softmax temperature for audio emotion calibration
DISSONANCE_WEIGHTS=0.5,0.3,0.2 → w1·audio_text + w2·prosody + w3·text_conf
"""
import os
import logging
import time
import json
from typing import Dict, List, Any, Optional
import numpy as np
from pathlib import Path
# Guardian Integrated services
from .guardian_sensory import guardian_sensory
from .scam_graph import scam_graph
# Core dissonance engine — real models, VAD space, prosody, persistence
from .dissonance_engine import dissonance_engine, _probs_to_vad
# Heavyweight optional imports — deferred
torch = None
mp = None
tasks = None
vision = None
cv2 = None
yt_dlp = None
static_ffmpeg = None
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Optional Upgrade 1: audeering dimensional model
# ---------------------------------------------------------------------------
_USE_DIMENSIONAL = os.getenv("USE_DIMENSIONAL_AUDIO_MODEL", "false").lower() == "true"
_USE_EMOTION2VEC = os.getenv("USE_EMOTION2VEC", "false").lower() == "true"
DIMENSIONAL_MODEL_NAME = "audeering/wav2vec2-large-robust-12-ft-emotion-msp-dim"
class _DimensionalAudioModel:
"""
Wraps the audeering regression model that directly outputs
[arousal, dominance, valence] floats — no VAD lookup needed.
"""
def __init__(self):
self._loaded = False
self.processor = None
self.model = None
def load(self) -> bool:
global torch
try:
import torch as _torch
torch = _torch
import torch.nn as nn
from transformers import Wav2Vec2Processor
from transformers.models.wav2vec2.modeling_wav2vec2 import (
Wav2Vec2Model,
Wav2Vec2PreTrainedModel,
)
class RegressionHead(nn.Module):
def __init__(self, config):
super().__init__()
self.dense = nn.Linear(config.hidden_size, config.hidden_size)
self.dropout = nn.Dropout(config.final_dropout)
self.out_proj = nn.Linear(config.hidden_size, config.num_labels)
def forward(self, features, **kwargs):
x = self.dropout(features)
x = self.dense(x)
x = _torch.tanh(x)
x = self.dropout(x)
return self.out_proj(x)
class EmotionModel(Wav2Vec2PreTrainedModel):
def __init__(self, config):
super().__init__(config)
self.wav2vec2 = Wav2Vec2Model(config)
self.classifier = RegressionHead(config)
self.init_weights()
def forward(self, input_values):
hidden = self.wav2vec2(input_values)[0]
hidden = _torch.mean(hidden, dim=1)
return self.classifier(hidden)
logger.info("[MMSA] Loading audeering dimensional audio model …")
self.processor = Wav2Vec2Processor.from_pretrained(DIMENSIONAL_MODEL_NAME)
self.model = EmotionModel.from_pretrained(DIMENSIONAL_MODEL_NAME)
self.model.eval()
self._loaded = True
logger.info("[MMSA] Dimensional model ready (outputs arousal/dominance/valence).")
return True
except Exception as e:
logger.warning("[MMSA] Dimensional model unavailable: %s", e)
return False
def predict_vad(self, audio_path: str) -> np.ndarray:
"""
Returns np.ndarray [valence, arousal, dominance] ∈ [0, 1].
audeering model outputs [arousal, dominance, valence] — reordered here
to match the [val, aro, dom] convention used everywhere else.
"""
import librosa
y, _ = librosa.load(audio_path, sr=16000)
inputs = self.processor(y, sampling_rate=16000, return_tensors="pt", padding=True)
with torch.no_grad():
out = self.model(inputs.input_values)
vad = torch.sigmoid(out).squeeze().numpy() # [arousal, dominance, valence]
# Reorder to [valence, arousal, dominance] for consistency with EMOTION_TO_VAD
return np.array([vad[2], vad[0], vad[1]])
_dimensional_model = _DimensionalAudioModel() if _USE_DIMENSIONAL else None
# ---------------------------------------------------------------------------
# Main engine
# ---------------------------------------------------------------------------
class MMSADissonanceEngine:
"""
Tri-modal deception detection: Text × Audio × Video.
Real features via DissonanceEngine (wav2vec2 + DistilBERT + prosody).
"""
def __init__(self):
self.model_name = "self_mm"
self._vision_loaded = False
self.landmarker = None
self.dissonance_threshold = 0.4
self.memory_path = os.path.join(
os.path.dirname(__file__), "..", "cache", "emotional_baselines.json"
)
os.makedirs(os.path.dirname(self.memory_path), exist_ok=True)
self.guardian = guardian_sensory
self.scam_memory = scam_graph
self.action_guidance = {
"BLOCK": "Immediate Risk: Block sender and do not respond. Save screenshot as evidence.",
"WARN": "Suspicious Journey: Pattern matches known scam scripts. Verify via official app.",
"RESTRICT": "Potential Fraud: Dissonance and metadata conflicts. Do not share OTP.",
}
# Load optional dimensional model at startup
if _USE_DIMENSIONAL and _dimensional_model:
_dimensional_model.load()
# ------------------------------------------------------------------
# MediaPipe face landmarker (lazy)
# ------------------------------------------------------------------
def _lazy_load_vision(self) -> bool:
global mp, tasks, vision, cv2, yt_dlp, static_ffmpeg
if self._vision_loaded:
return True
try:
import mediapipe as _mp
from mediapipe.tasks import python as _tasks
from mediapipe.tasks.python import vision as _vision
import cv2 as _cv2
import yt_dlp as _yt_dlp
import static_ffmpeg as _sffmpeg
from static_ffmpeg import run as _ffmpeg_run
mp = _mp; tasks = _tasks; vision = _vision; cv2 = _cv2
yt_dlp = _yt_dlp; static_ffmpeg = _ffmpeg_run
model_path = os.path.join(os.path.dirname(__file__), "face_landmarker.task")
if os.path.exists(model_path):
base_opts = tasks.BaseOptions(model_asset_path=model_path)
opts = vision.FaceLandmarkerOptions(
base_options=base_opts,
output_face_blendshapes=True,
output_facial_transformation_matrixes=True,
num_faces=1,
)
self.landmarker = vision.FaceLandmarker.create_from_options(opts)
self._vision_loaded = True
logger.info("[MMSA] Vision / MediaPipe sensors active.")
return True
except ImportError as e:
logger.warning("[MMSA] Vision deps missing (%s); video analysis disabled.", e)
return False
except Exception as e:
logger.error("[MMSA] Vision init failed: %s", e)
return False
# ------------------------------------------------------------------
# Public: analyze file triplet
# ------------------------------------------------------------------
def analyze(
self,
audio_path: str,
transcript: str,
video_path: Optional[str] = None,
video_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Full tri-modal deception analysis.
Real audio/text features from DissonanceEngine; MediaPipe for video.
"""
start = time.time()
# ── Guardian context ──────────────────────────────────────────
intent_signals = self.guardian._detect_signals(transcript)
entities = self.guardian._extract_entities(transcript)
journey_report = self.scam_memory.get_journey_score(entities)
self.scam_memory.add_event("call_transcript", entities, intent_signals)
# ── 1. Core cross-modal dissonance (REAL features) ────────────
try:
dissonance_result = dissonance_engine.analyze(
audio_path, transcript, video_id=video_id
)
except Exception as e:
# Dissonance engine models not loaded yet (cold start / no GPU)
logger.warning("[MMSA] DissonanceEngine unavailable: %s — falling back.", e)
dissonance_result = {
"dissonance_score": 0.0,
"audio_text_divergence": 0.0,
"conflict_detected": False,
"likely_sarcasm": False,
"audio_dominant_emotion": "unknown",
"text_dominant_emotion": "unknown",
"audio_vad": [0.5, 0.5, 0.5],
"text_vad": [0.5, 0.5, 0.5],
"prosody": {},
"fusion_contributions": {},
"dominant_modality": "unavailable",
"error": str(e),
}
final_score = dissonance_result.get("dissonance_score", 0.0)
audio_text_div = dissonance_result.get("audio_text_divergence", 0.0)
conflicts: Dict[str, float] = {"text_audio": round(audio_text_div, 4)}
# ── 2. Optional dimensional audio override ────────────────────
audio_vad = np.array(dissonance_result.get("audio_vad", [0.5, 0.5, 0.5]))
text_vad = np.array(dissonance_result.get("text_vad", [0.5, 0.5, 0.5]))
if _USE_DIMENSIONAL and _dimensional_model and _dimensional_model._loaded:
try:
dim_vad = _dimensional_model.predict_vad(audio_path)
import scipy.spatial.distance as _dist
dim_div = float(_dist.cosine(dim_vad, text_vad))
if dim_div > audio_text_div:
final_score = max(final_score, dim_div)
conflicts["dimensional_audio_text"] = round(dim_div, 4)
logger.info("[MMSA] Dimensional model raised score to %.4f", final_score)
except Exception as e:
logger.warning("[MMSA] Dimensional model inference failed: %s", e)
# ── 3. Video sentiment (MediaPipe face blendshapes) ───────────
visual_sentiment = None
if video_path and self._lazy_load_vision():
try:
visual_sentiment = self._extract_visual_sentiment(video_path)
import scipy.spatial.distance as _dist
vis_vad = np.array([visual_sentiment, 0.5, 0.5]) # simplified
div_tv = float(_dist.cosine(text_vad, vis_vad))
div_av = float(_dist.cosine(audio_vad, vis_vad))
conflicts["text_visual"] = round(div_tv, 4)
conflicts["audio_visual"] = round(div_av, 4)
final_score = max(final_score, div_tv, div_av)
except Exception as e:
logger.warning("[MMSA] Visual sentiment failed: %s", e)
# ── 4. Deception meta-analysis ────────────────────────────────
audio_score = dissonance_result.get("audio_vad", [0.5, 0.5, 0.5])[0] # valence
text_score = dissonance_result.get("text_vad", [0.5, 0.5, 0.5])[0]
deception = self._detect_deception(text_score, audio_score, visual_sentiment)
# ── 5. Confidence assessment ──────────────────────────────────
confidence = self._calculate_confidence(transcript, audio_path, video_path)
reliability = "HIGH" if confidence > 0.8 else "MEDIUM" if confidence > 0.5 else "LOW"
is_dissonant = final_score > self.dissonance_threshold
# ── 6. Guardian fusion ────────────────────────────────────────
guardian_risk = journey_report["score"]
if deception["leakage"] or final_score > 0.6:
guardian_risk = min(100.0, guardian_risk + 30.0)
action_key = (
"BLOCK" if guardian_risk >= 70
else "WARN" if guardian_risk >= 40
else "RESTRICT" if is_dissonant
else "ALLOW"
)
result: Dict[str, Any] = {
# Core scores
"dissonance_score": round(final_score, 4),
"audio_text_divergence": round(audio_text_div, 4),
"is_dissonant": is_dissonant,
"conflict_detected": dissonance_result.get("conflict_detected", False),
"likely_sarcasm": dissonance_result.get("likely_sarcasm", False),
# Confidence
"confidence_score": round(confidence, 2),
"reliability_tier": reliability,
# Guardian
"guardian_score": round(guardian_risk, 2),
"safe_action": self.action_guidance.get(
action_key, "No immediate threat detected. Stay vigilant."
),
"scam_journey": journey_report,
# Deception
"deception_probability": deception["probability"],
"emotional_leakage_detected": deception["leakage"],
"analysis_tags": deception["tags"],
# VAD vectors
"audio_vad": audio_vad.tolist(),
"text_vad": text_vad.tolist(),
# Modality breakdown
"modality_scores": {
"text": round(text_score, 4),
"audio": round(audio_score, 4),
"visual": round(visual_sentiment, 4) if visual_sentiment is not None else None,
},
"pairwise_conflicts": conflicts,
"dominant_modality": dissonance_result.get("dominant_modality", "unknown"),
"fusion_contributions": dissonance_result.get("fusion_contributions", {}),
# Prosody
"prosody": dissonance_result.get("prosody", {}),
# Raw emotion probs
"audio_emotion_probs": dissonance_result.get("audio_emotion_probs", {}),
"text_emotion_probs": dissonance_result.get("text_emotion_probs", {}),
# Meta
"framework": "DissonanceEngine(wav2vec2+DistilBERT+VAD) + ZeroTrust Guardian",
"dimensional_model_used": _USE_DIMENSIONAL and _dimensional_model and _dimensional_model._loaded,
"duration_ms": int((time.time() - start) * 1000),
"source_url": None,
}
self._update_cognitive_memory(result)
logger.info(
"[MMSA] Analysis done. score=%.4f sarcasm=%s deception=%.2f",
final_score, result["likely_sarcasm"], deception["probability"],
)
return result
# ------------------------------------------------------------------
# Public: analyze URL (yt-dlp download)
# ------------------------------------------------------------------
def analyze_url(self, url: str, transcript: str) -> Dict[str, Any]:
if not self._lazy_load_vision():
# Vision optional; still run audio+text analysis if possible
logger.warning("[MMSA] Vision unavailable; attempting audio-only URL analysis.")
import tempfile, shutil
from static_ffmpeg import run as ffmpeg_run
tmp_dir = tempfile.mkdtemp(prefix="janus_yt_")
output_template = os.path.join(tmp_dir, "media.%(ext)s")
ffmpeg_bin, _ = ffmpeg_run.get_or_fetch_platform_executables_else_raise()
ydl_opts = {
"format": "bestvideo[height<=720]+bestaudio/best[height<=720]",
"outtmpl": output_template,
"merge_output_format": "mp4",
"quiet": True,
"no_warnings": True,
"ffmpeg_location": ffmpeg_bin,
}
try:
logger.info("[MMSA] Ingesting URL: %s", url)
import yt_dlp as _ytdlp
with _ytdlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
files = os.listdir(tmp_dir)
if not files:
raise RuntimeError("Failed to download media from URL.")
video_path = os.path.join(tmp_dir, files[0])
audio_path = os.path.join(tmp_dir, "extracted_audio.wav")
os.system(
f"{ffmpeg_bin} -i {video_path} -ss 00:00:00 -t 00:01:00 "
f"-vn -acodec pcm_s16le -ar 16000 -ac 1 {audio_path} -y"
)
# Derive stable video_id from URL
import hashlib
video_id = hashlib.md5(url.encode()).hexdigest()[:12]
results = self.analyze(audio_path, transcript, video_path=video_path, video_id=video_id)
results["source_url"] = url
return results
except Exception as e:
logger.error("[MMSA] URL analysis failed: %s", e)
return {"error": str(e)}
finally:
if os.path.exists(tmp_dir):
shutil.rmtree(tmp_dir)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _detect_deception(
self, t: float, a: float, v: Optional[float]
) -> Dict[str, Any]:
"""Meta-analysis for deception using VAD-space emotional paradoxes."""
prob = 0.0
tags = []
leakage = False
# Rule 1: Sarcasm / Irony — high valence text, low valence audio
if t > 0.6 and a < 0.35:
prob += 0.4
tags.append("Sarcasm/Irony Detected")
# Rule 2: Emotional masking — positive face, negative voice
if v is not None and v > 0.3 and a < 0.3:
prob += 0.5
leakage = True
tags.append("Emotional Masking (Fake Expression)")
# Rule 3: High-intensity paradox across all modalities
div_ta = abs(t - a)
div_tv = abs(t - (v or 0.5))
if div_ta > 0.5 and div_tv > 0.5:
prob += 0.3
tags.append("High-Fidelity Cognitive Dissonance")
return {
"probability": min(1.0, round(prob, 2)),
"leakage": leakage,
"tags": tags,
}
def _extract_visual_sentiment(self, video_path: str) -> float:
"""Sample up to 15 frames with MediaPipe face blendshapes → mean valence proxy."""
from mediapipe.tasks.python.vision.core.image import Image as MpImage
cap = cv2.VideoCapture(video_path)
sentiments = []
frame_count = 0
while cap.isOpened() and frame_count < 15:
ret, frame = cap.read()
if not ret:
break
mp_image = MpImage(
image_format=mp.ImageFormat.SRGB,
data=cv2.cvtColor(frame, cv2.COLOR_BGR2RGB),
)
result = self.landmarker.detect(mp_image)
if result.face_blendshapes:
sentiments.append(-0.4) # placeholder until blendshape → valence mapping added
frame_count += 1
cap.release()
return float(np.mean(sentiments)) if sentiments else 0.0
def _calculate_confidence(
self, transcript: str, a_path: str, v_path: Optional[str]
) -> float:
score = 1.0
words = transcript.split()
if len(words) < 10:
score -= 0.3
elif len(words) < 20:
score -= 0.1
score -= 0.05 # baseline noise assumption
if not v_path:
score -= 0.1
return max(0.0, min(1.0, score))
def _update_cognitive_memory(self, result: Dict[str, Any]) -> None:
try:
memory: Dict = {}
if os.path.exists(self.memory_path):
with open(self.memory_path) as f:
memory = json.load(f)
source = result.get("source_url") or "local_file"
if source not in memory:
memory[source] = []
memory[source].append({
"timestamp": time.time(),
"dissonance": result["dissonance_score"],
"deception": result["deception_probability"],
"sarcasm": result.get("likely_sarcasm", False),
"tags": result["analysis_tags"],
})
memory[source] = memory[source][-100:]
with open(self.memory_path, "w") as f:
json.dump(memory, f, indent=2)
except Exception as e:
logger.warning("[MMSA] Memory update failed: %s", e)
def calibrate(self) -> Dict[str, Any]:
"""Proxy calibration through DissonanceEngine."""
dissonance_engine.refine_with_dataset()
return {
"status": "calibrated",
"threshold": dissonance_engine.dissonance_threshold,
"timestamp": time.time(),
}
# Singleton
mmsa_engine = MMSADissonanceEngine()