""" Gradio web application for generating lip‑synchronised videos from an avatar image and an audio clip. On the first run it clones the Wav2Lip repository and downloads the required model checkpoints. Subsequent runs reuse the cached repository and weights. The heavy inference is executed by spawning the original `inference.py` script provided by Wav2Lip. To deploy on Hugging Face Spaces: 1. Create a new Gradio space. 2. Upload this file along with ``requirements.txt`` and ``README.md``. 3. Commit and wait for the space to build. Enable GPU hardware for best performance. Author: ChatGPT (2025) License: MIT (same as Wav2Lip) """ import os import shutil import subprocess import tempfile from pathlib import Path import gradio as gr import numpy as np from pydub import AudioSegment # Constants for repository and checkpoint names REPO_URL = "https://github.com/Rudrabha/Wav2Lip.git" REPO_DIR = Path("Wav2Lip") CHECKPOINTS_DIR = REPO_DIR / "checkpoints" WAV2LIP_MODEL = "wav2lip_gan.pth" FACE_SEG_MODEL = "face_segmentation.pth" # Direct download links for model weights. These links point to publicly # accessible mirrors. If they stop working, you can upload the files into # the checkpoints folder manually and the app will skip downloading. # Public mirrors for the required model weights. # # The original Wav2Lip checkpoint (``wav2lip_gan.pth``) and face segmentation # model are large binary files hosted on Hugging Face. During development we # discovered that the URLs previously used in this script pointed at the wrong # repository (``Wave2lip`` vs ``Wave2Lip``) and returned 404 errors. The # corrected links below refer to the official ``Wave2lip`` repository for the # GAN checkpoint and to a Wav2Lip‑HD repository for the face segmentation # model. Note that these files are large (hundreds of megabytes) and may # require you to upload them manually into the ``checkpoints`` directory of # your Space if the automated download fails due to network restrictions. MODEL_URLS = { # 436 MB GAN checkpoint hosted by Non‑playing‑Character WAV2LIP_MODEL: "https://huggingface.co/Non-playing-Character/Wave2lip/resolve/main/wav2lip_gan.pth", # 53 MB face segmentation model hosted by commanderx (Wav2Lip‑HD project) FACE_SEG_MODEL: "https://huggingface.co/commanderx/Wav2Lip-HD/resolve/main/checkpoints/face_segmentation.pth", } def clone_repository() -> None: """Ensure that the Wav2Lip repository is available locally. The original implementation attempted to clone the repository from GitHub every time the app started. However, Hugging Face Spaces often run in restricted network environments where external git operations are disallowed. To make the app resilient to such conditions we take a multi‑step approach: 1. If the ``Wav2Lip`` directory already exists, do nothing. 2. Otherwise, if a ``Wav2Lip-master.zip`` archive is present in the current working directory, extract it to create the repository. You can provide this archive by downloading the Wav2Lip source code from GitHub on your local machine and uploading it into your Space using the Hugging Face web interface. 3. As a last resort, attempt to perform a shallow git clone of the upstream repository. If this fails due to lack of network access, raise a ``RuntimeError`` instructing the user to upload the ``Wav2Lip-master.zip`` archive instead. """ if REPO_DIR.exists(): # Repository already present return # Attempt to extract from local zip if available archive_name = "Wav2Lip-master.zip" archive_path = Path(archive_name) if archive_path.exists(): try: shutil.unpack_archive(str(archive_path), ".") except Exception as e: raise RuntimeError( f"Failed to extract {archive_name}: {e}. Please ensure the archive is a valid zip file." ) # The archive extracts into a directory named ``Wav2Lip-master``. Rename it to ``Wav2Lip``. extracted_dir = Path("Wav2Lip-master") if extracted_dir.exists(): extracted_dir.rename(REPO_DIR) return # Fallback: try cloning from GitHub try: subprocess.run([ "git", "clone", "--depth", "1", REPO_URL, str(REPO_DIR), ], check=True) except Exception as e: raise RuntimeError( "Unable to clone the Wav2Lip repository. This environment may not allow outbound network connections. " "Please download the Wav2Lip source code as a zip file and upload it to your Space. " "Rename the archive to 'Wav2Lip-master.zip' and place it in the root directory of your repository." ) def download_model_weights() -> None: """Ensure that the required model weights are present. This function first checks whether the two required checkpoint files (``wav2lip_gan.pth`` and ``face_segmentation.pth``) already exist in ``checkpoints/``. If they are missing it attempts to download them from the URLs defined in ``MODEL_URLS``. Because network access on Hugging Face Spaces may be restricted, any failure to fetch the files triggers a ``RuntimeError`` with instructions for manual upload. You can obtain the weight files by visiting the links in ``MODEL_URLS`` on your local machine and then uploading the files into the ``checkpoints`` folder of your Space. Once the files are present locally, the download step will be skipped on subsequent runs. """ CHECKPOINTS_DIR.mkdir(parents=True, exist_ok=True) import requests for filename, url in MODEL_URLS.items(): dest = CHECKPOINTS_DIR / filename if dest.exists(): continue try: with requests.get(url, stream=True, timeout=10) as r: r.raise_for_status() with open(dest, "wb") as f: for chunk in r.iter_content(chunk_size=8192): if chunk: f.write(chunk) except Exception as e: # Clean up partially downloaded file if dest.exists(): try: dest.unlink() except Exception: pass raise RuntimeError( f"Failed to download {filename} from {url}: {e}.\n" "This environment may block external downloads. " "Please manually download the model weights to your local machine and upload them " "into the 'Wav2Lip/checkpoints' directory of your Space." ) def ensure_setup() -> None: """Ensure that the Wav2Lip repository and model weights are available. This helper wraps ``clone_repository`` and ``download_model_weights`` and converts any failures into user‑visible errors. Raising ``gr.Error`` here allows us to display a helpful message inside the Gradio interface rather than causing the entire Space to crash. """ try: clone_repository() download_model_weights() except Exception as e: # Re‑raise as gr.Error to display in the UI raise gr.Error(str(e)) def validate_audio_length(audio_path: str) -> None: """Raise a ValueError if the audio duration is not between 1 and 10 minutes.""" audio = AudioSegment.from_file(audio_path) duration_sec = len(audio) / 1000.0 if duration_sec < 60: raise ValueError("Audio is too short: please provide at least 1 minute of audio.") if duration_sec > 600: raise ValueError("Audio is too long: please limit the audio to 10 minutes.") def run_inference(image_path: Path, audio_path: Path) -> Path: """ Generate a lip‑synced video from an avatar image and audio track. This function attempts to perform high‑quality lip synchronisation via the Wav2Lip model. If the required model repository or weights are not available (for example because outbound network traffic is blocked or the weight files are too large to download), it falls back to a lightweight amplitude‑driven mouth animation. The fallback uses only OpenCV and MoviePy to create a simple talking head effect by stretching the mouth region based on the loudness of the audio. Although not as accurate as Wav2Lip, the fallback produces a plausible talking animation without requiring any deep learning checkpoints. Parameters ---------- image_path : Path Path to the avatar image saved on disk. audio_path : Path Path to the audio file saved on disk. Returns ------- Path Path to the generated MP4 video relative to the working directory. """ # First attempt the full Wav2Lip pipeline. If anything fails (e.g. missing # repository or weights, runtime errors from the inference script), we # swallow the error and fall back to the simple implementation. try: ensure_setup() outputs_dir = Path("outputs") outputs_dir.mkdir(exist_ok=True) output_path = outputs_dir / f"result_{image_path.stem}.mp4" cmd = [ "python", "inference.py", "--checkpoint_path", str(CHECKPOINTS_DIR / WAV2LIP_MODEL), "--segmentation_path", str(CHECKPOINTS_DIR / FACE_SEG_MODEL), "--face", str(image_path), "--audio", str(audio_path), "--outfile", str(output_path), "--pads", "0", "10", "0", "0", ] subprocess.run(cmd, cwd=str(REPO_DIR), check=True) return output_path except Exception: # Fall back to simple lip‑sync implementation return simple_lip_sync(image_path, audio_path) def simple_lip_sync(image_path: Path, audio_path: Path, fps: int = 25) -> Path: """ Create a basic talking head animation without neural networks. This fallback implementation estimates speech activity from the audio's root‑mean‑square (RMS) amplitude and stretches the mouth region of the avatar image accordingly. Frames are saved to a temporary directory and then stitched together with the original audio via the system ``ffmpeg`` binary. This avoids heavy Python dependencies (like OpenCV and MoviePy) and works in network‑restricted environments as long as ``ffmpeg`` is available (it is installed by default on Hugging Face Spaces CPU images). Parameters ---------- image_path : Path Path to the input image. audio_path : Path Path to the input audio file. fps : int, optional Frames per second for the output video, by default 25. Returns ------- Path Path to the generated video file. """ from PIL import Image # Pillow for image manipulation # Load avatar image (RGB) try: img = Image.open(str(image_path)).convert("RGB") except Exception: raise RuntimeError("Failed to load the avatar image. Please ensure the file is a valid image.") width, height = img.size # Approximate mouth bounding box (tune proportions if necessary) mouth_w = int(width * 0.6) mouth_h = int(height * 0.15) mouth_x = int(width * 0.2) mouth_y = int(height * 0.65) # Load audio and compute amplitude per frame audio = AudioSegment.from_file(str(audio_path)) samples = np.array(audio.get_array_of_samples()).astype(np.float32) # Stereo to mono if necessary if audio.channels > 1: samples = samples.reshape((-1, audio.channels)).mean(axis=1) frame_size = int(audio.frame_rate / fps) n_frames = max(int(len(samples) / frame_size), 1) amplitudes: list[float] = [] for i in range(n_frames): segment = samples[i * frame_size : (i + 1) * frame_size] if segment.size == 0: amp = 0.0 else: # Root mean square of the audio segment amp = float(np.sqrt(np.mean(segment ** 2))) amplitudes.append(amp) max_amp = max(amplitudes) if amplitudes else 1.0 if max_amp == 0: max_amp = 1.0 # Normalise amplitudes to [0, 1] amplitudes = [amp / max_amp for amp in amplitudes] # Prepare output paths outputs_dir = Path("outputs") outputs_dir.mkdir(exist_ok=True) output_path = outputs_dir / f"simple_{image_path.stem}.mp4" # Create temporary directory for frames with tempfile.TemporaryDirectory() as tmpdir: frames_dir = Path(tmpdir) # Generate each frame for idx, amp in enumerate(amplitudes): # Scaling factor between 1.0 (mouth closed) and 1.6 (fully open) factor = 1.0 + amp * 0.6 # Start from a copy of the base image frame_img = img.copy() # Crop mouth region from the base image roi = img.crop((mouth_x, mouth_y, mouth_x + mouth_w, mouth_y + mouth_h)) # Scale ROI vertically new_h = max(1, int(mouth_h * factor)) scaled = roi.resize((mouth_w, new_h), Image.BILINEAR) # Compute overlay height (do not exceed image bounds) end_y = mouth_y + new_h if end_y > height: # Trim scaled ROI if it would overflow beyond the image bottom trim_h = height - mouth_y scaled = scaled.crop((0, 0, mouth_w, trim_h)) end_y = height # Paste scaled ROI onto frame frame_img.paste(scaled, (mouth_x, mouth_y)) # Save frame as PNG frame_filename = frames_dir / f"frame_{idx:04d}.png" frame_img.save(frame_filename) # Assemble video using ffmpeg. The -shortest flag ensures that the # output ends when the shorter of the audio or video streams ends. Use # -loglevel error to suppress verbose output. cmd = [ "ffmpeg", "-y", # overwrite existing file "-loglevel", "error", "-framerate", str(fps), "-i", str(frames_dir / "frame_%04d.png"), "-i", str(audio_path), "-c:v", "libx264", "-pix_fmt", "yuv420p", "-c:a", "aac", "-shortest", str(output_path), ] try: subprocess.run(cmd, check=True) except Exception as e: # If ffmpeg fails (e.g. missing binary), raise a user‑visible error raise RuntimeError( f"Failed to assemble video with ffmpeg: {e}. " "Ensure that the ffmpeg binary is available in the environment." ) return output_path def generate_video(avatar_file, audio_file): """ Gradio callback to generate a lip‑synced video. This function receives the uploaded avatar and audio files from Gradio's ``gr.File`` inputs. Depending on the ``type`` parameter of the file component and the version of Gradio, the objects passed into this function can take on different forms. They may be file-like objects supporting ``read()``, simple strings containing a path on disk, or ``NamedString`` instances with a ``name`` attribute pointing to a temporary file location. To robustly handle all of these cases, we normalise the inputs by copying their contents into a temporary directory, ensuring that subsequent processing always operates on filesystem paths. This avoids ``AttributeError`` issues such as ``'NamedString' object has no attribute 'read'`` seen with newer versions of Gradio. Parameters ---------- avatar_file : Any Uploaded image or video containing the face. Can be a file-like object, a path string, or a NamedString/UploadFile depending on Gradio version. audio_file : Any Uploaded audio file. Same possible types as ``avatar_file``. Returns ------- str | None Path to the generated MP4 file (relative to Gradio working directory), or ``None`` if either input is missing. """ if avatar_file is None or audio_file is None: return None def _copy_input_to_path(file_obj, dest_path: Path) -> None: """Copy the uploaded file into a destination path. Parameters ---------- file_obj : Any The object returned by Gradio's file component. dest_path : Path Destination path where the file should be written. """ # Case 1: file-like object (has .read attribute) if hasattr(file_obj, "read"): dest_path.write_bytes(file_obj.read()) return # Case 2: file object implements .getvalue (e.g. io.BytesIO) if hasattr(file_obj, "getvalue"): dest_path.write_bytes(file_obj.getvalue()) return # Case 3: NamedString or similar with a .name attribute (points to a temp file) filename = None if hasattr(file_obj, "name") and isinstance(getattr(file_obj, "name"), (str, bytes)): filename = file_obj.name elif hasattr(file_obj, "path") and isinstance(getattr(file_obj, "path"), (str, bytes)): filename = file_obj.path # Case 4: the input itself is a string/Path representing a path on disk if filename is None and isinstance(file_obj, (str, os.PathLike)): filename = str(file_obj) if filename is not None: # Copy the file from its existing location shutil.copy(filename, dest_path) else: # Last resort: try to convert to bytes directly try: dest_path.write_bytes(bytes(file_obj)) except Exception: raise gr.Error(f"Unsupported input type: {type(file_obj)}") # Save uploaded files to a temporary directory with tempfile.TemporaryDirectory() as tmpdir: avatar_path = Path(tmpdir) / "avatar" audio_path = Path(tmpdir) / "audio" _copy_input_to_path(avatar_file, avatar_path) _copy_input_to_path(audio_file, audio_path) # Validate audio length try: validate_audio_length(str(audio_path)) except Exception as e: raise gr.Error(str(e)) # Run inference try: result_path = run_inference(avatar_path, audio_path) except subprocess.CalledProcessError as e: raise gr.Error(f"Inference failed: {e}") return str(result_path) def build_interface(): """Construct the Gradio interface.""" with gr.Blocks(title="Lip‑Sync Video Generator") as demo: gr.Markdown( """ # Lip‑Sync Video Generator Upload an image (PNG/JPG) or short video of an avatar and an audio file (MP3/WAV/M4A) between 1 – 10 minutes. Click **Generate video** to create a new video where the avatar lip‑syncs to the audio. The first generation may take several minutes because the model needs to be downloaded. """ ) with gr.Row(): avatar_input = gr.File(label="Avatar image/video", file_count="single") audio_input = gr.File(label="Audio (1–10 min)", file_count="single") generate_btn = gr.Button("Generate video") result_video = gr.Video(label="Output video", interactive=False) generate_btn.click( fn=generate_video, inputs=[avatar_input, audio_input], outputs=result_video, show_progress=True, ) return demo if __name__ == "__main__": demo = build_interface() # Queue requests so multiple users can use the model concurrently demo.queue().launch()