emolips / code /eval_metrics.py
primal-sage's picture
Upload code/eval_metrics.py with huggingface_hub
ea40621 verified
"""
EMOLIPS Evaluation Suite
========================
Computes metrics across 4 categories:
Category A: Lip-Sync Quality
- LSE-D (Lip Sync Error - Distance)
- LSE-C (Lip Sync Error - Confidence)
- LMD (Landmark Distance)
Category B: Emotion Quality
- ECA (Emotion Classification Accuracy)
- EIS (Emotion Intensity Score)
- AU-MAE (Action Unit Mean Absolute Error)
Category C: Visual Realism
- FID (Fréchet Inception Distance)
- SSIM (Structural Similarity Index)
- PSNR (Peak Signal-to-Noise Ratio)
Category D: Human Evaluation (templates only)
- MOS-Sync, MOS-Emotion, MOS-Real
Usage:
python eval_metrics.py --generated outputs/ --ground-truth gt/ --report results/
python eval_metrics.py --quick-eval outputs/emolips_happy.mp4
"""
import os
import sys
import json
import argparse
import numpy as np
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import warnings
warnings.filterwarnings("ignore")
# ============================================================
# CATEGORY A: LIP-SYNC QUALITY
# ============================================================
class LipSyncMetrics:
"""Lip-sync quality metrics using SyncNet and landmarks."""
def __init__(self):
self.syncnet = None
def compute_lmd(
self,
pred_landmarks: np.ndarray,
gt_landmarks: np.ndarray
) -> float:
"""
Landmark Distance (LMD).
Mean L2 distance between predicted and ground truth lip landmarks.
Args:
pred_landmarks: [T, 20, 2] predicted lip landmarks
gt_landmarks: [T, 20, 2] ground truth lip landmarks
Returns:
Mean landmark distance (lower is better)
"""
assert pred_landmarks.shape == gt_landmarks.shape
distances = np.sqrt(np.sum((pred_landmarks - gt_landmarks) ** 2, axis=-1))
return float(np.mean(distances))
def extract_lip_landmarks(self, video_path: str) -> Optional[np.ndarray]:
"""Extract lip landmarks from video using MediaPipe."""
try:
import cv2
import mediapipe as mp
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
min_detection_confidence=0.5
)
# MediaPipe lip landmark indices (inner + outer)
LIP_INDICES = [
61, 146, 91, 181, 84, 17, 314, 405, 321, 375, # Outer upper
291, 409, 270, 269, 267, 0, 37, 39, 40, 185, # Outer lower
]
cap = cv2.VideoCapture(video_path)
landmarks = []
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
h, w = frame.shape[:2]
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb)
if results.multi_face_landmarks:
face_lms = results.multi_face_landmarks[0]
lip_pts = []
for idx in LIP_INDICES:
lm = face_lms.landmark[idx]
lip_pts.append([lm.x * w, lm.y * h])
landmarks.append(lip_pts)
else:
if landmarks:
landmarks.append(landmarks[-1]) # Carry forward
else:
landmarks.append([[0, 0]] * len(LIP_INDICES))
cap.release()
face_mesh.close()
return np.array(landmarks)
except Exception as e:
print(f" ⚠ Landmark extraction failed: {e}")
return None
def compute_lip_sync_score(
self,
video_path: str,
audio_path: str = None
) -> Dict:
"""
Compute lip-sync quality metrics for a video.
Returns dict with available metrics.
"""
results = {}
landmarks = self.extract_lip_landmarks(video_path)
if landmarks is not None:
# Lip aperture (mouth openness over time)
# Upper lip center vs lower lip center
upper = landmarks[:, 5, :] # Center of upper lip
lower = landmarks[:, 15, :] # Center of lower lip
aperture = np.sqrt(np.sum((upper - lower) ** 2, axis=-1))
results["lip_aperture_mean"] = float(np.mean(aperture))
results["lip_aperture_std"] = float(np.std(aperture))
results["lip_aperture_range"] = float(np.max(aperture) - np.min(aperture))
results["num_frames"] = len(landmarks)
# Lip movement energy (higher = more articulation)
if len(landmarks) > 1:
lip_velocity = np.diff(landmarks, axis=0)
results["lip_movement_energy"] = float(np.mean(np.abs(lip_velocity)))
return results
# ============================================================
# CATEGORY B: EMOTION QUALITY
# ============================================================
class EmotionMetrics:
"""Emotion quality metrics."""
def __init__(self, device: str = "cpu"):
self.device = device
def compute_eca(
self,
video_path: str,
target_emotion: str
) -> Dict:
"""
Emotion Classification Accuracy (ECA).
Run emotion classifier on generated video frames and check
if detected emotion matches target.
"""
try:
import cv2
from transformers import pipeline
# Use a face emotion classifier
classifier = pipeline(
"image-classification",
model="dima806/facial_emotions_image_detection",
device=0 if self.device == "cuda" else -1
)
cap = cv2.VideoCapture(video_path)
emotion_counts = {}
frame_count = 0
sample_every = 5 # Sample every 5th frame
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
frame_count += 1
if frame_count % sample_every != 0:
continue
# Convert BGR to RGB
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
from PIL import Image
pil_img = Image.fromarray(rgb)
results = classifier(pil_img)
if results:
top_emotion = results[0]["label"].lower()
emotion_counts[top_emotion] = emotion_counts.get(top_emotion, 0) + 1
cap.release()
total = sum(emotion_counts.values())
if total == 0:
return {"eca": 0.0, "counts": {}}
# Map detected emotions to our categories
target_lower = target_emotion.lower()
target_count = emotion_counts.get(target_lower, 0)
# Check aliases
aliases = {
"happy": ["happy", "happiness", "joy"],
"sad": ["sad", "sadness"],
"angry": ["angry", "anger"],
"fear": ["fear", "fearful", "scared"],
"surprise": ["surprise", "surprised"],
"disgust": ["disgust", "disgusted"],
"neutral": ["neutral", "calm"]
}
for alias in aliases.get(target_lower, []):
target_count += emotion_counts.get(alias, 0)
return {
"eca": target_count / total,
"total_frames_evaluated": total,
"emotion_distribution": emotion_counts
}
except Exception as e:
return {"eca": 0.0, "error": str(e)}
def compute_emotion_consistency(
self,
landmarks_neutral: np.ndarray,
landmarks_emotion: np.ndarray
) -> Dict:
"""
Compute cross-emotion consistency metrics.
Measures how much lip-sync is preserved while expression changes.
"""
if landmarks_neutral is None or landmarks_emotion is None:
return {"consistency": 0.0}
T = min(len(landmarks_neutral), len(landmarks_emotion))
# Lip region only (indices 0-19 are lip landmarks)
lip_diff = np.mean(np.abs(
landmarks_neutral[:T] - landmarks_emotion[:T]
))
return {
"lip_region_diff": float(lip_diff),
"consistency_score": float(1.0 / (1.0 + lip_diff)) # Higher is better
}
# ============================================================
# CATEGORY C: VISUAL REALISM
# ============================================================
class RealismMetrics:
"""Visual realism metrics."""
def compute_ssim_frames(
self,
video_path: str,
gt_video_path: str
) -> Optional[float]:
"""Compute mean SSIM between generated and ground truth video frames."""
try:
import cv2
from skimage.metrics import structural_similarity as ssim
cap_gen = cv2.VideoCapture(video_path)
cap_gt = cv2.VideoCapture(gt_video_path)
ssim_scores = []
while True:
ret1, frame1 = cap_gen.read()
ret2, frame2 = cap_gt.read()
if not ret1 or not ret2:
break
# Resize to same dimensions
h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1])
frame1 = cv2.resize(frame1, (w, h))
frame2 = cv2.resize(frame2, (w, h))
# Convert to grayscale for SSIM
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
score = ssim(gray1, gray2)
ssim_scores.append(score)
cap_gen.release()
cap_gt.release()
return float(np.mean(ssim_scores)) if ssim_scores else None
except Exception as e:
print(f" ⚠ SSIM computation failed: {e}")
return None
def compute_psnr_frames(
self,
video_path: str,
gt_video_path: str
) -> Optional[float]:
"""Compute mean PSNR between generated and ground truth frames."""
try:
import cv2
cap_gen = cv2.VideoCapture(video_path)
cap_gt = cv2.VideoCapture(gt_video_path)
psnr_scores = []
while True:
ret1, frame1 = cap_gen.read()
ret2, frame2 = cap_gt.read()
if not ret1 or not ret2:
break
h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1])
frame1 = cv2.resize(frame1, (w, h))
frame2 = cv2.resize(frame2, (w, h))
mse = np.mean((frame1.astype(float) - frame2.astype(float)) ** 2)
if mse == 0:
psnr_scores.append(100.0)
else:
psnr_scores.append(20 * np.log10(255.0 / np.sqrt(mse)))
cap_gen.release()
cap_gt.release()
return float(np.mean(psnr_scores)) if psnr_scores else None
except Exception as e:
print(f" ⚠ PSNR computation failed: {e}")
return None
# ============================================================
# FULL EVALUATION RUNNER
# ============================================================
def evaluate_single_video(
video_path: str,
target_emotion: str = "neutral",
gt_video_path: str = None,
device: str = "cpu"
) -> Dict:
"""
Run full evaluation on a single generated video.
"""
print(f"\n Evaluating: {video_path}")
print(f" Target emotion: {target_emotion}")
results = {
"video": video_path,
"target_emotion": target_emotion,
"metrics": {}
}
# Category A: Lip-sync
print(" [A] Lip-sync metrics...")
lip_metrics = LipSyncMetrics()
sync_results = lip_metrics.compute_lip_sync_score(video_path)
results["metrics"]["lip_sync"] = sync_results
print(f" Lip aperture: {sync_results.get('lip_aperture_mean', 'N/A'):.2f} "
f"± {sync_results.get('lip_aperture_std', 'N/A'):.2f}")
# Category B: Emotion
print(" [B] Emotion metrics...")
emotion_metrics = EmotionMetrics(device=device)
eca_results = emotion_metrics.compute_eca(video_path, target_emotion)
results["metrics"]["emotion"] = eca_results
print(f" ECA: {eca_results.get('eca', 'N/A'):.2f}")
# Category C: Realism (if ground truth available)
if gt_video_path and os.path.exists(gt_video_path):
print(" [C] Realism metrics...")
realism = RealismMetrics()
ssim_val = realism.compute_ssim_frames(video_path, gt_video_path)
psnr_val = realism.compute_psnr_frames(video_path, gt_video_path)
results["metrics"]["realism"] = {
"ssim": ssim_val,
"psnr": psnr_val
}
print(f" SSIM: {ssim_val:.4f}" if ssim_val else " SSIM: N/A")
print(f" PSNR: {psnr_val:.2f}" if psnr_val else " PSNR: N/A")
return results
def evaluate_emotion_set(
output_dir: str,
gt_dir: str = None,
device: str = "cpu"
) -> Dict:
"""
Evaluate all emotion variants in an output directory.
Expects files named: emolips_{emotion}.mp4
"""
emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"]
all_results = {}
for emotion in emotions:
video_path = os.path.join(output_dir, f"emolips_{emotion}.mp4")
if not os.path.exists(video_path):
# Try demo_ prefix
video_path = os.path.join(output_dir, f"demo_{emotion}.mp4")
if os.path.exists(video_path):
gt_path = None
if gt_dir:
gt_path = os.path.join(gt_dir, f"gt_{emotion}.mp4")
result = evaluate_single_video(video_path, emotion, gt_path, device)
all_results[emotion] = result
# Compute aggregate metrics
aggregate = compute_aggregate_metrics(all_results)
all_results["aggregate"] = aggregate
return all_results
def compute_aggregate_metrics(results: Dict) -> Dict:
"""Compute aggregate metrics across emotions."""
aggregate = {
"mean_lip_aperture": [],
"mean_eca": [],
"mean_lip_energy": [],
}
for emotion, result in results.items():
if emotion == "aggregate":
continue
metrics = result.get("metrics", {})
lip = metrics.get("lip_sync", {})
if "lip_aperture_mean" in lip:
aggregate["mean_lip_aperture"].append(lip["lip_aperture_mean"])
if "lip_movement_energy" in lip:
aggregate["mean_lip_energy"].append(lip["lip_movement_energy"])
emo = metrics.get("emotion", {})
if "eca" in emo:
aggregate["mean_eca"].append(emo["eca"])
return {
"mean_lip_aperture": float(np.mean(aggregate["mean_lip_aperture"]))
if aggregate["mean_lip_aperture"] else None,
"mean_eca": float(np.mean(aggregate["mean_eca"]))
if aggregate["mean_eca"] else None,
"mean_lip_energy": float(np.mean(aggregate["mean_lip_energy"]))
if aggregate["mean_lip_energy"] else None,
"num_evaluated": len([k for k in results if k != "aggregate"])
}
# ============================================================
# GENERATE EVAL REPORT
# ============================================================
def generate_report(results: Dict, output_path: str):
"""Generate evaluation report as JSON and text summary."""
# Save JSON
json_path = output_path.replace(".txt", ".json")
with open(json_path, "w") as f:
json.dump(results, f, indent=2, default=str)
# Save text summary
with open(output_path, "w") as f:
f.write("=" * 60 + "\n")
f.write(" EMOLIPS Evaluation Report\n")
f.write("=" * 60 + "\n\n")
for emotion, result in results.items():
if emotion == "aggregate":
continue
f.write(f"\nEmotion: {emotion.upper()}\n")
f.write("-" * 40 + "\n")
metrics = result.get("metrics", {})
f.write(" Lip-Sync:\n")
lip = metrics.get("lip_sync", {})
for k, v in lip.items():
f.write(f" {k}: {v}\n")
f.write(" Emotion:\n")
emo = metrics.get("emotion", {})
f.write(f" ECA: {emo.get('eca', 'N/A')}\n")
if "emotion_distribution" in emo:
f.write(f" Distribution: {emo['emotion_distribution']}\n")
if "realism" in metrics:
f.write(" Realism:\n")
real = metrics["realism"]
f.write(f" SSIM: {real.get('ssim', 'N/A')}\n")
f.write(f" PSNR: {real.get('psnr', 'N/A')}\n")
# Aggregate
if "aggregate" in results:
f.write(f"\n{'='*60}\n")
f.write(" AGGREGATE METRICS\n")
f.write(f"{'='*60}\n")
for k, v in results["aggregate"].items():
f.write(f" {k}: {v}\n")
print(f"\n ✓ Report saved: {output_path}")
print(f" ✓ JSON saved: {json_path}")
def main():
parser = argparse.ArgumentParser(description="EMOLIPS Evaluation")
parser.add_argument("--generated", "-g", type=str, help="Generated videos directory")
parser.add_argument("--ground-truth", "-gt", type=str, default=None)
parser.add_argument("--report", "-r", type=str, default="results")
parser.add_argument("--quick-eval", type=str, help="Quick eval single video")
parser.add_argument("--emotion", type=str, default="neutral")
parser.add_argument("--device", type=str, default="cpu")
args = parser.parse_args()
if args.quick_eval:
result = evaluate_single_video(
args.quick_eval, args.emotion, device=args.device
)
print(json.dumps(result, indent=2, default=str))
return
if not args.generated:
print("Error: --generated directory required")
sys.exit(1)
os.makedirs(args.report, exist_ok=True)
results = evaluate_emotion_set(
args.generated,
args.ground_truth,
args.device
)
generate_report(results, os.path.join(args.report, "eval_report.txt"))
if __name__ == "__main__":
main()