gyrus2's picture
Use system ffmpeg for fallback lip-sync; generate frames and encode via ffmpeg
be38427 verified
"""
Gradio web application for generating lip‑synchronised videos from an avatar image
and an audio clip. On the first run it clones the Wav2Lip repository and
downloads the required model checkpoints. Subsequent runs reuse the cached
repository and weights. The heavy inference is executed by spawning the
original `inference.py` script provided by Wav2Lip.
To deploy on Hugging Face Spaces:
1. Create a new Gradio space.
2. Upload this file along with ``requirements.txt`` and ``README.md``.
3. Commit and wait for the space to build. Enable GPU hardware for best performance.
Author: ChatGPT (2025)
License: MIT (same as Wav2Lip)
"""
import os
import shutil
import subprocess
import tempfile
from pathlib import Path
import gradio as gr
import numpy as np
from pydub import AudioSegment
# Constants for repository and checkpoint names
REPO_URL = "https://github.com/Rudrabha/Wav2Lip.git"
REPO_DIR = Path("Wav2Lip")
CHECKPOINTS_DIR = REPO_DIR / "checkpoints"
WAV2LIP_MODEL = "wav2lip_gan.pth"
FACE_SEG_MODEL = "face_segmentation.pth"
# Direct download links for model weights. These links point to publicly
# accessible mirrors. If they stop working, you can upload the files into
# the checkpoints folder manually and the app will skip downloading.
# Public mirrors for the required model weights.
#
# The original Wav2Lip checkpoint (``wav2lip_gan.pth``) and face segmentation
# model are large binary files hosted on Hugging Face. During development we
# discovered that the URLs previously used in this script pointed at the wrong
# repository (``Wave2lip`` vs ``Wave2Lip``) and returned 404 errors. The
# corrected links below refer to the official ``Wave2lip`` repository for the
# GAN checkpoint and to a Wav2Lip‑HD repository for the face segmentation
# model. Note that these files are large (hundreds of megabytes) and may
# require you to upload them manually into the ``checkpoints`` directory of
# your Space if the automated download fails due to network restrictions.
MODEL_URLS = {
# 436 MB GAN checkpoint hosted by Non‑playing‑Character
WAV2LIP_MODEL: "https://huggingface.co/Non-playing-Character/Wave2lip/resolve/main/wav2lip_gan.pth",
# 53 MB face segmentation model hosted by commanderx (Wav2Lip‑HD project)
FACE_SEG_MODEL: "https://huggingface.co/commanderx/Wav2Lip-HD/resolve/main/checkpoints/face_segmentation.pth",
}
def clone_repository() -> None:
"""Ensure that the Wav2Lip repository is available locally.
The original implementation attempted to clone the repository from GitHub
every time the app started. However, Hugging Face Spaces often run in
restricted network environments where external git operations are
disallowed. To make the app resilient to such conditions we take a
multi‑step approach:
1. If the ``Wav2Lip`` directory already exists, do nothing.
2. Otherwise, if a ``Wav2Lip-master.zip`` archive is present in the
current working directory, extract it to create the repository. You
can provide this archive by downloading the Wav2Lip source code from
GitHub on your local machine and uploading it into your Space using
the Hugging Face web interface.
3. As a last resort, attempt to perform a shallow git clone of the
upstream repository. If this fails due to lack of network access,
raise a ``RuntimeError`` instructing the user to upload the
``Wav2Lip-master.zip`` archive instead.
"""
if REPO_DIR.exists():
# Repository already present
return
# Attempt to extract from local zip if available
archive_name = "Wav2Lip-master.zip"
archive_path = Path(archive_name)
if archive_path.exists():
try:
shutil.unpack_archive(str(archive_path), ".")
except Exception as e:
raise RuntimeError(
f"Failed to extract {archive_name}: {e}. Please ensure the archive is a valid zip file."
)
# The archive extracts into a directory named ``Wav2Lip-master``. Rename it to ``Wav2Lip``.
extracted_dir = Path("Wav2Lip-master")
if extracted_dir.exists():
extracted_dir.rename(REPO_DIR)
return
# Fallback: try cloning from GitHub
try:
subprocess.run([
"git",
"clone",
"--depth",
"1",
REPO_URL,
str(REPO_DIR),
], check=True)
except Exception as e:
raise RuntimeError(
"Unable to clone the Wav2Lip repository. This environment may not allow outbound network connections. "
"Please download the Wav2Lip source code as a zip file and upload it to your Space. "
"Rename the archive to 'Wav2Lip-master.zip' and place it in the root directory of your repository."
)
def download_model_weights() -> None:
"""Ensure that the required model weights are present.
This function first checks whether the two required checkpoint files
(``wav2lip_gan.pth`` and ``face_segmentation.pth``) already exist in
``checkpoints/``. If they are missing it attempts to download them
from the URLs defined in ``MODEL_URLS``. Because network access on
Hugging Face Spaces may be restricted, any failure to fetch the files
triggers a ``RuntimeError`` with instructions for manual upload. You
can obtain the weight files by visiting the links in ``MODEL_URLS`` on
your local machine and then uploading the files into the
``checkpoints`` folder of your Space. Once the files are present
locally, the download step will be skipped on subsequent runs.
"""
CHECKPOINTS_DIR.mkdir(parents=True, exist_ok=True)
import requests
for filename, url in MODEL_URLS.items():
dest = CHECKPOINTS_DIR / filename
if dest.exists():
continue
try:
with requests.get(url, stream=True, timeout=10) as r:
r.raise_for_status()
with open(dest, "wb") as f:
for chunk in r.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
except Exception as e:
# Clean up partially downloaded file
if dest.exists():
try:
dest.unlink()
except Exception:
pass
raise RuntimeError(
f"Failed to download {filename} from {url}: {e}.\n"
"This environment may block external downloads. "
"Please manually download the model weights to your local machine and upload them "
"into the 'Wav2Lip/checkpoints' directory of your Space."
)
def ensure_setup() -> None:
"""Ensure that the Wav2Lip repository and model weights are available.
This helper wraps ``clone_repository`` and ``download_model_weights`` and
converts any failures into user‑visible errors. Raising
``gr.Error`` here allows us to display a helpful message inside the
Gradio interface rather than causing the entire Space to crash.
"""
try:
clone_repository()
download_model_weights()
except Exception as e:
# Re‑raise as gr.Error to display in the UI
raise gr.Error(str(e))
def validate_audio_length(audio_path: str) -> None:
"""Raise a ValueError if the audio duration is not between 1 and 10 minutes."""
audio = AudioSegment.from_file(audio_path)
duration_sec = len(audio) / 1000.0
if duration_sec < 60:
raise ValueError("Audio is too short: please provide at least 1 minute of audio.")
if duration_sec > 600:
raise ValueError("Audio is too long: please limit the audio to 10 minutes.")
def run_inference(image_path: Path, audio_path: Path) -> Path:
"""
Generate a lip‑synced video from an avatar image and audio track.
This function attempts to perform high‑quality lip synchronisation via the
Wav2Lip model. If the required model repository or weights are not
available (for example because outbound network traffic is blocked or the
weight files are too large to download), it falls back to a lightweight
amplitude‑driven mouth animation. The fallback uses only OpenCV and
MoviePy to create a simple talking head effect by stretching the mouth
region based on the loudness of the audio. Although not as accurate as
Wav2Lip, the fallback produces a plausible talking animation without
requiring any deep learning checkpoints.
Parameters
----------
image_path : Path
Path to the avatar image saved on disk.
audio_path : Path
Path to the audio file saved on disk.
Returns
-------
Path
Path to the generated MP4 video relative to the working directory.
"""
# First attempt the full Wav2Lip pipeline. If anything fails (e.g. missing
# repository or weights, runtime errors from the inference script), we
# swallow the error and fall back to the simple implementation.
try:
ensure_setup()
outputs_dir = Path("outputs")
outputs_dir.mkdir(exist_ok=True)
output_path = outputs_dir / f"result_{image_path.stem}.mp4"
cmd = [
"python", "inference.py",
"--checkpoint_path", str(CHECKPOINTS_DIR / WAV2LIP_MODEL),
"--segmentation_path", str(CHECKPOINTS_DIR / FACE_SEG_MODEL),
"--face", str(image_path),
"--audio", str(audio_path),
"--outfile", str(output_path),
"--pads", "0", "10", "0", "0",
]
subprocess.run(cmd, cwd=str(REPO_DIR), check=True)
return output_path
except Exception:
# Fall back to simple lip‑sync implementation
return simple_lip_sync(image_path, audio_path)
def simple_lip_sync(image_path: Path, audio_path: Path, fps: int = 25) -> Path:
"""
Create a basic talking head animation without neural networks.
This fallback implementation estimates speech activity from the audio's
root‑mean‑square (RMS) amplitude and stretches the mouth region of the
avatar image accordingly. Frames are saved to a temporary directory and
then stitched together with the original audio via the system ``ffmpeg``
binary. This avoids heavy Python dependencies (like OpenCV and
MoviePy) and works in network‑restricted environments as long as
``ffmpeg`` is available (it is installed by default on Hugging Face
Spaces CPU images).
Parameters
----------
image_path : Path
Path to the input image.
audio_path : Path
Path to the input audio file.
fps : int, optional
Frames per second for the output video, by default 25.
Returns
-------
Path
Path to the generated video file.
"""
from PIL import Image # Pillow for image manipulation
# Load avatar image (RGB)
try:
img = Image.open(str(image_path)).convert("RGB")
except Exception:
raise RuntimeError("Failed to load the avatar image. Please ensure the file is a valid image.")
width, height = img.size
# Approximate mouth bounding box (tune proportions if necessary)
mouth_w = int(width * 0.6)
mouth_h = int(height * 0.15)
mouth_x = int(width * 0.2)
mouth_y = int(height * 0.65)
# Load audio and compute amplitude per frame
audio = AudioSegment.from_file(str(audio_path))
samples = np.array(audio.get_array_of_samples()).astype(np.float32)
# Stereo to mono if necessary
if audio.channels > 1:
samples = samples.reshape((-1, audio.channels)).mean(axis=1)
frame_size = int(audio.frame_rate / fps)
n_frames = max(int(len(samples) / frame_size), 1)
amplitudes: list[float] = []
for i in range(n_frames):
segment = samples[i * frame_size : (i + 1) * frame_size]
if segment.size == 0:
amp = 0.0
else:
# Root mean square of the audio segment
amp = float(np.sqrt(np.mean(segment ** 2)))
amplitudes.append(amp)
max_amp = max(amplitudes) if amplitudes else 1.0
if max_amp == 0:
max_amp = 1.0
# Normalise amplitudes to [0, 1]
amplitudes = [amp / max_amp for amp in amplitudes]
# Prepare output paths
outputs_dir = Path("outputs")
outputs_dir.mkdir(exist_ok=True)
output_path = outputs_dir / f"simple_{image_path.stem}.mp4"
# Create temporary directory for frames
with tempfile.TemporaryDirectory() as tmpdir:
frames_dir = Path(tmpdir)
# Generate each frame
for idx, amp in enumerate(amplitudes):
# Scaling factor between 1.0 (mouth closed) and 1.6 (fully open)
factor = 1.0 + amp * 0.6
# Start from a copy of the base image
frame_img = img.copy()
# Crop mouth region from the base image
roi = img.crop((mouth_x, mouth_y, mouth_x + mouth_w, mouth_y + mouth_h))
# Scale ROI vertically
new_h = max(1, int(mouth_h * factor))
scaled = roi.resize((mouth_w, new_h), Image.BILINEAR)
# Compute overlay height (do not exceed image bounds)
end_y = mouth_y + new_h
if end_y > height:
# Trim scaled ROI if it would overflow beyond the image bottom
trim_h = height - mouth_y
scaled = scaled.crop((0, 0, mouth_w, trim_h))
end_y = height
# Paste scaled ROI onto frame
frame_img.paste(scaled, (mouth_x, mouth_y))
# Save frame as PNG
frame_filename = frames_dir / f"frame_{idx:04d}.png"
frame_img.save(frame_filename)
# Assemble video using ffmpeg. The -shortest flag ensures that the
# output ends when the shorter of the audio or video streams ends. Use
# -loglevel error to suppress verbose output.
cmd = [
"ffmpeg",
"-y", # overwrite existing file
"-loglevel", "error",
"-framerate", str(fps),
"-i", str(frames_dir / "frame_%04d.png"),
"-i", str(audio_path),
"-c:v", "libx264",
"-pix_fmt", "yuv420p",
"-c:a", "aac",
"-shortest",
str(output_path),
]
try:
subprocess.run(cmd, check=True)
except Exception as e:
# If ffmpeg fails (e.g. missing binary), raise a user‑visible error
raise RuntimeError(
f"Failed to assemble video with ffmpeg: {e}. "
"Ensure that the ffmpeg binary is available in the environment."
)
return output_path
def generate_video(avatar_file, audio_file):
"""
Gradio callback to generate a lip‑synced video.
This function receives the uploaded avatar and audio files from Gradio's
``gr.File`` inputs. Depending on the ``type`` parameter of the file
component and the version of Gradio, the objects passed into this
function can take on different forms. They may be file-like objects
supporting ``read()``, simple strings containing a path on disk, or
``NamedString`` instances with a ``name`` attribute pointing to a
temporary file location. To robustly handle all of these cases, we
normalise the inputs by copying their contents into a temporary
directory, ensuring that subsequent processing always operates on
filesystem paths. This avoids ``AttributeError`` issues such as
``'NamedString' object has no attribute 'read'`` seen with newer
versions of Gradio.
Parameters
----------
avatar_file : Any
Uploaded image or video containing the face. Can be a file-like
object, a path string, or a NamedString/UploadFile depending on
Gradio version.
audio_file : Any
Uploaded audio file. Same possible types as ``avatar_file``.
Returns
-------
str | None
Path to the generated MP4 file (relative to Gradio working directory),
or ``None`` if either input is missing.
"""
if avatar_file is None or audio_file is None:
return None
def _copy_input_to_path(file_obj, dest_path: Path) -> None:
"""Copy the uploaded file into a destination path.
Parameters
----------
file_obj : Any
The object returned by Gradio's file component.
dest_path : Path
Destination path where the file should be written.
"""
# Case 1: file-like object (has .read attribute)
if hasattr(file_obj, "read"):
dest_path.write_bytes(file_obj.read())
return
# Case 2: file object implements .getvalue (e.g. io.BytesIO)
if hasattr(file_obj, "getvalue"):
dest_path.write_bytes(file_obj.getvalue())
return
# Case 3: NamedString or similar with a .name attribute (points to a temp file)
filename = None
if hasattr(file_obj, "name") and isinstance(getattr(file_obj, "name"), (str, bytes)):
filename = file_obj.name
elif hasattr(file_obj, "path") and isinstance(getattr(file_obj, "path"), (str, bytes)):
filename = file_obj.path
# Case 4: the input itself is a string/Path representing a path on disk
if filename is None and isinstance(file_obj, (str, os.PathLike)):
filename = str(file_obj)
if filename is not None:
# Copy the file from its existing location
shutil.copy(filename, dest_path)
else:
# Last resort: try to convert to bytes directly
try:
dest_path.write_bytes(bytes(file_obj))
except Exception:
raise gr.Error(f"Unsupported input type: {type(file_obj)}")
# Save uploaded files to a temporary directory
with tempfile.TemporaryDirectory() as tmpdir:
avatar_path = Path(tmpdir) / "avatar"
audio_path = Path(tmpdir) / "audio"
_copy_input_to_path(avatar_file, avatar_path)
_copy_input_to_path(audio_file, audio_path)
# Validate audio length
try:
validate_audio_length(str(audio_path))
except Exception as e:
raise gr.Error(str(e))
# Run inference
try:
result_path = run_inference(avatar_path, audio_path)
except subprocess.CalledProcessError as e:
raise gr.Error(f"Inference failed: {e}")
return str(result_path)
def build_interface():
"""Construct the Gradio interface."""
with gr.Blocks(title="Lip‑Sync Video Generator") as demo:
gr.Markdown(
"""
# Lip‑Sync Video Generator
Upload an image (PNG/JPG) or short video of an avatar and an audio file (MP3/WAV/M4A) between 1 – 10 minutes. Click **Generate video** to create a new video where the avatar lip‑syncs to the audio. The first generation may take several minutes because the model needs to be downloaded.
"""
)
with gr.Row():
avatar_input = gr.File(label="Avatar image/video", file_count="single")
audio_input = gr.File(label="Audio (1–10 min)", file_count="single")
generate_btn = gr.Button("Generate video")
result_video = gr.Video(label="Output video", interactive=False)
generate_btn.click(
fn=generate_video,
inputs=[avatar_input, audio_input],
outputs=result_video,
show_progress=True,
)
return demo
if __name__ == "__main__":
demo = build_interface()
# Queue requests so multiple users can use the model concurrently
demo.queue().launch()