|
|
import os |
|
|
import sys |
|
|
import uuid |
|
|
import tempfile |
|
|
import json |
|
|
import inspect |
|
|
import shutil |
|
|
|
|
|
import torch |
|
|
import gradio as gr |
|
|
from huggingface_hub import snapshot_download |
|
|
from omegaconf import OmegaConf |
|
|
from diffusers import AutoencoderKL, DDIMScheduler |
|
|
|
|
|
|
|
|
BASE_DIR = os.path.dirname(__file__) |
|
|
|
|
|
|
|
|
os.chdir(os.path.join(BASE_DIR, "LatentSync")) |
|
|
|
|
|
|
|
|
assets_mask = os.path.join("assets", "mask.png") |
|
|
utils_mask = os.path.join("latentsync", "utils", "mask.png") |
|
|
if os.path.exists(assets_mask) and not os.path.exists(utils_mask): |
|
|
shutil.copy(assets_mask, utils_mask) |
|
|
|
|
|
|
|
|
sys.path.insert(0, os.path.join(BASE_DIR, "Long_Tieng")) |
|
|
sys.path.insert(0, os.path.join(BASE_DIR, "LatentSync")) |
|
|
|
|
|
|
|
|
from mmaudio.eval_utils import ( |
|
|
ModelConfig, all_model_cfg, |
|
|
generate, load_video, make_video, |
|
|
setup_eval_logging |
|
|
) |
|
|
from mmaudio.model.flow_matching import FlowMatching |
|
|
from mmaudio.model.sequence_config import SequenceConfig |
|
|
from mmaudio.model.utils.features_utils import FeaturesUtils |
|
|
from mmaudio.model.networks import MMAudio, get_my_mmaudio |
|
|
|
|
|
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
|
|
dtype = torch.bfloat16 if device.type == "cuda" else torch.float32 |
|
|
|
|
|
mma_cfg: ModelConfig = all_model_cfg["large_44k_v2"] |
|
|
mma_cfg.download_if_needed() |
|
|
setup_eval_logging() |
|
|
net: MMAudio = get_my_mmaudio(mma_cfg.model_name).to(device, dtype).eval() |
|
|
net.load_weights(torch.load( |
|
|
mma_cfg.model_path, map_location=device, weights_only=True |
|
|
)) |
|
|
feature_utils = FeaturesUtils( |
|
|
tod_vae_ckpt=mma_cfg.vae_path, |
|
|
synchformer_ckpt=mma_cfg.synchformer_ckpt, |
|
|
enable_conditions=True, |
|
|
mode=mma_cfg.mode, |
|
|
bigvgan_vocoder_ckpt=mma_cfg.bigvgan_16k_path, |
|
|
need_vae_encoder=False |
|
|
).to(device, dtype).eval() |
|
|
seq_cfg: SequenceConfig = mma_cfg.seq_cfg |
|
|
|
|
|
@torch.inference_mode() |
|
|
def text_to_audio_fn(prompt, neg_prompt, seed, num_steps, guidance, duration): |
|
|
rng = torch.Generator(device=device) |
|
|
if seed >= 0: |
|
|
rng.manual_seed(seed) |
|
|
fm = FlowMatching(min_sigma=0, inference_mode="euler", num_steps=num_steps) |
|
|
seq_cfg.duration = duration |
|
|
net.update_seq_lengths( |
|
|
seq_cfg.latent_seq_len, |
|
|
seq_cfg.clip_seq_len, |
|
|
seq_cfg.sync_seq_len |
|
|
) |
|
|
audios = generate( |
|
|
None, None, [prompt], |
|
|
negative_text=[neg_prompt], |
|
|
feature_utils=feature_utils, |
|
|
net=net, fm=fm, rng=rng, cfg_strength=guidance |
|
|
) |
|
|
audio = audios.float().cpu()[0] |
|
|
out = tempfile.NamedTemporaryFile(delete=False, suffix=".flac").name |
|
|
import torchaudio |
|
|
torchaudio.save(out, audio, seq_cfg.sampling_rate) |
|
|
return out |
|
|
|
|
|
@torch.inference_mode() |
|
|
def video_to_audio_fn(video, prompt, neg_prompt, seed, num_steps, guidance, duration): |
|
|
from mmaudio.eval_utils import load_video, make_video |
|
|
from mmaudio.model.flow_matching import FlowMatching |
|
|
|
|
|
info = load_video(video, duration) |
|
|
clip = info.clip_frames.unsqueeze(0) |
|
|
sync = info.sync_frames.unsqueeze(0) |
|
|
|
|
|
rng = torch.Generator(device=device) |
|
|
if seed >= 0: |
|
|
rng.manual_seed(seed) |
|
|
fm = FlowMatching(min_sigma=0, inference_mode="euler", num_steps=num_steps) |
|
|
|
|
|
seq_cfg.duration = info.duration_sec |
|
|
net.update_seq_lengths( |
|
|
seq_cfg.latent_seq_len, |
|
|
seq_cfg.clip_seq_len, |
|
|
seq_cfg.sync_seq_len |
|
|
) |
|
|
audios = generate( |
|
|
clip, sync, [prompt], |
|
|
negative_text=[neg_prompt], |
|
|
feature_utils=feature_utils, |
|
|
net=net, fm=fm, rng=rng, cfg_strength=guidance |
|
|
) |
|
|
audio = audios.float().cpu()[0] |
|
|
out_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name |
|
|
make_video(info, out_video, audio, sampling_rate=seq_cfg.sampling_rate) |
|
|
return out_video |
|
|
|
|
|
|
|
|
|
|
|
REPO_ID = "LTTEAM/Nhep_Mieng" |
|
|
ckpt_dir = os.path.join(BASE_DIR, "checkpoints") |
|
|
os.makedirs(ckpt_dir, exist_ok=True) |
|
|
snapshot_download(repo_id=REPO_ID, local_dir=ckpt_dir) |
|
|
|
|
|
|
|
|
cfg_path = os.path.join(BASE_DIR, "LatentSync", "configs", "unet", "second_stage.yaml") |
|
|
conf = OmegaConf.load(cfg_path) |
|
|
|
|
|
|
|
|
sched_path = os.path.join(BASE_DIR, "LatentSync", "configs", "scheduler_config.json") |
|
|
with open(sched_path, "r") as f: |
|
|
sched_cfg = json.load(f) |
|
|
valid_args = inspect.signature(DDIMScheduler.__init__).parameters.keys() |
|
|
init_cfg = {k: v for k, v in sched_cfg.items() if k in valid_args} |
|
|
scheduler = DDIMScheduler(**init_cfg) |
|
|
|
|
|
|
|
|
vae = AutoencoderKL.from_pretrained( |
|
|
"stabilityai/sd-vae-ft-mse", |
|
|
torch_dtype=torch.float16 if device.type == "cuda" else torch.float32 |
|
|
) |
|
|
if not hasattr(vae.config, "shift_factor") or vae.config.shift_factor is None: |
|
|
vae.config.shift_factor = 0.0 |
|
|
|
|
|
|
|
|
from latentsync.whisper.audio2feature import Audio2Feature |
|
|
dim = conf.model.cross_attention_dim |
|
|
wh = "small.pt" if dim == 768 else "tiny.pt" |
|
|
audio_encoder = Audio2Feature( |
|
|
model_path=os.path.join(ckpt_dir, "whisper", wh), |
|
|
device=device, |
|
|
num_frames=conf.data.num_frames |
|
|
) |
|
|
|
|
|
|
|
|
from latentsync.models.unet import UNet3DConditionModel |
|
|
unet, _ = UNet3DConditionModel.from_pretrained( |
|
|
OmegaConf.to_container(conf.model), |
|
|
os.path.join(ckpt_dir, "latentsync_unet.pt"), |
|
|
device=device |
|
|
) |
|
|
unet = unet.to(torch.float16) if device.type == "cuda" else unet.to(torch.float32) |
|
|
|
|
|
|
|
|
from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline |
|
|
pipe_sync = LipsyncPipeline( |
|
|
vae=vae, |
|
|
audio_encoder=audio_encoder, |
|
|
unet=unet, |
|
|
scheduler=scheduler |
|
|
).to(device) |
|
|
|
|
|
def lipsync_fn(video_path, audio_path, seed, num_frames, inference_steps): |
|
|
from accelerate.utils import set_seed |
|
|
if seed >= 0: |
|
|
set_seed(seed) |
|
|
|
|
|
out_id = uuid.uuid4().hex |
|
|
result = f"lipsync_{out_id}.mp4" |
|
|
try: |
|
|
pipe_sync( |
|
|
video_path=video_path, |
|
|
audio_path=audio_path, |
|
|
video_out_path=result, |
|
|
video_mask_path=result.replace(".mp4","_mask.mp4"), |
|
|
num_frames=num_frames, |
|
|
num_inference_steps=inference_steps, |
|
|
guidance_scale=1.0, |
|
|
weight_dtype=torch.float16 if device.type=="cuda" else torch.float32, |
|
|
width=conf.data.resolution, |
|
|
height=conf.data.resolution |
|
|
) |
|
|
except RuntimeError as e: |
|
|
if "Face not detected" in str(e): |
|
|
raise ValueError("Không phát hiện khuôn mặt trong video. Vui lòng chọn video có khuôn mặt rõ ràng.") |
|
|
else: |
|
|
raise |
|
|
return result |
|
|
|
|
|
|
|
|
text2audio = gr.Interface( |
|
|
fn=text_to_audio_fn, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Prompt"), |
|
|
gr.Textbox(label="Negative Prompt", value="music"), |
|
|
gr.Number(label="Seed", value=-1, precision=0), |
|
|
gr.Number(label="Num Steps", value=25, precision=0), |
|
|
gr.Number(label="Guidance Strength", value=4.5), |
|
|
gr.Number(label="Duration (s)", value=8), |
|
|
], |
|
|
outputs=gr.Audio(label="Generated Audio"), |
|
|
title="Text → Audio" |
|
|
) |
|
|
|
|
|
video2audio = gr.Interface( |
|
|
fn=video_to_audio_fn, |
|
|
inputs=[ |
|
|
gr.Video(label="Input Video"), |
|
|
gr.Textbox(label="Prompt"), |
|
|
gr.Textbox(label="Negative Prompt", value="music"), |
|
|
gr.Number(label="Seed", value=-1, precision=0), |
|
|
gr.Number(label="Num Steps", value=25, precision=0), |
|
|
gr.Number(label="Guidance Strength", value=4.5), |
|
|
gr.Number(label="Duration (s)", value=8), |
|
|
], |
|
|
outputs=gr.Video(label="Video with Audio"), |
|
|
title="Video → Audio" |
|
|
) |
|
|
|
|
|
audio2video = gr.Interface( |
|
|
fn=lipsync_fn, |
|
|
inputs=[ |
|
|
gr.Video(label="Input Video"), |
|
|
gr.Audio(label="Input Audio", type="filepath"), |
|
|
gr.Number(label="Seed", value=-1, precision=0), |
|
|
gr.Number(label="Num Frames", value=conf.data.num_frames, precision=0), |
|
|
gr.Number(label="Inference Steps", value=conf.run.inference_steps, precision=0), |
|
|
], |
|
|
outputs=gr.Video(label="Lip-Synced Video"), |
|
|
title="Audio → Lip-Sync" |
|
|
) |
|
|
|
|
|
def text_video2video_fn(prompt, neg_prompt, seed, num_steps, guidance, duration, |
|
|
video, num_frames, inference_steps): |
|
|
audio = text_to_audio_fn(prompt, neg_prompt, seed, num_steps, guidance, duration) |
|
|
video_out = lipsync_fn(video, audio, seed, num_frames, inference_steps) |
|
|
return audio, video_out |
|
|
|
|
|
text_video2video = gr.Interface( |
|
|
fn=text_video2video_fn, |
|
|
inputs=[ |
|
|
gr.Textbox(label="Prompt"), |
|
|
gr.Textbox(label="Negative Prompt", value="music"), |
|
|
gr.Number(label="Seed", value=-1, precision=0), |
|
|
gr.Number(label="Num Steps", value=25, precision=0), |
|
|
gr.Number(label="Guidance Strength", value=4.5), |
|
|
gr.Number(label="Duration (s)", value=8), |
|
|
gr.Video(label="Input Video"), |
|
|
gr.Number(label="Num Frames", value=conf.data.num_frames, precision=0), |
|
|
gr.Number(label="Inference Steps", value=conf.run.inference_steps, precision=0), |
|
|
], |
|
|
outputs=[gr.Audio(label="Generated Audio"), gr.Video(label="Lip-Synced Video")], |
|
|
title="Text + Video → Lip-Sync" |
|
|
) |
|
|
|
|
|
|
|
|
demo = gr.TabbedInterface( |
|
|
[text2audio, video2audio, audio2video, text_video2video], |
|
|
["Text→Audio","Video→Audio","Audio→LipSync","Text+Video→LipSync"] |
|
|
).queue() |
|
|
|
|
|
|
|
|
demo.launch(share=True) |
|
|
|