# V2A (Video-to-Audio) 백엔드 스크립트 (VRAM 최적화) import sys import os import time import glob import gc import torch import subprocess import random import argparse import shutil from typing import Sequence, Mapping, Any, Union # --- 0. 기본 헬퍼 함수 (I2V 스크립트와 동일) --- def to_bool(s: str) -> bool: return s.lower() in ['true', '1', 't', 'y', 'yes', 'on'] def clear_memory(): """VRAM 및 RAM 캐시를 정리합니다.""" if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() gc.collect() COMFYUI_BASE_PATH = '/content/ComfyUI' def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: """ ComfyUI 노드 출력에서 값을 안전하게 가져옵니다. """ try: return obj[index] except (KeyError, TypeError): if isinstance(obj, dict) and "result" in obj: return obj["result"][index] raise def add_comfyui_directory_to_sys_path() -> None: """ ComfyUI 경로를 sys.path에 추가합니다. """ if os.path.isdir(COMFYUI_BASE_PATH) and COMFYUI_BASE_PATH not in sys.path: sys.path.append(COMFYUI_BASE_PATH) print(f"'{COMFYUI_BASE_PATH}' added to sys.path") def import_custom_nodes() -> None: """ ComfyUI 커스텀 노드를 로드하기 위해 비동기 환경을 초기화합니다. (I2V 스크립트의 import_custom_nodes와 동일) """ try: import nest_asyncio nest_asyncio.apply() except ImportError: print("nest_asyncio not found, installing...") try: subprocess.run([sys.executable, "-m", "pip", "install", "-q", "nest_asyncio"], check=True) import nest_asyncio nest_asyncio.apply() print("nest_asyncio installed and applied.") except Exception as e: print(f"Failed to install or apply nest_asyncio: {e}") import asyncio, execution, server from nodes import init_extra_nodes try: loop = asyncio.get_event_loop() if loop.is_closed(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) server_instance = server.PromptServer(loop) execution.PromptQueue(server_instance) if not loop.is_running(): try: loop.run_until_complete(init_extra_nodes()) except RuntimeError as e: print(f"Note: Could not run init_extra_nodes synchronously: {e}") try: asyncio.ensure_future(init_extra_nodes()) except Exception as fut_e: print(f"Error trying async init_extra_nodes: {fut_e}") else: try: asyncio.ensure_future(init_extra_nodes()) except Exception as fut_e: print(f"Error trying async init_extra_nodes on running loop: {fut_e}") # --- 1. Gradio UI에서 모든 인수를 받기 위한 ArgParser --- def parse_args(): parser = argparse.ArgumentParser(description="ComfyUI V2A (Video-to-Audio) Generation Script") # 1. 일반 설정 (Input/Prompt) parser.add_argument("--input_video_path", type=str, required=True, help="오디오를 생성할 입력 비디오 파일 경로") parser.add_argument("--prompt", type=str, default="") parser.add_argument("--negative_prompt", type=str, default="") # 2. 고급 설정 - 샘플링 (MMAudio Sampler) parser.add_argument("--steps", type=int, default=25) parser.add_argument("--cfg", type=float, default=4.5) parser.add_argument("--seed", type=int, default=-1) parser.add_argument("--mask_away_clip", type=str, default="off") # bool parser.add_argument("--force_offload", type=str, default="off") # bool # 3. 고급 설정 - 모델 (Loaders) parser.add_argument("--mmaudio_model", type=str, default="mmaudio_large_44k_v2_fp16.safetensors") parser.add_argument("--base_precision", type=str, default="fp16") parser.add_argument("--vae_model", type=str, default="mmaudio_vae_44k_fp16.safetensors") parser.add_argument("--synchformer_model", type=str, default="mmaudio_synchformer_fp16.safetensors") parser.add_argument("--clip_model", type=str, default="apple_DFN5B-CLIP-ViT-H-14-384_fp16.safetensors") parser.add_argument("--mode", type=str, default="44k") parser.add_argument("--precision", type=str, default="fp16", help="Feature Utils Precision") # 4. 고급 설정 - 비디오 로딩 (VHS LoadVideo) parser.add_argument("--force_rate", type=int, default=0) parser.add_argument("--custom_width", type=int, default=0) parser.add_argument("--custom_height", type=int, default=0) parser.add_argument("--frame_load_cap", type=int, default=0) parser.add_argument("--skip_first_frames", type=int, default=0) parser.add_argument("--select_every_nth", type=int, default=1) parser.add_argument("--load_format", type=str, default="AnimateDiff") # 5. 고급 설정 - 비디오 결합 (VHS VideoCombine) parser.add_argument("--loop_count", type=int, default=0) parser.add_argument("--filename_prefix", type=str, default="MMaudio") parser.add_argument("--combine_format", type=str, default="video/h264-mp4") parser.add_argument("--pix_fmt", type=str, default="yuv420p") parser.add_argument("--crf", type=int, default=19) parser.add_argument("--save_metadata", type=str, default="on") # bool parser.add_argument("--trim_to_audio", type=str, default="off") # bool parser.add_argument("--pingpong", type=str, default="off") # bool return parser.parse_args() # --- 2. VRAM 최적화된 메인 실행 함수 --- # --- 2. VRAM 최적화된 메인 실행 함수 --- def main(): args = parse_args() print("🚀 V2A 오디오 생성을 시작합니다 (VRAM Optimized)...") # --- 환경 설정 --- add_comfyui_directory_to_sys_path() try: from utils.extra_config import load_extra_path_config except ImportError: print("⚠️ ComfyUI의 extra_model_paths.yaml 로딩 실패 (무시하고 진행)") load_extra_path_config = lambda x: None extra_model_paths_file = os.path.join(COMFYUI_BASE_PATH, "extra_model_paths.yaml") if os.path.exists(extra_model_paths_file): load_extra_path_config(extra_model_paths_file) print("ComfyUI 커스텀 노드 초기화 중...") import_custom_nodes() from nodes import NODE_CLASS_MAPPINGS print("커스텀 노드 초기화 완료.") # --- 노드 클래스 인스턴스화 --- mmaudiomodelloader = NODE_CLASS_MAPPINGS["MMAudioModelLoader"]() vhs_loadvideo = NODE_CLASS_MAPPINGS["VHS_LoadVideo"]() mmaudiofeatureutilsloader = NODE_CLASS_MAPPINGS["MMAudioFeatureUtilsLoader"]() vhs_videoinfo = NODE_CLASS_MAPPINGS["VHS_VideoInfo"]() mmaudiosampler = NODE_CLASS_MAPPINGS["MMAudioSampler"]() vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() # --- 시드 설정 --- if args.seed == -1: final_seed = random.randint(1, 2**64) print(f" - 랜덤 시드 생성: {final_seed}") else: final_seed = args.seed print(f" - 고정 시드 사용: {final_seed}") # --- VRAM 최적화 파이프라인 --- with torch.inference_mode(): # ✨ [수정됨] 1단계: 오디오 생성을 위한 비디오 로드 (25 FPS 강제) print(f"\n1단계: 오디오 생성을 위한 비디오 로드 (25 FPS 강제)... ({args.input_video_path})") vhs_loadvideo_91_audio = vhs_loadvideo.load_video( video=args.input_video_path, force_rate=25, # ✨ 오디오용 25 FPS 강제 custom_width=args.custom_width, custom_height=args.custom_height, frame_load_cap=args.frame_load_cap, skip_first_frames=args.skip_first_frames, select_every_nth=args.select_every_nth, format=args.load_format, unique_id=random.randint(1, 2**64) ) images_for_audio = get_value_at_index(vhs_loadvideo_91_audio, 0) # ✨ 오디오용 이미지 # 원본 비디오 정보 추출 (재생 시간, 원본 FPS 등) vhs_videoinfo_105 = vhs_videoinfo.get_video_info( video_info=get_value_at_index(vhs_loadvideo_91_audio, 3) ) del vhs_loadvideo_91_audio # 텐서 로더는 즉시 삭제 duration = get_value_at_index(vhs_videoinfo_105, 7) original_frame_rate = get_value_at_index(vhs_videoinfo_105, 0) # ✨ 최종 비디오에 사용할 원본 FPS print(f" - 비디오 정보: {duration}초, 원본 {original_frame_rate} FPS") clear_memory() # 2단계: 오디오 모델 로드 print(f"\n2단계: 오디오 모델 로딩 중...") print(f" - MMAudio 모델: {args.mmaudio_model} ({args.base_precision})") mmaudiomodelloader_85 = mmaudiomodelloader.loadmodel( mmaudio_model=args.mmaudio_model, base_precision=args.base_precision ) mmaudio_model = get_value_at_index(mmaudiomodelloader_85, 0) print(f" - 유틸리티 모델: (Mode: {args.mode}, Precision: {args.precision})") mmaudiofeatureutilsloader_102 = mmaudiofeatureutilsloader.loadmodel( vae_model=args.vae_model, synchformer_model=args.synchformer_model, clip_model=args.clip_model, mode=args.mode, precision=args.precision ) feature_utils = get_value_at_index(mmaudiofeatureutilsloader_102, 0) # 3단계: 오디오 생성 (샘플링) print(f"\n3단계: 오디오 생성 중... (Steps: {args.steps}, CFG: {args.cfg})") mmaudiosampler_92 = mmaudiosampler.sample( duration=duration, steps=args.steps, cfg=args.cfg, seed=final_seed, prompt=args.prompt, negative_prompt=args.negative_prompt, mask_away_clip=to_bool(args.mask_away_clip), force_offload=to_bool(args.force_offload), mmaudio_model=mmaudio_model, feature_utils=feature_utils, images=images_for_audio # ✨ 오디오용 이미지 텐서 사용 ) generated_audio = get_value_at_index(mmaudiosampler_92, 0) # ✨ [수정됨] 4단계: 모델 및 오디오용 이미지 메모리 해제 print(f"\n4단계: 모델 및 오디오용 이미지 메모리 해제 중...") del mmaudiomodelloader_85, mmaudio_model, mmaudiofeatureutilsloader_102, feature_utils del images_for_audio # ✨ 오디오용 이미지 텐서 삭제 (VRAM 확보) clear_memory() # ✨ [수정됨] 5단계: 비디오 결합을 위한 원본 비디오 로드 print(f"\n5단계: 비디오 결합을 위한 원본 비디오 로드 (사용자 설정 FPS: {args.force_rate})...") vhs_loadvideo_91_combine = vhs_loadvideo.load_video( video=args.input_video_path, force_rate=args.force_rate, # ✨ 사용자의 원본 FPS 설정 사용 (보통 0) custom_width=args.custom_width, custom_height=args.custom_height, frame_load_cap=args.frame_load_cap, skip_first_frames=args.skip_first_frames, select_every_nth=args.select_every_nth, format=args.load_format, unique_id=random.randint(1, 2**64) # 다른 ID 사용 ) images_for_combine = get_value_at_index(vhs_loadvideo_91_combine, 0) # ✨ 결합용 이미지 del vhs_loadvideo_91_combine clear_memory() # ✨ [수정됨] 6단계: 비디오와 오디오 결합 및 저장 print(f"\n6단계: 비디오 + 오디오 결합 및 저장 중...") timestamp = time.strftime("%Y%m%d-%H%M%S") final_filename_prefix = f"{args.filename_prefix}_{timestamp}" vhs_videocombine_97 = vhs_videocombine.combine_video( frame_rate=original_frame_rate, # ✨ 1단계에서 추출한 "원본" FPS 사용 loop_count=args.loop_count, filename_prefix=final_filename_prefix, format=args.combine_format, pix_fmt=args.pix_fmt, crf=args.crf, save_metadata=to_bool(args.save_metadata), trim_to_audio=to_bool(args.trim_to_audio), pingpong=to_bool(args.pingpong), save_output=True, # <<< 파일 저장을 위해 True로 설정 images=images_for_combine, # ✨ 결합용 이미지 텐서 사용 audio=generated_audio, unique_id=random.randint(1, 2**64) ) # ✨ [추가] 결합 후 즉시 텐서 삭제 del images_for_combine, generated_audio clear_memory() # 7단계: Gradio UI로 반환할 최종 파일 경로 출력 try: # 디버그 결과 기반: 반환값 {'result': ((True, [workflow.png, video.mp4, video-audio.mp4]),)} # 우리가 필요한 경로는 리스트의 3번째 항목(인덱스 2)입니다. file_path_list = vhs_videocombine_97['result'][0][1] final_video_path = file_path_list[2] # 오디오가 포함된 최종 비디오 경로 except Exception as e: print(f"❌ [오류] 최종 파일 경로를 추출하는 데 실패했습니다: {e}") print(f" - 전체 반환값: {vhs_videocombine_97}") final_video_path = None if final_video_path and os.path.exists(final_video_path): print(f"✅ 오디오 생성 및 결합 완료!") print(f"LATEST_VIDEO_PATH:{final_video_path}") # I2V 스크립트와 동일한 출력을 위해 원본 복사본 생성 base, ext = os.path.splitext(final_video_path) original_copy_path = f"{base}_original{ext}" try: shutil.copy2(final_video_path, original_copy_path) print(f"✅ 원본 복사본 생성 완료: {original_copy_path}") print(f"ORIGINAL_COPY_PATH:{original_copy_path}") except Exception as e: print(f"❌ 원본 복사본 생성 실패: {e}") else: print(f"❌ 최종 비디오 파일 경로를 찾을 수 없습니다.") if __name__ == "__main__": main()