# 이 파일은 셀 3에서 서브프로세스로 실행됩니다. import sys import os import time import glob import gc import torch import subprocess import random import argparse from typing import Sequence, Mapping, Any, Union # --- 0. 기본 설정 및 인수 파싱 -- def parse_args(): parser = argparse.ArgumentParser(description="ComfyUI Video Generation Script") parser.add_argument("--positive_prompt", type=str, required=True) parser.add_argument("--negative_prompt", type=str, required=True) parser.add_argument("--width", type=int, required=True) parser.add_argument("--height", type=int, required=True) parser.add_argument("--length", type=int, required=True) parser.add_argument("--upscale_ratio", type=float, required=True) parser.add_argument("--custom_lora_1_name", type=str, default="None") parser.add_argument("--custom_lora_1_strength_model", type=float, default=1.0) parser.add_argument("--custom_lora_1_strength_clip", type=float, default=1.0) parser.add_argument("--custom_lora_2_name", type=str, default="None") parser.add_argument("--custom_lora_2_strength_model", type=float, default=1.0) parser.add_argument("--custom_lora_2_strength_clip", type=float, default=1.0) return parser.parse_args() def clear_memory(): if torch.cuda.is_available(): torch.cuda.empty_cache() torch.cuda.ipc_collect() gc.collect() # --- 1. ComfyUI 관련 함수 정의 --- COMFYUI_BASE_PATH = '/content/ComfyUI' def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any: try: return obj[index] except (KeyError, TypeError): if isinstance(obj, dict) and "result" in obj: return obj["result"][index] raise def add_comfyui_directory_to_sys_path() -> None: if os.path.isdir(COMFYUI_BASE_PATH) and COMFYUI_BASE_PATH not in sys.path: sys.path.append(COMFYUI_BASE_PATH) print(f"'{COMFYUI_BASE_PATH}' added to sys.path") def import_custom_nodes() -> None: try: import nest_asyncio nest_asyncio.apply() except ImportError: subprocess.run([sys.executable, "-m", "pip", "install", "-q", "nest_asyncio"]) import nest_asyncio nest_asyncio.apply() import asyncio import execution from nodes import init_extra_nodes import server loop = asyncio.get_event_loop() or asyncio.new_event_loop() server_instance = server.PromptServer(loop) execution.PromptQueue(server_instance) if not loop.is_running(): loop.run_until_complete(init_extra_nodes()) else: asyncio.ensure_future(init_extra_nodes()) # --- 2. 메인 실행 로직 --- def main(): args = parse_args() print("🚀 동영상 생성을 시작합니다...") print(f"프롬프트: {args.positive_prompt[:50]}...") print(f"크기: {args.width}x{args.height}, 길이: {args.length} 프레임") print(f"최종 업스케일 비율: {args.upscale_ratio}x") if args.custom_lora_1_name != "None": print(f"커스텀 LoRA 1: {args.custom_lora_1_name} (강도: {args.custom_lora_1_strength_model})") if args.custom_lora_2_name != "None": print(f"커스텀 LoRA 2: {args.custom_lora_2_name} (강도: {args.custom_lora_2_strength_model})") subprocess.run(f"rm -rf {COMFYUI_BASE_PATH}/output/up/*", shell=True, check=True) subprocess.run(f"rm -rf {COMFYUI_BASE_PATH}/output/temp/*", shell=True, check=True) add_comfyui_directory_to_sys_path() from utils.extra_config import load_extra_path_config extra_model_paths_file = os.path.join(COMFYUI_BASE_PATH, "extra_model_paths.yaml") if os.path.exists(extra_model_paths_file): load_extra_path_config(extra_model_paths_file) print("Initializing ComfyUI custom nodes...") import_custom_nodes() from nodes import NODE_CLASS_MAPPINGS print("Custom nodes initialized successfully.") with torch.inference_mode(): unetloadergguf = NODE_CLASS_MAPPINGS["UnetLoaderGGUF"]() cliploader = NODE_CLASS_MAPPINGS["CLIPLoader"]() loraloader = NODE_CLASS_MAPPINGS["LoraLoader"]() cliptextencode = NODE_CLASS_MAPPINGS["CLIPTextEncode"]() vaeloader = NODE_CLASS_MAPPINGS["VAELoader"]() loadimage = NODE_CLASS_MAPPINGS["LoadImage"]() upscalemodelloader = NODE_CLASS_MAPPINGS["UpscaleModelLoader"]() mxslider = NODE_CLASS_MAPPINGS["mxSlider"]() vhs_loadimagespath = NODE_CLASS_MAPPINGS["VHS_LoadImagesPath"]() modelsamplingsd3 = NODE_CLASS_MAPPINGS["ModelSamplingSD3"]() wanimagetovideo = NODE_CLASS_MAPPINGS["WanImageToVideo"]() ksampleradvanced = NODE_CLASS_MAPPINGS["KSamplerAdvanced"]() vaedecode = NODE_CLASS_MAPPINGS["VAEDecode"]() rife_vfi = NODE_CLASS_MAPPINGS["RIFE VFI"]() vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]() easy_mathfloat = NODE_CLASS_MAPPINGS["easy mathFloat"]() imageupscalewithmodel = NODE_CLASS_MAPPINGS["ImageUpscaleWithModel"]() imagescaleby = NODE_CLASS_MAPPINGS["ImageScaleBy"]() saveimage = NODE_CLASS_MAPPINGS["SaveImage"]() print("Starting Step 1: Initial Sampling") cliploader_38 = cliploader.load_clip(clip_name="umt5_xxl_fp8_e4m3fn_scaled.safetensors", type="wan", device="default") unetloadergguf_84 = unetloadergguf.load_unet(unet_name="Wan2.2-I2V-A14B-HighNoise-Q4_K_S.gguf") # 1번 샘플러 LoRA 체인 model, clip = loraloader.load_lora(lora_name="lightx2v_T2V_14B_cfg_step_distill_v2_lora_rank32_bf16.safetensors", strength_model=2, strength_clip=2, model=get_value_at_index(unetloadergguf_84, 0), clip=get_value_at_index(cliploader_38, 0)) if args.custom_lora_1_name and args.custom_lora_1_name != "None": print(f"1번 샘플러에 커스텀 LoRA 적용: {args.custom_lora_1_name}") model, clip = loraloader.load_lora(lora_name=args.custom_lora_1_name, strength_model=args.custom_lora_1_strength_model, strength_clip=args.custom_lora_1_strength_clip, model=model, clip=clip) loraloader_78 = loraloader.load_lora(lora_name="FastWan_T2V_14B_480p_lora_rank_128_bf16.safetensors", strength_model=1.5, strength_clip=1.5, model=model, clip=clip) cliptextencode_6 = cliptextencode.encode(text=args.positive_prompt, clip=get_value_at_index(loraloader_78, 1)) cliptextencode_7 = cliptextencode.encode(text=args.negative_prompt, clip=get_value_at_index(loraloader_78, 1)) vaeloader_39 = vaeloader.load_vae(vae_name="wan_2.1_vae.safetensors") loadimage_62 = loadimage.load_image(image="example.png") wanimagetovideo_63 = wanimagetovideo.EXECUTE_NORMALIZED(width=args.width, height=args.height, length=args.length, batch_size=1, positive=get_value_at_index(cliptextencode_6, 0), negative=get_value_at_index(cliptextencode_7, 0), vae=get_value_at_index(vaeloader_39, 0), start_image=get_value_at_index(loadimage_62, 0)) modelsamplingsd3_54 = modelsamplingsd3.patch(shift=8.0, model=get_value_at_index(loraloader_78, 0)) current_seed = int(time.time() * 1000) random.seed(current_seed) ksampleradvanced_74 = ksampleradvanced.sample(add_noise="enable", noise_seed=random.randint(1, 2**64), steps=4, cfg=1, sampler_name="lcm", scheduler="simple", start_at_step=0, end_at_step=2, return_with_leftover_noise="enable", model=get_value_at_index(modelsamplingsd3_54, 0), positive=get_value_at_index(wanimagetovideo_63, 0), negative=get_value_at_index(wanimagetovideo_63, 1), latent_image=get_value_at_index(wanimagetovideo_63, 2)) del unetloadergguf_84, loraloader_78, modelsamplingsd3_54, loadimage_62, model, clip clear_memory() print("Step 1 finished and memory cleared.") print("Starting Step 2: Refinement Sampling") unetloadergguf_85 = unetloadergguf.load_unet(unet_name="Wan2.2-I2V-A14B-LowNoise-Q4_K_S.gguf") # 2번 샘플러 LoRA 체인 model, clip = loraloader.load_lora(lora_name="lightx2v_T2V_14B_cfg_step_distill_v2_lora_rank32_bf16.safetensors", strength_model=2, strength_clip=2, model=get_value_at_index(unetloadergguf_85, 0), clip=get_value_at_index(cliploader_38, 0)) if args.custom_lora_2_name and args.custom_lora_2_name != "None": print(f"2번 샘플러에 커스텀 LoRA 적용: {args.custom_lora_2_name}") model, clip = loraloader.load_lora(lora_name=args.custom_lora_2_name, strength_model=args.custom_lora_2_strength_model, strength_clip=args.custom_lora_2_strength_clip, model=model, clip=clip) loraloader_86 = loraloader.load_lora(lora_name="FastWan_T2V_14B_480p_lora_rank_128_bf16.safetensors", strength_model=0.5, strength_clip=0.5, model=model, clip=clip) modelsamplingsd3_55 = modelsamplingsd3.patch(shift=8, model=get_value_at_index(loraloader_86, 0)) current_seed_2 = int(time.time() * 1000) random.seed(current_seed_2) ksampleradvanced_75 = ksampleradvanced.sample(add_noise="disable", noise_seed=random.randint(1, 2**64), steps=4, cfg=1, sampler_name="lcm", scheduler="simple", start_at_step=2, end_at_step=10000, return_with_leftover_noise="disable", model=get_value_at_index(modelsamplingsd3_55, 0), positive=get_value_at_index(wanimagetovideo_63, 0), negative=get_value_at_index(wanimagetovideo_63, 1), latent_image=get_value_at_index(ksampleradvanced_74, 0)) del unetloadergguf_85, loraloader_86, modelsamplingsd3_55, cliploader_38, cliptextencode_6, cliptextencode_7, ksampleradvanced_74, wanimagetovideo_63, model, clip clear_memory() print("Step 2 finished and memory cleared.") print("Starting Step 3: VAE Decode and Save") vaedecode_8 = vaedecode.decode(samples=get_value_at_index(ksampleradvanced_75, 0), vae=get_value_at_index(vaeloader_39, 0)) saveimage.save_images(filename_prefix="temp/example", images=get_value_at_index(vaedecode_8, 0)) # ✨ [최적화] 사용이 끝난 VAE 관련 변수 즉시 삭제 del ksampleradvanced_75, vaeloader_39, vaedecode_8 clear_memory() print("Step 3 finished and memory cleared.") if args.upscale_ratio > 1: print("Starting Steps 4 & 5: Upscaling") upscalemodelloader_88 = upscalemodelloader.load_model(model_name="2x-AnimeSharpV4_Fast_RCAN_PU.safetensors") # Upscale Part 1 vhs_loadimagespath_96 = vhs_loadimagespath.load_images(directory=f"{COMFYUI_BASE_PATH}/output/temp", image_load_cap=40) if get_value_at_index(vhs_loadimagespath_96, 0) is not None and get_value_at_index(vhs_loadimagespath_96, 0).shape[0] > 0: imageupscalewithmodel_92 = imageupscalewithmodel.upscale(upscale_model=get_value_at_index(upscalemodelloader_88, 0), image=get_value_at_index(vhs_loadimagespath_96, 0)) mxslider_91 = mxslider.main(Xi=args.upscale_ratio, Xf=args.upscale_ratio, isfloatX=1) easy_mathfloat_90 = easy_mathfloat.float_math_operation(a=get_value_at_index(mxslider_91, 0), b=2, operation="divide") imagescaleby_93 = imagescaleby.upscale(upscale_method="nearest-exact", scale_by=get_value_at_index(easy_mathfloat_90, 0), image=get_value_at_index(imageupscalewithmodel_92, 0)) saveimage.save_images(filename_prefix="up/example", images=get_value_at_index(imagescaleby_93, 0)) del vhs_loadimagespath_96, imageupscalewithmodel_92, imagescaleby_93, easy_mathfloat_90, mxslider_91 clear_memory() # Upscale Part 2 vhs_loadimagespath_98 = vhs_loadimagespath.load_images(directory=f"{COMFYUI_BASE_PATH}/output/temp", skip_first_images=40) if get_value_at_index(vhs_loadimagespath_98, 0) is not None and get_value_at_index(vhs_loadimagespath_98, 0).shape[0] > 0: imageupscalewithmodel_100 = imageupscalewithmodel.upscale(upscale_model=get_value_at_index(upscalemodelloader_88, 0), image=get_value_at_index(vhs_loadimagespath_98, 0)) mxslider_102 = mxslider.main(Xi=args.upscale_ratio, Xf=args.upscale_ratio, isfloatX=1) easy_mathfloat_103 = easy_mathfloat.float_math_operation(a=get_value_at_index(mxslider_102, 0), b=2, operation="divide") imagescaleby_101 = imagescaleby.upscale(upscale_method="nearest-exact", scale_by=get_value_at_index(easy_mathfloat_103, 0), image=get_value_at_index(imageupscalewithmodel_100, 0)) saveimage.save_images(filename_prefix="up/example", images=get_value_at_index(imagescaleby_101, 0)) del vhs_loadimagespath_98, imageupscalewithmodel_100, easy_mathfloat_103, imagescaleby_101, mxslider_102 del upscalemodelloader_88 clear_memory() print("Upscaling finished and memory cleared.") rife_input_dir = f"{COMFYUI_BASE_PATH}/output/up" else: print("Skipping Upscaling (Upscale ratio <= 1)") rife_input_dir = f"{COMFYUI_BASE_PATH}/output/temp" print("Starting Step 6: RIFE and Combine") vhs_loadimagespath_97 = vhs_loadimagespath.load_images(directory=rife_input_dir) rife_vfi_94 = rife_vfi.vfi(ckpt_name="rife47.pth", multiplier=2, frames=get_value_at_index(vhs_loadimagespath_97, 0)) vhs_videocombine.combine_video(frame_rate=32, loop_count=0, filename_prefix="AnimateDiff", format="video/h264-mp4", pix_fmt="yuv420p", crf=19, save_metadata=True, trim_to_audio=False, pingpong=False, save_output=True, images=get_value_at_index(rife_vfi_94, 0)) # ✨ [최적화] 마지막 단계에서 사용한 변수 삭제 del vhs_loadimagespath_97, rife_vfi_94 clear_memory() print("✅ All steps finished.") output_dir = os.path.join(COMFYUI_BASE_PATH, "output") video_files = glob.glob(os.path.join(output_dir, '**', '*.mp4'), recursive=True) if not video_files: raise FileNotFoundError("생성된 동영상 파일을 찾을 수 없습니다!") latest_video = max(video_files, key=os.path.getctime) print(f"LATEST_VIDEO_PATH:{latest_video}") if __name__ == "__main__": main()