Veo3Audio / app.py
LTTEAM's picture
Update app.py
21626b4 verified
import os
import sys
import uuid
import tempfile
import json
import inspect
import shutil
import torch
import gradio as gr
from huggingface_hub import snapshot_download
from omegaconf import OmegaConf
from diffusers import AutoencoderKL, DDIMScheduler
# ─── 0. Chuyển CWD & thiết lập PYTHONPATH ───────────────────────────
BASE_DIR = os.path.dirname(__file__)
# Chuyển working dir vào LatentSync để các đường dẫn relative bên trong đúng
os.chdir(os.path.join(BASE_DIR, "LatentSync"))
# Copy mask.png từ assets → latentsync/utils nếu cần
assets_mask = os.path.join("assets", "mask.png")
utils_mask = os.path.join("latentsync", "utils", "mask.png")
if os.path.exists(assets_mask) and not os.path.exists(utils_mask):
shutil.copy(assets_mask, utils_mask)
# Thêm Long_Tieng và LatentSync vào sys.path để import modules
sys.path.insert(0, os.path.join(BASE_DIR, "Long_Tieng"))
sys.path.insert(0, os.path.join(BASE_DIR, "LatentSync"))
# ─── 1. MMAUDIO (Long_Tieng) setup ─────────────────────────────────
from mmaudio.eval_utils import (
ModelConfig, all_model_cfg,
generate, load_video, make_video,
setup_eval_logging
)
from mmaudio.model.flow_matching import FlowMatching
from mmaudio.model.sequence_config import SequenceConfig
from mmaudio.model.utils.features_utils import FeaturesUtils
from mmaudio.model.networks import MMAudio, get_my_mmaudio
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
dtype = torch.bfloat16 if device.type == "cuda" else torch.float32
mma_cfg: ModelConfig = all_model_cfg["large_44k_v2"]
mma_cfg.download_if_needed()
setup_eval_logging()
net: MMAudio = get_my_mmaudio(mma_cfg.model_name).to(device, dtype).eval()
net.load_weights(torch.load(
mma_cfg.model_path, map_location=device, weights_only=True
))
feature_utils = FeaturesUtils(
tod_vae_ckpt=mma_cfg.vae_path,
synchformer_ckpt=mma_cfg.synchformer_ckpt,
enable_conditions=True,
mode=mma_cfg.mode,
bigvgan_vocoder_ckpt=mma_cfg.bigvgan_16k_path,
need_vae_encoder=False
).to(device, dtype).eval()
seq_cfg: SequenceConfig = mma_cfg.seq_cfg
@torch.inference_mode()
def text_to_audio_fn(prompt, neg_prompt, seed, num_steps, guidance, duration):
rng = torch.Generator(device=device)
if seed >= 0:
rng.manual_seed(seed)
fm = FlowMatching(min_sigma=0, inference_mode="euler", num_steps=num_steps)
seq_cfg.duration = duration
net.update_seq_lengths(
seq_cfg.latent_seq_len,
seq_cfg.clip_seq_len,
seq_cfg.sync_seq_len
)
audios = generate(
None, None, [prompt],
negative_text=[neg_prompt],
feature_utils=feature_utils,
net=net, fm=fm, rng=rng, cfg_strength=guidance
)
audio = audios.float().cpu()[0]
out = tempfile.NamedTemporaryFile(delete=False, suffix=".flac").name
import torchaudio
torchaudio.save(out, audio, seq_cfg.sampling_rate)
return out
@torch.inference_mode()
def video_to_audio_fn(video, prompt, neg_prompt, seed, num_steps, guidance, duration):
from mmaudio.eval_utils import load_video, make_video
from mmaudio.model.flow_matching import FlowMatching
info = load_video(video, duration)
clip = info.clip_frames.unsqueeze(0)
sync = info.sync_frames.unsqueeze(0)
rng = torch.Generator(device=device)
if seed >= 0:
rng.manual_seed(seed)
fm = FlowMatching(min_sigma=0, inference_mode="euler", num_steps=num_steps)
seq_cfg.duration = info.duration_sec
net.update_seq_lengths(
seq_cfg.latent_seq_len,
seq_cfg.clip_seq_len,
seq_cfg.sync_seq_len
)
audios = generate(
clip, sync, [prompt],
negative_text=[neg_prompt],
feature_utils=feature_utils,
net=net, fm=fm, rng=rng, cfg_strength=guidance
)
audio = audios.float().cpu()[0]
out_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name
make_video(info, out_video, audio, sampling_rate=seq_cfg.sampling_rate)
return out_video
# ─── 2. LATENTSYNC setup ─────────────────────────────────────────────
# 2.1 Download checkpoints
REPO_ID = "LTTEAM/Nhep_Mieng"
ckpt_dir = os.path.join(BASE_DIR, "checkpoints")
os.makedirs(ckpt_dir, exist_ok=True)
snapshot_download(repo_id=REPO_ID, local_dir=ckpt_dir)
# 2.2 Load U-Net config
cfg_path = os.path.join(BASE_DIR, "LatentSync", "configs", "unet", "second_stage.yaml")
conf = OmegaConf.load(cfg_path)
# 2.3 Load scheduler config locally + filter invalid args
sched_path = os.path.join(BASE_DIR, "LatentSync", "configs", "scheduler_config.json")
with open(sched_path, "r") as f:
sched_cfg = json.load(f)
valid_args = inspect.signature(DDIMScheduler.__init__).parameters.keys()
init_cfg = {k: v for k, v in sched_cfg.items() if k in valid_args}
scheduler = DDIMScheduler(**init_cfg)
# 2.4 Load VAE and fix missing shift_factor
vae = AutoencoderKL.from_pretrained(
"stabilityai/sd-vae-ft-mse",
torch_dtype=torch.float16 if device.type == "cuda" else torch.float32
)
if not hasattr(vae.config, "shift_factor") or vae.config.shift_factor is None:
vae.config.shift_factor = 0.0
# 2.5 Whisper audio encoder
from latentsync.whisper.audio2feature import Audio2Feature
dim = conf.model.cross_attention_dim
wh = "small.pt" if dim == 768 else "tiny.pt"
audio_encoder = Audio2Feature(
model_path=os.path.join(ckpt_dir, "whisper", wh),
device=device,
num_frames=conf.data.num_frames
)
# 2.6 Load UNet3DConditionModel
from latentsync.models.unet import UNet3DConditionModel
unet, _ = UNet3DConditionModel.from_pretrained(
OmegaConf.to_container(conf.model),
os.path.join(ckpt_dir, "latentsync_unet.pt"),
device=device
)
unet = unet.to(torch.float16) if device.type == "cuda" else unet.to(torch.float32)
# 2.7 Build LipsyncPipeline
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline
pipe_sync = LipsyncPipeline(
vae=vae,
audio_encoder=audio_encoder,
unet=unet,
scheduler=scheduler
).to(device)
def lipsync_fn(video_path, audio_path, seed, num_frames, inference_steps):
from accelerate.utils import set_seed
if seed >= 0:
set_seed(seed)
out_id = uuid.uuid4().hex
result = f"lipsync_{out_id}.mp4"
try:
pipe_sync(
video_path=video_path,
audio_path=audio_path,
video_out_path=result,
video_mask_path=result.replace(".mp4","_mask.mp4"),
num_frames=num_frames,
num_inference_steps=inference_steps,
guidance_scale=1.0,
weight_dtype=torch.float16 if device.type=="cuda" else torch.float32,
width=conf.data.resolution,
height=conf.data.resolution
)
except RuntimeError as e:
if "Face not detected" in str(e):
raise ValueError("Không phát hiện khuôn mặt trong video. Vui lòng chọn video có khuôn mặt rõ ràng.")
else:
raise
return result
# ─── 3. Gradio UI ────────────────────────────────────────────────────
text2audio = gr.Interface(
fn=text_to_audio_fn,
inputs=[
gr.Textbox(label="Prompt"),
gr.Textbox(label="Negative Prompt", value="music"),
gr.Number(label="Seed", value=-1, precision=0),
gr.Number(label="Num Steps", value=25, precision=0),
gr.Number(label="Guidance Strength", value=4.5),
gr.Number(label="Duration (s)", value=8),
],
outputs=gr.Audio(label="Generated Audio"),
title="Text → Audio"
)
video2audio = gr.Interface(
fn=video_to_audio_fn,
inputs=[
gr.Video(label="Input Video"),
gr.Textbox(label="Prompt"),
gr.Textbox(label="Negative Prompt", value="music"),
gr.Number(label="Seed", value=-1, precision=0),
gr.Number(label="Num Steps", value=25, precision=0),
gr.Number(label="Guidance Strength", value=4.5),
gr.Number(label="Duration (s)", value=8),
],
outputs=gr.Video(label="Video with Audio"),
title="Video → Audio"
)
audio2video = gr.Interface(
fn=lipsync_fn,
inputs=[
gr.Video(label="Input Video"),
gr.Audio(label="Input Audio", type="filepath"),
gr.Number(label="Seed", value=-1, precision=0),
gr.Number(label="Num Frames", value=conf.data.num_frames, precision=0),
gr.Number(label="Inference Steps", value=conf.run.inference_steps, precision=0),
],
outputs=gr.Video(label="Lip-Synced Video"),
title="Audio → Lip-Sync"
)
def text_video2video_fn(prompt, neg_prompt, seed, num_steps, guidance, duration,
video, num_frames, inference_steps):
audio = text_to_audio_fn(prompt, neg_prompt, seed, num_steps, guidance, duration)
video_out = lipsync_fn(video, audio, seed, num_frames, inference_steps)
return audio, video_out
text_video2video = gr.Interface(
fn=text_video2video_fn,
inputs=[
gr.Textbox(label="Prompt"),
gr.Textbox(label="Negative Prompt", value="music"),
gr.Number(label="Seed", value=-1, precision=0),
gr.Number(label="Num Steps", value=25, precision=0),
gr.Number(label="Guidance Strength", value=4.5),
gr.Number(label="Duration (s)", value=8),
gr.Video(label="Input Video"),
gr.Number(label="Num Frames", value=conf.data.num_frames, precision=0),
gr.Number(label="Inference Steps", value=conf.run.inference_steps, precision=0),
],
outputs=[gr.Audio(label="Generated Audio"), gr.Video(label="Lip-Synced Video")],
title="Text + Video → Lip-Sync"
)
# Tạo tabbed interface và bật queue (mặc định)
demo = gr.TabbedInterface(
[text2audio, video2audio, audio2video, text_video2video],
["Text→Audio","Video→Audio","Audio→LipSync","Text+Video→LipSync"]
).queue()
# Launch với share=True
demo.launch(share=True)