emolips / code /pipeline.py
primal-sage's picture
Upload code/pipeline.py with huggingface_hub
46dd09f verified
"""
EMOLIPS Pipeline
================
Emotion-Driven Lip-Sync Synthesis Pipeline
Orchestrates:
1. Audio emotion detection (automatic or manual override)
2. Emotion intensity estimation
3. SadTalker talking face generation
4. Emotion-conditioned coefficient modification
5. Output video rendering
Usage:
pipeline = EmolipsPipeline(device="cuda")
pipeline.generate(
audio_path="speech.wav",
image_path="face.jpg",
emotion="happy", # Optional: auto-detected if not specified
intensity=0.7, # Optional: auto-estimated if not specified
output_path="output.mp4"
)
"""
import os
import sys
import subprocess
import shutil
import json
import numpy as np
from pathlib import Path
from typing import Optional, Dict, List
import warnings
warnings.filterwarnings("ignore")
from emotion_module import (
PracticalEmotionModifier,
AudioEmotionDetector,
EmotionIntensityEstimator,
EMOTION_PROFILES
)
class EmolipsPipeline:
"""
Main EMOLIPS inference pipeline.
Wraps SadTalker backbone with emotion conditioning.
"""
def __init__(
self,
sadtalker_dir: str = "./SadTalker",
device: str = "cuda",
checkpoint_dir: str = None
):
self.sadtalker_dir = Path(sadtalker_dir).resolve()
self.device = device
self.checkpoint_dir = checkpoint_dir or str(self.sadtalker_dir / "checkpoints")
# Initialize emotion components
self.emotion_detector = AudioEmotionDetector(device=device)
self.intensity_estimator = EmotionIntensityEstimator()
self.emotion_modifier = PracticalEmotionModifier()
# Verify SadTalker installation
if not self.sadtalker_dir.exists():
print(f"⚠ SadTalker not found at {self.sadtalker_dir}")
print(" Run setup.sh first or specify correct path")
def detect_emotion(self, audio_path: str) -> Dict:
"""Auto-detect emotion from audio."""
print(" [1/4] Detecting emotion from audio...")
result = self.emotion_detector.detect(audio_path)
print(f" Detected: {result['detected_emotion']} "
f"(confidence: {result['confidence']:.2f})")
return result
def estimate_intensity(self, audio_path: str) -> float:
"""Estimate emotion intensity from audio features."""
intensity = self.intensity_estimator.estimate(audio_path)
print(f" Intensity: {intensity:.2f}")
return intensity
def run_sadtalker(
self,
audio_path: str,
image_path: str,
output_dir: str,
expression_scale: float = 1.0,
still_mode: bool = False,
preprocess: str = "crop",
size: int = 256,
pose_style: int = 0
) -> Optional[str]:
"""
Run SadTalker to generate base talking face video.
Returns path to generated video.
"""
print(" [2/4] Running SadTalker backbone...")
# Build SadTalker command
inference_script = self.sadtalker_dir / "inference.py"
cmd = [
sys.executable, str(inference_script),
"--driven_audio", str(audio_path),
"--source_image", str(image_path),
"--result_dir", str(output_dir),
"--expression_scale", str(expression_scale),
"--preprocess", preprocess,
"--size", str(size),
"--pose_style", str(pose_style),
]
if still_mode:
cmd.append("--still")
# Add checkpoint paths
checkpoint_dir = Path(self.checkpoint_dir)
if checkpoint_dir.exists():
cmd.extend(["--checkpoint_dir", str(checkpoint_dir)])
try:
env = os.environ.copy()
env["PYTHONPATH"] = str(self.sadtalker_dir) + ":" + env.get("PYTHONPATH", "")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
cwd=str(self.sadtalker_dir),
env=env,
timeout=300 # 5 min timeout
)
if result.returncode != 0:
print(f" ⚠ SadTalker error: {result.stderr[-500:]}")
return None
# Find generated video
output_path = Path(output_dir)
videos = list(output_path.rglob("*.mp4"))
if videos:
return str(sorted(videos, key=os.path.getmtime)[-1])
return None
except subprocess.TimeoutExpired:
print(" ⚠ SadTalker timed out (>5 min)")
return None
except Exception as e:
print(f" ⚠ SadTalker failed: {e}")
return None
def apply_emotion_postprocess(
self,
video_path: str,
emotion: str,
intensity: float,
output_path: str
) -> str:
"""
Apply emotion-based post-processing to generated video.
This applies subtle facial modifications via:
1. Face landmark detection on each frame
2. Emotion-specific spatial warping
3. Color grading for emotional tone
"""
print(" [3/4] Applying emotion conditioning...")
try:
import cv2
import mediapipe as mp
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
static_image_mode=False,
max_num_faces=1,
min_detection_confidence=0.5
)
cap = cv2.VideoCapture(video_path)
fps = int(cap.get(cv2.CAP_PROP_FPS))
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
# Temp output (will mux audio later)
temp_path = output_path.replace(".mp4", "_temp.mp4")
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(temp_path, fourcc, fps, (w, h))
profile = EMOTION_PROFILES.get(emotion, EMOTION_PROFILES["neutral"])
frame_count = 0
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Apply emotion-specific color grading
frame = self._apply_color_grade(frame, emotion, intensity)
# Apply subtle face warping if emotion is strong
if intensity > 0.3 and emotion != "neutral":
frame = self._apply_face_warp(frame, face_mesh, emotion, intensity)
out.write(frame)
frame_count += 1
cap.release()
out.release()
face_mesh.close()
# Mux original audio back
self._mux_audio(temp_path, video_path, output_path)
# Cleanup temp
if os.path.exists(temp_path):
os.remove(temp_path)
print(f" Processed {frame_count} frames")
return output_path
except ImportError as e:
print(f" ⚠ Post-processing skipped (missing {e}). Copying base video.")
shutil.copy2(video_path, output_path)
return output_path
except Exception as e:
print(f" ⚠ Post-processing error: {e}. Using base video.")
shutil.copy2(video_path, output_path)
return output_path
def _apply_color_grade(
self, frame: np.ndarray, emotion: str, intensity: float
) -> np.ndarray:
"""Apply subtle emotion-specific color grading."""
import cv2
# Very subtle color shifts based on emotion
color_shifts = {
"happy": (5, 5, 15), # Warm (slight yellow)
"sad": (-5, -3, -10), # Cool (slight blue)
"angry": (10, -5, -5), # Warm red
"fear": (-5, -5, 5), # Cool green
"surprise": (5, 5, 5), # Bright
"disgust": (-3, 5, -5), # Sickly green
"neutral": (0, 0, 0),
}
shift = color_shifts.get(emotion, (0, 0, 0))
scale = intensity * 0.5 # Keep it very subtle
adjusted = frame.astype(np.float32)
adjusted[:, :, 0] += shift[0] * scale # B
adjusted[:, :, 1] += shift[1] * scale # G
adjusted[:, :, 2] += shift[2] * scale # R
return np.clip(adjusted, 0, 255).astype(np.uint8)
def _apply_face_warp(
self,
frame: np.ndarray,
face_mesh,
emotion: str,
intensity: float
) -> np.ndarray:
"""
Apply subtle facial warping based on emotion.
Uses MediaPipe landmarks to create emotion-specific deformations.
"""
import cv2
h, w = frame.shape[:2]
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb)
if not results.multi_face_landmarks:
return frame
landmarks = results.multi_face_landmarks[0]
# Key landmark indices for warping
# Brow: 70, 63, 105, 66, 107 (left), 336, 296, 334, 293, 300 (right)
# Mouth corners: 61, 291
# Jaw: 152
profile = EMOTION_PROFILES.get(emotion, {})
brow_shift = profile.get("brow_scale", 0) * intensity * 3 # pixels
mouth_shift = profile.get("mouth_scale", 0) * intensity * 2
if abs(brow_shift) < 0.5 and abs(mouth_shift) < 0.5:
return frame # Not enough to notice
# Simple approach: use cv2.remap with subtle displacement
# This is fast and produces decent results
map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1))
map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w))
# Get face center and brow/mouth regions
face_pts = [(int(l.x * w), int(l.y * h)) for l in landmarks.landmark]
# Brow region (top 1/3 of face)
brow_y = face_pts[10][1] # Top of face
nose_y = face_pts[1][1] # Nose tip
brow_region = (brow_y, nose_y)
# Apply brow displacement in brow region
for y_idx in range(max(0, brow_region[0]), min(h, brow_region[1])):
# Gaussian falloff from center of region
region_center = (brow_region[0] + brow_region[1]) // 2
dist = abs(y_idx - region_center) / max(1, (brow_region[1] - brow_region[0]) / 2)
falloff = np.exp(-dist ** 2 * 2)
map_y[y_idx, :] -= brow_shift * falloff
# Apply mouth displacement in lower face
mouth_y = face_pts[13][1] # Upper lip
chin_y = face_pts[152][1] # Chin
mouth_center_x = (face_pts[61][0] + face_pts[291][0]) // 2
for y_idx in range(max(0, mouth_y - 10), min(h, chin_y + 10)):
for x_idx in range(max(0, mouth_center_x - 40), min(w, mouth_center_x + 40)):
dist_y = abs(y_idx - mouth_y) / max(1, (chin_y - mouth_y))
dist_x = abs(x_idx - mouth_center_x) / 40.0
falloff = np.exp(-(dist_y ** 2 + dist_x ** 2) * 2)
map_x[y_idx, x_idx] += mouth_shift * falloff * (1 if x_idx > mouth_center_x else -1)
warped = cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR)
return warped
def _mux_audio(self, video_path: str, audio_source: str, output_path: str):
"""Combine processed video with original audio."""
try:
subprocess.run([
"ffmpeg", "-y",
"-i", video_path,
"-i", audio_source,
"-c:v", "copy",
"-c:a", "aac",
"-map", "0:v:0",
"-map", "1:a:0",
"-shortest",
output_path
], capture_output=True, timeout=60)
except Exception:
# If ffmpeg fails, just use the video without audio
shutil.copy2(video_path, output_path)
def generate(
self,
audio_path: str,
image_path: str,
emotion: Optional[str] = None,
intensity: Optional[float] = None,
output_path: str = "output.mp4",
expression_scale: float = 1.0,
still_mode: bool = False,
preprocess: str = "crop",
size: int = 256
) -> Dict:
"""
Full EMOLIPS generation pipeline.
Args:
audio_path: Path to speech audio file
image_path: Path to source face image
emotion: Target emotion (auto-detected if None)
intensity: Emotion intensity 0-1 (auto-estimated if None)
output_path: Where to save result
expression_scale: SadTalker expression scale
still_mode: Reduce head motion
preprocess: SadTalker preprocess mode
size: Output resolution
Returns:
Dict with generation metadata
"""
print("=" * 50)
print(" EMOLIPS: Emotion-Driven Lip-Sync Generation")
print("=" * 50)
# Validate inputs
assert os.path.exists(audio_path), f"Audio not found: {audio_path}"
assert os.path.exists(image_path), f"Image not found: {image_path}"
result_meta = {
"audio": audio_path,
"image": image_path,
"output": output_path,
}
# Step 1: Emotion detection
if emotion is None:
detection = self.detect_emotion(audio_path)
emotion = detection["detected_emotion"]
result_meta["emotion_detection"] = detection
else:
print(f" [1/4] Using specified emotion: {emotion}")
result_meta["emotion_detection"] = {"manual": emotion}
# Step 2: Intensity estimation
if intensity is None:
intensity = self.estimate_intensity(audio_path)
else:
print(f" Using specified intensity: {intensity}")
result_meta["emotion"] = emotion
result_meta["intensity"] = intensity
# Adjust SadTalker expression scale based on emotion
emotion_expression_map = {
"neutral": 1.0,
"happy": 1.3,
"sad": 0.9,
"angry": 1.4,
"fear": 1.2,
"surprise": 1.5,
"disgust": 1.1
}
adjusted_scale = expression_scale * emotion_expression_map.get(emotion, 1.0) * (0.5 + 0.5 * intensity)
# Step 3: Run SadTalker
temp_dir = os.path.join(os.path.dirname(output_path) or ".", "temp_sadtalker")
os.makedirs(temp_dir, exist_ok=True)
base_video = self.run_sadtalker(
audio_path=audio_path,
image_path=image_path,
output_dir=temp_dir,
expression_scale=adjusted_scale,
still_mode=still_mode,
preprocess=preprocess,
size=size
)
if base_video is None:
print(" ✗ SadTalker generation failed!")
result_meta["success"] = False
return result_meta
print(f" Base video: {base_video}")
result_meta["base_video"] = base_video
# Step 4: Apply emotion post-processing
final_video = self.apply_emotion_postprocess(
video_path=base_video,
emotion=emotion,
intensity=intensity,
output_path=output_path
)
result_meta["output"] = final_video
result_meta["success"] = True
print(f"\n [4/4] Generation complete!")
print(f" Output: {final_video}")
print(f" Emotion: {emotion} (intensity: {intensity:.2f})")
print("=" * 50)
# Save metadata
meta_path = output_path.replace(".mp4", "_meta.json")
with open(meta_path, "w") as f:
json.dump(result_meta, f, indent=2, default=str)
return result_meta
def generate_all_emotions(
self,
audio_path: str,
image_path: str,
output_dir: str = "outputs",
intensity: float = 0.7,
**kwargs
) -> List[Dict]:
"""
Generate same audio+image across all 7 emotions.
This is the key demo for showing emotion conditioning works.
"""
os.makedirs(output_dir, exist_ok=True)
results = []
emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"]
for emotion in emotions:
print(f"\n{'='*50}")
print(f" Generating: {emotion.upper()}")
print(f"{'='*50}")
out_path = os.path.join(output_dir, f"emolips_{emotion}.mp4")
result = self.generate(
audio_path=audio_path,
image_path=image_path,
emotion=emotion,
intensity=intensity,
output_path=out_path,
**kwargs
)
results.append(result)
# Create comparison grid
self._create_comparison_grid(output_dir, emotions)
return results
def _create_comparison_grid(self, output_dir: str, emotions: List[str]):
"""Create side-by-side comparison video."""
try:
videos = []
for emotion in emotions:
path = os.path.join(output_dir, f"emolips_{emotion}.mp4")
if os.path.exists(path):
videos.append(path)
if len(videos) < 2:
return
# Use ffmpeg to create grid
# 4 videos in a row, 2 rows
filter_parts = []
inputs = []
for i, v in enumerate(videos[:8]): # Max 8
inputs.extend(["-i", v])
filter_parts.append(f"[{i}:v]scale=256:256[v{i}]")
n = len(videos[:8])
cols = min(4, n)
rows = (n + cols - 1) // cols
# Build xstack filter
layout_parts = []
for i in range(min(n, 8)):
x = (i % cols) * 256
y = (i // cols) * 256
layout_parts.append(f"{x}_{y}")
inputs_str = "".join(f"[v{i}]" for i in range(min(n, 8)))
filter_str = ";".join(filter_parts) + f";{inputs_str}xstack=inputs={min(n,8)}:layout={'|'.join(layout_parts)}"
grid_path = os.path.join(output_dir, "comparison_grid.mp4")
subprocess.run(
["ffmpeg", "-y"] + inputs + [
"-filter_complex", filter_str,
"-c:v", "libx264",
"-crf", "23",
grid_path
],
capture_output=True,
timeout=120
)
if os.path.exists(grid_path):
print(f"\n ✓ Comparison grid: {grid_path}")
except Exception as e:
print(f" ⚠ Could not create comparison grid: {e}")
# ============================================================
# STANDALONE MODE (without SadTalker, for testing pipeline)
# ============================================================
class EmolipsStandalone:
"""
Standalone mode that works WITHOUT SadTalker.
Uses MediaPipe face mesh + direct warping for quick demo.
Good for:
- Testing the emotion module independently
- Quick demos without full SadTalker setup
- Verifying the pipeline logic
"""
def __init__(self):
self.emotion_detector = AudioEmotionDetector(device="cpu")
self.intensity_estimator = EmotionIntensityEstimator()
self.emotion_modifier = PracticalEmotionModifier()
def generate_emotion_frames(
self,
image_path: str,
emotion: str,
intensity: float = 0.7,
num_frames: int = 30
) -> List[np.ndarray]:
"""
Generate emotion-modified face frames from a single image.
No audio needed - just shows the emotion transformation.
"""
import cv2
import mediapipe as mp
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"Could not read image: {image_path}")
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1)
frames = []
for i in range(num_frames):
# Gradual emotion onset
t = min(1.0, i / (num_frames * 0.3)) # Ramp up in first 30%
current_intensity = intensity * t
frame = img.copy()
# Apply warping
if current_intensity > 0.1:
h, w = frame.shape[:2]
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
results = face_mesh.process(rgb)
if results.multi_face_landmarks:
profile = EMOTION_PROFILES.get(emotion, {})
brow_shift = profile.get("brow_scale", 0) * current_intensity * 5
mouth_shift = profile.get("mouth_scale", 0) * current_intensity * 4
if abs(brow_shift) > 0.3 or abs(mouth_shift) > 0.3:
map_x = np.tile(np.arange(w, dtype=np.float32), (h, 1))
map_y = np.tile(np.arange(h, dtype=np.float32).reshape(-1, 1), (1, w))
face_pts = [(int(l.x * w), int(l.y * h))
for l in results.multi_face_landmarks[0].landmark]
brow_y = face_pts[10][1]
nose_y = face_pts[1][1]
for y_idx in range(max(0, brow_y), min(h, nose_y)):
center = (brow_y + nose_y) // 2
dist = abs(y_idx - center) / max(1, (nose_y - brow_y) / 2)
falloff = np.exp(-dist ** 2 * 2)
map_y[y_idx, :] -= brow_shift * falloff
frame = cv2.remap(frame, map_x, map_y, cv2.INTER_LINEAR)
# Apply color grading
color_shifts = {
"happy": (5, 5, 15), "sad": (-5, -3, -10),
"angry": (10, -5, -5), "fear": (-5, -5, 5),
"surprise": (5, 5, 5), "disgust": (-3, 5, -5),
"neutral": (0, 0, 0)
}
shift = color_shifts.get(emotion, (0, 0, 0))
adjusted = frame.astype(np.float32)
for c in range(3):
adjusted[:, :, c] += shift[c] * current_intensity * 0.5
frame = np.clip(adjusted, 0, 255).astype(np.uint8)
frames.append(frame)
face_mesh.close()
return frames
def save_demo_video(
self,
image_path: str,
emotions: List[str] = None,
output_dir: str = "outputs",
fps: int = 30,
duration: float = 2.0
):
"""Save emotion demo videos from a single face image."""
import cv2
os.makedirs(output_dir, exist_ok=True)
if emotions is None:
emotions = ["neutral", "happy", "sad", "angry", "fear", "surprise", "disgust"]
num_frames = int(fps * duration)
for emotion in emotions:
print(f" Generating {emotion}...")
frames = self.generate_emotion_frames(image_path, emotion, 0.7, num_frames)
out_path = os.path.join(output_dir, f"demo_{emotion}.mp4")
h, w = frames[0].shape[:2]
out = cv2.VideoWriter(out_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))
for f in frames:
out.write(f)
out.release()
print(f" ✓ {out_path}")
if __name__ == "__main__":
print("EMOLIPS Pipeline module loaded.")
print("Use EmolipsPipeline for full generation or EmolipsStandalone for quick demo.")