""" Wav2Lip HD - CPU-only Lip Sync Converts Wav2Lip-HD (https://github.com/saifhassan/Wav2Lip-HD) to CPU-only: - ONNX Wav2Lip model (145MB) - OpenCV Haar Cascade face detection (no GPU) - Simple feather blending (no BiSeNet segmentation) - No SR upscaling (keeps original quality via mouth-paste approach) Approach: Crop mouth from 96x96 wav2lip output, scale & paste onto original face. Usage: CLI: python app.py --video input.mp4 --audio input.wav --output output.mp4 Gradio: python app.py """ import os import sys import argparse import cv2 import numpy as np import librosa import tempfile import subprocess from huggingface_hub import hf_hub_download from scipy import signal import onnxruntime as ort # Wav2Lip constants (from hparams.py) IMG_SIZE = 96 MEL_STEP_SIZE = 16 SAMPLE_RATE = 16000 N_FFT = 800 HOP_SIZE = 200 WIN_SIZE = 800 NUM_MELS = 80 FMIN = 55 FMAX = 7600 PREEMPHASIS = 0.97 REF_LEVEL_DB = 20 MIN_LEVEL_DB = -100 MAX_ABS_VALUE = 4.0 # Global model cache models = {} def load_models(): """Load wav2lip ONNX model""" global models if 'wav2lip' in models: return print("Loading Wav2Lip ONNX model...") wav2lip_path = hf_hub_download( repo_id="bluefoxcreation/Wav2lip-Onnx", filename="wav2lip_gan.onnx" ) # ONNX Runtime session options for CPU sess_options = ort.SessionOptions() sess_options.intra_op_num_threads = 2 sess_options.inter_op_num_threads = 2 sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL models['wav2lip'] = ort.InferenceSession( wav2lip_path, sess_options, providers=["CPUExecutionProvider"] ) print("Wav2Lip loaded!") def extract_mel(audio_path): """Extract mel spectrogram - exact Wav2Lip preprocessing from audio.py""" wav, _ = librosa.load(audio_path, sr=SAMPLE_RATE) # Preemphasis filter (critical for Wav2Lip!) wav = signal.lfilter([1, -PREEMPHASIS], [1], wav) # STFT D = librosa.stft(y=wav, n_fft=N_FFT, hop_length=HOP_SIZE, win_length=WIN_SIZE) # Mel spectrogram mel_basis = librosa.filters.mel(sr=SAMPLE_RATE, n_fft=N_FFT, n_mels=NUM_MELS, fmin=FMIN, fmax=FMAX) S = np.dot(mel_basis, np.abs(D)) # Convert to dB and normalize to [-4, 4] min_level = np.exp(MIN_LEVEL_DB / 20 * np.log(10)) S = 20 * np.log10(np.maximum(min_level, S)) - REF_LEVEL_DB S = np.clip( (2 * MAX_ABS_VALUE) * ((S - MIN_LEVEL_DB) / (-MIN_LEVEL_DB)) - MAX_ABS_VALUE, -MAX_ABS_VALUE, MAX_ABS_VALUE ) return S def detect_face(frame, cascade): """Detect largest face using OpenCV Haar Cascade""" gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) faces = cascade.detectMultiScale(gray, 1.1, 5, minSize=(50, 50)) if len(faces) == 0: return None # Return largest face areas = [w * h for (x, y, w, h) in faces] return faces[np.argmax(areas)] def get_smoothened_boxes(boxes, T=5): """Temporal smoothing for face boxes (from inference.py)""" smoothed = [] for i in range(len(boxes)): if boxes[i] is None: smoothed.append(None) continue # Get window of nearby boxes start = max(0, i - T // 2) end = min(len(boxes), i + T // 2 + 1) nearby = [boxes[j] for j in range(start, end) if boxes[j] is not None] if nearby: smoothed.append(tuple(np.mean(nearby, axis=0).astype(int))) else: smoothed.append(None) return smoothed class CLIProgress: """Fake progress for CLI mode""" def __call__(self, val, desc=''): if val in [0, 0.1, 0.2, 0.9, 1.0] or (val > 0.2 and val < 0.9 and int(val * 100) % 20 == 0): print(f"[{val*100:5.1f}%] {desc}") def process_video(video_path, audio_path, use_smoothing=True, progress=None): """ Wav2Lip HD CPU inference Approach from saifhassan/Wav2Lip-HD: 1. Detect face, crop to bbox 2. Resize face to 96x96 3. Run wav2lip → get 96x96 synced face 4. Extract mouth region (bottom half) 5. Scale mouth and paste onto original face 6. Feather blend at seam """ if progress is None: progress = CLIProgress() if video_path is None or audio_path is None: return None, "Please upload both video and audio." progress(0, desc="Loading models...") load_models() wav2lip = models['wav2lip'] progress(0.05, desc="Reading video...") cap = cv2.VideoCapture(video_path) fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) frames = [] max_frames = 500 # Limit for CPU while len(frames) < max_frames: ret, frame = cap.read() if not ret: break frames.append(frame) cap.release() if not frames: return None, "No frames in video." progress(0.1, desc="Processing audio...") mel = extract_mel(audio_path) mel_idx_mult = 80.0 / fps # Limit frames to audio length max_audio_frames = int(mel.shape[1] / mel_idx_mult) frames = frames[:max_audio_frames] num_frames = len(frames) if num_frames == 0: return None, "Audio too short or no overlap with video." progress(0.15, desc="Detecting faces...") cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') # Detect faces in all frames first raw_boxes = [] for i, frame in enumerate(frames): bbox = detect_face(frame, cascade) raw_boxes.append(bbox) # Check if any faces detected valid_boxes = [b for b in raw_boxes if b is not None] if not valid_boxes: return None, "No face detected in video." # Apply temporal smoothing if enabled if use_smoothing: smoothed_boxes = get_smoothened_boxes(raw_boxes, T=5) else: smoothed_boxes = raw_boxes # Fill None boxes with nearest valid box last_valid = None for i in range(len(smoothed_boxes)): if smoothed_boxes[i] is not None: last_valid = smoothed_boxes[i] elif last_valid is not None: smoothed_boxes[i] = last_valid # Backward fill if first frames had no detection if smoothed_boxes[0] is None: for i in range(len(smoothed_boxes)): if smoothed_boxes[i] is not None: for j in range(i): smoothed_boxes[j] = smoothed_boxes[i] break progress(0.2, desc="Generating lip sync...") output_frames = [] for i in range(num_frames): if i % 10 == 0: progress(0.2 + 0.7 * (i / num_frames), desc=f"Frame {i+1}/{num_frames}") frame = frames[i] bbox = smoothed_boxes[i] if bbox is None: output_frames.append(frame) continue x, y, w, h = bbox # Get mel chunk for this frame start_idx = int(i * mel_idx_mult) if start_idx + MEL_STEP_SIZE > mel.shape[1]: mel_chunk = mel[:, -MEL_STEP_SIZE:] else: mel_chunk = mel[:, start_idx:start_idx + MEL_STEP_SIZE] # Face region with padding (like Wav2Lip-HD) pad = int(w * 0.25) x1 = max(0, x - pad) y1 = max(0, y - pad) x2 = min(frame_w, x + w + pad) y2 = min(frame_h, y + h + pad) orig_face = frame[y1:y2, x1:x2].copy() face_h, face_w = orig_face.shape[:2] if face_h < 10 or face_w < 10: output_frames.append(frame) continue # Resize to 96x96 for wav2lip face_96 = cv2.resize(orig_face, (IMG_SIZE, IMG_SIZE)) # Mask bottom half (mouth area) - this is what Wav2Lip expects face_masked = face_96.copy() face_masked[IMG_SIZE // 2:] = 0 # Prepare inputs: concatenate masked + original face img_batch = np.concatenate((face_masked, face_96), axis=2) / 255.0 img_batch = img_batch.transpose((2, 0, 1))[np.newaxis, :, :, :].astype(np.float32) # Mel input shape: (1, 1, 80, 16) mel_input = mel_chunk[np.newaxis, :, :, np.newaxis].astype(np.float32) mel_input = np.transpose(mel_input, (0, 3, 1, 2)) # Run wav2lip inference try: pred = wav2lip.run(None, {'mel': mel_input, 'vid': img_batch})[0][0] except Exception as e: print(f"Wav2lip inference error: {e}") output_frames.append(frame) continue # Convert output: (6, 96, 96) -> (96, 96, 3) pred = (pred.transpose(1, 2, 0) * 255).clip(0, 255).astype(np.uint8) # === MOUTH PASTE APPROACH (from Wav2Lip-HD concept) === # Extract mouth region from 96x96 output (bottom half) mouth_96 = pred[IMG_SIZE // 2:, :, :] # Shape: (48, 96, 3) # Calculate exact dimensions top_h = face_h // 2 bottom_h = face_h - top_h # Ensures top_h + bottom_h == face_h # Scale mouth to match bottom half of original face mouth_scaled = cv2.resize(mouth_96, (face_w, bottom_h)) # Create result: original top half + wav2lip bottom half result_face = orig_face.copy() result_face[top_h:, :] = mouth_scaled # Feather blend at seam (10 pixels) blend_zone = 10 if top_h > blend_zone: for offset in range(blend_zone): alpha = offset / blend_zone row = top_h - blend_zone + offset if 0 <= row < face_h: result_face[row] = cv2.addWeighted( orig_face[row], 1 - alpha, result_face[row], alpha, 0 ) # Paste back onto frame result = frame.copy() result[y1:y2, x1:x2] = result_face output_frames.append(result) progress(0.9, desc="Encoding video...") # Save output temp_avi = tempfile.mktemp(suffix='.avi') output_path = tempfile.mktemp(suffix='.mp4') fourcc = cv2.VideoWriter_fourcc(*'XVID') out = cv2.VideoWriter(temp_avi, fourcc, fps, (frame_w, frame_h)) for f in output_frames: out.write(f) out.release() # Mux audio subprocess.run([ 'ffmpeg', '-y', '-i', temp_avi, '-i', audio_path, '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', '-c:a', 'aac', '-shortest', '-movflags', '+faststart', output_path ], capture_output=True) if os.path.exists(temp_avi): os.remove(temp_avi) progress(1.0, desc="Done!") return output_path, f"Processed {num_frames} frames at {fps:.1f} fps." def create_demo(): """Create Gradio demo""" import gradio as gr with gr.Blocks(title="Wav2Lip HD CPU") as demo: gr.Markdown(""" # Wav2Lip HD - CPU Lip Sync Based on [saifhassan/Wav2Lip-HD](https://github.com/saifhassan/Wav2Lip-HD). Converted to CPU-only using ONNX Runtime. **Approach:** - ONNX Wav2Lip model (145MB) - OpenCV face detection (CPU) - Mouth-paste with feather blending - No GPU required **Limitations:** - Max 500 frames (~20 sec at 25fps) - Processing: ~1-2 sec/frame on CPU """) with gr.Row(): with gr.Column(): video_input = gr.Video(label="Input Video (with face)") audio_input = gr.Audio(label="Audio to sync", type="filepath") smoothing = gr.Checkbox(label="Temporal smoothing", value=True) btn = gr.Button("Generate Lip Sync", variant="primary") with gr.Column(): video_output = gr.Video(label="Output") status = gr.Textbox(label="Status") btn.click( process_video, inputs=[video_input, audio_input, smoothing], outputs=[video_output, status] ) gr.Examples( examples=[ ["examples/woman_512_4s.mp4", "examples/57 Years Man Talk About Life.wav", True], ], inputs=[video_input, audio_input, smoothing], outputs=[video_output, status], fn=process_video, cache_examples=True, cache_mode="lazy", label="Examples" ) return demo def main_cli(): """CLI mode""" parser = argparse.ArgumentParser(description="Wav2Lip HD - CPU Lip Sync") parser.add_argument("--video", "-v", type=str, help="Input video path") parser.add_argument("--audio", "-a", type=str, help="Input audio path") parser.add_argument("--output", "-o", type=str, default="output.mp4", help="Output video path") parser.add_argument("--no-smoothing", action="store_true", help="Disable temporal smoothing") args = parser.parse_args() if not args.video or not args.audio: parser.print_help() print("\nError: --video and --audio are required for CLI mode") sys.exit(1) if not os.path.exists(args.video): print(f"Error: Video file not found: {args.video}") sys.exit(1) if not os.path.exists(args.audio): print(f"Error: Audio file not found: {args.audio}") sys.exit(1) print(f"Processing: {args.video} + {args.audio}") result, status = process_video(args.video, args.audio, use_smoothing=not args.no_smoothing) if result: import shutil shutil.copy(result, args.output) print(f"Output saved to: {args.output}") print(f"Status: {status}") else: print(f"Error: {status}") sys.exit(1) if __name__ == "__main__": # CLI mode if args provided, else Gradio if len(sys.argv) > 1 and sys.argv[1] not in ["--help", "-h"]: main_cli() elif len(sys.argv) > 1 and sys.argv[1] in ["--help", "-h"]: main_cli() # Show help else: demo = create_demo() demo.launch(mcp_server=True, show_error=True)