Spaces:
Running
Running
| """ | |
| Wav2Lip HD - CPU-only Lip Sync | |
| Converts Wav2Lip-HD (https://github.com/saifhassan/Wav2Lip-HD) to CPU-only: | |
| - ONNX Wav2Lip model (145MB) | |
| - OpenCV Haar Cascade face detection (no GPU) | |
| - Simple feather blending (no BiSeNet segmentation) | |
| - No SR upscaling (keeps original quality via mouth-paste approach) | |
| Approach: Crop mouth from 96x96 wav2lip output, scale & paste onto original face. | |
| Usage: | |
| CLI: python app.py --video input.mp4 --audio input.wav --output output.mp4 | |
| Gradio: python app.py | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import cv2 | |
| import numpy as np | |
| import librosa | |
| import tempfile | |
| import subprocess | |
| from huggingface_hub import hf_hub_download | |
| from scipy import signal | |
| import onnxruntime as ort | |
| # Wav2Lip constants (from hparams.py) | |
| IMG_SIZE = 96 | |
| MEL_STEP_SIZE = 16 | |
| SAMPLE_RATE = 16000 | |
| N_FFT = 800 | |
| HOP_SIZE = 200 | |
| WIN_SIZE = 800 | |
| NUM_MELS = 80 | |
| FMIN = 55 | |
| FMAX = 7600 | |
| PREEMPHASIS = 0.97 | |
| REF_LEVEL_DB = 20 | |
| MIN_LEVEL_DB = -100 | |
| MAX_ABS_VALUE = 4.0 | |
| # Global model cache | |
| models = {} | |
| def load_models(): | |
| """Load wav2lip ONNX model""" | |
| global models | |
| if 'wav2lip' in models: | |
| return | |
| print("Loading Wav2Lip ONNX model...") | |
| wav2lip_path = hf_hub_download( | |
| repo_id="bluefoxcreation/Wav2lip-Onnx", | |
| filename="wav2lip_gan.onnx" | |
| ) | |
| # ONNX Runtime session options for CPU | |
| sess_options = ort.SessionOptions() | |
| sess_options.intra_op_num_threads = 2 | |
| sess_options.inter_op_num_threads = 2 | |
| sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL | |
| models['wav2lip'] = ort.InferenceSession( | |
| wav2lip_path, | |
| sess_options, | |
| providers=["CPUExecutionProvider"] | |
| ) | |
| print("Wav2Lip loaded!") | |
| def extract_mel(audio_path): | |
| """Extract mel spectrogram - exact Wav2Lip preprocessing from audio.py""" | |
| wav, _ = librosa.load(audio_path, sr=SAMPLE_RATE) | |
| # Preemphasis filter (critical for Wav2Lip!) | |
| wav = signal.lfilter([1, -PREEMPHASIS], [1], wav) | |
| # STFT | |
| D = librosa.stft(y=wav, n_fft=N_FFT, hop_length=HOP_SIZE, win_length=WIN_SIZE) | |
| # Mel spectrogram | |
| mel_basis = librosa.filters.mel(sr=SAMPLE_RATE, n_fft=N_FFT, n_mels=NUM_MELS, fmin=FMIN, fmax=FMAX) | |
| S = np.dot(mel_basis, np.abs(D)) | |
| # Convert to dB and normalize to [-4, 4] | |
| min_level = np.exp(MIN_LEVEL_DB / 20 * np.log(10)) | |
| S = 20 * np.log10(np.maximum(min_level, S)) - REF_LEVEL_DB | |
| S = np.clip( | |
| (2 * MAX_ABS_VALUE) * ((S - MIN_LEVEL_DB) / (-MIN_LEVEL_DB)) - MAX_ABS_VALUE, | |
| -MAX_ABS_VALUE, MAX_ABS_VALUE | |
| ) | |
| return S | |
| def detect_face(frame, cascade): | |
| """Detect largest face using OpenCV Haar Cascade""" | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| faces = cascade.detectMultiScale(gray, 1.1, 5, minSize=(50, 50)) | |
| if len(faces) == 0: | |
| return None | |
| # Return largest face | |
| areas = [w * h for (x, y, w, h) in faces] | |
| return faces[np.argmax(areas)] | |
| def get_smoothened_boxes(boxes, T=5): | |
| """Temporal smoothing for face boxes (from inference.py)""" | |
| smoothed = [] | |
| for i in range(len(boxes)): | |
| if boxes[i] is None: | |
| smoothed.append(None) | |
| continue | |
| # Get window of nearby boxes | |
| start = max(0, i - T // 2) | |
| end = min(len(boxes), i + T // 2 + 1) | |
| nearby = [boxes[j] for j in range(start, end) if boxes[j] is not None] | |
| if nearby: | |
| smoothed.append(tuple(np.mean(nearby, axis=0).astype(int))) | |
| else: | |
| smoothed.append(None) | |
| return smoothed | |
| class CLIProgress: | |
| """Fake progress for CLI mode""" | |
| def __call__(self, val, desc=''): | |
| if val in [0, 0.1, 0.2, 0.9, 1.0] or (val > 0.2 and val < 0.9 and int(val * 100) % 20 == 0): | |
| print(f"[{val*100:5.1f}%] {desc}") | |
| def process_video(video_path, audio_path, use_smoothing=True, progress=None): | |
| """ | |
| Wav2Lip HD CPU inference | |
| Approach from saifhassan/Wav2Lip-HD: | |
| 1. Detect face, crop to bbox | |
| 2. Resize face to 96x96 | |
| 3. Run wav2lip → get 96x96 synced face | |
| 4. Extract mouth region (bottom half) | |
| 5. Scale mouth and paste onto original face | |
| 6. Feather blend at seam | |
| """ | |
| if progress is None: | |
| progress = CLIProgress() | |
| if video_path is None or audio_path is None: | |
| return None, "Please upload both video and audio." | |
| progress(0, desc="Loading models...") | |
| load_models() | |
| wav2lip = models['wav2lip'] | |
| progress(0.05, desc="Reading video...") | |
| cap = cv2.VideoCapture(video_path) | |
| fps = cap.get(cv2.CAP_PROP_FPS) or 25.0 | |
| frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| frames = [] | |
| max_frames = 500 # Limit for CPU | |
| while len(frames) < max_frames: | |
| ret, frame = cap.read() | |
| if not ret: | |
| break | |
| frames.append(frame) | |
| cap.release() | |
| if not frames: | |
| return None, "No frames in video." | |
| progress(0.1, desc="Processing audio...") | |
| mel = extract_mel(audio_path) | |
| mel_idx_mult = 80.0 / fps | |
| # Limit frames to audio length | |
| max_audio_frames = int(mel.shape[1] / mel_idx_mult) | |
| frames = frames[:max_audio_frames] | |
| num_frames = len(frames) | |
| if num_frames == 0: | |
| return None, "Audio too short or no overlap with video." | |
| progress(0.15, desc="Detecting faces...") | |
| cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| # Detect faces in all frames first | |
| raw_boxes = [] | |
| for i, frame in enumerate(frames): | |
| bbox = detect_face(frame, cascade) | |
| raw_boxes.append(bbox) | |
| # Check if any faces detected | |
| valid_boxes = [b for b in raw_boxes if b is not None] | |
| if not valid_boxes: | |
| return None, "No face detected in video." | |
| # Apply temporal smoothing if enabled | |
| if use_smoothing: | |
| smoothed_boxes = get_smoothened_boxes(raw_boxes, T=5) | |
| else: | |
| smoothed_boxes = raw_boxes | |
| # Fill None boxes with nearest valid box | |
| last_valid = None | |
| for i in range(len(smoothed_boxes)): | |
| if smoothed_boxes[i] is not None: | |
| last_valid = smoothed_boxes[i] | |
| elif last_valid is not None: | |
| smoothed_boxes[i] = last_valid | |
| # Backward fill if first frames had no detection | |
| if smoothed_boxes[0] is None: | |
| for i in range(len(smoothed_boxes)): | |
| if smoothed_boxes[i] is not None: | |
| for j in range(i): | |
| smoothed_boxes[j] = smoothed_boxes[i] | |
| break | |
| progress(0.2, desc="Generating lip sync...") | |
| output_frames = [] | |
| for i in range(num_frames): | |
| if i % 10 == 0: | |
| progress(0.2 + 0.7 * (i / num_frames), desc=f"Frame {i+1}/{num_frames}") | |
| frame = frames[i] | |
| bbox = smoothed_boxes[i] | |
| if bbox is None: | |
| output_frames.append(frame) | |
| continue | |
| x, y, w, h = bbox | |
| # Get mel chunk for this frame | |
| start_idx = int(i * mel_idx_mult) | |
| if start_idx + MEL_STEP_SIZE > mel.shape[1]: | |
| mel_chunk = mel[:, -MEL_STEP_SIZE:] | |
| else: | |
| mel_chunk = mel[:, start_idx:start_idx + MEL_STEP_SIZE] | |
| # Face region with padding (like Wav2Lip-HD) | |
| pad = int(w * 0.25) | |
| x1 = max(0, x - pad) | |
| y1 = max(0, y - pad) | |
| x2 = min(frame_w, x + w + pad) | |
| y2 = min(frame_h, y + h + pad) | |
| orig_face = frame[y1:y2, x1:x2].copy() | |
| face_h, face_w = orig_face.shape[:2] | |
| if face_h < 10 or face_w < 10: | |
| output_frames.append(frame) | |
| continue | |
| # Resize to 96x96 for wav2lip | |
| face_96 = cv2.resize(orig_face, (IMG_SIZE, IMG_SIZE)) | |
| # Mask bottom half (mouth area) - this is what Wav2Lip expects | |
| face_masked = face_96.copy() | |
| face_masked[IMG_SIZE // 2:] = 0 | |
| # Prepare inputs: concatenate masked + original face | |
| img_batch = np.concatenate((face_masked, face_96), axis=2) / 255.0 | |
| img_batch = img_batch.transpose((2, 0, 1))[np.newaxis, :, :, :].astype(np.float32) | |
| # Mel input shape: (1, 1, 80, 16) | |
| mel_input = mel_chunk[np.newaxis, :, :, np.newaxis].astype(np.float32) | |
| mel_input = np.transpose(mel_input, (0, 3, 1, 2)) | |
| # Run wav2lip inference | |
| try: | |
| pred = wav2lip.run(None, {'mel': mel_input, 'vid': img_batch})[0][0] | |
| except Exception as e: | |
| print(f"Wav2lip inference error: {e}") | |
| output_frames.append(frame) | |
| continue | |
| # Convert output: (6, 96, 96) -> (96, 96, 3) | |
| pred = (pred.transpose(1, 2, 0) * 255).clip(0, 255).astype(np.uint8) | |
| # === MOUTH PASTE APPROACH (from Wav2Lip-HD concept) === | |
| # Extract mouth region from 96x96 output (bottom half) | |
| mouth_96 = pred[IMG_SIZE // 2:, :, :] # Shape: (48, 96, 3) | |
| # Calculate exact dimensions | |
| top_h = face_h // 2 | |
| bottom_h = face_h - top_h # Ensures top_h + bottom_h == face_h | |
| # Scale mouth to match bottom half of original face | |
| mouth_scaled = cv2.resize(mouth_96, (face_w, bottom_h)) | |
| # Create result: original top half + wav2lip bottom half | |
| result_face = orig_face.copy() | |
| result_face[top_h:, :] = mouth_scaled | |
| # Feather blend at seam (10 pixels) | |
| blend_zone = 10 | |
| if top_h > blend_zone: | |
| for offset in range(blend_zone): | |
| alpha = offset / blend_zone | |
| row = top_h - blend_zone + offset | |
| if 0 <= row < face_h: | |
| result_face[row] = cv2.addWeighted( | |
| orig_face[row], 1 - alpha, | |
| result_face[row], alpha, 0 | |
| ) | |
| # Paste back onto frame | |
| result = frame.copy() | |
| result[y1:y2, x1:x2] = result_face | |
| output_frames.append(result) | |
| progress(0.9, desc="Encoding video...") | |
| # Save output | |
| temp_avi = tempfile.mktemp(suffix='.avi') | |
| output_path = tempfile.mktemp(suffix='.mp4') | |
| fourcc = cv2.VideoWriter_fourcc(*'XVID') | |
| out = cv2.VideoWriter(temp_avi, fourcc, fps, (frame_w, frame_h)) | |
| for f in output_frames: | |
| out.write(f) | |
| out.release() | |
| # Mux audio | |
| subprocess.run([ | |
| 'ffmpeg', '-y', '-i', temp_avi, '-i', audio_path, | |
| '-c:v', 'libx264', '-preset', 'fast', '-crf', '23', | |
| '-c:a', 'aac', '-shortest', '-movflags', '+faststart', | |
| output_path | |
| ], capture_output=True) | |
| if os.path.exists(temp_avi): | |
| os.remove(temp_avi) | |
| progress(1.0, desc="Done!") | |
| return output_path, f"Processed {num_frames} frames at {fps:.1f} fps." | |
| def create_demo(): | |
| """Create Gradio demo""" | |
| import gradio as gr | |
| with gr.Blocks(title="Wav2Lip HD CPU") as demo: | |
| gr.Markdown(""" | |
| # Wav2Lip HD - CPU Lip Sync | |
| Based on [saifhassan/Wav2Lip-HD](https://github.com/saifhassan/Wav2Lip-HD). | |
| Converted to CPU-only using ONNX Runtime. | |
| **Approach:** | |
| - ONNX Wav2Lip model (145MB) | |
| - OpenCV face detection (CPU) | |
| - Mouth-paste with feather blending | |
| - No GPU required | |
| **Limitations:** | |
| - Max 500 frames (~20 sec at 25fps) | |
| - Processing: ~1-2 sec/frame on CPU | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| video_input = gr.Video(label="Input Video (with face)") | |
| audio_input = gr.Audio(label="Audio to sync", type="filepath") | |
| smoothing = gr.Checkbox(label="Temporal smoothing", value=True) | |
| btn = gr.Button("Generate Lip Sync", variant="primary") | |
| with gr.Column(): | |
| video_output = gr.Video(label="Output") | |
| status = gr.Textbox(label="Status") | |
| btn.click( | |
| process_video, | |
| inputs=[video_input, audio_input, smoothing], | |
| outputs=[video_output, status] | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| ["examples/woman_512_4s.mp4", "examples/57 Years Man Talk About Life.wav", True], | |
| ], | |
| inputs=[video_input, audio_input, smoothing], | |
| outputs=[video_output, status], | |
| fn=process_video, | |
| cache_examples=True, | |
| cache_mode="lazy", | |
| label="Examples" | |
| ) | |
| return demo | |
| def main_cli(): | |
| """CLI mode""" | |
| parser = argparse.ArgumentParser(description="Wav2Lip HD - CPU Lip Sync") | |
| parser.add_argument("--video", "-v", type=str, help="Input video path") | |
| parser.add_argument("--audio", "-a", type=str, help="Input audio path") | |
| parser.add_argument("--output", "-o", type=str, default="output.mp4", help="Output video path") | |
| parser.add_argument("--no-smoothing", action="store_true", help="Disable temporal smoothing") | |
| args = parser.parse_args() | |
| if not args.video or not args.audio: | |
| parser.print_help() | |
| print("\nError: --video and --audio are required for CLI mode") | |
| sys.exit(1) | |
| if not os.path.exists(args.video): | |
| print(f"Error: Video file not found: {args.video}") | |
| sys.exit(1) | |
| if not os.path.exists(args.audio): | |
| print(f"Error: Audio file not found: {args.audio}") | |
| sys.exit(1) | |
| print(f"Processing: {args.video} + {args.audio}") | |
| result, status = process_video(args.video, args.audio, use_smoothing=not args.no_smoothing) | |
| if result: | |
| import shutil | |
| shutil.copy(result, args.output) | |
| print(f"Output saved to: {args.output}") | |
| print(f"Status: {status}") | |
| else: | |
| print(f"Error: {status}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| # CLI mode if args provided, else Gradio | |
| if len(sys.argv) > 1 and sys.argv[1] not in ["--help", "-h"]: | |
| main_cli() | |
| elif len(sys.argv) > 1 and sys.argv[1] in ["--help", "-h"]: | |
| main_cli() # Show help | |
| else: | |
| demo = create_demo() | |
| demo.launch(mcp_server=True, show_error=True) | |