| """ |
| EMOLIPS Evaluation Suite |
| ======================== |
| Computes metrics across 4 categories: |
| |
| Category A: Lip-Sync Quality |
| - LSE-D (Lip Sync Error - Distance) |
| - LSE-C (Lip Sync Error - Confidence) |
| - LMD (Landmark Distance) |
| |
| Category B: Emotion Quality |
| - ECA (Emotion Classification Accuracy) |
| - EIS (Emotion Intensity Score) |
| - AU-MAE (Action Unit Mean Absolute Error) |
| |
| Category C: Visual Realism |
| - FID (Fréchet Inception Distance) |
| - SSIM (Structural Similarity Index) |
| - PSNR (Peak Signal-to-Noise Ratio) |
| |
| Category D: Human Evaluation (templates only) |
| - MOS-Sync, MOS-Emotion, MOS-Real |
| |
| Usage: |
| python eval_metrics.py --generated outputs/ --ground-truth gt/ --report results/ |
| python eval_metrics.py --quick-eval outputs/emolips_happy.mp4 |
| """ |
|
|
| import os |
| import sys |
| import json |
| import argparse |
| import numpy as np |
| from pathlib import Path |
| from typing import Dict, List, Optional, Tuple |
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
|
|
| |
| |
| |
|
|
| class LipSyncMetrics: |
| """Lip-sync quality metrics using SyncNet and landmarks.""" |
|
|
| def __init__(self): |
| self.syncnet = None |
|
|
| def compute_lmd( |
| self, |
| pred_landmarks: np.ndarray, |
| gt_landmarks: np.ndarray |
| ) -> float: |
| """ |
| Landmark Distance (LMD). |
| Mean L2 distance between predicted and ground truth lip landmarks. |
| |
| Args: |
| pred_landmarks: [T, 20, 2] predicted lip landmarks |
| gt_landmarks: [T, 20, 2] ground truth lip landmarks |
| |
| Returns: |
| Mean landmark distance (lower is better) |
| """ |
| assert pred_landmarks.shape == gt_landmarks.shape |
| distances = np.sqrt(np.sum((pred_landmarks - gt_landmarks) ** 2, axis=-1)) |
| return float(np.mean(distances)) |
|
|
| def extract_lip_landmarks(self, video_path: str) -> Optional[np.ndarray]: |
| """Extract lip landmarks from video using MediaPipe.""" |
| try: |
| import cv2 |
| import mediapipe as mp |
|
|
| mp_face_mesh = mp.solutions.face_mesh |
| face_mesh = mp_face_mesh.FaceMesh( |
| static_image_mode=False, |
| max_num_faces=1, |
| min_detection_confidence=0.5 |
| ) |
|
|
| |
| LIP_INDICES = [ |
| 61, 146, 91, 181, 84, 17, 314, 405, 321, 375, |
| 291, 409, 270, 269, 267, 0, 37, 39, 40, 185, |
| ] |
|
|
| cap = cv2.VideoCapture(video_path) |
| landmarks = [] |
|
|
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| h, w = frame.shape[:2] |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| results = face_mesh.process(rgb) |
|
|
| if results.multi_face_landmarks: |
| face_lms = results.multi_face_landmarks[0] |
| lip_pts = [] |
| for idx in LIP_INDICES: |
| lm = face_lms.landmark[idx] |
| lip_pts.append([lm.x * w, lm.y * h]) |
| landmarks.append(lip_pts) |
| else: |
| if landmarks: |
| landmarks.append(landmarks[-1]) |
| else: |
| landmarks.append([[0, 0]] * len(LIP_INDICES)) |
|
|
| cap.release() |
| face_mesh.close() |
|
|
| return np.array(landmarks) |
|
|
| except Exception as e: |
| print(f" ⚠ Landmark extraction failed: {e}") |
| return None |
|
|
| def compute_lip_sync_score( |
| self, |
| video_path: str, |
| audio_path: str = None |
| ) -> Dict: |
| """ |
| Compute lip-sync quality metrics for a video. |
| |
| Returns dict with available metrics. |
| """ |
| results = {} |
|
|
| landmarks = self.extract_lip_landmarks(video_path) |
| if landmarks is not None: |
| |
| |
| upper = landmarks[:, 5, :] |
| lower = landmarks[:, 15, :] |
| aperture = np.sqrt(np.sum((upper - lower) ** 2, axis=-1)) |
|
|
| results["lip_aperture_mean"] = float(np.mean(aperture)) |
| results["lip_aperture_std"] = float(np.std(aperture)) |
| results["lip_aperture_range"] = float(np.max(aperture) - np.min(aperture)) |
| results["num_frames"] = len(landmarks) |
|
|
| |
| if len(landmarks) > 1: |
| lip_velocity = np.diff(landmarks, axis=0) |
| results["lip_movement_energy"] = float(np.mean(np.abs(lip_velocity))) |
|
|
| return results |
|
|
|
|
| |
| |
| |
|
|
| class EmotionMetrics: |
| """Emotion quality metrics.""" |
|
|
| def __init__(self, device: str = "cpu"): |
| self.device = device |
|
|
| def compute_eca( |
| self, |
| video_path: str, |
| target_emotion: str |
| ) -> Dict: |
| """ |
| Emotion Classification Accuracy (ECA). |
| Run emotion classifier on generated video frames and check |
| if detected emotion matches target. |
| """ |
| try: |
| import cv2 |
| from transformers import pipeline |
|
|
| |
| classifier = pipeline( |
| "image-classification", |
| model="dima806/facial_emotions_image_detection", |
| device=0 if self.device == "cuda" else -1 |
| ) |
|
|
| cap = cv2.VideoCapture(video_path) |
| emotion_counts = {} |
| frame_count = 0 |
| sample_every = 5 |
|
|
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| frame_count += 1 |
| if frame_count % sample_every != 0: |
| continue |
|
|
| |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| from PIL import Image |
| pil_img = Image.fromarray(rgb) |
|
|
| results = classifier(pil_img) |
| if results: |
| top_emotion = results[0]["label"].lower() |
| emotion_counts[top_emotion] = emotion_counts.get(top_emotion, 0) + 1 |
|
|
| cap.release() |
|
|
| total = sum(emotion_counts.values()) |
| if total == 0: |
| return {"eca": 0.0, "counts": {}} |
|
|
| |
| target_lower = target_emotion.lower() |
| target_count = emotion_counts.get(target_lower, 0) |
| |
| aliases = { |
| "happy": ["happy", "happiness", "joy"], |
| "sad": ["sad", "sadness"], |
| "angry": ["angry", "anger"], |
| "fear": ["fear", "fearful", "scared"], |
| "surprise": ["surprise", "surprised"], |
| "disgust": ["disgust", "disgusted"], |
| "neutral": ["neutral", "calm"] |
| } |
| for alias in aliases.get(target_lower, []): |
| target_count += emotion_counts.get(alias, 0) |
|
|
| return { |
| "eca": target_count / total, |
| "total_frames_evaluated": total, |
| "emotion_distribution": emotion_counts |
| } |
|
|
| except Exception as e: |
| return {"eca": 0.0, "error": str(e)} |
|
|
| def compute_emotion_consistency( |
| self, |
| landmarks_neutral: np.ndarray, |
| landmarks_emotion: np.ndarray |
| ) -> Dict: |
| """ |
| Compute cross-emotion consistency metrics. |
| Measures how much lip-sync is preserved while expression changes. |
| """ |
| if landmarks_neutral is None or landmarks_emotion is None: |
| return {"consistency": 0.0} |
|
|
| T = min(len(landmarks_neutral), len(landmarks_emotion)) |
|
|
| |
| lip_diff = np.mean(np.abs( |
| landmarks_neutral[:T] - landmarks_emotion[:T] |
| )) |
|
|
| return { |
| "lip_region_diff": float(lip_diff), |
| "consistency_score": float(1.0 / (1.0 + lip_diff)) |
| } |
|
|
|
|
| |
| |
| |
|
|
| class RealismMetrics: |
| """Visual realism metrics.""" |
|
|
| def compute_ssim_frames( |
| self, |
| video_path: str, |
| gt_video_path: str |
| ) -> Optional[float]: |
| """Compute mean SSIM between generated and ground truth video frames.""" |
| try: |
| import cv2 |
| from skimage.metrics import structural_similarity as ssim |
|
|
| cap_gen = cv2.VideoCapture(video_path) |
| cap_gt = cv2.VideoCapture(gt_video_path) |
|
|
| ssim_scores = [] |
|
|
| while True: |
| ret1, frame1 = cap_gen.read() |
| ret2, frame2 = cap_gt.read() |
| if not ret1 or not ret2: |
| break |
|
|
| |
| h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1]) |
| frame1 = cv2.resize(frame1, (w, h)) |
| frame2 = cv2.resize(frame2, (w, h)) |
|
|
| |
| gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) |
| gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) |
|
|
| score = ssim(gray1, gray2) |
| ssim_scores.append(score) |
|
|
| cap_gen.release() |
| cap_gt.release() |
|
|
| return float(np.mean(ssim_scores)) if ssim_scores else None |
|
|
| except Exception as e: |
| print(f" ⚠ SSIM computation failed: {e}") |
| return None |
|
|
| def compute_psnr_frames( |
| self, |
| video_path: str, |
| gt_video_path: str |
| ) -> Optional[float]: |
| """Compute mean PSNR between generated and ground truth frames.""" |
| try: |
| import cv2 |
|
|
| cap_gen = cv2.VideoCapture(video_path) |
| cap_gt = cv2.VideoCapture(gt_video_path) |
|
|
| psnr_scores = [] |
|
|
| while True: |
| ret1, frame1 = cap_gen.read() |
| ret2, frame2 = cap_gt.read() |
| if not ret1 or not ret2: |
| break |
|
|
| h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1]) |
| frame1 = cv2.resize(frame1, (w, h)) |
| frame2 = cv2.resize(frame2, (w, h)) |
|
|
| mse = np.mean((frame1.astype(float) - frame2.astype(float)) ** 2) |
| if mse == 0: |
| psnr_scores.append(100.0) |
| else: |
| psnr_scores.append(20 * np.log10(255.0 / np.sqrt(mse))) |
|
|
| cap_gen.release() |
| cap_gt.release() |
|
|
| return float(np.mean(psnr_scores)) if psnr_scores else None |
|
|
| except Exception as e: |
| print(f" ⚠ PSNR computation failed: {e}") |
| return None |
|
|
|
|
| |
| |
| |
|
|
| def evaluate_single_video( |
| video_path: str, |
| target_emotion: str = "neutral", |
| gt_video_path: str = None, |
| device: str = "cpu" |
| ) -> Dict: |
| """ |
| Run full evaluation on a single generated video. |
| """ |
| print(f"\n Evaluating: {video_path}") |
| print(f" Target emotion: {target_emotion}") |
|
|
| results = { |
| "video": video_path, |
| "target_emotion": target_emotion, |
| "metrics": {} |
| } |
|
|
| |
| print(" [A] Lip-sync metrics...") |
| lip_metrics = LipSyncMetrics() |
| sync_results = lip_metrics.compute_lip_sync_score(video_path) |
| results["metrics"]["lip_sync"] = sync_results |
| print(f" Lip aperture: {sync_results.get('lip_aperture_mean', 'N/A'):.2f} " |
| f"± {sync_results.get('lip_aperture_std', 'N/A'):.2f}") |
|
|
| |
| print(" [B] Emotion metrics...") |
| emotion_metrics = EmotionMetrics(device=device) |
| eca_results = emotion_metrics.compute_eca(video_path, target_emotion) |
| results["metrics"]["emotion"] = eca_results |
| print(f" ECA: {eca_results.get('eca', 'N/A'):.2f}") |
|
|
| |
| if gt_video_path and os.path.exists(gt_video_path): |
| print(" [C] Realism metrics...") |
| realism = RealismMetrics() |
|
|
| ssim_val = realism.compute_ssim_frames(video_path, gt_video_path) |
| psnr_val = realism.compute_psnr_frames(video_path, gt_video_path) |
|
|
| results["metrics"]["realism"] = { |
| "ssim": ssim_val, |
| "psnr": psnr_val |
| } |
| print(f" SSIM: {ssim_val:.4f}" if ssim_val else " SSIM: N/A") |
| print(f" PSNR: {psnr_val:.2f}" if psnr_val else " PSNR: N/A") |
|
|
| return results |
|
|
|
|
| def evaluate_emotion_set( |
| output_dir: str, |
| gt_dir: str = None, |
| device: str = "cpu" |
| ) -> Dict: |
| """ |
| Evaluate all emotion variants in an output directory. |
| Expects files named: emolips_{emotion}.mp4 |
| """ |
| emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"] |
| all_results = {} |
|
|
| for emotion in emotions: |
| video_path = os.path.join(output_dir, f"emolips_{emotion}.mp4") |
| if not os.path.exists(video_path): |
| |
| video_path = os.path.join(output_dir, f"demo_{emotion}.mp4") |
|
|
| if os.path.exists(video_path): |
| gt_path = None |
| if gt_dir: |
| gt_path = os.path.join(gt_dir, f"gt_{emotion}.mp4") |
|
|
| result = evaluate_single_video(video_path, emotion, gt_path, device) |
| all_results[emotion] = result |
|
|
| |
| aggregate = compute_aggregate_metrics(all_results) |
| all_results["aggregate"] = aggregate |
|
|
| return all_results |
|
|
|
|
| def compute_aggregate_metrics(results: Dict) -> Dict: |
| """Compute aggregate metrics across emotions.""" |
| aggregate = { |
| "mean_lip_aperture": [], |
| "mean_eca": [], |
| "mean_lip_energy": [], |
| } |
|
|
| for emotion, result in results.items(): |
| if emotion == "aggregate": |
| continue |
| metrics = result.get("metrics", {}) |
|
|
| lip = metrics.get("lip_sync", {}) |
| if "lip_aperture_mean" in lip: |
| aggregate["mean_lip_aperture"].append(lip["lip_aperture_mean"]) |
| if "lip_movement_energy" in lip: |
| aggregate["mean_lip_energy"].append(lip["lip_movement_energy"]) |
|
|
| emo = metrics.get("emotion", {}) |
| if "eca" in emo: |
| aggregate["mean_eca"].append(emo["eca"]) |
|
|
| return { |
| "mean_lip_aperture": float(np.mean(aggregate["mean_lip_aperture"])) |
| if aggregate["mean_lip_aperture"] else None, |
| "mean_eca": float(np.mean(aggregate["mean_eca"])) |
| if aggregate["mean_eca"] else None, |
| "mean_lip_energy": float(np.mean(aggregate["mean_lip_energy"])) |
| if aggregate["mean_lip_energy"] else None, |
| "num_evaluated": len([k for k in results if k != "aggregate"]) |
| } |
|
|
|
|
| |
| |
| |
|
|
| def generate_report(results: Dict, output_path: str): |
| """Generate evaluation report as JSON and text summary.""" |
| |
| json_path = output_path.replace(".txt", ".json") |
| with open(json_path, "w") as f: |
| json.dump(results, f, indent=2, default=str) |
|
|
| |
| with open(output_path, "w") as f: |
| f.write("=" * 60 + "\n") |
| f.write(" EMOLIPS Evaluation Report\n") |
| f.write("=" * 60 + "\n\n") |
|
|
| for emotion, result in results.items(): |
| if emotion == "aggregate": |
| continue |
| f.write(f"\nEmotion: {emotion.upper()}\n") |
| f.write("-" * 40 + "\n") |
|
|
| metrics = result.get("metrics", {}) |
|
|
| f.write(" Lip-Sync:\n") |
| lip = metrics.get("lip_sync", {}) |
| for k, v in lip.items(): |
| f.write(f" {k}: {v}\n") |
|
|
| f.write(" Emotion:\n") |
| emo = metrics.get("emotion", {}) |
| f.write(f" ECA: {emo.get('eca', 'N/A')}\n") |
| if "emotion_distribution" in emo: |
| f.write(f" Distribution: {emo['emotion_distribution']}\n") |
|
|
| if "realism" in metrics: |
| f.write(" Realism:\n") |
| real = metrics["realism"] |
| f.write(f" SSIM: {real.get('ssim', 'N/A')}\n") |
| f.write(f" PSNR: {real.get('psnr', 'N/A')}\n") |
|
|
| |
| if "aggregate" in results: |
| f.write(f"\n{'='*60}\n") |
| f.write(" AGGREGATE METRICS\n") |
| f.write(f"{'='*60}\n") |
| for k, v in results["aggregate"].items(): |
| f.write(f" {k}: {v}\n") |
|
|
| print(f"\n ✓ Report saved: {output_path}") |
| print(f" ✓ JSON saved: {json_path}") |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="EMOLIPS Evaluation") |
| parser.add_argument("--generated", "-g", type=str, help="Generated videos directory") |
| parser.add_argument("--ground-truth", "-gt", type=str, default=None) |
| parser.add_argument("--report", "-r", type=str, default="results") |
| parser.add_argument("--quick-eval", type=str, help="Quick eval single video") |
| parser.add_argument("--emotion", type=str, default="neutral") |
| parser.add_argument("--device", type=str, default="cpu") |
|
|
| args = parser.parse_args() |
|
|
| if args.quick_eval: |
| result = evaluate_single_video( |
| args.quick_eval, args.emotion, device=args.device |
| ) |
| print(json.dumps(result, indent=2, default=str)) |
| return |
|
|
| if not args.generated: |
| print("Error: --generated directory required") |
| sys.exit(1) |
|
|
| os.makedirs(args.report, exist_ok=True) |
|
|
| results = evaluate_emotion_set( |
| args.generated, |
| args.ground_truth, |
| args.device |
| ) |
|
|
| generate_report(results, os.path.join(args.report, "eval_report.txt")) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|