Spaces:
Running
Running
| """ | |
| Gradio web application for generating lip‑synchronised videos from an avatar image | |
| and an audio clip. On the first run it clones the Wav2Lip repository and | |
| downloads the required model checkpoints. Subsequent runs reuse the cached | |
| repository and weights. The heavy inference is executed by spawning the | |
| original `inference.py` script provided by Wav2Lip. | |
| To deploy on Hugging Face Spaces: | |
| 1. Create a new Gradio space. | |
| 2. Upload this file along with ``requirements.txt`` and ``README.md``. | |
| 3. Commit and wait for the space to build. Enable GPU hardware for best performance. | |
| Author: ChatGPT (2025) | |
| License: MIT (same as Wav2Lip) | |
| """ | |
| import os | |
| import shutil | |
| import subprocess | |
| import tempfile | |
| from pathlib import Path | |
| import gradio as gr | |
| import numpy as np | |
| from pydub import AudioSegment | |
| # Constants for repository and checkpoint names | |
| REPO_URL = "https://github.com/Rudrabha/Wav2Lip.git" | |
| REPO_DIR = Path("Wav2Lip") | |
| CHECKPOINTS_DIR = REPO_DIR / "checkpoints" | |
| WAV2LIP_MODEL = "wav2lip_gan.pth" | |
| FACE_SEG_MODEL = "face_segmentation.pth" | |
| # Direct download links for model weights. These links point to publicly | |
| # accessible mirrors. If they stop working, you can upload the files into | |
| # the checkpoints folder manually and the app will skip downloading. | |
| # Public mirrors for the required model weights. | |
| # | |
| # The original Wav2Lip checkpoint (``wav2lip_gan.pth``) and face segmentation | |
| # model are large binary files hosted on Hugging Face. During development we | |
| # discovered that the URLs previously used in this script pointed at the wrong | |
| # repository (``Wave2lip`` vs ``Wave2Lip``) and returned 404 errors. The | |
| # corrected links below refer to the official ``Wave2lip`` repository for the | |
| # GAN checkpoint and to a Wav2Lip‑HD repository for the face segmentation | |
| # model. Note that these files are large (hundreds of megabytes) and may | |
| # require you to upload them manually into the ``checkpoints`` directory of | |
| # your Space if the automated download fails due to network restrictions. | |
| MODEL_URLS = { | |
| # 436 MB GAN checkpoint hosted by Non‑playing‑Character | |
| WAV2LIP_MODEL: "https://huggingface.co/Non-playing-Character/Wave2lip/resolve/main/wav2lip_gan.pth", | |
| # 53 MB face segmentation model hosted by commanderx (Wav2Lip‑HD project) | |
| FACE_SEG_MODEL: "https://huggingface.co/commanderx/Wav2Lip-HD/resolve/main/checkpoints/face_segmentation.pth", | |
| } | |
| def clone_repository() -> None: | |
| """Ensure that the Wav2Lip repository is available locally. | |
| The original implementation attempted to clone the repository from GitHub | |
| every time the app started. However, Hugging Face Spaces often run in | |
| restricted network environments where external git operations are | |
| disallowed. To make the app resilient to such conditions we take a | |
| multi‑step approach: | |
| 1. If the ``Wav2Lip`` directory already exists, do nothing. | |
| 2. Otherwise, if a ``Wav2Lip-master.zip`` archive is present in the | |
| current working directory, extract it to create the repository. You | |
| can provide this archive by downloading the Wav2Lip source code from | |
| GitHub on your local machine and uploading it into your Space using | |
| the Hugging Face web interface. | |
| 3. As a last resort, attempt to perform a shallow git clone of the | |
| upstream repository. If this fails due to lack of network access, | |
| raise a ``RuntimeError`` instructing the user to upload the | |
| ``Wav2Lip-master.zip`` archive instead. | |
| """ | |
| if REPO_DIR.exists(): | |
| # Repository already present | |
| return | |
| # Attempt to extract from local zip if available | |
| archive_name = "Wav2Lip-master.zip" | |
| archive_path = Path(archive_name) | |
| if archive_path.exists(): | |
| try: | |
| shutil.unpack_archive(str(archive_path), ".") | |
| except Exception as e: | |
| raise RuntimeError( | |
| f"Failed to extract {archive_name}: {e}. Please ensure the archive is a valid zip file." | |
| ) | |
| # The archive extracts into a directory named ``Wav2Lip-master``. Rename it to ``Wav2Lip``. | |
| extracted_dir = Path("Wav2Lip-master") | |
| if extracted_dir.exists(): | |
| extracted_dir.rename(REPO_DIR) | |
| return | |
| # Fallback: try cloning from GitHub | |
| try: | |
| subprocess.run([ | |
| "git", | |
| "clone", | |
| "--depth", | |
| "1", | |
| REPO_URL, | |
| str(REPO_DIR), | |
| ], check=True) | |
| except Exception as e: | |
| raise RuntimeError( | |
| "Unable to clone the Wav2Lip repository. This environment may not allow outbound network connections. " | |
| "Please download the Wav2Lip source code as a zip file and upload it to your Space. " | |
| "Rename the archive to 'Wav2Lip-master.zip' and place it in the root directory of your repository." | |
| ) | |
| def download_model_weights() -> None: | |
| """Ensure that the required model weights are present. | |
| This function first checks whether the two required checkpoint files | |
| (``wav2lip_gan.pth`` and ``face_segmentation.pth``) already exist in | |
| ``checkpoints/``. If they are missing it attempts to download them | |
| from the URLs defined in ``MODEL_URLS``. Because network access on | |
| Hugging Face Spaces may be restricted, any failure to fetch the files | |
| triggers a ``RuntimeError`` with instructions for manual upload. You | |
| can obtain the weight files by visiting the links in ``MODEL_URLS`` on | |
| your local machine and then uploading the files into the | |
| ``checkpoints`` folder of your Space. Once the files are present | |
| locally, the download step will be skipped on subsequent runs. | |
| """ | |
| CHECKPOINTS_DIR.mkdir(parents=True, exist_ok=True) | |
| import requests | |
| for filename, url in MODEL_URLS.items(): | |
| dest = CHECKPOINTS_DIR / filename | |
| if dest.exists(): | |
| continue | |
| try: | |
| with requests.get(url, stream=True, timeout=10) as r: | |
| r.raise_for_status() | |
| with open(dest, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=8192): | |
| if chunk: | |
| f.write(chunk) | |
| except Exception as e: | |
| # Clean up partially downloaded file | |
| if dest.exists(): | |
| try: | |
| dest.unlink() | |
| except Exception: | |
| pass | |
| raise RuntimeError( | |
| f"Failed to download {filename} from {url}: {e}.\n" | |
| "This environment may block external downloads. " | |
| "Please manually download the model weights to your local machine and upload them " | |
| "into the 'Wav2Lip/checkpoints' directory of your Space." | |
| ) | |
| def ensure_setup() -> None: | |
| """Ensure that the Wav2Lip repository and model weights are available. | |
| This helper wraps ``clone_repository`` and ``download_model_weights`` and | |
| converts any failures into user‑visible errors. Raising | |
| ``gr.Error`` here allows us to display a helpful message inside the | |
| Gradio interface rather than causing the entire Space to crash. | |
| """ | |
| try: | |
| clone_repository() | |
| download_model_weights() | |
| except Exception as e: | |
| # Re‑raise as gr.Error to display in the UI | |
| raise gr.Error(str(e)) | |
| def validate_audio_length(audio_path: str) -> None: | |
| """Raise a ValueError if the audio duration is not between 1 and 10 minutes.""" | |
| audio = AudioSegment.from_file(audio_path) | |
| duration_sec = len(audio) / 1000.0 | |
| if duration_sec < 60: | |
| raise ValueError("Audio is too short: please provide at least 1 minute of audio.") | |
| if duration_sec > 600: | |
| raise ValueError("Audio is too long: please limit the audio to 10 minutes.") | |
| def run_inference(image_path: Path, audio_path: Path) -> Path: | |
| """ | |
| Generate a lip‑synced video from an avatar image and audio track. | |
| This function attempts to perform high‑quality lip synchronisation via the | |
| Wav2Lip model. If the required model repository or weights are not | |
| available (for example because outbound network traffic is blocked or the | |
| weight files are too large to download), it falls back to a lightweight | |
| amplitude‑driven mouth animation. The fallback uses only OpenCV and | |
| MoviePy to create a simple talking head effect by stretching the mouth | |
| region based on the loudness of the audio. Although not as accurate as | |
| Wav2Lip, the fallback produces a plausible talking animation without | |
| requiring any deep learning checkpoints. | |
| Parameters | |
| ---------- | |
| image_path : Path | |
| Path to the avatar image saved on disk. | |
| audio_path : Path | |
| Path to the audio file saved on disk. | |
| Returns | |
| ------- | |
| Path | |
| Path to the generated MP4 video relative to the working directory. | |
| """ | |
| # First attempt the full Wav2Lip pipeline. If anything fails (e.g. missing | |
| # repository or weights, runtime errors from the inference script), we | |
| # swallow the error and fall back to the simple implementation. | |
| try: | |
| ensure_setup() | |
| outputs_dir = Path("outputs") | |
| outputs_dir.mkdir(exist_ok=True) | |
| output_path = outputs_dir / f"result_{image_path.stem}.mp4" | |
| cmd = [ | |
| "python", "inference.py", | |
| "--checkpoint_path", str(CHECKPOINTS_DIR / WAV2LIP_MODEL), | |
| "--segmentation_path", str(CHECKPOINTS_DIR / FACE_SEG_MODEL), | |
| "--face", str(image_path), | |
| "--audio", str(audio_path), | |
| "--outfile", str(output_path), | |
| "--pads", "0", "10", "0", "0", | |
| ] | |
| subprocess.run(cmd, cwd=str(REPO_DIR), check=True) | |
| return output_path | |
| except Exception: | |
| # Fall back to simple lip‑sync implementation | |
| return simple_lip_sync(image_path, audio_path) | |
| def simple_lip_sync(image_path: Path, audio_path: Path, fps: int = 25) -> Path: | |
| """ | |
| Create a basic talking head animation without neural networks. | |
| This fallback implementation estimates speech activity from the audio's | |
| root‑mean‑square (RMS) amplitude and stretches the mouth region of the | |
| avatar image accordingly. Frames are saved to a temporary directory and | |
| then stitched together with the original audio via the system ``ffmpeg`` | |
| binary. This avoids heavy Python dependencies (like OpenCV and | |
| MoviePy) and works in network‑restricted environments as long as | |
| ``ffmpeg`` is available (it is installed by default on Hugging Face | |
| Spaces CPU images). | |
| Parameters | |
| ---------- | |
| image_path : Path | |
| Path to the input image. | |
| audio_path : Path | |
| Path to the input audio file. | |
| fps : int, optional | |
| Frames per second for the output video, by default 25. | |
| Returns | |
| ------- | |
| Path | |
| Path to the generated video file. | |
| """ | |
| from PIL import Image # Pillow for image manipulation | |
| # Load avatar image (RGB) | |
| try: | |
| img = Image.open(str(image_path)).convert("RGB") | |
| except Exception: | |
| raise RuntimeError("Failed to load the avatar image. Please ensure the file is a valid image.") | |
| width, height = img.size | |
| # Approximate mouth bounding box (tune proportions if necessary) | |
| mouth_w = int(width * 0.6) | |
| mouth_h = int(height * 0.15) | |
| mouth_x = int(width * 0.2) | |
| mouth_y = int(height * 0.65) | |
| # Load audio and compute amplitude per frame | |
| audio = AudioSegment.from_file(str(audio_path)) | |
| samples = np.array(audio.get_array_of_samples()).astype(np.float32) | |
| # Stereo to mono if necessary | |
| if audio.channels > 1: | |
| samples = samples.reshape((-1, audio.channels)).mean(axis=1) | |
| frame_size = int(audio.frame_rate / fps) | |
| n_frames = max(int(len(samples) / frame_size), 1) | |
| amplitudes: list[float] = [] | |
| for i in range(n_frames): | |
| segment = samples[i * frame_size : (i + 1) * frame_size] | |
| if segment.size == 0: | |
| amp = 0.0 | |
| else: | |
| # Root mean square of the audio segment | |
| amp = float(np.sqrt(np.mean(segment ** 2))) | |
| amplitudes.append(amp) | |
| max_amp = max(amplitudes) if amplitudes else 1.0 | |
| if max_amp == 0: | |
| max_amp = 1.0 | |
| # Normalise amplitudes to [0, 1] | |
| amplitudes = [amp / max_amp for amp in amplitudes] | |
| # Prepare output paths | |
| outputs_dir = Path("outputs") | |
| outputs_dir.mkdir(exist_ok=True) | |
| output_path = outputs_dir / f"simple_{image_path.stem}.mp4" | |
| # Create temporary directory for frames | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| frames_dir = Path(tmpdir) | |
| # Generate each frame | |
| for idx, amp in enumerate(amplitudes): | |
| # Scaling factor between 1.0 (mouth closed) and 1.6 (fully open) | |
| factor = 1.0 + amp * 0.6 | |
| # Start from a copy of the base image | |
| frame_img = img.copy() | |
| # Crop mouth region from the base image | |
| roi = img.crop((mouth_x, mouth_y, mouth_x + mouth_w, mouth_y + mouth_h)) | |
| # Scale ROI vertically | |
| new_h = max(1, int(mouth_h * factor)) | |
| scaled = roi.resize((mouth_w, new_h), Image.BILINEAR) | |
| # Compute overlay height (do not exceed image bounds) | |
| end_y = mouth_y + new_h | |
| if end_y > height: | |
| # Trim scaled ROI if it would overflow beyond the image bottom | |
| trim_h = height - mouth_y | |
| scaled = scaled.crop((0, 0, mouth_w, trim_h)) | |
| end_y = height | |
| # Paste scaled ROI onto frame | |
| frame_img.paste(scaled, (mouth_x, mouth_y)) | |
| # Save frame as PNG | |
| frame_filename = frames_dir / f"frame_{idx:04d}.png" | |
| frame_img.save(frame_filename) | |
| # Assemble video using ffmpeg. The -shortest flag ensures that the | |
| # output ends when the shorter of the audio or video streams ends. Use | |
| # -loglevel error to suppress verbose output. | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", # overwrite existing file | |
| "-loglevel", "error", | |
| "-framerate", str(fps), | |
| "-i", str(frames_dir / "frame_%04d.png"), | |
| "-i", str(audio_path), | |
| "-c:v", "libx264", | |
| "-pix_fmt", "yuv420p", | |
| "-c:a", "aac", | |
| "-shortest", | |
| str(output_path), | |
| ] | |
| try: | |
| subprocess.run(cmd, check=True) | |
| except Exception as e: | |
| # If ffmpeg fails (e.g. missing binary), raise a user‑visible error | |
| raise RuntimeError( | |
| f"Failed to assemble video with ffmpeg: {e}. " | |
| "Ensure that the ffmpeg binary is available in the environment." | |
| ) | |
| return output_path | |
| def generate_video(avatar_file, audio_file): | |
| """ | |
| Gradio callback to generate a lip‑synced video. | |
| This function receives the uploaded avatar and audio files from Gradio's | |
| ``gr.File`` inputs. Depending on the ``type`` parameter of the file | |
| component and the version of Gradio, the objects passed into this | |
| function can take on different forms. They may be file-like objects | |
| supporting ``read()``, simple strings containing a path on disk, or | |
| ``NamedString`` instances with a ``name`` attribute pointing to a | |
| temporary file location. To robustly handle all of these cases, we | |
| normalise the inputs by copying their contents into a temporary | |
| directory, ensuring that subsequent processing always operates on | |
| filesystem paths. This avoids ``AttributeError`` issues such as | |
| ``'NamedString' object has no attribute 'read'`` seen with newer | |
| versions of Gradio. | |
| Parameters | |
| ---------- | |
| avatar_file : Any | |
| Uploaded image or video containing the face. Can be a file-like | |
| object, a path string, or a NamedString/UploadFile depending on | |
| Gradio version. | |
| audio_file : Any | |
| Uploaded audio file. Same possible types as ``avatar_file``. | |
| Returns | |
| ------- | |
| str | None | |
| Path to the generated MP4 file (relative to Gradio working directory), | |
| or ``None`` if either input is missing. | |
| """ | |
| if avatar_file is None or audio_file is None: | |
| return None | |
| def _copy_input_to_path(file_obj, dest_path: Path) -> None: | |
| """Copy the uploaded file into a destination path. | |
| Parameters | |
| ---------- | |
| file_obj : Any | |
| The object returned by Gradio's file component. | |
| dest_path : Path | |
| Destination path where the file should be written. | |
| """ | |
| # Case 1: file-like object (has .read attribute) | |
| if hasattr(file_obj, "read"): | |
| dest_path.write_bytes(file_obj.read()) | |
| return | |
| # Case 2: file object implements .getvalue (e.g. io.BytesIO) | |
| if hasattr(file_obj, "getvalue"): | |
| dest_path.write_bytes(file_obj.getvalue()) | |
| return | |
| # Case 3: NamedString or similar with a .name attribute (points to a temp file) | |
| filename = None | |
| if hasattr(file_obj, "name") and isinstance(getattr(file_obj, "name"), (str, bytes)): | |
| filename = file_obj.name | |
| elif hasattr(file_obj, "path") and isinstance(getattr(file_obj, "path"), (str, bytes)): | |
| filename = file_obj.path | |
| # Case 4: the input itself is a string/Path representing a path on disk | |
| if filename is None and isinstance(file_obj, (str, os.PathLike)): | |
| filename = str(file_obj) | |
| if filename is not None: | |
| # Copy the file from its existing location | |
| shutil.copy(filename, dest_path) | |
| else: | |
| # Last resort: try to convert to bytes directly | |
| try: | |
| dest_path.write_bytes(bytes(file_obj)) | |
| except Exception: | |
| raise gr.Error(f"Unsupported input type: {type(file_obj)}") | |
| # Save uploaded files to a temporary directory | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| avatar_path = Path(tmpdir) / "avatar" | |
| audio_path = Path(tmpdir) / "audio" | |
| _copy_input_to_path(avatar_file, avatar_path) | |
| _copy_input_to_path(audio_file, audio_path) | |
| # Validate audio length | |
| try: | |
| validate_audio_length(str(audio_path)) | |
| except Exception as e: | |
| raise gr.Error(str(e)) | |
| # Run inference | |
| try: | |
| result_path = run_inference(avatar_path, audio_path) | |
| except subprocess.CalledProcessError as e: | |
| raise gr.Error(f"Inference failed: {e}") | |
| return str(result_path) | |
| def build_interface(): | |
| """Construct the Gradio interface.""" | |
| with gr.Blocks(title="Lip‑Sync Video Generator") as demo: | |
| gr.Markdown( | |
| """ | |
| # Lip‑Sync Video Generator | |
| Upload an image (PNG/JPG) or short video of an avatar and an audio file (MP3/WAV/M4A) between 1 – 10 minutes. Click **Generate video** to create a new video where the avatar lip‑syncs to the audio. The first generation may take several minutes because the model needs to be downloaded. | |
| """ | |
| ) | |
| with gr.Row(): | |
| avatar_input = gr.File(label="Avatar image/video", file_count="single") | |
| audio_input = gr.File(label="Audio (1–10 min)", file_count="single") | |
| generate_btn = gr.Button("Generate video") | |
| result_video = gr.Video(label="Output video", interactive=False) | |
| generate_btn.click( | |
| fn=generate_video, | |
| inputs=[avatar_input, audio_input], | |
| outputs=result_video, | |
| show_progress=True, | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = build_interface() | |
| # Queue requests so multiple users can use the model concurrently | |
| demo.queue().launch() |