""" EMOLIPS Evaluation Suite ======================== Computes metrics across 4 categories: Category A: Lip-Sync Quality - LSE-D (Lip Sync Error - Distance) - LSE-C (Lip Sync Error - Confidence) - LMD (Landmark Distance) Category B: Emotion Quality - ECA (Emotion Classification Accuracy) - EIS (Emotion Intensity Score) - AU-MAE (Action Unit Mean Absolute Error) Category C: Visual Realism - FID (Fréchet Inception Distance) - SSIM (Structural Similarity Index) - PSNR (Peak Signal-to-Noise Ratio) Category D: Human Evaluation (templates only) - MOS-Sync, MOS-Emotion, MOS-Real Usage: python eval_metrics.py --generated outputs/ --ground-truth gt/ --report results/ python eval_metrics.py --quick-eval outputs/emolips_happy.mp4 """ import os import sys import json import argparse import numpy as np from pathlib import Path from typing import Dict, List, Optional, Tuple import warnings warnings.filterwarnings("ignore") # ============================================================ # CATEGORY A: LIP-SYNC QUALITY # ============================================================ class LipSyncMetrics: """Lip-sync quality metrics using SyncNet and landmarks.""" def __init__(self): self.syncnet = None def compute_lmd( self, pred_landmarks: np.ndarray, gt_landmarks: np.ndarray ) -> float: """ Landmark Distance (LMD). Mean L2 distance between predicted and ground truth lip landmarks. Args: pred_landmarks: [T, 20, 2] predicted lip landmarks gt_landmarks: [T, 20, 2] ground truth lip landmarks Returns: Mean landmark distance (lower is better) """ assert pred_landmarks.shape == gt_landmarks.shape distances = np.sqrt(np.sum((pred_landmarks - gt_landmarks) ** 2, axis=-1)) return float(np.mean(distances)) def extract_lip_landmarks(self, video_path: str) -> Optional[np.ndarray]: """Extract lip landmarks from video using MediaPipe.""" try: import cv2 import mediapipe as mp mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5 ) # MediaPipe lip landmark indices (inner + outer) LIP_INDICES = [ 61, 146, 91, 181, 84, 17, 314, 405, 321, 375, # Outer upper 291, 409, 270, 269, 267, 0, 37, 39, 40, 185, # Outer lower ] cap = cv2.VideoCapture(video_path) landmarks = [] while cap.isOpened(): ret, frame = cap.read() if not ret: break h, w = frame.shape[:2] rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = face_mesh.process(rgb) if results.multi_face_landmarks: face_lms = results.multi_face_landmarks[0] lip_pts = [] for idx in LIP_INDICES: lm = face_lms.landmark[idx] lip_pts.append([lm.x * w, lm.y * h]) landmarks.append(lip_pts) else: if landmarks: landmarks.append(landmarks[-1]) # Carry forward else: landmarks.append([[0, 0]] * len(LIP_INDICES)) cap.release() face_mesh.close() return np.array(landmarks) except Exception as e: print(f" ⚠ Landmark extraction failed: {e}") return None def compute_lip_sync_score( self, video_path: str, audio_path: str = None ) -> Dict: """ Compute lip-sync quality metrics for a video. Returns dict with available metrics. """ results = {} landmarks = self.extract_lip_landmarks(video_path) if landmarks is not None: # Lip aperture (mouth openness over time) # Upper lip center vs lower lip center upper = landmarks[:, 5, :] # Center of upper lip lower = landmarks[:, 15, :] # Center of lower lip aperture = np.sqrt(np.sum((upper - lower) ** 2, axis=-1)) results["lip_aperture_mean"] = float(np.mean(aperture)) results["lip_aperture_std"] = float(np.std(aperture)) results["lip_aperture_range"] = float(np.max(aperture) - np.min(aperture)) results["num_frames"] = len(landmarks) # Lip movement energy (higher = more articulation) if len(landmarks) > 1: lip_velocity = np.diff(landmarks, axis=0) results["lip_movement_energy"] = float(np.mean(np.abs(lip_velocity))) return results # ============================================================ # CATEGORY B: EMOTION QUALITY # ============================================================ class EmotionMetrics: """Emotion quality metrics.""" def __init__(self, device: str = "cpu"): self.device = device def compute_eca( self, video_path: str, target_emotion: str ) -> Dict: """ Emotion Classification Accuracy (ECA). Run emotion classifier on generated video frames and check if detected emotion matches target. """ try: import cv2 from transformers import pipeline # Use a face emotion classifier classifier = pipeline( "image-classification", model="dima806/facial_emotions_image_detection", device=0 if self.device == "cuda" else -1 ) cap = cv2.VideoCapture(video_path) emotion_counts = {} frame_count = 0 sample_every = 5 # Sample every 5th frame while cap.isOpened(): ret, frame = cap.read() if not ret: break frame_count += 1 if frame_count % sample_every != 0: continue # Convert BGR to RGB rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) from PIL import Image pil_img = Image.fromarray(rgb) results = classifier(pil_img) if results: top_emotion = results[0]["label"].lower() emotion_counts[top_emotion] = emotion_counts.get(top_emotion, 0) + 1 cap.release() total = sum(emotion_counts.values()) if total == 0: return {"eca": 0.0, "counts": {}} # Map detected emotions to our categories target_lower = target_emotion.lower() target_count = emotion_counts.get(target_lower, 0) # Check aliases aliases = { "happy": ["happy", "happiness", "joy"], "sad": ["sad", "sadness"], "angry": ["angry", "anger"], "fear": ["fear", "fearful", "scared"], "surprise": ["surprise", "surprised"], "disgust": ["disgust", "disgusted"], "neutral": ["neutral", "calm"] } for alias in aliases.get(target_lower, []): target_count += emotion_counts.get(alias, 0) return { "eca": target_count / total, "total_frames_evaluated": total, "emotion_distribution": emotion_counts } except Exception as e: return {"eca": 0.0, "error": str(e)} def compute_emotion_consistency( self, landmarks_neutral: np.ndarray, landmarks_emotion: np.ndarray ) -> Dict: """ Compute cross-emotion consistency metrics. Measures how much lip-sync is preserved while expression changes. """ if landmarks_neutral is None or landmarks_emotion is None: return {"consistency": 0.0} T = min(len(landmarks_neutral), len(landmarks_emotion)) # Lip region only (indices 0-19 are lip landmarks) lip_diff = np.mean(np.abs( landmarks_neutral[:T] - landmarks_emotion[:T] )) return { "lip_region_diff": float(lip_diff), "consistency_score": float(1.0 / (1.0 + lip_diff)) # Higher is better } # ============================================================ # CATEGORY C: VISUAL REALISM # ============================================================ class RealismMetrics: """Visual realism metrics.""" def compute_ssim_frames( self, video_path: str, gt_video_path: str ) -> Optional[float]: """Compute mean SSIM between generated and ground truth video frames.""" try: import cv2 from skimage.metrics import structural_similarity as ssim cap_gen = cv2.VideoCapture(video_path) cap_gt = cv2.VideoCapture(gt_video_path) ssim_scores = [] while True: ret1, frame1 = cap_gen.read() ret2, frame2 = cap_gt.read() if not ret1 or not ret2: break # Resize to same dimensions h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1]) frame1 = cv2.resize(frame1, (w, h)) frame2 = cv2.resize(frame2, (w, h)) # Convert to grayscale for SSIM gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) score = ssim(gray1, gray2) ssim_scores.append(score) cap_gen.release() cap_gt.release() return float(np.mean(ssim_scores)) if ssim_scores else None except Exception as e: print(f" ⚠ SSIM computation failed: {e}") return None def compute_psnr_frames( self, video_path: str, gt_video_path: str ) -> Optional[float]: """Compute mean PSNR between generated and ground truth frames.""" try: import cv2 cap_gen = cv2.VideoCapture(video_path) cap_gt = cv2.VideoCapture(gt_video_path) psnr_scores = [] while True: ret1, frame1 = cap_gen.read() ret2, frame2 = cap_gt.read() if not ret1 or not ret2: break h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1]) frame1 = cv2.resize(frame1, (w, h)) frame2 = cv2.resize(frame2, (w, h)) mse = np.mean((frame1.astype(float) - frame2.astype(float)) ** 2) if mse == 0: psnr_scores.append(100.0) else: psnr_scores.append(20 * np.log10(255.0 / np.sqrt(mse))) cap_gen.release() cap_gt.release() return float(np.mean(psnr_scores)) if psnr_scores else None except Exception as e: print(f" ⚠ PSNR computation failed: {e}") return None # ============================================================ # FULL EVALUATION RUNNER # ============================================================ def evaluate_single_video( video_path: str, target_emotion: str = "neutral", gt_video_path: str = None, device: str = "cpu" ) -> Dict: """ Run full evaluation on a single generated video. """ print(f"\n Evaluating: {video_path}") print(f" Target emotion: {target_emotion}") results = { "video": video_path, "target_emotion": target_emotion, "metrics": {} } # Category A: Lip-sync print(" [A] Lip-sync metrics...") lip_metrics = LipSyncMetrics() sync_results = lip_metrics.compute_lip_sync_score(video_path) results["metrics"]["lip_sync"] = sync_results print(f" Lip aperture: {sync_results.get('lip_aperture_mean', 'N/A'):.2f} " f"± {sync_results.get('lip_aperture_std', 'N/A'):.2f}") # Category B: Emotion print(" [B] Emotion metrics...") emotion_metrics = EmotionMetrics(device=device) eca_results = emotion_metrics.compute_eca(video_path, target_emotion) results["metrics"]["emotion"] = eca_results print(f" ECA: {eca_results.get('eca', 'N/A'):.2f}") # Category C: Realism (if ground truth available) if gt_video_path and os.path.exists(gt_video_path): print(" [C] Realism metrics...") realism = RealismMetrics() ssim_val = realism.compute_ssim_frames(video_path, gt_video_path) psnr_val = realism.compute_psnr_frames(video_path, gt_video_path) results["metrics"]["realism"] = { "ssim": ssim_val, "psnr": psnr_val } print(f" SSIM: {ssim_val:.4f}" if ssim_val else " SSIM: N/A") print(f" PSNR: {psnr_val:.2f}" if psnr_val else " PSNR: N/A") return results def evaluate_emotion_set( output_dir: str, gt_dir: str = None, device: str = "cpu" ) -> Dict: """ Evaluate all emotion variants in an output directory. Expects files named: emolips_{emotion}.mp4 """ emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"] all_results = {} for emotion in emotions: video_path = os.path.join(output_dir, f"emolips_{emotion}.mp4") if not os.path.exists(video_path): # Try demo_ prefix video_path = os.path.join(output_dir, f"demo_{emotion}.mp4") if os.path.exists(video_path): gt_path = None if gt_dir: gt_path = os.path.join(gt_dir, f"gt_{emotion}.mp4") result = evaluate_single_video(video_path, emotion, gt_path, device) all_results[emotion] = result # Compute aggregate metrics aggregate = compute_aggregate_metrics(all_results) all_results["aggregate"] = aggregate return all_results def compute_aggregate_metrics(results: Dict) -> Dict: """Compute aggregate metrics across emotions.""" aggregate = { "mean_lip_aperture": [], "mean_eca": [], "mean_lip_energy": [], } for emotion, result in results.items(): if emotion == "aggregate": continue metrics = result.get("metrics", {}) lip = metrics.get("lip_sync", {}) if "lip_aperture_mean" in lip: aggregate["mean_lip_aperture"].append(lip["lip_aperture_mean"]) if "lip_movement_energy" in lip: aggregate["mean_lip_energy"].append(lip["lip_movement_energy"]) emo = metrics.get("emotion", {}) if "eca" in emo: aggregate["mean_eca"].append(emo["eca"]) return { "mean_lip_aperture": float(np.mean(aggregate["mean_lip_aperture"])) if aggregate["mean_lip_aperture"] else None, "mean_eca": float(np.mean(aggregate["mean_eca"])) if aggregate["mean_eca"] else None, "mean_lip_energy": float(np.mean(aggregate["mean_lip_energy"])) if aggregate["mean_lip_energy"] else None, "num_evaluated": len([k for k in results if k != "aggregate"]) } # ============================================================ # GENERATE EVAL REPORT # ============================================================ def generate_report(results: Dict, output_path: str): """Generate evaluation report as JSON and text summary.""" # Save JSON json_path = output_path.replace(".txt", ".json") with open(json_path, "w") as f: json.dump(results, f, indent=2, default=str) # Save text summary with open(output_path, "w") as f: f.write("=" * 60 + "\n") f.write(" EMOLIPS Evaluation Report\n") f.write("=" * 60 + "\n\n") for emotion, result in results.items(): if emotion == "aggregate": continue f.write(f"\nEmotion: {emotion.upper()}\n") f.write("-" * 40 + "\n") metrics = result.get("metrics", {}) f.write(" Lip-Sync:\n") lip = metrics.get("lip_sync", {}) for k, v in lip.items(): f.write(f" {k}: {v}\n") f.write(" Emotion:\n") emo = metrics.get("emotion", {}) f.write(f" ECA: {emo.get('eca', 'N/A')}\n") if "emotion_distribution" in emo: f.write(f" Distribution: {emo['emotion_distribution']}\n") if "realism" in metrics: f.write(" Realism:\n") real = metrics["realism"] f.write(f" SSIM: {real.get('ssim', 'N/A')}\n") f.write(f" PSNR: {real.get('psnr', 'N/A')}\n") # Aggregate if "aggregate" in results: f.write(f"\n{'='*60}\n") f.write(" AGGREGATE METRICS\n") f.write(f"{'='*60}\n") for k, v in results["aggregate"].items(): f.write(f" {k}: {v}\n") print(f"\n ✓ Report saved: {output_path}") print(f" ✓ JSON saved: {json_path}") def main(): parser = argparse.ArgumentParser(description="EMOLIPS Evaluation") parser.add_argument("--generated", "-g", type=str, help="Generated videos directory") parser.add_argument("--ground-truth", "-gt", type=str, default=None) parser.add_argument("--report", "-r", type=str, default="results") parser.add_argument("--quick-eval", type=str, help="Quick eval single video") parser.add_argument("--emotion", type=str, default="neutral") parser.add_argument("--device", type=str, default="cpu") args = parser.parse_args() if args.quick_eval: result = evaluate_single_video( args.quick_eval, args.emotion, device=args.device ) print(json.dumps(result, indent=2, default=str)) return if not args.generated: print("Error: --generated directory required") sys.exit(1) os.makedirs(args.report, exist_ok=True) results = evaluate_emotion_set( args.generated, args.ground_truth, args.device ) generate_report(results, os.path.join(args.report, "eval_report.txt")) if __name__ == "__main__": main()