primal-sage commited on
Commit
46dd09f
·
verified ·
1 Parent(s): c2d8a02

Upload code/pipeline.py with huggingface_hub

Browse files
Files changed (1) hide show
  1. code/pipeline.py +691 -0
code/pipeline.py ADDED
@@ -0,0 +1,691 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ EMOLIPS Pipeline
3
+ ================
4
+ Emotion-Driven Lip-Sync Synthesis Pipeline
5
+
6
+ Orchestrates:
7
+ 1. Audio emotion detection (automatic or manual override)
8
+ 2. Emotion intensity estimation
9
+ 3. SadTalker talking face generation
10
+ 4. Emotion-conditioned coefficient modification
11
+ 5. Output video rendering
12
+
13
+ Usage:
14
+ pipeline = EmolipsPipeline(device="cuda")
15
+ pipeline.generate(
16
+ audio_path="speech.wav",
17
+ image_path="face.jpg",
18
+ emotion="happy", # Optional: auto-detected if not specified
19
+ intensity=0.7, # Optional: auto-estimated if not specified
20
+ output_path="output.mp4"
21
+ )
22
+ """
23
+
24
+ import os
25
+ import sys
26
+ import subprocess
27
+ import shutil
28
+ import json
29
+ import numpy as np
30
+ from pathlib import Path
31
+ from typing import Optional, Dict, List
32
+ import warnings
33
+ warnings.filterwarnings("ignore")
34
+
35
+ from emotion_module import (
36
+ PracticalEmotionModifier,
37
+ AudioEmotionDetector,
38
+ EmotionIntensityEstimator,
39
+ EMOTION_PROFILES
40
+ )
41
+
42
+
43
+ class EmolipsPipeline:
44
+ """
45
+ Main EMOLIPS inference pipeline.
46
+
47
+ Wraps SadTalker backbone with emotion conditioning.
48
+ """
49
+
50
+ def __init__(
51
+ self,
52
+ sadtalker_dir: str = "./SadTalker",
53
+ device: str = "cuda",
54
+ checkpoint_dir: str = None
55
+ ):
56
+ self.sadtalker_dir = Path(sadtalker_dir).resolve()
57
+ self.device = device
58
+ self.checkpoint_dir = checkpoint_dir or str(self.sadtalker_dir / "checkpoints")
59
+
60
+ # Initialize emotion components
61
+ self.emotion_detector = AudioEmotionDetector(device=device)
62
+ self.intensity_estimator = EmotionIntensityEstimator()
63
+ self.emotion_modifier = PracticalEmotionModifier()
64
+
65
+ # Verify SadTalker installation
66
+ if not self.sadtalker_dir.exists():
67
+ print(f"⚠ SadTalker not found at {self.sadtalker_dir}")
68
+ print(" Run setup.sh first or specify correct path")
69
+
70
+ def detect_emotion(self, audio_path: str) -> Dict:
71
+ """Auto-detect emotion from audio."""
72
+ print(" [1/4] Detecting emotion from audio...")
73
+ result = self.emotion_detector.detect(audio_path)
74
+ print(f" Detected: {result['detected_emotion']} "
75
+ f"(confidence: {result['confidence']:.2f})")
76
+ return result
77
+
78
+ def estimate_intensity(self, audio_path: str) -> float:
79
+ """Estimate emotion intensity from audio features."""
80
+ intensity = self.intensity_estimator.estimate(audio_path)
81
+ print(f" Intensity: {intensity:.2f}")
82
+ return intensity
83
+
84
+ def run_sadtalker(
85
+ self,
86
+ audio_path: str,
87
+ image_path: str,
88
+ output_dir: str,
89
+ expression_scale: float = 1.0,
90
+ still_mode: bool = False,
91
+ preprocess: str = "crop",
92
+ size: int = 256,
93
+ pose_style: int = 0
94
+ ) -> Optional[str]:
95
+ """
96
+ Run SadTalker to generate base talking face video.
97
+
98
+ Returns path to generated video.
99
+ """
100
+ print(" [2/4] Running SadTalker backbone...")
101
+
102
+ # Build SadTalker command
103
+ inference_script = self.sadtalker_dir / "inference.py"
104
+
105
+ cmd = [
106
+ sys.executable, str(inference_script),
107
+ "--driven_audio", str(audio_path),
108
+ "--source_image", str(image_path),
109
+ "--result_dir", str(output_dir),
110
+ "--expression_scale", str(expression_scale),
111
+ "--preprocess", preprocess,
112
+ "--size", str(size),
113
+ "--pose_style", str(pose_style),
114
+ ]
115
+
116
+ if still_mode:
117
+ cmd.append("--still")
118
+
119
+ # Add checkpoint paths
120
+ checkpoint_dir = Path(self.checkpoint_dir)
121
+ if checkpoint_dir.exists():
122
+ cmd.extend(["--checkpoint_dir", str(checkpoint_dir)])
123
+
124
+ try:
125
+ env = os.environ.copy()
126
+ env["PYTHONPATH"] = str(self.sadtalker_dir) + ":" + env.get("PYTHONPATH", "")
127
+
128
+ result = subprocess.run(
129
+ cmd,
130
+ capture_output=True,
131
+ text=True,
132
+ cwd=str(self.sadtalker_dir),
133
+ env=env,
134
+ timeout=300 # 5 min timeout
135
+ )
136
+
137
+ if result.returncode != 0:
138
+ print(f" ⚠ SadTalker error: {result.stderr[-500:]}")
139
+ return None
140
+
141
+ # Find generated video
142
+ output_path = Path(output_dir)
143
+ videos = list(output_path.rglob("*.mp4"))
144
+ if videos:
145
+ return str(sorted(videos, key=os.path.getmtime)[-1])
146
+
147
+ return None
148
+
149
+ except subprocess.TimeoutExpired:
150
+ print(" ⚠ SadTalker timed out (>5 min)")
151
+ return None
152
+ except Exception as e:
153
+ print(f" ⚠ SadTalker failed: {e}")
154
+ return None
155
+
156
+ def apply_emotion_postprocess(
157
+ self,
158
+ video_path: str,
159
+ emotion: str,
160
+ intensity: float,
161
+ output_path: str
162
+ ) -> str:
163
+ """
164
+ Apply emotion-based post-processing to generated video.
165
+
166
+ This applies subtle facial modifications via:
167
+ 1. Face landmark detection on each frame
168
+ 2. Emotion-specific spatial warping
169
+ 3. Color grading for emotional tone
170
+ """
171
+ print(" [3/4] Applying emotion conditioning...")
172
+
173
+ try:
174
+ import cv2
175
+ import mediapipe as mp
176
+
177
+ mp_face_mesh = mp.solutions.face_mesh
178
+ face_mesh = mp_face_mesh.FaceMesh(
179
+ static_image_mode=False,
180
+ max_num_faces=1,
181
+ min_detection_confidence=0.5
182
+ )
183
+
184
+ cap = cv2.VideoCapture(video_path)
185
+ fps = int(cap.get(cv2.CAP_PROP_FPS))
186
+ w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
187
+ h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
188
+
189
+ # Temp output (will mux audio later)
190
+ temp_path = output_path.replace(".mp4", "_temp.mp4")
191
+ fourcc = cv2.VideoWriter_fourcc(*'mp4v')
192
+ out = cv2.VideoWriter(temp_path, fourcc, fps, (w, h))
193
+
194
+ profile = EMOTION_PROFILES.get(emotion, EMOTION_PROFILES["neutral"])
195
+
196
+ frame_count = 0
197
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
198
+
199
+ while cap.isOpened():
200
+ ret, frame = cap.read()
201
+ if not ret:
202
+ break
203
+
204
+ # Apply emotion-specific color grading
205
+ frame = self._apply_color_grade(frame, emotion, intensity)
206
+
207
+ # Apply subtle face warping if emotion is strong
208
+ if intensity > 0.3 and emotion != "neutral":
209
+ frame = self._apply_face_warp(frame, face_mesh, emotion, intensity)
210
+
211
+ out.write(frame)
212
+ frame_count += 1
213
+
214
+ cap.release()
215
+ out.release()
216
+ face_mesh.close()
217
+
218
+ # Mux original audio back
219
+ self._mux_audio(temp_path, video_path, output_path)
220
+
221
+ # Cleanup temp
222
+ if os.path.exists(temp_path):
223
+ os.remove(temp_path)
224
+
225
+ print(f" Processed {frame_count} frames")
226
+ return output_path
227
+
228
+ except ImportError as e:
229
+ print(f" ⚠ Post-processing skipped (missing {e}). Copying base video.")
230
+ shutil.copy2(video_path, output_path)
231
+ return output_path
232
+ except Exception as e:
233
+ print(f" ⚠ Post-processing error: {e}. Using base video.")
234
+ shutil.copy2(video_path, output_path)
235
+ return output_path
236
+
237
+ def _apply_color_grade(
238
+ self, frame: np.ndarray, emotion: str, intensity: float
239
+ ) -> np.ndarray:
240
+ """Apply subtle emotion-specific color grading."""
241
+ import cv2
242
+
243
+ # Very subtle color shifts based on emotion
244
+ color_shifts = {
245
+ "happy": (5, 5, 15), # Warm (slight yellow)
246
+ "sad": (-5, -3, -10), # Cool (slight blue)
247
+ "angry": (10, -5, -5), # Warm red
248
+ "fear": (-5, -5, 5), # Cool green
249
+ "surprise": (5, 5, 5), # Bright
250
+ "disgust": (-3, 5, -5), # Sickly green
251
+ "neutral": (0, 0, 0),
252
+ }
253
+
254
+ shift = color_shifts.get(emotion, (0, 0, 0))
255
+ scale = intensity * 0.5 # Keep it very subtle
256
+
257
+ adjusted = frame.astype(np.float32)
258
+ adjusted[:, :, 0] += shift[0] * scale # B
259
+ adjusted[:, :, 1] += shift[1] * scale # G
260
+ adjusted[:, :, 2] += shift[2] * scale # R
261
+
262
+ return np.clip(adjusted, 0, 255).astype(np.uint8)
263
+
264
+ def _apply_face_warp(
265
+ self,
266
+ frame: np.ndarray,
267
+ face_mesh,
268
+ emotion: str,
269
+ intensity: float
270
+ ) -> np.ndarray:
271
+ """
272
+ Apply subtle facial warping based on emotion.
273
+ Uses MediaPipe landmarks to create emotion-specific deformations.
274
+ """
275
+ import cv2
276
+
277
+ h, w = frame.shape[:2]
278
+ rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
279
+ results = face_mesh.process(rgb)
280
+
281
+ if not results.multi_face_landmarks:
282
+ return frame
283
+
284
+ landmarks = results.multi_face_landmarks[0]
285
+
286
+ # Key landmark indices for warping
287
+ # Brow: 70, 63, 105, 66, 107 (left), 336, 296, 334, 293, 300 (right)
288
+ # Mouth corners: 61, 291
289
+ # Jaw: 152
290
+
291
+ profile = EMOTION_PROFILES.get(emotion, {})
292
+ brow_shift = profile.get("brow_scale", 0) * intensity * 3 # pixels
293
+ mouth_shift = profile.get("mouth_scale", 0) * intensity * 2
294
+
295
+ if abs(brow_shift) < 0.5 and abs(mouth_shift) < 0.5:
296
+ return frame # Not enough to notice
297
+
298
+ # Simple approach: use cv2.remap with subtle displacement
299
+ # This is fast and produces decent results
300
+ map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1))
301
+ map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w))
302
+
303
+ # Get face center and brow/mouth regions
304
+ face_pts = [(int(l.x * w), int(l.y * h)) for l in landmarks.landmark]
305
+
306
+ # Brow region (top 1/3 of face)
307
+ brow_y = face_pts[10][1] # Top of face
308
+ nose_y = face_pts[1][1] # Nose tip
309
+ brow_region = (brow_y, nose_y)
310
+
311
+ # Apply brow displacement in brow region
312
+ for y_idx in range(max(0, brow_region[0]), min(h, brow_region[1])):
313
+ # Gaussian falloff from center of region
314
+ region_center = (brow_region[0] + brow_region[1]) // 2
315
+ dist = abs(y_idx - region_center) / max(1, (brow_region[1] - brow_region[0]) / 2)
316
+ falloff = np.exp(-dist ** 2 * 2)
317
+ map_y[y_idx, :] -= brow_shift * falloff
318
+
319
+ # Apply mouth displacement in lower face
320
+ mouth_y = face_pts[13][1] # Upper lip
321
+ chin_y = face_pts[152][1] # Chin
322
+ mouth_center_x = (face_pts[61][0] + face_pts[291][0]) // 2
323
+
324
+ for y_idx in range(max(0, mouth_y - 10), min(h, chin_y + 10)):
325
+ for x_idx in range(max(0, mouth_center_x - 40), min(w, mouth_center_x + 40)):
326
+ dist_y = abs(y_idx - mouth_y) / max(1, (chin_y - mouth_y))
327
+ dist_x = abs(x_idx - mouth_center_x) / 40.0
328
+ falloff = np.exp(-(dist_y ** 2 + dist_x ** 2) * 2)
329
+ map_x[y_idx, x_idx] += mouth_shift * falloff * (1 if x_idx > mouth_center_x else -1)
330
+
331
+ warped = cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR)
332
+ return warped
333
+
334
+ def _mux_audio(self, video_path: str, audio_source: str, output_path: str):
335
+ """Combine processed video with original audio."""
336
+ try:
337
+ subprocess.run([
338
+ "ffmpeg", "-y",
339
+ "-i", video_path,
340
+ "-i", audio_source,
341
+ "-c:v", "copy",
342
+ "-c:a", "aac",
343
+ "-map", "0:v:0",
344
+ "-map", "1:a:0",
345
+ "-shortest",
346
+ output_path
347
+ ], capture_output=True, timeout=60)
348
+ except Exception:
349
+ # If ffmpeg fails, just use the video without audio
350
+ shutil.copy2(video_path, output_path)
351
+
352
+ def generate(
353
+ self,
354
+ audio_path: str,
355
+ image_path: str,
356
+ emotion: Optional[str] = None,
357
+ intensity: Optional[float] = None,
358
+ output_path: str = "output.mp4",
359
+ expression_scale: float = 1.0,
360
+ still_mode: bool = False,
361
+ preprocess: str = "crop",
362
+ size: int = 256
363
+ ) -> Dict:
364
+ """
365
+ Full EMOLIPS generation pipeline.
366
+
367
+ Args:
368
+ audio_path: Path to speech audio file
369
+ image_path: Path to source face image
370
+ emotion: Target emotion (auto-detected if None)
371
+ intensity: Emotion intensity 0-1 (auto-estimated if None)
372
+ output_path: Where to save result
373
+ expression_scale: SadTalker expression scale
374
+ still_mode: Reduce head motion
375
+ preprocess: SadTalker preprocess mode
376
+ size: Output resolution
377
+
378
+ Returns:
379
+ Dict with generation metadata
380
+ """
381
+ print("=" * 50)
382
+ print(" EMOLIPS: Emotion-Driven Lip-Sync Generation")
383
+ print("=" * 50)
384
+
385
+ # Validate inputs
386
+ assert os.path.exists(audio_path), f"Audio not found: {audio_path}"
387
+ assert os.path.exists(image_path), f"Image not found: {image_path}"
388
+
389
+ result_meta = {
390
+ "audio": audio_path,
391
+ "image": image_path,
392
+ "output": output_path,
393
+ }
394
+
395
+ # Step 1: Emotion detection
396
+ if emotion is None:
397
+ detection = self.detect_emotion(audio_path)
398
+ emotion = detection["detected_emotion"]
399
+ result_meta["emotion_detection"] = detection
400
+ else:
401
+ print(f" [1/4] Using specified emotion: {emotion}")
402
+ result_meta["emotion_detection"] = {"manual": emotion}
403
+
404
+ # Step 2: Intensity estimation
405
+ if intensity is None:
406
+ intensity = self.estimate_intensity(audio_path)
407
+ else:
408
+ print(f" Using specified intensity: {intensity}")
409
+ result_meta["emotion"] = emotion
410
+ result_meta["intensity"] = intensity
411
+
412
+ # Adjust SadTalker expression scale based on emotion
413
+ emotion_expression_map = {
414
+ "neutral": 1.0,
415
+ "happy": 1.3,
416
+ "sad": 0.9,
417
+ "angry": 1.4,
418
+ "fear": 1.2,
419
+ "surprise": 1.5,
420
+ "disgust": 1.1
421
+ }
422
+ adjusted_scale = expression_scale * emotion_expression_map.get(emotion, 1.0) * (0.5 + 0.5 * intensity)
423
+
424
+ # Step 3: Run SadTalker
425
+ temp_dir = os.path.join(os.path.dirname(output_path) or ".", "temp_sadtalker")
426
+ os.makedirs(temp_dir, exist_ok=True)
427
+
428
+ base_video = self.run_sadtalker(
429
+ audio_path=audio_path,
430
+ image_path=image_path,
431
+ output_dir=temp_dir,
432
+ expression_scale=adjusted_scale,
433
+ still_mode=still_mode,
434
+ preprocess=preprocess,
435
+ size=size
436
+ )
437
+
438
+ if base_video is None:
439
+ print(" ✗ SadTalker generation failed!")
440
+ result_meta["success"] = False
441
+ return result_meta
442
+
443
+ print(f" Base video: {base_video}")
444
+ result_meta["base_video"] = base_video
445
+
446
+ # Step 4: Apply emotion post-processing
447
+ final_video = self.apply_emotion_postprocess(
448
+ video_path=base_video,
449
+ emotion=emotion,
450
+ intensity=intensity,
451
+ output_path=output_path
452
+ )
453
+
454
+ result_meta["output"] = final_video
455
+ result_meta["success"] = True
456
+
457
+ print(f"\n [4/4] Generation complete!")
458
+ print(f" Output: {final_video}")
459
+ print(f" Emotion: {emotion} (intensity: {intensity:.2f})")
460
+ print("=" * 50)
461
+
462
+ # Save metadata
463
+ meta_path = output_path.replace(".mp4", "_meta.json")
464
+ with open(meta_path, "w") as f:
465
+ json.dump(result_meta, f, indent=2, default=str)
466
+
467
+ return result_meta
468
+
469
+ def generate_all_emotions(
470
+ self,
471
+ audio_path: str,
472
+ image_path: str,
473
+ output_dir: str = "outputs",
474
+ intensity: float = 0.7,
475
+ **kwargs
476
+ ) -> List[Dict]:
477
+ """
478
+ Generate same audio+image across all 7 emotions.
479
+ This is the key demo for showing emotion conditioning works.
480
+ """
481
+ os.makedirs(output_dir, exist_ok=True)
482
+ results = []
483
+
484
+ emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"]
485
+
486
+ for emotion in emotions:
487
+ print(f"\n{'='*50}")
488
+ print(f" Generating: {emotion.upper()}")
489
+ print(f"{'='*50}")
490
+
491
+ out_path = os.path.join(output_dir, f"emolips_{emotion}.mp4")
492
+
493
+ result = self.generate(
494
+ audio_path=audio_path,
495
+ image_path=image_path,
496
+ emotion=emotion,
497
+ intensity=intensity,
498
+ output_path=out_path,
499
+ **kwargs
500
+ )
501
+ results.append(result)
502
+
503
+ # Create comparison grid
504
+ self._create_comparison_grid(output_dir, emotions)
505
+
506
+ return results
507
+
508
+ def _create_comparison_grid(self, output_dir: str, emotions: List[str]):
509
+ """Create side-by-side comparison video."""
510
+ try:
511
+ videos = []
512
+ for emotion in emotions:
513
+ path = os.path.join(output_dir, f"emolips_{emotion}.mp4")
514
+ if os.path.exists(path):
515
+ videos.append(path)
516
+
517
+ if len(videos) < 2:
518
+ return
519
+
520
+ # Use ffmpeg to create grid
521
+ # 4 videos in a row, 2 rows
522
+ filter_parts = []
523
+ inputs = []
524
+ for i, v in enumerate(videos[:8]): # Max 8
525
+ inputs.extend(["-i", v])
526
+ filter_parts.append(f"[{i}:v]scale=256:256[v{i}]")
527
+
528
+ n = len(videos[:8])
529
+ cols = min(4, n)
530
+ rows = (n + cols - 1) // cols
531
+
532
+ # Build xstack filter
533
+ layout_parts = []
534
+ for i in range(min(n, 8)):
535
+ x = (i % cols) * 256
536
+ y = (i // cols) * 256
537
+ layout_parts.append(f"{x}_{y}")
538
+
539
+ inputs_str = "".join(f"[v{i}]" for i in range(min(n, 8)))
540
+ filter_str = ";".join(filter_parts) + f";{inputs_str}xstack=inputs={min(n,8)}:layout={'|'.join(layout_parts)}"
541
+
542
+ grid_path = os.path.join(output_dir, "comparison_grid.mp4")
543
+
544
+ subprocess.run(
545
+ ["ffmpeg", "-y"] + inputs + [
546
+ "-filter_complex", filter_str,
547
+ "-c:v", "libx264",
548
+ "-crf", "23",
549
+ grid_path
550
+ ],
551
+ capture_output=True,
552
+ timeout=120
553
+ )
554
+
555
+ if os.path.exists(grid_path):
556
+ print(f"\n ✓ Comparison grid: {grid_path}")
557
+
558
+ except Exception as e:
559
+ print(f" ⚠ Could not create comparison grid: {e}")
560
+
561
+
562
+ # ============================================================
563
+ # STANDALONE MODE (without SadTalker, for testing pipeline)
564
+ # ============================================================
565
+
566
+ class EmolipsStandalone:
567
+ """
568
+ Standalone mode that works WITHOUT SadTalker.
569
+ Uses MediaPipe face mesh + direct warping for quick demo.
570
+
571
+ Good for:
572
+ - Testing the emotion module independently
573
+ - Quick demos without full SadTalker setup
574
+ - Verifying the pipeline logic
575
+ """
576
+
577
+ def __init__(self):
578
+ self.emotion_detector = AudioEmotionDetector(device="cpu")
579
+ self.intensity_estimator = EmotionIntensityEstimator()
580
+ self.emotion_modifier = PracticalEmotionModifier()
581
+
582
+ def generate_emotion_frames(
583
+ self,
584
+ image_path: str,
585
+ emotion: str,
586
+ intensity: float = 0.7,
587
+ num_frames: int = 30
588
+ ) -> List[np.ndarray]:
589
+ """
590
+ Generate emotion-modified face frames from a single image.
591
+ No audio needed - just shows the emotion transformation.
592
+ """
593
+ import cv2
594
+ import mediapipe as mp
595
+
596
+ img = cv2.imread(image_path)
597
+ if img is None:
598
+ raise ValueError(f"Could not read image: {image_path}")
599
+
600
+ mp_face_mesh = mp.solutions.face_mesh
601
+ face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1)
602
+
603
+ frames = []
604
+ for i in range(num_frames):
605
+ # Gradual emotion onset
606
+ t = min(1.0, i / (num_frames * 0.3)) # Ramp up in first 30%
607
+ current_intensity = intensity * t
608
+
609
+ frame = img.copy()
610
+
611
+ # Apply warping
612
+ if current_intensity > 0.1:
613
+ h, w = frame.shape[:2]
614
+ rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
615
+ results = face_mesh.process(rgb)
616
+
617
+ if results.multi_face_landmarks:
618
+ profile = EMOTION_PROFILES.get(emotion, {})
619
+ brow_shift = profile.get("brow_scale", 0) * current_intensity * 5
620
+ mouth_shift = profile.get("mouth_scale", 0) * current_intensity * 4
621
+
622
+ if abs(brow_shift) > 0.3 or abs(mouth_shift) > 0.3:
623
+ map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1))
624
+ map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w))
625
+
626
+ face_pts = [(int(l.x * w), int(l.y * h))
627
+ for l in results.multi_face_landmarks[0].landmark]
628
+
629
+ brow_y = face_pts[10][1]
630
+ nose_y = face_pts[1][1]
631
+
632
+ for y_idx in range(max(0, brow_y), min(h, nose_y)):
633
+ center = (brow_y + nose_y) // 2
634
+ dist = abs(y_idx - center) / max(1, (nose_y - brow_y) / 2)
635
+ falloff = np.exp(-dist ** 2 * 2)
636
+ map_y[y_idx, :] -= brow_shift * falloff
637
+
638
+ frame = cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR)
639
+
640
+ # Apply color grading
641
+ color_shifts = {
642
+ "happy": (5, 5, 15), "sad": (-5, -3, -10),
643
+ "angry": (10, -5, -5), "fear": (-5, -5, 5),
644
+ "surprise": (5, 5, 5), "disgust": (-3, 5, -5),
645
+ "neutral": (0, 0, 0)
646
+ }
647
+ shift = color_shifts.get(emotion, (0, 0, 0))
648
+ adjusted = frame.astype(np.float32)
649
+ for c in range(3):
650
+ adjusted[:, :, c] += shift[c] * current_intensity * 0.5
651
+ frame = np.clip(adjusted, 0, 255).astype(np.uint8)
652
+
653
+ frames.append(frame)
654
+
655
+ face_mesh.close()
656
+ return frames
657
+
658
+ def save_demo_video(
659
+ self,
660
+ image_path: str,
661
+ emotions: List[str] = None,
662
+ output_dir: str = "outputs",
663
+ fps: int = 30,
664
+ duration: float = 2.0
665
+ ):
666
+ """Save emotion demo videos from a single face image."""
667
+ import cv2
668
+
669
+ os.makedirs(output_dir, exist_ok=True)
670
+
671
+ if emotions is None:
672
+ emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"]
673
+
674
+ num_frames = int(fps * duration)
675
+
676
+ for emotion in emotions:
677
+ print(f" Generating {emotion}...")
678
+ frames = self.generate_emotion_frames(image_path, emotion, 0.7, num_frames)
679
+
680
+ out_path = os.path.join(output_dir, f"demo_{emotion}.mp4")
681
+ h, w = frames[0].shape[:2]
682
+ out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
683
+ for f in frames:
684
+ out.write(f)
685
+ out.release()
686
+ print(f" ✓ {out_path}")
687
+
688
+
689
+ if __name__ == "__main__":
690
+ print("EMOLIPS Pipeline module loaded.")
691
+ print("Use EmolipsPipeline for full generation or EmolipsStandalone for quick demo.")