data / run_generator.py
arcacolab's picture
Upload run_generator.py
a715537 verified
# 이 νŒŒμΌμ€ μ…€ 3μ—μ„œ μ„œλΈŒν”„λ‘œμ„ΈμŠ€λ‘œ μ‹€ν–‰λ©λ‹ˆλ‹€.
import sys
import os
import time
import glob
import gc
import torch
import subprocess
import random
import argparse
from typing import Sequence, Mapping, Any, Union
# --- 0. κΈ°λ³Έ μ„€μ • 및 인수 νŒŒμ‹± --
def parse_args():
parser = argparse.ArgumentParser(description="ComfyUI Video Generation Script")
parser.add_argument("--positive_prompt", type=str, required=True)
parser.add_argument("--negative_prompt", type=str, required=True)
parser.add_argument("--width", type=int, required=True)
parser.add_argument("--height", type=int, required=True)
parser.add_argument("--length", type=int, required=True)
parser.add_argument("--upscale_ratio", type=float, required=True)
parser.add_argument("--custom_lora_1_name", type=str, default="None")
parser.add_argument("--custom_lora_1_strength_model", type=float, default=1.0)
parser.add_argument("--custom_lora_1_strength_clip", type=float, default=1.0)
parser.add_argument("--custom_lora_2_name", type=str, default="None")
parser.add_argument("--custom_lora_2_strength_model", type=float, default=1.0)
parser.add_argument("--custom_lora_2_strength_clip", type=float, default=1.0)
return parser.parse_args()
def clear_memory():
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()
# --- 1. ComfyUI κ΄€λ ¨ ν•¨μˆ˜ μ •μ˜ ---
COMFYUI_BASE_PATH = '/content/ComfyUI'
def get_value_at_index(obj: Union[Sequence, Mapping], index: int) -> Any:
try:
return obj[index]
except (KeyError, TypeError):
if isinstance(obj, dict) and "result" in obj:
return obj["result"][index]
raise
def add_comfyui_directory_to_sys_path() -> None:
if os.path.isdir(COMFYUI_BASE_PATH) and COMFYUI_BASE_PATH not in sys.path:
sys.path.append(COMFYUI_BASE_PATH)
print(f"'{COMFYUI_BASE_PATH}' added to sys.path")
def import_custom_nodes() -> None:
try:
import nest_asyncio
nest_asyncio.apply()
except ImportError:
subprocess.run([sys.executable, "-m", "pip", "install", "-q", "nest_asyncio"])
import nest_asyncio
nest_asyncio.apply()
import asyncio
import execution
from nodes import init_extra_nodes
import server
loop = asyncio.get_event_loop() or asyncio.new_event_loop()
server_instance = server.PromptServer(loop)
execution.PromptQueue(server_instance)
if not loop.is_running():
loop.run_until_complete(init_extra_nodes())
else:
asyncio.ensure_future(init_extra_nodes())
# --- 2. 메인 μ‹€ν–‰ 둜직 ---
def main():
args = parse_args()
print("πŸš€ λ™μ˜μƒ 생성을 μ‹œμž‘ν•©λ‹ˆλ‹€...")
print(f"ν”„λ‘¬ν”„νŠΈ: {args.positive_prompt[:50]}...")
print(f"크기: {args.width}x{args.height}, 길이: {args.length} ν”„λ ˆμž„")
print(f"μ΅œμ’… μ—…μŠ€μΌ€μΌ λΉ„μœ¨: {args.upscale_ratio}x")
if args.custom_lora_1_name != "None":
print(f"μ»€μŠ€ν…€ LoRA 1: {args.custom_lora_1_name} (강도: {args.custom_lora_1_strength_model})")
if args.custom_lora_2_name != "None":
print(f"μ»€μŠ€ν…€ LoRA 2: {args.custom_lora_2_name} (강도: {args.custom_lora_2_strength_model})")
subprocess.run(f"rm -rf {COMFYUI_BASE_PATH}/output/up/*", shell=True, check=True)
subprocess.run(f"rm -rf {COMFYUI_BASE_PATH}/output/temp/*", shell=True, check=True)
add_comfyui_directory_to_sys_path()
from utils.extra_config import load_extra_path_config
extra_model_paths_file = os.path.join(COMFYUI_BASE_PATH, "extra_model_paths.yaml")
if os.path.exists(extra_model_paths_file):
load_extra_path_config(extra_model_paths_file)
print("Initializing ComfyUI custom nodes...")
import_custom_nodes()
from nodes import NODE_CLASS_MAPPINGS
print("Custom nodes initialized successfully.")
with torch.inference_mode():
unetloadergguf = NODE_CLASS_MAPPINGS["UnetLoaderGGUF"]()
cliploader = NODE_CLASS_MAPPINGS["CLIPLoader"]()
loraloader = NODE_CLASS_MAPPINGS["LoraLoader"]()
cliptextencode = NODE_CLASS_MAPPINGS["CLIPTextEncode"]()
vaeloader = NODE_CLASS_MAPPINGS["VAELoader"]()
loadimage = NODE_CLASS_MAPPINGS["LoadImage"]()
upscalemodelloader = NODE_CLASS_MAPPINGS["UpscaleModelLoader"]()
mxslider = NODE_CLASS_MAPPINGS["mxSlider"]()
vhs_loadimagespath = NODE_CLASS_MAPPINGS["VHS_LoadImagesPath"]()
modelsamplingsd3 = NODE_CLASS_MAPPINGS["ModelSamplingSD3"]()
wanimagetovideo = NODE_CLASS_MAPPINGS["WanImageToVideo"]()
ksampleradvanced = NODE_CLASS_MAPPINGS["KSamplerAdvanced"]()
vaedecode = NODE_CLASS_MAPPINGS["VAEDecode"]()
rife_vfi = NODE_CLASS_MAPPINGS["RIFE VFI"]()
vhs_videocombine = NODE_CLASS_MAPPINGS["VHS_VideoCombine"]()
easy_mathfloat = NODE_CLASS_MAPPINGS["easy mathFloat"]()
imageupscalewithmodel = NODE_CLASS_MAPPINGS["ImageUpscaleWithModel"]()
imagescaleby = NODE_CLASS_MAPPINGS["ImageScaleBy"]()
saveimage = NODE_CLASS_MAPPINGS["SaveImage"]()
print("Starting Step 1: Initial Sampling")
cliploader_38 = cliploader.load_clip(clip_name="umt5_xxl_fp8_e4m3fn_scaled.safetensors", type="wan", device="default")
unetloadergguf_84 = unetloadergguf.load_unet(unet_name="Wan2.2-I2V-A14B-HighNoise-Q4_K_S.gguf")
# 1번 μƒ˜ν”ŒλŸ¬ LoRA 체인
model, clip = loraloader.load_lora(lora_name="lightx2v_T2V_14B_cfg_step_distill_v2_lora_rank32_bf16.safetensors", strength_model=2, strength_clip=2, model=get_value_at_index(unetloadergguf_84, 0), clip=get_value_at_index(cliploader_38, 0))
if args.custom_lora_1_name and args.custom_lora_1_name != "None":
print(f"1번 μƒ˜ν”ŒλŸ¬μ— μ»€μŠ€ν…€ LoRA 적용: {args.custom_lora_1_name}")
model, clip = loraloader.load_lora(lora_name=args.custom_lora_1_name, strength_model=args.custom_lora_1_strength_model, strength_clip=args.custom_lora_1_strength_clip, model=model, clip=clip)
loraloader_78 = loraloader.load_lora(lora_name="FastWan_T2V_14B_480p_lora_rank_128_bf16.safetensors", strength_model=1.5, strength_clip=1.5, model=model, clip=clip)
cliptextencode_6 = cliptextencode.encode(text=args.positive_prompt, clip=get_value_at_index(loraloader_78, 1))
cliptextencode_7 = cliptextencode.encode(text=args.negative_prompt, clip=get_value_at_index(loraloader_78, 1))
vaeloader_39 = vaeloader.load_vae(vae_name="wan_2.1_vae.safetensors")
loadimage_62 = loadimage.load_image(image="example.png")
wanimagetovideo_63 = wanimagetovideo.EXECUTE_NORMALIZED(width=args.width, height=args.height, length=args.length, batch_size=1, positive=get_value_at_index(cliptextencode_6, 0), negative=get_value_at_index(cliptextencode_7, 0), vae=get_value_at_index(vaeloader_39, 0), start_image=get_value_at_index(loadimage_62, 0))
modelsamplingsd3_54 = modelsamplingsd3.patch(shift=8.0, model=get_value_at_index(loraloader_78, 0))
current_seed = int(time.time() * 1000)
random.seed(current_seed)
ksampleradvanced_74 = ksampleradvanced.sample(add_noise="enable", noise_seed=random.randint(1, 2**64), steps=4, cfg=1, sampler_name="lcm", scheduler="simple", start_at_step=0, end_at_step=2, return_with_leftover_noise="enable", model=get_value_at_index(modelsamplingsd3_54, 0), positive=get_value_at_index(wanimagetovideo_63, 0), negative=get_value_at_index(wanimagetovideo_63, 1), latent_image=get_value_at_index(wanimagetovideo_63, 2))
del unetloadergguf_84, loraloader_78, modelsamplingsd3_54, loadimage_62, model, clip
clear_memory()
print("Step 1 finished and memory cleared.")
print("Starting Step 2: Refinement Sampling")
unetloadergguf_85 = unetloadergguf.load_unet(unet_name="Wan2.2-I2V-A14B-LowNoise-Q4_K_S.gguf")
# 2번 μƒ˜ν”ŒλŸ¬ LoRA 체인
model, clip = loraloader.load_lora(lora_name="lightx2v_T2V_14B_cfg_step_distill_v2_lora_rank32_bf16.safetensors", strength_model=2, strength_clip=2, model=get_value_at_index(unetloadergguf_85, 0), clip=get_value_at_index(cliploader_38, 0))
if args.custom_lora_2_name and args.custom_lora_2_name != "None":
print(f"2번 μƒ˜ν”ŒλŸ¬μ— μ»€μŠ€ν…€ LoRA 적용: {args.custom_lora_2_name}")
model, clip = loraloader.load_lora(lora_name=args.custom_lora_2_name, strength_model=args.custom_lora_2_strength_model, strength_clip=args.custom_lora_2_strength_clip, model=model, clip=clip)
loraloader_86 = loraloader.load_lora(lora_name="FastWan_T2V_14B_480p_lora_rank_128_bf16.safetensors", strength_model=0.5, strength_clip=0.5, model=model, clip=clip)
modelsamplingsd3_55 = modelsamplingsd3.patch(shift=8, model=get_value_at_index(loraloader_86, 0))
current_seed_2 = int(time.time() * 1000)
random.seed(current_seed_2)
ksampleradvanced_75 = ksampleradvanced.sample(add_noise="disable", noise_seed=random.randint(1, 2**64), steps=4, cfg=1, sampler_name="lcm", scheduler="simple", start_at_step=2, end_at_step=10000, return_with_leftover_noise="disable", model=get_value_at_index(modelsamplingsd3_55, 0), positive=get_value_at_index(wanimagetovideo_63, 0), negative=get_value_at_index(wanimagetovideo_63, 1), latent_image=get_value_at_index(ksampleradvanced_74, 0))
del unetloadergguf_85, loraloader_86, modelsamplingsd3_55, cliploader_38, cliptextencode_6, cliptextencode_7, ksampleradvanced_74, wanimagetovideo_63, model, clip
clear_memory()
print("Step 2 finished and memory cleared.")
print("Starting Step 3: VAE Decode and Save")
vaedecode_8 = vaedecode.decode(samples=get_value_at_index(ksampleradvanced_75, 0), vae=get_value_at_index(vaeloader_39, 0))
saveimage.save_images(filename_prefix="temp/example", images=get_value_at_index(vaedecode_8, 0))
# ✨ [μ΅œμ ν™”] μ‚¬μš©μ΄ λλ‚œ VAE κ΄€λ ¨ λ³€μˆ˜ μ¦‰μ‹œ μ‚­μ œ
del ksampleradvanced_75, vaeloader_39, vaedecode_8
clear_memory()
print("Step 3 finished and memory cleared.")
if args.upscale_ratio > 1:
print("Starting Steps 4 & 5: Upscaling")
upscalemodelloader_88 = upscalemodelloader.load_model(model_name="2x-AnimeSharpV4_Fast_RCAN_PU.safetensors")
# Upscale Part 1
vhs_loadimagespath_96 = vhs_loadimagespath.load_images(directory=f"{COMFYUI_BASE_PATH}/output/temp", image_load_cap=40)
if get_value_at_index(vhs_loadimagespath_96, 0) is not None and get_value_at_index(vhs_loadimagespath_96, 0).shape[0] > 0:
imageupscalewithmodel_92 = imageupscalewithmodel.upscale(upscale_model=get_value_at_index(upscalemodelloader_88, 0), image=get_value_at_index(vhs_loadimagespath_96, 0))
mxslider_91 = mxslider.main(Xi=args.upscale_ratio, Xf=args.upscale_ratio, isfloatX=1)
easy_mathfloat_90 = easy_mathfloat.float_math_operation(a=get_value_at_index(mxslider_91, 0), b=2, operation="divide")
imagescaleby_93 = imagescaleby.upscale(upscale_method="nearest-exact", scale_by=get_value_at_index(easy_mathfloat_90, 0), image=get_value_at_index(imageupscalewithmodel_92, 0))
saveimage.save_images(filename_prefix="up/example", images=get_value_at_index(imagescaleby_93, 0))
del vhs_loadimagespath_96, imageupscalewithmodel_92, imagescaleby_93, easy_mathfloat_90, mxslider_91
clear_memory()
# Upscale Part 2
vhs_loadimagespath_98 = vhs_loadimagespath.load_images(directory=f"{COMFYUI_BASE_PATH}/output/temp", skip_first_images=40)
if get_value_at_index(vhs_loadimagespath_98, 0) is not None and get_value_at_index(vhs_loadimagespath_98, 0).shape[0] > 0:
imageupscalewithmodel_100 = imageupscalewithmodel.upscale(upscale_model=get_value_at_index(upscalemodelloader_88, 0), image=get_value_at_index(vhs_loadimagespath_98, 0))
mxslider_102 = mxslider.main(Xi=args.upscale_ratio, Xf=args.upscale_ratio, isfloatX=1)
easy_mathfloat_103 = easy_mathfloat.float_math_operation(a=get_value_at_index(mxslider_102, 0), b=2, operation="divide")
imagescaleby_101 = imagescaleby.upscale(upscale_method="nearest-exact", scale_by=get_value_at_index(easy_mathfloat_103, 0), image=get_value_at_index(imageupscalewithmodel_100, 0))
saveimage.save_images(filename_prefix="up/example", images=get_value_at_index(imagescaleby_101, 0))
del vhs_loadimagespath_98, imageupscalewithmodel_100, easy_mathfloat_103, imagescaleby_101, mxslider_102
del upscalemodelloader_88
clear_memory()
print("Upscaling finished and memory cleared.")
rife_input_dir = f"{COMFYUI_BASE_PATH}/output/up"
else:
print("Skipping Upscaling (Upscale ratio <= 1)")
rife_input_dir = f"{COMFYUI_BASE_PATH}/output/temp"
print("Starting Step 6: RIFE and Combine")
vhs_loadimagespath_97 = vhs_loadimagespath.load_images(directory=rife_input_dir)
rife_vfi_94 = rife_vfi.vfi(ckpt_name="rife47.pth", multiplier=2, frames=get_value_at_index(vhs_loadimagespath_97, 0))
vhs_videocombine.combine_video(frame_rate=32, loop_count=0, filename_prefix="AnimateDiff", format="video/h264-mp4", pix_fmt="yuv420p", crf=19, save_metadata=True, trim_to_audio=False, pingpong=False, save_output=True, images=get_value_at_index(rife_vfi_94, 0))
# ✨ [μ΅œμ ν™”] λ§ˆμ§€λ§‰ λ‹¨κ³„μ—μ„œ μ‚¬μš©ν•œ λ³€μˆ˜ μ‚­μ œ
del vhs_loadimagespath_97, rife_vfi_94
clear_memory()
print("βœ… All steps finished.")
output_dir = os.path.join(COMFYUI_BASE_PATH, "output")
video_files = glob.glob(os.path.join(output_dir, '**', '*.mp4'), recursive=True)
if not video_files:
raise FileNotFoundError("μƒμ„±λœ λ™μ˜μƒ νŒŒμΌμ„ 찾을 수 μ—†μŠ΅λ‹ˆλ‹€!")
latest_video = max(video_files, key=os.path.getctime)
print(f"LATEST_VIDEO_PATH:{latest_video}")
if __name__ == "__main__":
main()