import os import sys import uuid import tempfile import json import inspect import shutil import torch import gradio as gr from huggingface_hub import snapshot_download from omegaconf import OmegaConf from diffusers import AutoencoderKL, DDIMScheduler # ─── 0. Chuyển CWD & thiết lập PYTHONPATH ─────────────────────────── BASE_DIR = os.path.dirname(__file__) # Chuyển working dir vào LatentSync để các đường dẫn relative bên trong đúng os.chdir(os.path.join(BASE_DIR, "LatentSync")) # Copy mask.png từ assets → latentsync/utils nếu cần assets_mask = os.path.join("assets", "mask.png") utils_mask = os.path.join("latentsync", "utils", "mask.png") if os.path.exists(assets_mask) and not os.path.exists(utils_mask): shutil.copy(assets_mask, utils_mask) # Thêm Long_Tieng và LatentSync vào sys.path để import modules sys.path.insert(0, os.path.join(BASE_DIR, "Long_Tieng")) sys.path.insert(0, os.path.join(BASE_DIR, "LatentSync")) # ─── 1. MMAUDIO (Long_Tieng) setup ───────────────────────────────── from mmaudio.eval_utils import ( ModelConfig, all_model_cfg, generate, load_video, make_video, setup_eval_logging ) from mmaudio.model.flow_matching import FlowMatching from mmaudio.model.sequence_config import SequenceConfig from mmaudio.model.utils.features_utils import FeaturesUtils from mmaudio.model.networks import MMAudio, get_my_mmaudio device = torch.device("cuda" if torch.cuda.is_available() else "cpu") dtype = torch.bfloat16 if device.type == "cuda" else torch.float32 mma_cfg: ModelConfig = all_model_cfg["large_44k_v2"] mma_cfg.download_if_needed() setup_eval_logging() net: MMAudio = get_my_mmaudio(mma_cfg.model_name).to(device, dtype).eval() net.load_weights(torch.load( mma_cfg.model_path, map_location=device, weights_only=True )) feature_utils = FeaturesUtils( tod_vae_ckpt=mma_cfg.vae_path, synchformer_ckpt=mma_cfg.synchformer_ckpt, enable_conditions=True, mode=mma_cfg.mode, bigvgan_vocoder_ckpt=mma_cfg.bigvgan_16k_path, need_vae_encoder=False ).to(device, dtype).eval() seq_cfg: SequenceConfig = mma_cfg.seq_cfg @torch.inference_mode() def text_to_audio_fn(prompt, neg_prompt, seed, num_steps, guidance, duration): rng = torch.Generator(device=device) if seed >= 0: rng.manual_seed(seed) fm = FlowMatching(min_sigma=0, inference_mode="euler", num_steps=num_steps) seq_cfg.duration = duration net.update_seq_lengths( seq_cfg.latent_seq_len, seq_cfg.clip_seq_len, seq_cfg.sync_seq_len ) audios = generate( None, None, [prompt], negative_text=[neg_prompt], feature_utils=feature_utils, net=net, fm=fm, rng=rng, cfg_strength=guidance ) audio = audios.float().cpu()[0] out = tempfile.NamedTemporaryFile(delete=False, suffix=".flac").name import torchaudio torchaudio.save(out, audio, seq_cfg.sampling_rate) return out @torch.inference_mode() def video_to_audio_fn(video, prompt, neg_prompt, seed, num_steps, guidance, duration): from mmaudio.eval_utils import load_video, make_video from mmaudio.model.flow_matching import FlowMatching info = load_video(video, duration) clip = info.clip_frames.unsqueeze(0) sync = info.sync_frames.unsqueeze(0) rng = torch.Generator(device=device) if seed >= 0: rng.manual_seed(seed) fm = FlowMatching(min_sigma=0, inference_mode="euler", num_steps=num_steps) seq_cfg.duration = info.duration_sec net.update_seq_lengths( seq_cfg.latent_seq_len, seq_cfg.clip_seq_len, seq_cfg.sync_seq_len ) audios = generate( clip, sync, [prompt], negative_text=[neg_prompt], feature_utils=feature_utils, net=net, fm=fm, rng=rng, cfg_strength=guidance ) audio = audios.float().cpu()[0] out_video = tempfile.NamedTemporaryFile(delete=False, suffix=".mp4").name make_video(info, out_video, audio, sampling_rate=seq_cfg.sampling_rate) return out_video # ─── 2. LATENTSYNC setup ───────────────────────────────────────────── # 2.1 Download checkpoints REPO_ID = "LTTEAM/Nhep_Mieng" ckpt_dir = os.path.join(BASE_DIR, "checkpoints") os.makedirs(ckpt_dir, exist_ok=True) snapshot_download(repo_id=REPO_ID, local_dir=ckpt_dir) # 2.2 Load U-Net config cfg_path = os.path.join(BASE_DIR, "LatentSync", "configs", "unet", "second_stage.yaml") conf = OmegaConf.load(cfg_path) # 2.3 Load scheduler config locally + filter invalid args sched_path = os.path.join(BASE_DIR, "LatentSync", "configs", "scheduler_config.json") with open(sched_path, "r") as f: sched_cfg = json.load(f) valid_args = inspect.signature(DDIMScheduler.__init__).parameters.keys() init_cfg = {k: v for k, v in sched_cfg.items() if k in valid_args} scheduler = DDIMScheduler(**init_cfg) # 2.4 Load VAE and fix missing shift_factor vae = AutoencoderKL.from_pretrained( "stabilityai/sd-vae-ft-mse", torch_dtype=torch.float16 if device.type == "cuda" else torch.float32 ) if not hasattr(vae.config, "shift_factor") or vae.config.shift_factor is None: vae.config.shift_factor = 0.0 # 2.5 Whisper audio encoder from latentsync.whisper.audio2feature import Audio2Feature dim = conf.model.cross_attention_dim wh = "small.pt" if dim == 768 else "tiny.pt" audio_encoder = Audio2Feature( model_path=os.path.join(ckpt_dir, "whisper", wh), device=device, num_frames=conf.data.num_frames ) # 2.6 Load UNet3DConditionModel from latentsync.models.unet import UNet3DConditionModel unet, _ = UNet3DConditionModel.from_pretrained( OmegaConf.to_container(conf.model), os.path.join(ckpt_dir, "latentsync_unet.pt"), device=device ) unet = unet.to(torch.float16) if device.type == "cuda" else unet.to(torch.float32) # 2.7 Build LipsyncPipeline from latentsync.pipelines.lipsync_pipeline import LipsyncPipeline pipe_sync = LipsyncPipeline( vae=vae, audio_encoder=audio_encoder, unet=unet, scheduler=scheduler ).to(device) def lipsync_fn(video_path, audio_path, seed, num_frames, inference_steps): from accelerate.utils import set_seed if seed >= 0: set_seed(seed) out_id = uuid.uuid4().hex result = f"lipsync_{out_id}.mp4" try: pipe_sync( video_path=video_path, audio_path=audio_path, video_out_path=result, video_mask_path=result.replace(".mp4","_mask.mp4"), num_frames=num_frames, num_inference_steps=inference_steps, guidance_scale=1.0, weight_dtype=torch.float16 if device.type=="cuda" else torch.float32, width=conf.data.resolution, height=conf.data.resolution ) except RuntimeError as e: if "Face not detected" in str(e): raise ValueError("Không phát hiện khuôn mặt trong video. Vui lòng chọn video có khuôn mặt rõ ràng.") else: raise return result # ─── 3. Gradio UI ──────────────────────────────────────────────────── text2audio = gr.Interface( fn=text_to_audio_fn, inputs=[ gr.Textbox(label="Prompt"), gr.Textbox(label="Negative Prompt", value="music"), gr.Number(label="Seed", value=-1, precision=0), gr.Number(label="Num Steps", value=25, precision=0), gr.Number(label="Guidance Strength", value=4.5), gr.Number(label="Duration (s)", value=8), ], outputs=gr.Audio(label="Generated Audio"), title="Text → Audio" ) video2audio = gr.Interface( fn=video_to_audio_fn, inputs=[ gr.Video(label="Input Video"), gr.Textbox(label="Prompt"), gr.Textbox(label="Negative Prompt", value="music"), gr.Number(label="Seed", value=-1, precision=0), gr.Number(label="Num Steps", value=25, precision=0), gr.Number(label="Guidance Strength", value=4.5), gr.Number(label="Duration (s)", value=8), ], outputs=gr.Video(label="Video with Audio"), title="Video → Audio" ) audio2video = gr.Interface( fn=lipsync_fn, inputs=[ gr.Video(label="Input Video"), gr.Audio(label="Input Audio", type="filepath"), gr.Number(label="Seed", value=-1, precision=0), gr.Number(label="Num Frames", value=conf.data.num_frames, precision=0), gr.Number(label="Inference Steps", value=conf.run.inference_steps, precision=0), ], outputs=gr.Video(label="Lip-Synced Video"), title="Audio → Lip-Sync" ) def text_video2video_fn(prompt, neg_prompt, seed, num_steps, guidance, duration, video, num_frames, inference_steps): audio = text_to_audio_fn(prompt, neg_prompt, seed, num_steps, guidance, duration) video_out = lipsync_fn(video, audio, seed, num_frames, inference_steps) return audio, video_out text_video2video = gr.Interface( fn=text_video2video_fn, inputs=[ gr.Textbox(label="Prompt"), gr.Textbox(label="Negative Prompt", value="music"), gr.Number(label="Seed", value=-1, precision=0), gr.Number(label="Num Steps", value=25, precision=0), gr.Number(label="Guidance Strength", value=4.5), gr.Number(label="Duration (s)", value=8), gr.Video(label="Input Video"), gr.Number(label="Num Frames", value=conf.data.num_frames, precision=0), gr.Number(label="Inference Steps", value=conf.run.inference_steps, precision=0), ], outputs=[gr.Audio(label="Generated Audio"), gr.Video(label="Lip-Synced Video")], title="Text + Video → Lip-Sync" ) # Tạo tabbed interface và bật queue (mặc định) demo = gr.TabbedInterface( [text2audio, video2audio, audio2video, text_video2video], ["Text→Audio","Video→Audio","Audio→LipSync","Text+Video→LipSync"] ).queue() # Launch với share=True demo.launch(share=True)