wav2lip / app.py
Nekochu's picture
Init
ba84c3f verified
"""
Wav2Lip HD - CPU-only Lip Sync
Converts Wav2Lip-HD (https://github.com/saifhassan/Wav2Lip-HD) to CPU-only:
- ONNX Wav2Lip model (145MB)
- OpenCV Haar Cascade face detection (no GPU)
- Simple feather blending (no BiSeNet segmentation)
- No SR upscaling (keeps original quality via mouth-paste approach)
Approach: Crop mouth from 96x96 wav2lip output, scale & paste onto original face.
Usage:
CLI: python app.py --video input.mp4 --audio input.wav --output output.mp4
Gradio: python app.py
"""
import os
import sys
import argparse
import cv2
import numpy as np
import librosa
import tempfile
import subprocess
from huggingface_hub import hf_hub_download
from scipy import signal
import onnxruntime as ort
# Wav2Lip constants (from hparams.py)
IMG_SIZE = 96
MEL_STEP_SIZE = 16
SAMPLE_RATE = 16000
N_FFT = 800
HOP_SIZE = 200
WIN_SIZE = 800
NUM_MELS = 80
FMIN = 55
FMAX = 7600
PREEMPHASIS = 0.97
REF_LEVEL_DB = 20
MIN_LEVEL_DB = -100
MAX_ABS_VALUE = 4.0
# Global model cache
models = {}
def load_models():
"""Load wav2lip ONNX model"""
global models
if 'wav2lip' in models:
return
print("Loading Wav2Lip ONNX model...")
wav2lip_path = hf_hub_download(
repo_id="bluefoxcreation/Wav2lip-Onnx",
filename="wav2lip_gan.onnx"
)
# ONNX Runtime session options for CPU
sess_options = ort.SessionOptions()
sess_options.intra_op_num_threads = 2
sess_options.inter_op_num_threads = 2
sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
models['wav2lip'] = ort.InferenceSession(
wav2lip_path,
sess_options,
providers=["CPUExecutionProvider"]
)
print("Wav2Lip loaded!")
def extract_mel(audio_path):
"""Extract mel spectrogram - exact Wav2Lip preprocessing from audio.py"""
wav, _ = librosa.load(audio_path, sr=SAMPLE_RATE)
# Preemphasis filter (critical for Wav2Lip!)
wav = signal.lfilter([1, -PREEMPHASIS], [1], wav)
# STFT
D = librosa.stft(y=wav, n_fft=N_FFT, hop_length=HOP_SIZE, win_length=WIN_SIZE)
# Mel spectrogram
mel_basis = librosa.filters.mel(sr=SAMPLE_RATE, n_fft=N_FFT, n_mels=NUM_MELS, fmin=FMIN, fmax=FMAX)
S = np.dot(mel_basis, np.abs(D))
# Convert to dB and normalize to [-4, 4]
min_level = np.exp(MIN_LEVEL_DB / 20 * np.log(10))
S = 20 * np.log10(np.maximum(min_level, S)) - REF_LEVEL_DB
S = np.clip(
(2 * MAX_ABS_VALUE) * ((S - MIN_LEVEL_DB) / (-MIN_LEVEL_DB)) - MAX_ABS_VALUE,
-MAX_ABS_VALUE, MAX_ABS_VALUE
)
return S
def detect_face(frame, cascade):
"""Detect largest face using OpenCV Haar Cascade"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
faces = cascade.detectMultiScale(gray, 1.1, 5, minSize=(50, 50))
if len(faces) == 0:
return None
# Return largest face
areas = [w * h for (x, y, w, h) in faces]
return faces[np.argmax(areas)]
def get_smoothened_boxes(boxes, T=5):
"""Temporal smoothing for face boxes (from inference.py)"""
smoothed = []
for i in range(len(boxes)):
if boxes[i] is None:
smoothed.append(None)
continue
# Get window of nearby boxes
start = max(0, i - T // 2)
end = min(len(boxes), i + T // 2 + 1)
nearby = [boxes[j] for j in range(start, end) if boxes[j] is not None]
if nearby:
smoothed.append(tuple(np.mean(nearby, axis=0).astype(int)))
else:
smoothed.append(None)
return smoothed
class CLIProgress:
"""Fake progress for CLI mode"""
def __call__(self, val, desc=''):
if val in [0, 0.1, 0.2, 0.9, 1.0] or (val > 0.2 and val < 0.9 and int(val * 100) % 20 == 0):
print(f"[{val*100:5.1f}%] {desc}")
def process_video(video_path, audio_path, use_smoothing=True, progress=None):
"""
Wav2Lip HD CPU inference
Approach from saifhassan/Wav2Lip-HD:
1. Detect face, crop to bbox
2. Resize face to 96x96
3. Run wav2lip → get 96x96 synced face
4. Extract mouth region (bottom half)
5. Scale mouth and paste onto original face
6. Feather blend at seam
"""
if progress is None:
progress = CLIProgress()
if video_path is None or audio_path is None:
return None, "Please upload both video and audio."
progress(0, desc="Loading models...")
load_models()
wav2lip = models['wav2lip']
progress(0.05, desc="Reading video...")
cap = cv2.VideoCapture(video_path)
fps = cap.get(cv2.CAP_PROP_FPS) or 25.0
frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
frames = []
max_frames = 500 # Limit for CPU
while len(frames) < max_frames:
ret, frame = cap.read()
if not ret:
break
frames.append(frame)
cap.release()
if not frames:
return None, "No frames in video."
progress(0.1, desc="Processing audio...")
mel = extract_mel(audio_path)
mel_idx_mult = 80.0 / fps
# Limit frames to audio length
max_audio_frames = int(mel.shape[1] / mel_idx_mult)
frames = frames[:max_audio_frames]
num_frames = len(frames)
if num_frames == 0:
return None, "Audio too short or no overlap with video."
progress(0.15, desc="Detecting faces...")
cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
# Detect faces in all frames first
raw_boxes = []
for i, frame in enumerate(frames):
bbox = detect_face(frame, cascade)
raw_boxes.append(bbox)
# Check if any faces detected
valid_boxes = [b for b in raw_boxes if b is not None]
if not valid_boxes:
return None, "No face detected in video."
# Apply temporal smoothing if enabled
if use_smoothing:
smoothed_boxes = get_smoothened_boxes(raw_boxes, T=5)
else:
smoothed_boxes = raw_boxes
# Fill None boxes with nearest valid box
last_valid = None
for i in range(len(smoothed_boxes)):
if smoothed_boxes[i] is not None:
last_valid = smoothed_boxes[i]
elif last_valid is not None:
smoothed_boxes[i] = last_valid
# Backward fill if first frames had no detection
if smoothed_boxes[0] is None:
for i in range(len(smoothed_boxes)):
if smoothed_boxes[i] is not None:
for j in range(i):
smoothed_boxes[j] = smoothed_boxes[i]
break
progress(0.2, desc="Generating lip sync...")
output_frames = []
for i in range(num_frames):
if i % 10 == 0:
progress(0.2 + 0.7 * (i / num_frames), desc=f"Frame {i+1}/{num_frames}")
frame = frames[i]
bbox = smoothed_boxes[i]
if bbox is None:
output_frames.append(frame)
continue
x, y, w, h = bbox
# Get mel chunk for this frame
start_idx = int(i * mel_idx_mult)
if start_idx + MEL_STEP_SIZE > mel.shape[1]:
mel_chunk = mel[:, -MEL_STEP_SIZE:]
else:
mel_chunk = mel[:, start_idx:start_idx + MEL_STEP_SIZE]
# Face region with padding (like Wav2Lip-HD)
pad = int(w * 0.25)
x1 = max(0, x - pad)
y1 = max(0, y - pad)
x2 = min(frame_w, x + w + pad)
y2 = min(frame_h, y + h + pad)
orig_face = frame[y1:y2, x1:x2].copy()
face_h, face_w = orig_face.shape[:2]
if face_h < 10 or face_w < 10:
output_frames.append(frame)
continue
# Resize to 96x96 for wav2lip
face_96 = cv2.resize(orig_face, (IMG_SIZE, IMG_SIZE))
# Mask bottom half (mouth area) - this is what Wav2Lip expects
face_masked = face_96.copy()
face_masked[IMG_SIZE // 2:] = 0
# Prepare inputs: concatenate masked + original face
img_batch = np.concatenate((face_masked, face_96), axis=2) / 255.0
img_batch = img_batch.transpose((2, 0, 1))[np.newaxis, :, :, :].astype(np.float32)
# Mel input shape: (1, 1, 80, 16)
mel_input = mel_chunk[np.newaxis, :, :, np.newaxis].astype(np.float32)
mel_input = np.transpose(mel_input, (0, 3, 1, 2))
# Run wav2lip inference
try:
pred = wav2lip.run(None, {'mel': mel_input, 'vid': img_batch})[0][0]
except Exception as e:
print(f"Wav2lip inference error: {e}")
output_frames.append(frame)
continue
# Convert output: (6, 96, 96) -> (96, 96, 3)
pred = (pred.transpose(1, 2, 0) * 255).clip(0, 255).astype(np.uint8)
# === MOUTH PASTE APPROACH (from Wav2Lip-HD concept) ===
# Extract mouth region from 96x96 output (bottom half)
mouth_96 = pred[IMG_SIZE // 2:, :, :] # Shape: (48, 96, 3)
# Calculate exact dimensions
top_h = face_h // 2
bottom_h = face_h - top_h # Ensures top_h + bottom_h == face_h
# Scale mouth to match bottom half of original face
mouth_scaled = cv2.resize(mouth_96, (face_w, bottom_h))
# Create result: original top half + wav2lip bottom half
result_face = orig_face.copy()
result_face[top_h:, :] = mouth_scaled
# Feather blend at seam (10 pixels)
blend_zone = 10
if top_h > blend_zone:
for offset in range(blend_zone):
alpha = offset / blend_zone
row = top_h - blend_zone + offset
if 0 <= row < face_h:
result_face[row] = cv2.addWeighted(
orig_face[row], 1 - alpha,
result_face[row], alpha, 0
)
# Paste back onto frame
result = frame.copy()
result[y1:y2, x1:x2] = result_face
output_frames.append(result)
progress(0.9, desc="Encoding video...")
# Save output
temp_avi = tempfile.mktemp(suffix='.avi')
output_path = tempfile.mktemp(suffix='.mp4')
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(temp_avi, fourcc, fps, (frame_w, frame_h))
for f in output_frames:
out.write(f)
out.release()
# Mux audio
subprocess.run([
'ffmpeg', '-y', '-i', temp_avi, '-i', audio_path,
'-c:v', 'libx264', '-preset', 'fast', '-crf', '23',
'-c:a', 'aac', '-shortest', '-movflags', '+faststart',
output_path
], capture_output=True)
if os.path.exists(temp_avi):
os.remove(temp_avi)
progress(1.0, desc="Done!")
return output_path, f"Processed {num_frames} frames at {fps:.1f} fps."
def create_demo():
"""Create Gradio demo"""
import gradio as gr
with gr.Blocks(title="Wav2Lip HD CPU") as demo:
gr.Markdown("""
# Wav2Lip HD - CPU Lip Sync
Based on [saifhassan/Wav2Lip-HD](https://github.com/saifhassan/Wav2Lip-HD).
Converted to CPU-only using ONNX Runtime.
**Approach:**
- ONNX Wav2Lip model (145MB)
- OpenCV face detection (CPU)
- Mouth-paste with feather blending
- No GPU required
**Limitations:**
- Max 500 frames (~20 sec at 25fps)
- Processing: ~1-2 sec/frame on CPU
""")
with gr.Row():
with gr.Column():
video_input = gr.Video(label="Input Video (with face)")
audio_input = gr.Audio(label="Audio to sync", type="filepath")
smoothing = gr.Checkbox(label="Temporal smoothing", value=True)
btn = gr.Button("Generate Lip Sync", variant="primary")
with gr.Column():
video_output = gr.Video(label="Output")
status = gr.Textbox(label="Status")
btn.click(
process_video,
inputs=[video_input, audio_input, smoothing],
outputs=[video_output, status]
)
gr.Examples(
examples=[
["examples/woman_512_4s.mp4", "examples/57 Years Man Talk About Life.wav", True],
],
inputs=[video_input, audio_input, smoothing],
outputs=[video_output, status],
fn=process_video,
cache_examples=True,
cache_mode="lazy",
label="Examples"
)
return demo
def main_cli():
"""CLI mode"""
parser = argparse.ArgumentParser(description="Wav2Lip HD - CPU Lip Sync")
parser.add_argument("--video", "-v", type=str, help="Input video path")
parser.add_argument("--audio", "-a", type=str, help="Input audio path")
parser.add_argument("--output", "-o", type=str, default="output.mp4", help="Output video path")
parser.add_argument("--no-smoothing", action="store_true", help="Disable temporal smoothing")
args = parser.parse_args()
if not args.video or not args.audio:
parser.print_help()
print("\nError: --video and --audio are required for CLI mode")
sys.exit(1)
if not os.path.exists(args.video):
print(f"Error: Video file not found: {args.video}")
sys.exit(1)
if not os.path.exists(args.audio):
print(f"Error: Audio file not found: {args.audio}")
sys.exit(1)
print(f"Processing: {args.video} + {args.audio}")
result, status = process_video(args.video, args.audio, use_smoothing=not args.no_smoothing)
if result:
import shutil
shutil.copy(result, args.output)
print(f"Output saved to: {args.output}")
print(f"Status: {status}")
else:
print(f"Error: {status}")
sys.exit(1)
if __name__ == "__main__":
# CLI mode if args provided, else Gradio
if len(sys.argv) > 1 and sys.argv[1] not in ["--help", "-h"]:
main_cli()
elif len(sys.argv) > 1 and sys.argv[1] in ["--help", "-h"]:
main_cli() # Show help
else:
demo = create_demo()
demo.launch(mcp_server=True, show_error=True)