| """ |
| EMOLIPS Pipeline |
| ================ |
| Emotion-Driven Lip-Sync Synthesis Pipeline |
| |
| Orchestrates: |
| 1. Audio emotion detection (automatic or manual override) |
| 2. Emotion intensity estimation |
| 3. SadTalker talking face generation |
| 4. Emotion-conditioned coefficient modification |
| 5. Output video rendering |
| |
| Usage: |
| pipeline = EmolipsPipeline(device="cuda") |
| pipeline.generate( |
| audio_path="speech.wav", |
| image_path="face.jpg", |
| emotion="happy", # Optional: auto-detected if not specified |
| intensity=0.7, # Optional: auto-estimated if not specified |
| output_path="output.mp4" |
| ) |
| """ |
|
|
| import os |
| import sys |
| import subprocess |
| import shutil |
| import json |
| import numpy as np |
| from pathlib import Path |
| from typing import Optional, Dict, List |
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
| from emotion_module import ( |
| PracticalEmotionModifier, |
| AudioEmotionDetector, |
| EmotionIntensityEstimator, |
| EMOTION_PROFILES |
| ) |
|
|
|
|
| class EmolipsPipeline: |
| """ |
| Main EMOLIPS inference pipeline. |
| |
| Wraps SadTalker backbone with emotion conditioning. |
| """ |
|
|
| def __init__( |
| self, |
| sadtalker_dir: str = "./SadTalker", |
| device: str = "cuda", |
| checkpoint_dir: str = None |
| ): |
| self.sadtalker_dir = Path(sadtalker_dir).resolve() |
| self.device = device |
| self.checkpoint_dir = checkpoint_dir or str(self.sadtalker_dir / "checkpoints") |
|
|
| |
| self.emotion_detector = AudioEmotionDetector(device=device) |
| self.intensity_estimator = EmotionIntensityEstimator() |
| self.emotion_modifier = PracticalEmotionModifier() |
|
|
| |
| if not self.sadtalker_dir.exists(): |
| print(f"⚠ SadTalker not found at {self.sadtalker_dir}") |
| print(" Run setup.sh first or specify correct path") |
|
|
| def detect_emotion(self, audio_path: str) -> Dict: |
| """Auto-detect emotion from audio.""" |
| print(" [1/4] Detecting emotion from audio...") |
| result = self.emotion_detector.detect(audio_path) |
| print(f" Detected: {result['detected_emotion']} " |
| f"(confidence: {result['confidence']:.2f})") |
| return result |
|
|
| def estimate_intensity(self, audio_path: str) -> float: |
| """Estimate emotion intensity from audio features.""" |
| intensity = self.intensity_estimator.estimate(audio_path) |
| print(f" Intensity: {intensity:.2f}") |
| return intensity |
|
|
| def run_sadtalker( |
| self, |
| audio_path: str, |
| image_path: str, |
| output_dir: str, |
| expression_scale: float = 1.0, |
| still_mode: bool = False, |
| preprocess: str = "crop", |
| size: int = 256, |
| pose_style: int = 0 |
| ) -> Optional[str]: |
| """ |
| Run SadTalker to generate base talking face video. |
| |
| Returns path to generated video. |
| """ |
| print(" [2/4] Running SadTalker backbone...") |
|
|
| |
| inference_script = self.sadtalker_dir / "inference.py" |
|
|
| cmd = [ |
| sys.executable, str(inference_script), |
| "--driven_audio", str(audio_path), |
| "--source_image", str(image_path), |
| "--result_dir", str(output_dir), |
| "--expression_scale", str(expression_scale), |
| "--preprocess", preprocess, |
| "--size", str(size), |
| "--pose_style", str(pose_style), |
| ] |
|
|
| if still_mode: |
| cmd.append("--still") |
|
|
| |
| checkpoint_dir = Path(self.checkpoint_dir) |
| if checkpoint_dir.exists(): |
| cmd.extend(["--checkpoint_dir", str(checkpoint_dir)]) |
|
|
| try: |
| env = os.environ.copy() |
| env["PYTHONPATH"] = str(self.sadtalker_dir) + ":" + env.get("PYTHONPATH", "") |
|
|
| result = subprocess.run( |
| cmd, |
| capture_output=True, |
| text=True, |
| cwd=str(self.sadtalker_dir), |
| env=env, |
| timeout=300 |
| ) |
|
|
| if result.returncode != 0: |
| print(f" ⚠ SadTalker error: {result.stderr[-500:]}") |
| return None |
|
|
| |
| output_path = Path(output_dir) |
| videos = list(output_path.rglob("*.mp4")) |
| if videos: |
| return str(sorted(videos, key=os.path.getmtime)[-1]) |
|
|
| return None |
|
|
| except subprocess.TimeoutExpired: |
| print(" ⚠ SadTalker timed out (>5 min)") |
| return None |
| except Exception as e: |
| print(f" ⚠ SadTalker failed: {e}") |
| return None |
|
|
| def apply_emotion_postprocess( |
| self, |
| video_path: str, |
| emotion: str, |
| intensity: float, |
| output_path: str |
| ) -> str: |
| """ |
| Apply emotion-based post-processing to generated video. |
| |
| This applies subtle facial modifications via: |
| 1. Face landmark detection on each frame |
| 2. Emotion-specific spatial warping |
| 3. Color grading for emotional tone |
| """ |
| print(" [3/4] Applying emotion conditioning...") |
|
|
| try: |
| import cv2 |
| import mediapipe as mp |
|
|
| mp_face_mesh = mp.solutions.face_mesh |
| face_mesh = mp_face_mesh.FaceMesh( |
| static_image_mode=False, |
| max_num_faces=1, |
| min_detection_confidence=0.5 |
| ) |
|
|
| cap = cv2.VideoCapture(video_path) |
| fps = int(cap.get(cv2.CAP_PROP_FPS)) |
| w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
|
|
| |
| temp_path = output_path.replace(".mp4", "_temp.mp4") |
| fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| out = cv2.VideoWriter(temp_path, fourcc, fps, (w, h)) |
|
|
| profile = EMOTION_PROFILES.get(emotion, EMOTION_PROFILES["neutral"]) |
|
|
| frame_count = 0 |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
|
|
| while cap.isOpened(): |
| ret, frame = cap.read() |
| if not ret: |
| break |
|
|
| |
| frame = self._apply_color_grade(frame, emotion, intensity) |
|
|
| |
| if intensity > 0.3 and emotion != "neutral": |
| frame = self._apply_face_warp(frame, face_mesh, emotion, intensity) |
|
|
| out.write(frame) |
| frame_count += 1 |
|
|
| cap.release() |
| out.release() |
| face_mesh.close() |
|
|
| |
| self._mux_audio(temp_path, video_path, output_path) |
|
|
| |
| if os.path.exists(temp_path): |
| os.remove(temp_path) |
|
|
| print(f" Processed {frame_count} frames") |
| return output_path |
|
|
| except ImportError as e: |
| print(f" ⚠ Post-processing skipped (missing {e}). Copying base video.") |
| shutil.copy2(video_path, output_path) |
| return output_path |
| except Exception as e: |
| print(f" ⚠ Post-processing error: {e}. Using base video.") |
| shutil.copy2(video_path, output_path) |
| return output_path |
|
|
| def _apply_color_grade( |
| self, frame: np.ndarray, emotion: str, intensity: float |
| ) -> np.ndarray: |
| """Apply subtle emotion-specific color grading.""" |
| import cv2 |
|
|
| |
| color_shifts = { |
| "happy": (5, 5, 15), |
| "sad": (-5, -3, -10), |
| "angry": (10, -5, -5), |
| "fear": (-5, -5, 5), |
| "surprise": (5, 5, 5), |
| "disgust": (-3, 5, -5), |
| "neutral": (0, 0, 0), |
| } |
|
|
| shift = color_shifts.get(emotion, (0, 0, 0)) |
| scale = intensity * 0.5 |
|
|
| adjusted = frame.astype(np.float32) |
| adjusted[:, :, 0] += shift[0] * scale |
| adjusted[:, :, 1] += shift[1] * scale |
| adjusted[:, :, 2] += shift[2] * scale |
|
|
| return np.clip(adjusted, 0, 255).astype(np.uint8) |
|
|
| def _apply_face_warp( |
| self, |
| frame: np.ndarray, |
| face_mesh, |
| emotion: str, |
| intensity: float |
| ) -> np.ndarray: |
| """ |
| Apply subtle facial warping based on emotion. |
| Uses MediaPipe landmarks to create emotion-specific deformations. |
| """ |
| import cv2 |
|
|
| h, w = frame.shape[:2] |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| results = face_mesh.process(rgb) |
|
|
| if not results.multi_face_landmarks: |
| return frame |
|
|
| landmarks = results.multi_face_landmarks[0] |
|
|
| |
| |
| |
| |
|
|
| profile = EMOTION_PROFILES.get(emotion, {}) |
| brow_shift = profile.get("brow_scale", 0) * intensity * 3 |
| mouth_shift = profile.get("mouth_scale", 0) * intensity * 2 |
|
|
| if abs(brow_shift) < 0.5 and abs(mouth_shift) < 0.5: |
| return frame |
|
|
| |
| |
| map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1)) |
| map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w)) |
|
|
| |
| face_pts = [(int(l.x * w), int(l.y * h)) for l in landmarks.landmark] |
|
|
| |
| brow_y = face_pts[10][1] |
| nose_y = face_pts[1][1] |
| brow_region = (brow_y, nose_y) |
|
|
| |
| for y_idx in range(max(0, brow_region[0]), min(h, brow_region[1])): |
| |
| region_center = (brow_region[0] + brow_region[1]) // 2 |
| dist = abs(y_idx - region_center) / max(1, (brow_region[1] - brow_region[0]) / 2) |
| falloff = np.exp(-dist ** 2 * 2) |
| map_y[y_idx, :] -= brow_shift * falloff |
|
|
| |
| mouth_y = face_pts[13][1] |
| chin_y = face_pts[152][1] |
| mouth_center_x = (face_pts[61][0] + face_pts[291][0]) // 2 |
|
|
| for y_idx in range(max(0, mouth_y - 10), min(h, chin_y + 10)): |
| for x_idx in range(max(0, mouth_center_x - 40), min(w, mouth_center_x + 40)): |
| dist_y = abs(y_idx - mouth_y) / max(1, (chin_y - mouth_y)) |
| dist_x = abs(x_idx - mouth_center_x) / 40.0 |
| falloff = np.exp(-(dist_y ** 2 + dist_x ** 2) * 2) |
| map_x[y_idx, x_idx] += mouth_shift * falloff * (1 if x_idx > mouth_center_x else -1) |
|
|
| warped = cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR) |
| return warped |
|
|
| def _mux_audio(self, video_path: str, audio_source: str, output_path: str): |
| """Combine processed video with original audio.""" |
| try: |
| subprocess.run([ |
| "ffmpeg", "-y", |
| "-i", video_path, |
| "-i", audio_source, |
| "-c:v", "copy", |
| "-c:a", "aac", |
| "-map", "0:v:0", |
| "-map", "1:a:0", |
| "-shortest", |
| output_path |
| ], capture_output=True, timeout=60) |
| except Exception: |
| |
| shutil.copy2(video_path, output_path) |
|
|
| def generate( |
| self, |
| audio_path: str, |
| image_path: str, |
| emotion: Optional[str] = None, |
| intensity: Optional[float] = None, |
| output_path: str = "output.mp4", |
| expression_scale: float = 1.0, |
| still_mode: bool = False, |
| preprocess: str = "crop", |
| size: int = 256 |
| ) -> Dict: |
| """ |
| Full EMOLIPS generation pipeline. |
| |
| Args: |
| audio_path: Path to speech audio file |
| image_path: Path to source face image |
| emotion: Target emotion (auto-detected if None) |
| intensity: Emotion intensity 0-1 (auto-estimated if None) |
| output_path: Where to save result |
| expression_scale: SadTalker expression scale |
| still_mode: Reduce head motion |
| preprocess: SadTalker preprocess mode |
| size: Output resolution |
| |
| Returns: |
| Dict with generation metadata |
| """ |
| print("=" * 50) |
| print(" EMOLIPS: Emotion-Driven Lip-Sync Generation") |
| print("=" * 50) |
|
|
| |
| assert os.path.exists(audio_path), f"Audio not found: {audio_path}" |
| assert os.path.exists(image_path), f"Image not found: {image_path}" |
|
|
| result_meta = { |
| "audio": audio_path, |
| "image": image_path, |
| "output": output_path, |
| } |
|
|
| |
| if emotion is None: |
| detection = self.detect_emotion(audio_path) |
| emotion = detection["detected_emotion"] |
| result_meta["emotion_detection"] = detection |
| else: |
| print(f" [1/4] Using specified emotion: {emotion}") |
| result_meta["emotion_detection"] = {"manual": emotion} |
|
|
| |
| if intensity is None: |
| intensity = self.estimate_intensity(audio_path) |
| else: |
| print(f" Using specified intensity: {intensity}") |
| result_meta["emotion"] = emotion |
| result_meta["intensity"] = intensity |
|
|
| |
| emotion_expression_map = { |
| "neutral": 1.0, |
| "happy": 1.3, |
| "sad": 0.9, |
| "angry": 1.4, |
| "fear": 1.2, |
| "surprise": 1.5, |
| "disgust": 1.1 |
| } |
| adjusted_scale = expression_scale * emotion_expression_map.get(emotion, 1.0) * (0.5 + 0.5 * intensity) |
|
|
| |
| temp_dir = os.path.join(os.path.dirname(output_path) or ".", "temp_sadtalker") |
| os.makedirs(temp_dir, exist_ok=True) |
|
|
| base_video = self.run_sadtalker( |
| audio_path=audio_path, |
| image_path=image_path, |
| output_dir=temp_dir, |
| expression_scale=adjusted_scale, |
| still_mode=still_mode, |
| preprocess=preprocess, |
| size=size |
| ) |
|
|
| if base_video is None: |
| print(" ✗ SadTalker generation failed!") |
| result_meta["success"] = False |
| return result_meta |
|
|
| print(f" Base video: {base_video}") |
| result_meta["base_video"] = base_video |
|
|
| |
| final_video = self.apply_emotion_postprocess( |
| video_path=base_video, |
| emotion=emotion, |
| intensity=intensity, |
| output_path=output_path |
| ) |
|
|
| result_meta["output"] = final_video |
| result_meta["success"] = True |
|
|
| print(f"\n [4/4] Generation complete!") |
| print(f" Output: {final_video}") |
| print(f" Emotion: {emotion} (intensity: {intensity:.2f})") |
| print("=" * 50) |
|
|
| |
| meta_path = output_path.replace(".mp4", "_meta.json") |
| with open(meta_path, "w") as f: |
| json.dump(result_meta, f, indent=2, default=str) |
|
|
| return result_meta |
|
|
| def generate_all_emotions( |
| self, |
| audio_path: str, |
| image_path: str, |
| output_dir: str = "outputs", |
| intensity: float = 0.7, |
| **kwargs |
| ) -> List[Dict]: |
| """ |
| Generate same audio+image across all 7 emotions. |
| This is the key demo for showing emotion conditioning works. |
| """ |
| os.makedirs(output_dir, exist_ok=True) |
| results = [] |
|
|
| emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"] |
|
|
| for emotion in emotions: |
| print(f"\n{'='*50}") |
| print(f" Generating: {emotion.upper()}") |
| print(f"{'='*50}") |
|
|
| out_path = os.path.join(output_dir, f"emolips_{emotion}.mp4") |
|
|
| result = self.generate( |
| audio_path=audio_path, |
| image_path=image_path, |
| emotion=emotion, |
| intensity=intensity, |
| output_path=out_path, |
| **kwargs |
| ) |
| results.append(result) |
|
|
| |
| self._create_comparison_grid(output_dir, emotions) |
|
|
| return results |
|
|
| def _create_comparison_grid(self, output_dir: str, emotions: List[str]): |
| """Create side-by-side comparison video.""" |
| try: |
| videos = [] |
| for emotion in emotions: |
| path = os.path.join(output_dir, f"emolips_{emotion}.mp4") |
| if os.path.exists(path): |
| videos.append(path) |
|
|
| if len(videos) < 2: |
| return |
|
|
| |
| |
| filter_parts = [] |
| inputs = [] |
| for i, v in enumerate(videos[:8]): |
| inputs.extend(["-i", v]) |
| filter_parts.append(f"[{i}:v]scale=256:256[v{i}]") |
|
|
| n = len(videos[:8]) |
| cols = min(4, n) |
| rows = (n + cols - 1) // cols |
|
|
| |
| layout_parts = [] |
| for i in range(min(n, 8)): |
| x = (i % cols) * 256 |
| y = (i // cols) * 256 |
| layout_parts.append(f"{x}_{y}") |
|
|
| inputs_str = "".join(f"[v{i}]" for i in range(min(n, 8))) |
| filter_str = ";".join(filter_parts) + f";{inputs_str}xstack=inputs={min(n,8)}:layout={'|'.join(layout_parts)}" |
|
|
| grid_path = os.path.join(output_dir, "comparison_grid.mp4") |
|
|
| subprocess.run( |
| ["ffmpeg", "-y"] + inputs + [ |
| "-filter_complex", filter_str, |
| "-c:v", "libx264", |
| "-crf", "23", |
| grid_path |
| ], |
| capture_output=True, |
| timeout=120 |
| ) |
|
|
| if os.path.exists(grid_path): |
| print(f"\n ✓ Comparison grid: {grid_path}") |
|
|
| except Exception as e: |
| print(f" ⚠ Could not create comparison grid: {e}") |
|
|
|
|
| |
| |
| |
|
|
| class EmolipsStandalone: |
| """ |
| Standalone mode that works WITHOUT SadTalker. |
| Uses MediaPipe face mesh + direct warping for quick demo. |
| |
| Good for: |
| - Testing the emotion module independently |
| - Quick demos without full SadTalker setup |
| - Verifying the pipeline logic |
| """ |
|
|
| def __init__(self): |
| self.emotion_detector = AudioEmotionDetector(device="cpu") |
| self.intensity_estimator = EmotionIntensityEstimator() |
| self.emotion_modifier = PracticalEmotionModifier() |
|
|
| def generate_emotion_frames( |
| self, |
| image_path: str, |
| emotion: str, |
| intensity: float = 0.7, |
| num_frames: int = 30 |
| ) -> List[np.ndarray]: |
| """ |
| Generate emotion-modified face frames from a single image. |
| No audio needed - just shows the emotion transformation. |
| """ |
| import cv2 |
| import mediapipe as mp |
|
|
| img = cv2.imread(image_path) |
| if img is None: |
| raise ValueError(f"Could not read image: {image_path}") |
|
|
| mp_face_mesh = mp.solutions.face_mesh |
| face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1) |
|
|
| frames = [] |
| for i in range(num_frames): |
| |
| t = min(1.0, i / (num_frames * 0.3)) |
| current_intensity = intensity * t |
|
|
| frame = img.copy() |
|
|
| |
| if current_intensity > 0.1: |
| h, w = frame.shape[:2] |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| results = face_mesh.process(rgb) |
|
|
| if results.multi_face_landmarks: |
| profile = EMOTION_PROFILES.get(emotion, {}) |
| brow_shift = profile.get("brow_scale", 0) * current_intensity * 5 |
| mouth_shift = profile.get("mouth_scale", 0) * current_intensity * 4 |
|
|
| if abs(brow_shift) > 0.3 or abs(mouth_shift) > 0.3: |
| map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1)) |
| map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w)) |
|
|
| face_pts = [(int(l.x * w), int(l.y * h)) |
| for l in results.multi_face_landmarks[0].landmark] |
|
|
| brow_y = face_pts[10][1] |
| nose_y = face_pts[1][1] |
|
|
| for y_idx in range(max(0, brow_y), min(h, nose_y)): |
| center = (brow_y + nose_y) // 2 |
| dist = abs(y_idx - center) / max(1, (nose_y - brow_y) / 2) |
| falloff = np.exp(-dist ** 2 * 2) |
| map_y[y_idx, :] -= brow_shift * falloff |
|
|
| frame = cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR) |
|
|
| |
| color_shifts = { |
| "happy": (5, 5, 15), "sad": (-5, -3, -10), |
| "angry": (10, -5, -5), "fear": (-5, -5, 5), |
| "surprise": (5, 5, 5), "disgust": (-3, 5, -5), |
| "neutral": (0, 0, 0) |
| } |
| shift = color_shifts.get(emotion, (0, 0, 0)) |
| adjusted = frame.astype(np.float32) |
| for c in range(3): |
| adjusted[:, :, c] += shift[c] * current_intensity * 0.5 |
| frame = np.clip(adjusted, 0, 255).astype(np.uint8) |
|
|
| frames.append(frame) |
|
|
| face_mesh.close() |
| return frames |
|
|
| def save_demo_video( |
| self, |
| image_path: str, |
| emotions: List[str] = None, |
| output_dir: str = "outputs", |
| fps: int = 30, |
| duration: float = 2.0 |
| ): |
| """Save emotion demo videos from a single face image.""" |
| import cv2 |
|
|
| os.makedirs(output_dir, exist_ok=True) |
|
|
| if emotions is None: |
| emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"] |
|
|
| num_frames = int(fps * duration) |
|
|
| for emotion in emotions: |
| print(f" Generating {emotion}...") |
| frames = self.generate_emotion_frames(image_path, emotion, 0.7, num_frames) |
|
|
| out_path = os.path.join(output_dir, f"demo_{emotion}.mp4") |
| h, w = frames[0].shape[:2] |
| out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) |
| for f in frames: |
| out.write(f) |
| out.release() |
| print(f" ✓ {out_path}") |
|
|
|
|
| if __name__ == "__main__": |
| print("EMOLIPS Pipeline module loaded.") |
| print("Use EmolipsPipeline for full generation or EmolipsStandalone for quick demo.") |
|
|