primal-sage commited on
Commit
ea40621
·
verified ·
1 Parent(s): 46dd09f

Upload code/eval_metrics.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. code/eval_metrics.py +566 -0
code/eval_metrics.py ADDED
@@ -0,0 +1,566 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ EMOLIPS Evaluation Suite
3
+ ========================
4
+ Computes metrics across 4 categories:
5
+
6
+ Category A: Lip-Sync Quality
7
+ - LSE-D (Lip Sync Error - Distance)
8
+ - LSE-C (Lip Sync Error - Confidence)
9
+ - LMD (Landmark Distance)
10
+
11
+ Category B: Emotion Quality
12
+ - ECA (Emotion Classification Accuracy)
13
+ - EIS (Emotion Intensity Score)
14
+ - AU-MAE (Action Unit Mean Absolute Error)
15
+
16
+ Category C: Visual Realism
17
+ - FID (Fréchet Inception Distance)
18
+ - SSIM (Structural Similarity Index)
19
+ - PSNR (Peak Signal-to-Noise Ratio)
20
+
21
+ Category D: Human Evaluation (templates only)
22
+ - MOS-Sync, MOS-Emotion, MOS-Real
23
+
24
+ Usage:
25
+ python eval_metrics.py --generated outputs/ --ground-truth gt/ --report results/
26
+ python eval_metrics.py --quick-eval outputs/emolips_happy.mp4
27
+ """
28
+
29
+ import os
30
+ import sys
31
+ import json
32
+ import argparse
33
+ import numpy as np
34
+ from pathlib import Path
35
+ from typing import Dict, List, Optional, Tuple
36
+ import warnings
37
+ warnings.filterwarnings("ignore")
38
+
39
+
40
+ # ============================================================
41
+ # CATEGORY A: LIP-SYNC QUALITY
42
+ # ============================================================
43
+
44
+ class LipSyncMetrics:
45
+ """Lip-sync quality metrics using SyncNet and landmarks."""
46
+
47
+ def __init__(self):
48
+ self.syncnet = None
49
+
50
+ def compute_lmd(
51
+ self,
52
+ pred_landmarks: np.ndarray,
53
+ gt_landmarks: np.ndarray
54
+ ) -> float:
55
+ """
56
+ Landmark Distance (LMD).
57
+ Mean L2 distance between predicted and ground truth lip landmarks.
58
+
59
+ Args:
60
+ pred_landmarks: [T, 20, 2] predicted lip landmarks
61
+ gt_landmarks: [T, 20, 2] ground truth lip landmarks
62
+
63
+ Returns:
64
+ Mean landmark distance (lower is better)
65
+ """
66
+ assert pred_landmarks.shape == gt_landmarks.shape
67
+ distances = np.sqrt(np.sum((pred_landmarks - gt_landmarks) ** 2, axis=-1))
68
+ return float(np.mean(distances))
69
+
70
+ def extract_lip_landmarks(self, video_path: str) -> Optional[np.ndarray]:
71
+ """Extract lip landmarks from video using MediaPipe."""
72
+ try:
73
+ import cv2
74
+ import mediapipe as mp
75
+
76
+ mp_face_mesh = mp.solutions.face_mesh
77
+ face_mesh = mp_face_mesh.FaceMesh(
78
+ static_image_mode=False,
79
+ max_num_faces=1,
80
+ min_detection_confidence=0.5
81
+ )
82
+
83
+ # MediaPipe lip landmark indices (inner + outer)
84
+ LIP_INDICES = [
85
+ 61, 146, 91, 181, 84, 17, 314, 405, 321, 375, # Outer upper
86
+ 291, 409, 270, 269, 267, 0, 37, 39, 40, 185, # Outer lower
87
+ ]
88
+
89
+ cap = cv2.VideoCapture(video_path)
90
+ landmarks = []
91
+
92
+ while cap.isOpened():
93
+ ret, frame = cap.read()
94
+ if not ret:
95
+ break
96
+
97
+ h, w = frame.shape[:2]
98
+ rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
99
+ results = face_mesh.process(rgb)
100
+
101
+ if results.multi_face_landmarks:
102
+ face_lms = results.multi_face_landmarks[0]
103
+ lip_pts = []
104
+ for idx in LIP_INDICES:
105
+ lm = face_lms.landmark[idx]
106
+ lip_pts.append([lm.x * w, lm.y * h])
107
+ landmarks.append(lip_pts)
108
+ else:
109
+ if landmarks:
110
+ landmarks.append(landmarks[-1]) # Carry forward
111
+ else:
112
+ landmarks.append([[0, 0]] * len(LIP_INDICES))
113
+
114
+ cap.release()
115
+ face_mesh.close()
116
+
117
+ return np.array(landmarks)
118
+
119
+ except Exception as e:
120
+ print(f" ⚠ Landmark extraction failed: {e}")
121
+ return None
122
+
123
+ def compute_lip_sync_score(
124
+ self,
125
+ video_path: str,
126
+ audio_path: str = None
127
+ ) -> Dict:
128
+ """
129
+ Compute lip-sync quality metrics for a video.
130
+
131
+ Returns dict with available metrics.
132
+ """
133
+ results = {}
134
+
135
+ landmarks = self.extract_lip_landmarks(video_path)
136
+ if landmarks is not None:
137
+ # Lip aperture (mouth openness over time)
138
+ # Upper lip center vs lower lip center
139
+ upper = landmarks[:, 5, :] # Center of upper lip
140
+ lower = landmarks[:, 15, :] # Center of lower lip
141
+ aperture = np.sqrt(np.sum((upper - lower) ** 2, axis=-1))
142
+
143
+ results["lip_aperture_mean"] = float(np.mean(aperture))
144
+ results["lip_aperture_std"] = float(np.std(aperture))
145
+ results["lip_aperture_range"] = float(np.max(aperture) - np.min(aperture))
146
+ results["num_frames"] = len(landmarks)
147
+
148
+ # Lip movement energy (higher = more articulation)
149
+ if len(landmarks) > 1:
150
+ lip_velocity = np.diff(landmarks, axis=0)
151
+ results["lip_movement_energy"] = float(np.mean(np.abs(lip_velocity)))
152
+
153
+ return results
154
+
155
+
156
+ # ============================================================
157
+ # CATEGORY B: EMOTION QUALITY
158
+ # ============================================================
159
+
160
+ class EmotionMetrics:
161
+ """Emotion quality metrics."""
162
+
163
+ def __init__(self, device: str = "cpu"):
164
+ self.device = device
165
+
166
+ def compute_eca(
167
+ self,
168
+ video_path: str,
169
+ target_emotion: str
170
+ ) -> Dict:
171
+ """
172
+ Emotion Classification Accuracy (ECA).
173
+ Run emotion classifier on generated video frames and check
174
+ if detected emotion matches target.
175
+ """
176
+ try:
177
+ import cv2
178
+ from transformers import pipeline
179
+
180
+ # Use a face emotion classifier
181
+ classifier = pipeline(
182
+ "image-classification",
183
+ model="dima806/facial_emotions_image_detection",
184
+ device=0 if self.device == "cuda" else -1
185
+ )
186
+
187
+ cap = cv2.VideoCapture(video_path)
188
+ emotion_counts = {}
189
+ frame_count = 0
190
+ sample_every = 5 # Sample every 5th frame
191
+
192
+ while cap.isOpened():
193
+ ret, frame = cap.read()
194
+ if not ret:
195
+ break
196
+
197
+ frame_count += 1
198
+ if frame_count % sample_every != 0:
199
+ continue
200
+
201
+ # Convert BGR to RGB
202
+ rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
203
+ from PIL import Image
204
+ pil_img = Image.fromarray(rgb)
205
+
206
+ results = classifier(pil_img)
207
+ if results:
208
+ top_emotion = results[0]["label"].lower()
209
+ emotion_counts[top_emotion] = emotion_counts.get(top_emotion, 0) + 1
210
+
211
+ cap.release()
212
+
213
+ total = sum(emotion_counts.values())
214
+ if total == 0:
215
+ return {"eca": 0.0, "counts": {}}
216
+
217
+ # Map detected emotions to our categories
218
+ target_lower = target_emotion.lower()
219
+ target_count = emotion_counts.get(target_lower, 0)
220
+ # Check aliases
221
+ aliases = {
222
+ "happy": ["happy", "happiness", "joy"],
223
+ "sad": ["sad", "sadness"],
224
+ "angry": ["angry", "anger"],
225
+ "fear": ["fear", "fearful", "scared"],
226
+ "surprise": ["surprise", "surprised"],
227
+ "disgust": ["disgust", "disgusted"],
228
+ "neutral": ["neutral", "calm"]
229
+ }
230
+ for alias in aliases.get(target_lower, []):
231
+ target_count += emotion_counts.get(alias, 0)
232
+
233
+ return {
234
+ "eca": target_count / total,
235
+ "total_frames_evaluated": total,
236
+ "emotion_distribution": emotion_counts
237
+ }
238
+
239
+ except Exception as e:
240
+ return {"eca": 0.0, "error": str(e)}
241
+
242
+ def compute_emotion_consistency(
243
+ self,
244
+ landmarks_neutral: np.ndarray,
245
+ landmarks_emotion: np.ndarray
246
+ ) -> Dict:
247
+ """
248
+ Compute cross-emotion consistency metrics.
249
+ Measures how much lip-sync is preserved while expression changes.
250
+ """
251
+ if landmarks_neutral is None or landmarks_emotion is None:
252
+ return {"consistency": 0.0}
253
+
254
+ T = min(len(landmarks_neutral), len(landmarks_emotion))
255
+
256
+ # Lip region only (indices 0-19 are lip landmarks)
257
+ lip_diff = np.mean(np.abs(
258
+ landmarks_neutral[:T] - landmarks_emotion[:T]
259
+ ))
260
+
261
+ return {
262
+ "lip_region_diff": float(lip_diff),
263
+ "consistency_score": float(1.0 / (1.0 + lip_diff)) # Higher is better
264
+ }
265
+
266
+
267
+ # ============================================================
268
+ # CATEGORY C: VISUAL REALISM
269
+ # ============================================================
270
+
271
+ class RealismMetrics:
272
+ """Visual realism metrics."""
273
+
274
+ def compute_ssim_frames(
275
+ self,
276
+ video_path: str,
277
+ gt_video_path: str
278
+ ) -> Optional[float]:
279
+ """Compute mean SSIM between generated and ground truth video frames."""
280
+ try:
281
+ import cv2
282
+ from skimage.metrics import structural_similarity as ssim
283
+
284
+ cap_gen = cv2.VideoCapture(video_path)
285
+ cap_gt = cv2.VideoCapture(gt_video_path)
286
+
287
+ ssim_scores = []
288
+
289
+ while True:
290
+ ret1, frame1 = cap_gen.read()
291
+ ret2, frame2 = cap_gt.read()
292
+ if not ret1 or not ret2:
293
+ break
294
+
295
+ # Resize to same dimensions
296
+ h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1])
297
+ frame1 = cv2.resize(frame1, (w, h))
298
+ frame2 = cv2.resize(frame2, (w, h))
299
+
300
+ # Convert to grayscale for SSIM
301
+ gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
302
+ gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
303
+
304
+ score = ssim(gray1, gray2)
305
+ ssim_scores.append(score)
306
+
307
+ cap_gen.release()
308
+ cap_gt.release()
309
+
310
+ return float(np.mean(ssim_scores)) if ssim_scores else None
311
+
312
+ except Exception as e:
313
+ print(f" ⚠ SSIM computation failed: {e}")
314
+ return None
315
+
316
+ def compute_psnr_frames(
317
+ self,
318
+ video_path: str,
319
+ gt_video_path: str
320
+ ) -> Optional[float]:
321
+ """Compute mean PSNR between generated and ground truth frames."""
322
+ try:
323
+ import cv2
324
+
325
+ cap_gen = cv2.VideoCapture(video_path)
326
+ cap_gt = cv2.VideoCapture(gt_video_path)
327
+
328
+ psnr_scores = []
329
+
330
+ while True:
331
+ ret1, frame1 = cap_gen.read()
332
+ ret2, frame2 = cap_gt.read()
333
+ if not ret1 or not ret2:
334
+ break
335
+
336
+ h, w = min(frame1.shape[0], frame2.shape[0]), min(frame1.shape[1], frame2.shape[1])
337
+ frame1 = cv2.resize(frame1, (w, h))
338
+ frame2 = cv2.resize(frame2, (w, h))
339
+
340
+ mse = np.mean((frame1.astype(float) - frame2.astype(float)) ** 2)
341
+ if mse == 0:
342
+ psnr_scores.append(100.0)
343
+ else:
344
+ psnr_scores.append(20 * np.log10(255.0 / np.sqrt(mse)))
345
+
346
+ cap_gen.release()
347
+ cap_gt.release()
348
+
349
+ return float(np.mean(psnr_scores)) if psnr_scores else None
350
+
351
+ except Exception as e:
352
+ print(f" ⚠ PSNR computation failed: {e}")
353
+ return None
354
+
355
+
356
+ # ============================================================
357
+ # FULL EVALUATION RUNNER
358
+ # ============================================================
359
+
360
+ def evaluate_single_video(
361
+ video_path: str,
362
+ target_emotion: str = "neutral",
363
+ gt_video_path: str = None,
364
+ device: str = "cpu"
365
+ ) -> Dict:
366
+ """
367
+ Run full evaluation on a single generated video.
368
+ """
369
+ print(f"\n Evaluating: {video_path}")
370
+ print(f" Target emotion: {target_emotion}")
371
+
372
+ results = {
373
+ "video": video_path,
374
+ "target_emotion": target_emotion,
375
+ "metrics": {}
376
+ }
377
+
378
+ # Category A: Lip-sync
379
+ print(" [A] Lip-sync metrics...")
380
+ lip_metrics = LipSyncMetrics()
381
+ sync_results = lip_metrics.compute_lip_sync_score(video_path)
382
+ results["metrics"]["lip_sync"] = sync_results
383
+ print(f" Lip aperture: {sync_results.get('lip_aperture_mean', 'N/A'):.2f} "
384
+ f"± {sync_results.get('lip_aperture_std', 'N/A'):.2f}")
385
+
386
+ # Category B: Emotion
387
+ print(" [B] Emotion metrics...")
388
+ emotion_metrics = EmotionMetrics(device=device)
389
+ eca_results = emotion_metrics.compute_eca(video_path, target_emotion)
390
+ results["metrics"]["emotion"] = eca_results
391
+ print(f" ECA: {eca_results.get('eca', 'N/A'):.2f}")
392
+
393
+ # Category C: Realism (if ground truth available)
394
+ if gt_video_path and os.path.exists(gt_video_path):
395
+ print(" [C] Realism metrics...")
396
+ realism = RealismMetrics()
397
+
398
+ ssim_val = realism.compute_ssim_frames(video_path, gt_video_path)
399
+ psnr_val = realism.compute_psnr_frames(video_path, gt_video_path)
400
+
401
+ results["metrics"]["realism"] = {
402
+ "ssim": ssim_val,
403
+ "psnr": psnr_val
404
+ }
405
+ print(f" SSIM: {ssim_val:.4f}" if ssim_val else " SSIM: N/A")
406
+ print(f" PSNR: {psnr_val:.2f}" if psnr_val else " PSNR: N/A")
407
+
408
+ return results
409
+
410
+
411
+ def evaluate_emotion_set(
412
+ output_dir: str,
413
+ gt_dir: str = None,
414
+ device: str = "cpu"
415
+ ) -> Dict:
416
+ """
417
+ Evaluate all emotion variants in an output directory.
418
+ Expects files named: emolips_{emotion}.mp4
419
+ """
420
+ emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"]
421
+ all_results = {}
422
+
423
+ for emotion in emotions:
424
+ video_path = os.path.join(output_dir, f"emolips_{emotion}.mp4")
425
+ if not os.path.exists(video_path):
426
+ # Try demo_ prefix
427
+ video_path = os.path.join(output_dir, f"demo_{emotion}.mp4")
428
+
429
+ if os.path.exists(video_path):
430
+ gt_path = None
431
+ if gt_dir:
432
+ gt_path = os.path.join(gt_dir, f"gt_{emotion}.mp4")
433
+
434
+ result = evaluate_single_video(video_path, emotion, gt_path, device)
435
+ all_results[emotion] = result
436
+
437
+ # Compute aggregate metrics
438
+ aggregate = compute_aggregate_metrics(all_results)
439
+ all_results["aggregate"] = aggregate
440
+
441
+ return all_results
442
+
443
+
444
+ def compute_aggregate_metrics(results: Dict) -> Dict:
445
+ """Compute aggregate metrics across emotions."""
446
+ aggregate = {
447
+ "mean_lip_aperture": [],
448
+ "mean_eca": [],
449
+ "mean_lip_energy": [],
450
+ }
451
+
452
+ for emotion, result in results.items():
453
+ if emotion == "aggregate":
454
+ continue
455
+ metrics = result.get("metrics", {})
456
+
457
+ lip = metrics.get("lip_sync", {})
458
+ if "lip_aperture_mean" in lip:
459
+ aggregate["mean_lip_aperture"].append(lip["lip_aperture_mean"])
460
+ if "lip_movement_energy" in lip:
461
+ aggregate["mean_lip_energy"].append(lip["lip_movement_energy"])
462
+
463
+ emo = metrics.get("emotion", {})
464
+ if "eca" in emo:
465
+ aggregate["mean_eca"].append(emo["eca"])
466
+
467
+ return {
468
+ "mean_lip_aperture": float(np.mean(aggregate["mean_lip_aperture"]))
469
+ if aggregate["mean_lip_aperture"] else None,
470
+ "mean_eca": float(np.mean(aggregate["mean_eca"]))
471
+ if aggregate["mean_eca"] else None,
472
+ "mean_lip_energy": float(np.mean(aggregate["mean_lip_energy"]))
473
+ if aggregate["mean_lip_energy"] else None,
474
+ "num_evaluated": len([k for k in results if k != "aggregate"])
475
+ }
476
+
477
+
478
+ # ============================================================
479
+ # GENERATE EVAL REPORT
480
+ # ============================================================
481
+
482
+ def generate_report(results: Dict, output_path: str):
483
+ """Generate evaluation report as JSON and text summary."""
484
+ # Save JSON
485
+ json_path = output_path.replace(".txt", ".json")
486
+ with open(json_path, "w") as f:
487
+ json.dump(results, f, indent=2, default=str)
488
+
489
+ # Save text summary
490
+ with open(output_path, "w") as f:
491
+ f.write("=" * 60 + "\n")
492
+ f.write(" EMOLIPS Evaluation Report\n")
493
+ f.write("=" * 60 + "\n\n")
494
+
495
+ for emotion, result in results.items():
496
+ if emotion == "aggregate":
497
+ continue
498
+ f.write(f"\nEmotion: {emotion.upper()}\n")
499
+ f.write("-" * 40 + "\n")
500
+
501
+ metrics = result.get("metrics", {})
502
+
503
+ f.write(" Lip-Sync:\n")
504
+ lip = metrics.get("lip_sync", {})
505
+ for k, v in lip.items():
506
+ f.write(f" {k}: {v}\n")
507
+
508
+ f.write(" Emotion:\n")
509
+ emo = metrics.get("emotion", {})
510
+ f.write(f" ECA: {emo.get('eca', 'N/A')}\n")
511
+ if "emotion_distribution" in emo:
512
+ f.write(f" Distribution: {emo['emotion_distribution']}\n")
513
+
514
+ if "realism" in metrics:
515
+ f.write(" Realism:\n")
516
+ real = metrics["realism"]
517
+ f.write(f" SSIM: {real.get('ssim', 'N/A')}\n")
518
+ f.write(f" PSNR: {real.get('psnr', 'N/A')}\n")
519
+
520
+ # Aggregate
521
+ if "aggregate" in results:
522
+ f.write(f"\n{'='*60}\n")
523
+ f.write(" AGGREGATE METRICS\n")
524
+ f.write(f"{'='*60}\n")
525
+ for k, v in results["aggregate"].items():
526
+ f.write(f" {k}: {v}\n")
527
+
528
+ print(f"\n ✓ Report saved: {output_path}")
529
+ print(f" ✓ JSON saved: {json_path}")
530
+
531
+
532
+ def main():
533
+ parser = argparse.ArgumentParser(description="EMOLIPS Evaluation")
534
+ parser.add_argument("--generated", "-g", type=str, help="Generated videos directory")
535
+ parser.add_argument("--ground-truth", "-gt", type=str, default=None)
536
+ parser.add_argument("--report", "-r", type=str, default="results")
537
+ parser.add_argument("--quick-eval", type=str, help="Quick eval single video")
538
+ parser.add_argument("--emotion", type=str, default="neutral")
539
+ parser.add_argument("--device", type=str, default="cpu")
540
+
541
+ args = parser.parse_args()
542
+
543
+ if args.quick_eval:
544
+ result = evaluate_single_video(
545
+ args.quick_eval, args.emotion, device=args.device
546
+ )
547
+ print(json.dumps(result, indent=2, default=str))
548
+ return
549
+
550
+ if not args.generated:
551
+ print("Error: --generated directory required")
552
+ sys.exit(1)
553
+
554
+ os.makedirs(args.report, exist_ok=True)
555
+
556
+ results = evaluate_emotion_set(
557
+ args.generated,
558
+ args.ground_truth,
559
+ args.device
560
+ )
561
+
562
+ generate_report(results, os.path.join(args.report, "eval_report.txt"))
563
+
564
+
565
+ if __name__ == "__main__":
566
+ main()