deepshield / services /video_temporal.py
ar07xd's picture
Sync from GitHub via hub-sync
fba30db verified
"""Phase 17.1 — Temporal Consistency Module.
Analyses optical flow variance, luminance flicker, and blink timing across
sampled video frames to produce a temporal_score (0–100, higher = more natural).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Tuple
import cv2
import numpy as np
from loguru import logger
@dataclass
class TemporalAnalysis:
temporal_score: float # 0–100, higher = more natural / authentic
optical_flow_variance: float # mean inter-frame flow-magnitude variance
flicker_score: float # 0–100 (high = suspicious micro-flicker)
blink_rate_anomaly: bool # True when blink timing is unnatural
blink_intervals: List[float] = field(default_factory=list)
diagnostics: dict = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Optical-flow variance
# ---------------------------------------------------------------------------
def _compute_optical_flow_variance(frames_bgr: List[np.ndarray]) -> float:
"""Mean variance of inter-frame optical-flow magnitudes.
Real videos show consistent, smooth motion; deepfake temporal inconsistencies
appear as irregular per-frame flow jumps.
"""
if len(frames_bgr) < 2:
return 0.0
flow_mags: List[float] = []
for i in range(len(frames_bgr) - 1):
prev_gray = cv2.cvtColor(frames_bgr[i], cv2.COLOR_BGR2GRAY)
curr_gray = cv2.cvtColor(frames_bgr[i + 1], cv2.COLOR_BGR2GRAY)
h, w = prev_gray.shape
scale = min(1.0, 320.0 / max(h, w, 1))
if scale < 1.0:
dsize = (max(1, int(w * scale)), max(1, int(h * scale)))
prev_gray = cv2.resize(prev_gray, dsize)
curr_gray = cv2.resize(curr_gray, dsize)
flow = cv2.calcOpticalFlowFarneback(
prev_gray, curr_gray, None,
pyr_scale=0.5, levels=3, winsize=15,
iterations=3, poly_n=5, poly_sigma=1.2, flags=0,
)
mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1])
flow_mags.append(float(np.mean(mag)))
return float(np.var(flow_mags)) if flow_mags else 0.0
# ---------------------------------------------------------------------------
# Luminance flicker
# ---------------------------------------------------------------------------
def _compute_flicker_score(frames_bgr: List[np.ndarray]) -> float:
"""Flicker score 0–100 derived from inter-frame luminance variance.
Deepfake GAN generators introduce subtle luminance micro-flicker that
manifests as high variance in the difference sequence.
"""
if len(frames_bgr) < 2:
return 0.0
mean_lums = [
float(np.mean(cv2.cvtColor(f, cv2.COLOR_BGR2GRAY)))
for f in frames_bgr
]
diffs = [abs(mean_lums[i + 1] - mean_lums[i]) for i in range(len(mean_lums) - 1)]
if not diffs:
return 0.0
mean_diff = float(np.mean(diffs))
std_diff = float(np.std(diffs))
flicker_ratio = std_diff / (mean_diff + 1e-6)
return float(min(100.0, flicker_ratio * 50.0))
# ---------------------------------------------------------------------------
# Blink-rate anomaly (FaceMesh EAR)
# ---------------------------------------------------------------------------
def _compute_blink_anomaly(
frames_bgr: List[np.ndarray],
timestamps: List[float],
) -> Tuple[bool, List[float]]:
"""Detect unnatural blink timing using MediaPipe FaceMesh eye-aspect-ratio.
Returns (anomaly_detected, blink_interval_list_seconds).
Natural blink rate: ~15–20/min → intervals ~3–4 s.
Anomalies: perfectly regular cadence (std < 0.05 s) or rate > 2/s.
"""
try:
import mediapipe as mp
mp_face_mesh = mp.solutions.face_mesh
except ImportError:
return False, []
# Landmark indices for left eye (vertical & horizontal pairs)
EYE_V = (159, 145)
EYE_H = (33, 133)
BLINK_THRESH = 0.25
ear_seq: List[Tuple[float, float]] = []
with mp_face_mesh.FaceMesh(
static_image_mode=True,
max_num_faces=1,
refine_landmarks=True,
min_detection_confidence=0.5,
) as mesh:
for frame_bgr, ts in zip(frames_bgr, timestamps):
rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
res = mesh.process(rgb)
if not (res and res.multi_face_landmarks):
continue
lm = res.multi_face_landmarks[0].landmark
h, w = frame_bgr.shape[:2]
def pt(idx: int) -> np.ndarray:
return np.array([lm[idx].x * w, lm[idx].y * h])
v = float(np.linalg.norm(pt(EYE_V[0]) - pt(EYE_V[1])))
h_dist = float(np.linalg.norm(pt(EYE_H[0]) - pt(EYE_H[1])))
ear = v / (h_dist + 1e-6)
ear_seq.append((ts, ear))
if len(ear_seq) < 3:
return False, []
blink_times: List[float] = []
in_blink = False
for ts, ear in ear_seq:
if ear < BLINK_THRESH and not in_blink:
blink_times.append(ts)
in_blink = True
elif ear >= BLINK_THRESH:
in_blink = False
if len(blink_times) < 2:
return False, []
intervals = [
round(blink_times[i + 1] - blink_times[i], 3)
for i in range(len(blink_times) - 1)
]
mean_iv = float(np.mean(intervals))
std_iv = float(np.std(intervals))
anomaly = (std_iv < 0.05 and len(intervals) > 2) or mean_iv < 0.5
return bool(anomaly), intervals
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
def compute_temporal_score(
frames_bgr: List[np.ndarray],
timestamps: List[float],
) -> TemporalAnalysis:
"""Compute temporal consistency for a list of BGR video frames.
Args:
frames_bgr: BGR numpy arrays in temporal order.
timestamps: Corresponding timestamps in seconds.
Returns:
TemporalAnalysis with temporal_score 0–100 (higher = more authentic).
"""
if len(frames_bgr) < 2:
return TemporalAnalysis(
temporal_score=50.0,
optical_flow_variance=0.0,
flicker_score=0.0,
blink_rate_anomaly=False,
diagnostics={"frames_analyzed": len(frames_bgr)},
)
flow_var = 0.0
try:
flow_var = _compute_optical_flow_variance(frames_bgr)
except Exception as exc: # noqa: BLE001
logger.warning(f"Optical flow failed: {exc}")
flicker = 0.0
try:
flicker = _compute_flicker_score(frames_bgr)
except Exception as exc: # noqa: BLE001
logger.warning(f"Flicker score failed: {exc}")
blink_anomaly, blink_intervals = False, []
try:
blink_anomaly, blink_intervals = _compute_blink_anomaly(frames_bgr, timestamps)
except Exception as exc: # noqa: BLE001
logger.warning(f"Blink rate analysis failed: {exc}")
# Score composition
# flow_var: real ~0–2; deepfake inconsistencies push higher → penalise
flow_auth = max(0.0, 100.0 - min(100.0, flow_var * 15.0))
flicker_auth = 100.0 - flicker
blink_penalty = 20.0 if blink_anomaly else 0.0
# Weights: 50% flow, 40% flicker, 10% blink
raw = 0.50 * flow_auth + 0.40 * flicker_auth + 0.10 * (100.0 - blink_penalty)
temporal_score = float(max(0.0, min(100.0, raw)))
logger.info(
f"Temporal: flow_var={flow_var:.4f} flicker={flicker:.1f} "
f"blink_anomaly={blink_anomaly} → temporal_score={temporal_score:.1f}"
)
return TemporalAnalysis(
temporal_score=round(temporal_score, 2),
optical_flow_variance=round(flow_var, 4),
flicker_score=round(flicker, 2),
blink_rate_anomaly=blink_anomaly,
blink_intervals=blink_intervals,
diagnostics={
"flow_component": round(flow_auth, 1),
"flicker_component": round(flicker_auth, 1),
"frames_analyzed": len(frames_bgr),
},
)