Spaces:
Sleeping
Sleeping
| """Phase 17.1 — Temporal Consistency Module. | |
| Analyses optical flow variance, luminance flicker, and blink timing across | |
| sampled video frames to produce a temporal_score (0–100, higher = more natural). | |
| """ | |
| from __future__ import annotations | |
| from dataclasses import dataclass, field | |
| from typing import List, Tuple | |
| import cv2 | |
| import numpy as np | |
| from loguru import logger | |
| class TemporalAnalysis: | |
| temporal_score: float # 0–100, higher = more natural / authentic | |
| optical_flow_variance: float # mean inter-frame flow-magnitude variance | |
| flicker_score: float # 0–100 (high = suspicious micro-flicker) | |
| blink_rate_anomaly: bool # True when blink timing is unnatural | |
| blink_intervals: List[float] = field(default_factory=list) | |
| diagnostics: dict = field(default_factory=dict) | |
| # --------------------------------------------------------------------------- | |
| # Optical-flow variance | |
| # --------------------------------------------------------------------------- | |
| def _compute_optical_flow_variance(frames_bgr: List[np.ndarray]) -> float: | |
| """Mean variance of inter-frame optical-flow magnitudes. | |
| Real videos show consistent, smooth motion; deepfake temporal inconsistencies | |
| appear as irregular per-frame flow jumps. | |
| """ | |
| if len(frames_bgr) < 2: | |
| return 0.0 | |
| flow_mags: List[float] = [] | |
| for i in range(len(frames_bgr) - 1): | |
| prev_gray = cv2.cvtColor(frames_bgr[i], cv2.COLOR_BGR2GRAY) | |
| curr_gray = cv2.cvtColor(frames_bgr[i + 1], cv2.COLOR_BGR2GRAY) | |
| h, w = prev_gray.shape | |
| scale = min(1.0, 320.0 / max(h, w, 1)) | |
| if scale < 1.0: | |
| dsize = (max(1, int(w * scale)), max(1, int(h * scale))) | |
| prev_gray = cv2.resize(prev_gray, dsize) | |
| curr_gray = cv2.resize(curr_gray, dsize) | |
| flow = cv2.calcOpticalFlowFarneback( | |
| prev_gray, curr_gray, None, | |
| pyr_scale=0.5, levels=3, winsize=15, | |
| iterations=3, poly_n=5, poly_sigma=1.2, flags=0, | |
| ) | |
| mag, _ = cv2.cartToPolar(flow[..., 0], flow[..., 1]) | |
| flow_mags.append(float(np.mean(mag))) | |
| return float(np.var(flow_mags)) if flow_mags else 0.0 | |
| # --------------------------------------------------------------------------- | |
| # Luminance flicker | |
| # --------------------------------------------------------------------------- | |
| def _compute_flicker_score(frames_bgr: List[np.ndarray]) -> float: | |
| """Flicker score 0–100 derived from inter-frame luminance variance. | |
| Deepfake GAN generators introduce subtle luminance micro-flicker that | |
| manifests as high variance in the difference sequence. | |
| """ | |
| if len(frames_bgr) < 2: | |
| return 0.0 | |
| mean_lums = [ | |
| float(np.mean(cv2.cvtColor(f, cv2.COLOR_BGR2GRAY))) | |
| for f in frames_bgr | |
| ] | |
| diffs = [abs(mean_lums[i + 1] - mean_lums[i]) for i in range(len(mean_lums) - 1)] | |
| if not diffs: | |
| return 0.0 | |
| mean_diff = float(np.mean(diffs)) | |
| std_diff = float(np.std(diffs)) | |
| flicker_ratio = std_diff / (mean_diff + 1e-6) | |
| return float(min(100.0, flicker_ratio * 50.0)) | |
| # --------------------------------------------------------------------------- | |
| # Blink-rate anomaly (FaceMesh EAR) | |
| # --------------------------------------------------------------------------- | |
| def _compute_blink_anomaly( | |
| frames_bgr: List[np.ndarray], | |
| timestamps: List[float], | |
| ) -> Tuple[bool, List[float]]: | |
| """Detect unnatural blink timing using MediaPipe FaceMesh eye-aspect-ratio. | |
| Returns (anomaly_detected, blink_interval_list_seconds). | |
| Natural blink rate: ~15–20/min → intervals ~3–4 s. | |
| Anomalies: perfectly regular cadence (std < 0.05 s) or rate > 2/s. | |
| """ | |
| try: | |
| import mediapipe as mp | |
| mp_face_mesh = mp.solutions.face_mesh | |
| except ImportError: | |
| return False, [] | |
| # Landmark indices for left eye (vertical & horizontal pairs) | |
| EYE_V = (159, 145) | |
| EYE_H = (33, 133) | |
| BLINK_THRESH = 0.25 | |
| ear_seq: List[Tuple[float, float]] = [] | |
| with mp_face_mesh.FaceMesh( | |
| static_image_mode=True, | |
| max_num_faces=1, | |
| refine_landmarks=True, | |
| min_detection_confidence=0.5, | |
| ) as mesh: | |
| for frame_bgr, ts in zip(frames_bgr, timestamps): | |
| rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) | |
| res = mesh.process(rgb) | |
| if not (res and res.multi_face_landmarks): | |
| continue | |
| lm = res.multi_face_landmarks[0].landmark | |
| h, w = frame_bgr.shape[:2] | |
| def pt(idx: int) -> np.ndarray: | |
| return np.array([lm[idx].x * w, lm[idx].y * h]) | |
| v = float(np.linalg.norm(pt(EYE_V[0]) - pt(EYE_V[1]))) | |
| h_dist = float(np.linalg.norm(pt(EYE_H[0]) - pt(EYE_H[1]))) | |
| ear = v / (h_dist + 1e-6) | |
| ear_seq.append((ts, ear)) | |
| if len(ear_seq) < 3: | |
| return False, [] | |
| blink_times: List[float] = [] | |
| in_blink = False | |
| for ts, ear in ear_seq: | |
| if ear < BLINK_THRESH and not in_blink: | |
| blink_times.append(ts) | |
| in_blink = True | |
| elif ear >= BLINK_THRESH: | |
| in_blink = False | |
| if len(blink_times) < 2: | |
| return False, [] | |
| intervals = [ | |
| round(blink_times[i + 1] - blink_times[i], 3) | |
| for i in range(len(blink_times) - 1) | |
| ] | |
| mean_iv = float(np.mean(intervals)) | |
| std_iv = float(np.std(intervals)) | |
| anomaly = (std_iv < 0.05 and len(intervals) > 2) or mean_iv < 0.5 | |
| return bool(anomaly), intervals | |
| # --------------------------------------------------------------------------- | |
| # Public API | |
| # --------------------------------------------------------------------------- | |
| def compute_temporal_score( | |
| frames_bgr: List[np.ndarray], | |
| timestamps: List[float], | |
| ) -> TemporalAnalysis: | |
| """Compute temporal consistency for a list of BGR video frames. | |
| Args: | |
| frames_bgr: BGR numpy arrays in temporal order. | |
| timestamps: Corresponding timestamps in seconds. | |
| Returns: | |
| TemporalAnalysis with temporal_score 0–100 (higher = more authentic). | |
| """ | |
| if len(frames_bgr) < 2: | |
| return TemporalAnalysis( | |
| temporal_score=50.0, | |
| optical_flow_variance=0.0, | |
| flicker_score=0.0, | |
| blink_rate_anomaly=False, | |
| diagnostics={"frames_analyzed": len(frames_bgr)}, | |
| ) | |
| flow_var = 0.0 | |
| try: | |
| flow_var = _compute_optical_flow_variance(frames_bgr) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.warning(f"Optical flow failed: {exc}") | |
| flicker = 0.0 | |
| try: | |
| flicker = _compute_flicker_score(frames_bgr) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.warning(f"Flicker score failed: {exc}") | |
| blink_anomaly, blink_intervals = False, [] | |
| try: | |
| blink_anomaly, blink_intervals = _compute_blink_anomaly(frames_bgr, timestamps) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.warning(f"Blink rate analysis failed: {exc}") | |
| # Score composition | |
| # flow_var: real ~0–2; deepfake inconsistencies push higher → penalise | |
| flow_auth = max(0.0, 100.0 - min(100.0, flow_var * 15.0)) | |
| flicker_auth = 100.0 - flicker | |
| blink_penalty = 20.0 if blink_anomaly else 0.0 | |
| # Weights: 50% flow, 40% flicker, 10% blink | |
| raw = 0.50 * flow_auth + 0.40 * flicker_auth + 0.10 * (100.0 - blink_penalty) | |
| temporal_score = float(max(0.0, min(100.0, raw))) | |
| logger.info( | |
| f"Temporal: flow_var={flow_var:.4f} flicker={flicker:.1f} " | |
| f"blink_anomaly={blink_anomaly} → temporal_score={temporal_score:.1f}" | |
| ) | |
| return TemporalAnalysis( | |
| temporal_score=round(temporal_score, 2), | |
| optical_flow_variance=round(flow_var, 4), | |
| flicker_score=round(flicker, 2), | |
| blink_rate_anomaly=blink_anomaly, | |
| blink_intervals=blink_intervals, | |
| diagnostics={ | |
| "flow_component": round(flow_auth, 1), | |
| "flicker_component": round(flicker_auth, 1), | |
| "frames_analyzed": len(frames_bgr), | |
| }, | |
| ) | |