File size: 3,988 Bytes
be9b3ab |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
import os
import traceback
import numpy as np
import torch
import modules.async_worker as worker
import modules.controlnet
from modules.settings import default_settings
from shared import path_manager
import comfy.utils
from comfy_extras.chainner_models import model_loading
from comfy_extras.nodes_upscale_model import ImageUpscaleWithModel
from PIL import Image
Image.MAX_IMAGE_PIXELS = None
class pipeline:
pipeline_type = ["template"]
model_hash = ""
def parse_gen_data(self, gen_data):
gen_data["original_image_number"] = gen_data["image_number"]
gen_data["image_number"] = 1
gen_data["show_preview"] = False
return gen_data
def load_upscaler_model(self, model_name):
model_path = path_manager.get_file_path(
model_name,
default = os.path.join(path_manager.model_paths["upscaler_path"], model_name)
)
sd = comfy.utils.load_torch_file(str(model_path), safe_load=True)
if "module.layers.0.residual_group.blocks.0.norm1.weight" in sd:
sd = comfy.utils.state_dict_prefix_replace(sd, {"module.": ""})
out = model_loading.load_state_dict(sd).eval()
return out
def load_base_model(self, name):
# Check if model is already loaded
if self.model_hash == name:
return
print(f"Loading model: {name}")
self.model_hash = name
return
def load_keywords(self, lora):
filename = lora.replace(".safetensors", ".txt")
try:
with open(filename, "r") as file:
data = file.read()
return data
except FileNotFoundError:
return " "
def load_loras(self, loras):
return
def refresh_controlnet(self, name=None):
return
def clean_prompt_cond_caches(self):
return
def process(
self,
gen_data=None,
callback=None,
):
input_image = gen_data["input_image"]
input_image = input_image.convert("RGB")
input_image = np.array(input_image).astype(np.float32) / 255.0
input_image = torch.from_numpy(input_image)[None,]
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Load upscaling model ...", None)
)
cn_settings = modules.controlnet.get_settings(gen_data)
upscaler_name = cn_settings["upscaler"]
upscale_path = path_manager.get_file_path(upscaler_name)
if upscale_path == None:
upscale_path = path_manager.get_file_path("4x-UltraSharp.pth")
upscaler_model = self.load_upscaler_model(upscale_path)
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Upscaling image ...", None)
)
decoded_latent = ImageUpscaleWithModel().upscale(
upscaler_model, input_image
)[0]
try:
upscaler_model = self.load_upscaler_model(upscale_path)
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Upscaling image ...", None)
)
decoded_latent = ImageUpscaleWithModel().upscale(
upscaler_model, input_image
)[0]
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Converting ...", None)
)
images = [
np.clip(255.0 * y.cpu().numpy(), 0, 255).astype(np.uint8)
for y in decoded_latent
]
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Done ...", None)
)
except:
traceback.print_exc()
worker.add_result(
gen_data["task_id"],
"preview",
(-1, f"Oops ...", "html/error.png")
)
images = []
return images
|