""" EMOLIPS Pipeline ================ Emotion-Driven Lip-Sync Synthesis Pipeline Orchestrates: 1. Audio emotion detection (automatic or manual override) 2. Emotion intensity estimation 3. SadTalker talking face generation 4. Emotion-conditioned coefficient modification 5. Output video rendering Usage: pipeline = EmolipsPipeline(device="cuda") pipeline.generate( audio_path="speech.wav", image_path="face.jpg", emotion="happy", # Optional: auto-detected if not specified intensity=0.7, # Optional: auto-estimated if not specified output_path="output.mp4" ) """ import os import sys import subprocess import shutil import json import numpy as np from pathlib import Path from typing import Optional, Dict, List import warnings warnings.filterwarnings("ignore") from emotion_module import ( PracticalEmotionModifier, AudioEmotionDetector, EmotionIntensityEstimator, EMOTION_PROFILES ) class EmolipsPipeline: """ Main EMOLIPS inference pipeline. Wraps SadTalker backbone with emotion conditioning. """ def __init__( self, sadtalker_dir: str = "./SadTalker", device: str = "cuda", checkpoint_dir: str = None ): self.sadtalker_dir = Path(sadtalker_dir).resolve() self.device = device self.checkpoint_dir = checkpoint_dir or str(self.sadtalker_dir / "checkpoints") # Initialize emotion components self.emotion_detector = AudioEmotionDetector(device=device) self.intensity_estimator = EmotionIntensityEstimator() self.emotion_modifier = PracticalEmotionModifier() # Verify SadTalker installation if not self.sadtalker_dir.exists(): print(f"⚠ SadTalker not found at {self.sadtalker_dir}") print(" Run setup.sh first or specify correct path") def detect_emotion(self, audio_path: str) -> Dict: """Auto-detect emotion from audio.""" print(" [1/4] Detecting emotion from audio...") result = self.emotion_detector.detect(audio_path) print(f" Detected: {result['detected_emotion']} " f"(confidence: {result['confidence']:.2f})") return result def estimate_intensity(self, audio_path: str) -> float: """Estimate emotion intensity from audio features.""" intensity = self.intensity_estimator.estimate(audio_path) print(f" Intensity: {intensity:.2f}") return intensity def run_sadtalker( self, audio_path: str, image_path: str, output_dir: str, expression_scale: float = 1.0, still_mode: bool = False, preprocess: str = "crop", size: int = 256, pose_style: int = 0 ) -> Optional[str]: """ Run SadTalker to generate base talking face video. Returns path to generated video. """ print(" [2/4] Running SadTalker backbone...") # Build SadTalker command inference_script = self.sadtalker_dir / "inference.py" cmd = [ sys.executable, str(inference_script), "--driven_audio", str(audio_path), "--source_image", str(image_path), "--result_dir", str(output_dir), "--expression_scale", str(expression_scale), "--preprocess", preprocess, "--size", str(size), "--pose_style", str(pose_style), ] if still_mode: cmd.append("--still") # Add checkpoint paths checkpoint_dir = Path(self.checkpoint_dir) if checkpoint_dir.exists(): cmd.extend(["--checkpoint_dir", str(checkpoint_dir)]) try: env = os.environ.copy() env["PYTHONPATH"] = str(self.sadtalker_dir) + ":" + env.get("PYTHONPATH", "") result = subprocess.run( cmd, capture_output=True, text=True, cwd=str(self.sadtalker_dir), env=env, timeout=300 # 5 min timeout ) if result.returncode != 0: print(f" ⚠ SadTalker error: {result.stderr[-500:]}") return None # Find generated video output_path = Path(output_dir) videos = list(output_path.rglob("*.mp4")) if videos: return str(sorted(videos, key=os.path.getmtime)[-1]) return None except subprocess.TimeoutExpired: print(" ⚠ SadTalker timed out (>5 min)") return None except Exception as e: print(f" ⚠ SadTalker failed: {e}") return None def apply_emotion_postprocess( self, video_path: str, emotion: str, intensity: float, output_path: str ) -> str: """ Apply emotion-based post-processing to generated video. This applies subtle facial modifications via: 1. Face landmark detection on each frame 2. Emotion-specific spatial warping 3. Color grading for emotional tone """ print(" [3/4] Applying emotion conditioning...") try: import cv2 import mediapipe as mp mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh( static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5 ) cap = cv2.VideoCapture(video_path) fps = int(cap.get(cv2.CAP_PROP_FPS)) w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) # Temp output (will mux audio later) temp_path = output_path.replace(".mp4", "_temp.mp4") fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(temp_path, fourcc, fps, (w, h)) profile = EMOTION_PROFILES.get(emotion, EMOTION_PROFILES["neutral"]) frame_count = 0 total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) while cap.isOpened(): ret, frame = cap.read() if not ret: break # Apply emotion-specific color grading frame = self._apply_color_grade(frame, emotion, intensity) # Apply subtle face warping if emotion is strong if intensity > 0.3 and emotion != "neutral": frame = self._apply_face_warp(frame, face_mesh, emotion, intensity) out.write(frame) frame_count += 1 cap.release() out.release() face_mesh.close() # Mux original audio back self._mux_audio(temp_path, video_path, output_path) # Cleanup temp if os.path.exists(temp_path): os.remove(temp_path) print(f" Processed {frame_count} frames") return output_path except ImportError as e: print(f" ⚠ Post-processing skipped (missing {e}). Copying base video.") shutil.copy2(video_path, output_path) return output_path except Exception as e: print(f" ⚠ Post-processing error: {e}. Using base video.") shutil.copy2(video_path, output_path) return output_path def _apply_color_grade( self, frame: np.ndarray, emotion: str, intensity: float ) -> np.ndarray: """Apply subtle emotion-specific color grading.""" import cv2 # Very subtle color shifts based on emotion color_shifts = { "happy": (5, 5, 15), # Warm (slight yellow) "sad": (-5, -3, -10), # Cool (slight blue) "angry": (10, -5, -5), # Warm red "fear": (-5, -5, 5), # Cool green "surprise": (5, 5, 5), # Bright "disgust": (-3, 5, -5), # Sickly green "neutral": (0, 0, 0), } shift = color_shifts.get(emotion, (0, 0, 0)) scale = intensity * 0.5 # Keep it very subtle adjusted = frame.astype(np.float32) adjusted[:, :, 0] += shift[0] * scale # B adjusted[:, :, 1] += shift[1] * scale # G adjusted[:, :, 2] += shift[2] * scale # R return np.clip(adjusted, 0, 255).astype(np.uint8) def _apply_face_warp( self, frame: np.ndarray, face_mesh, emotion: str, intensity: float ) -> np.ndarray: """ Apply subtle facial warping based on emotion. Uses MediaPipe landmarks to create emotion-specific deformations. """ import cv2 h, w = frame.shape[:2] rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = face_mesh.process(rgb) if not results.multi_face_landmarks: return frame landmarks = results.multi_face_landmarks[0] # Key landmark indices for warping # Brow: 70, 63, 105, 66, 107 (left), 336, 296, 334, 293, 300 (right) # Mouth corners: 61, 291 # Jaw: 152 profile = EMOTION_PROFILES.get(emotion, {}) brow_shift = profile.get("brow_scale", 0) * intensity * 3 # pixels mouth_shift = profile.get("mouth_scale", 0) * intensity * 2 if abs(brow_shift) < 0.5 and abs(mouth_shift) < 0.5: return frame # Not enough to notice # Simple approach: use cv2.remap with subtle displacement # This is fast and produces decent results map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1)) map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w)) # Get face center and brow/mouth regions face_pts = [(int(l.x * w), int(l.y * h)) for l in landmarks.landmark] # Brow region (top 1/3 of face) brow_y = face_pts[10][1] # Top of face nose_y = face_pts[1][1] # Nose tip brow_region = (brow_y, nose_y) # Apply brow displacement in brow region for y_idx in range(max(0, brow_region[0]), min(h, brow_region[1])): # Gaussian falloff from center of region region_center = (brow_region[0] + brow_region[1]) // 2 dist = abs(y_idx - region_center) / max(1, (brow_region[1] - brow_region[0]) / 2) falloff = np.exp(-dist ** 2 * 2) map_y[y_idx, :] -= brow_shift * falloff # Apply mouth displacement in lower face mouth_y = face_pts[13][1] # Upper lip chin_y = face_pts[152][1] # Chin mouth_center_x = (face_pts[61][0] + face_pts[291][0]) // 2 for y_idx in range(max(0, mouth_y - 10), min(h, chin_y + 10)): for x_idx in range(max(0, mouth_center_x - 40), min(w, mouth_center_x + 40)): dist_y = abs(y_idx - mouth_y) / max(1, (chin_y - mouth_y)) dist_x = abs(x_idx - mouth_center_x) / 40.0 falloff = np.exp(-(dist_y ** 2 + dist_x ** 2) * 2) map_x[y_idx, x_idx] += mouth_shift * falloff * (1 if x_idx > mouth_center_x else -1) warped = cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR) return warped def _mux_audio(self, video_path: str, audio_source: str, output_path: str): """Combine processed video with original audio.""" try: subprocess.run([ "ffmpeg", "-y", "-i", video_path, "-i", audio_source, "-c:v", "copy", "-c:a", "aac", "-map", "0:v:0", "-map", "1:a:0", "-shortest", output_path ], capture_output=True, timeout=60) except Exception: # If ffmpeg fails, just use the video without audio shutil.copy2(video_path, output_path) def generate( self, audio_path: str, image_path: str, emotion: Optional[str] = None, intensity: Optional[float] = None, output_path: str = "output.mp4", expression_scale: float = 1.0, still_mode: bool = False, preprocess: str = "crop", size: int = 256 ) -> Dict: """ Full EMOLIPS generation pipeline. Args: audio_path: Path to speech audio file image_path: Path to source face image emotion: Target emotion (auto-detected if None) intensity: Emotion intensity 0-1 (auto-estimated if None) output_path: Where to save result expression_scale: SadTalker expression scale still_mode: Reduce head motion preprocess: SadTalker preprocess mode size: Output resolution Returns: Dict with generation metadata """ print("=" * 50) print(" EMOLIPS: Emotion-Driven Lip-Sync Generation") print("=" * 50) # Validate inputs assert os.path.exists(audio_path), f"Audio not found: {audio_path}" assert os.path.exists(image_path), f"Image not found: {image_path}" result_meta = { "audio": audio_path, "image": image_path, "output": output_path, } # Step 1: Emotion detection if emotion is None: detection = self.detect_emotion(audio_path) emotion = detection["detected_emotion"] result_meta["emotion_detection"] = detection else: print(f" [1/4] Using specified emotion: {emotion}") result_meta["emotion_detection"] = {"manual": emotion} # Step 2: Intensity estimation if intensity is None: intensity = self.estimate_intensity(audio_path) else: print(f" Using specified intensity: {intensity}") result_meta["emotion"] = emotion result_meta["intensity"] = intensity # Adjust SadTalker expression scale based on emotion emotion_expression_map = { "neutral": 1.0, "happy": 1.3, "sad": 0.9, "angry": 1.4, "fear": 1.2, "surprise": 1.5, "disgust": 1.1 } adjusted_scale = expression_scale * emotion_expression_map.get(emotion, 1.0) * (0.5 + 0.5 * intensity) # Step 3: Run SadTalker temp_dir = os.path.join(os.path.dirname(output_path) or ".", "temp_sadtalker") os.makedirs(temp_dir, exist_ok=True) base_video = self.run_sadtalker( audio_path=audio_path, image_path=image_path, output_dir=temp_dir, expression_scale=adjusted_scale, still_mode=still_mode, preprocess=preprocess, size=size ) if base_video is None: print(" ✗ SadTalker generation failed!") result_meta["success"] = False return result_meta print(f" Base video: {base_video}") result_meta["base_video"] = base_video # Step 4: Apply emotion post-processing final_video = self.apply_emotion_postprocess( video_path=base_video, emotion=emotion, intensity=intensity, output_path=output_path ) result_meta["output"] = final_video result_meta["success"] = True print(f"\n [4/4] Generation complete!") print(f" Output: {final_video}") print(f" Emotion: {emotion} (intensity: {intensity:.2f})") print("=" * 50) # Save metadata meta_path = output_path.replace(".mp4", "_meta.json") with open(meta_path, "w") as f: json.dump(result_meta, f, indent=2, default=str) return result_meta def generate_all_emotions( self, audio_path: str, image_path: str, output_dir: str = "outputs", intensity: float = 0.7, **kwargs ) -> List[Dict]: """ Generate same audio+image across all 7 emotions. This is the key demo for showing emotion conditioning works. """ os.makedirs(output_dir, exist_ok=True) results = [] emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"] for emotion in emotions: print(f"\n{'='*50}") print(f" Generating: {emotion.upper()}") print(f"{'='*50}") out_path = os.path.join(output_dir, f"emolips_{emotion}.mp4") result = self.generate( audio_path=audio_path, image_path=image_path, emotion=emotion, intensity=intensity, output_path=out_path, **kwargs ) results.append(result) # Create comparison grid self._create_comparison_grid(output_dir, emotions) return results def _create_comparison_grid(self, output_dir: str, emotions: List[str]): """Create side-by-side comparison video.""" try: videos = [] for emotion in emotions: path = os.path.join(output_dir, f"emolips_{emotion}.mp4") if os.path.exists(path): videos.append(path) if len(videos) < 2: return # Use ffmpeg to create grid # 4 videos in a row, 2 rows filter_parts = [] inputs = [] for i, v in enumerate(videos[:8]): # Max 8 inputs.extend(["-i", v]) filter_parts.append(f"[{i}:v]scale=256:256[v{i}]") n = len(videos[:8]) cols = min(4, n) rows = (n + cols - 1) // cols # Build xstack filter layout_parts = [] for i in range(min(n, 8)): x = (i % cols) * 256 y = (i // cols) * 256 layout_parts.append(f"{x}_{y}") inputs_str = "".join(f"[v{i}]" for i in range(min(n, 8))) filter_str = ";".join(filter_parts) + f";{inputs_str}xstack=inputs={min(n,8)}:layout={'|'.join(layout_parts)}" grid_path = os.path.join(output_dir, "comparison_grid.mp4") subprocess.run( ["ffmpeg", "-y"] + inputs + [ "-filter_complex", filter_str, "-c:v", "libx264", "-crf", "23", grid_path ], capture_output=True, timeout=120 ) if os.path.exists(grid_path): print(f"\n ✓ Comparison grid: {grid_path}") except Exception as e: print(f" ⚠ Could not create comparison grid: {e}") # ============================================================ # STANDALONE MODE (without SadTalker, for testing pipeline) # ============================================================ class EmolipsStandalone: """ Standalone mode that works WITHOUT SadTalker. Uses MediaPipe face mesh + direct warping for quick demo. Good for: - Testing the emotion module independently - Quick demos without full SadTalker setup - Verifying the pipeline logic """ def __init__(self): self.emotion_detector = AudioEmotionDetector(device="cpu") self.intensity_estimator = EmotionIntensityEstimator() self.emotion_modifier = PracticalEmotionModifier() def generate_emotion_frames( self, image_path: str, emotion: str, intensity: float = 0.7, num_frames: int = 30 ) -> List[np.ndarray]: """ Generate emotion-modified face frames from a single image. No audio needed - just shows the emotion transformation. """ import cv2 import mediapipe as mp img = cv2.imread(image_path) if img is None: raise ValueError(f"Could not read image: {image_path}") mp_face_mesh = mp.solutions.face_mesh face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1) frames = [] for i in range(num_frames): # Gradual emotion onset t = min(1.0, i / (num_frames * 0.3)) # Ramp up in first 30% current_intensity = intensity * t frame = img.copy() # Apply warping if current_intensity > 0.1: h, w = frame.shape[:2] rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) results = face_mesh.process(rgb) if results.multi_face_landmarks: profile = EMOTION_PROFILES.get(emotion, {}) brow_shift = profile.get("brow_scale", 0) * current_intensity * 5 mouth_shift = profile.get("mouth_scale", 0) * current_intensity * 4 if abs(brow_shift) > 0.3 or abs(mouth_shift) > 0.3: map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1)) map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w)) face_pts = [(int(l.x * w), int(l.y * h)) for l in results.multi_face_landmarks[0].landmark] brow_y = face_pts[10][1] nose_y = face_pts[1][1] for y_idx in range(max(0, brow_y), min(h, nose_y)): center = (brow_y + nose_y) // 2 dist = abs(y_idx - center) / max(1, (nose_y - brow_y) / 2) falloff = np.exp(-dist ** 2 * 2) map_y[y_idx, :] -= brow_shift * falloff frame = cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR) # Apply color grading color_shifts = { "happy": (5, 5, 15), "sad": (-5, -3, -10), "angry": (10, -5, -5), "fear": (-5, -5, 5), "surprise": (5, 5, 5), "disgust": (-3, 5, -5), "neutral": (0, 0, 0) } shift = color_shifts.get(emotion, (0, 0, 0)) adjusted = frame.astype(np.float32) for c in range(3): adjusted[:, :, c] += shift[c] * current_intensity * 0.5 frame = np.clip(adjusted, 0, 255).astype(np.uint8) frames.append(frame) face_mesh.close() return frames def save_demo_video( self, image_path: str, emotions: List[str] = None, output_dir: str = "outputs", fps: int = 30, duration: float = 2.0 ): """Save emotion demo videos from a single face image.""" import cv2 os.makedirs(output_dir, exist_ok=True) if emotions is None: emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"] num_frames = int(fps * duration) for emotion in emotions: print(f" Generating {emotion}...") frames = self.generate_emotion_frames(image_path, emotion, 0.7, num_frames) out_path = os.path.join(output_dir, f"demo_{emotion}.mp4") h, w = frames[0].shape[:2] out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h)) for f in frames: out.write(f) out.release() print(f" ✓ {out_path}") if __name__ == "__main__": print("EMOLIPS Pipeline module loaded.") print("Use EmolipsPipeline for full generation or EmolipsStandalone for quick demo.")