| | import gc, time |
| | import numpy as np |
| | import PIL.Image |
| | from diffusers import ( |
| | ControlNetModel, |
| | DiffusionPipeline, |
| | StableDiffusionControlNetPipeline, |
| | StableDiffusionControlNetInpaintPipeline, |
| | StableDiffusionPipeline, |
| | AutoencoderKL, |
| | StableDiffusionXLInpaintPipeline, |
| | StableDiffusionXLAdapterPipeline, |
| | T2IAdapter, |
| | StableDiffusionXLPipeline, |
| | AutoPipelineForImage2Image |
| | ) |
| | from huggingface_hub import hf_hub_download |
| | import torch, random, json |
| | from controlnet_aux import ( |
| | CannyDetector, |
| | ContentShuffleDetector, |
| | HEDdetector, |
| | LineartAnimeDetector, |
| | LineartDetector, |
| | MidasDetector, |
| | MLSDdetector, |
| | NormalBaeDetector, |
| | OpenposeDetector, |
| | PidiNetDetector, |
| | ) |
| | from transformers import pipeline |
| | from controlnet_aux.util import HWC3, ade_palette |
| | from transformers import AutoImageProcessor, UperNetForSemanticSegmentation |
| | import cv2 |
| | from diffusers import ( |
| | DPMSolverMultistepScheduler, |
| | DPMSolverSinglestepScheduler, |
| | KDPM2DiscreteScheduler, |
| | EulerDiscreteScheduler, |
| | EulerAncestralDiscreteScheduler, |
| | HeunDiscreteScheduler, |
| | LMSDiscreteScheduler, |
| | DDIMScheduler, |
| | DEISMultistepScheduler, |
| | UniPCMultistepScheduler, |
| | LCMScheduler, |
| | PNDMScheduler, |
| | KDPM2AncestralDiscreteScheduler, |
| | EDMDPMSolverMultistepScheduler, |
| | EDMEulerScheduler, |
| | ) |
| | from .prompt_weights import get_embed_new, add_comma_after_pattern_ti |
| | from .utils import save_pil_image_with_metadata |
| | from .lora_loader import lora_mix_load |
| | from .inpainting_canvas import draw, make_inpaint_condition |
| | from .adetailer import ad_model_process |
| | from ..upscalers.esrgan import UpscalerESRGAN, UpscalerLanczos, UpscalerNearest |
| | from ..logging.logging_setup import logger |
| | from .extra_model_loaders import custom_task_model_loader |
| | from .high_resolution import process_images_high_resolution |
| | from .style_prompt_config import styles_data, STYLE_NAMES, get_json_content, apply_style |
| | import os |
| | from compel import Compel, ReturnedEmbeddingsType |
| | import ipywidgets as widgets, mediapy |
| | from IPython.display import display |
| | from PIL import Image |
| | from typing import Union, Optional, List, Tuple, Dict, Any, Callable |
| | import logging, diffusers, copy, warnings |
| | logging.getLogger("diffusers").setLevel(logging.ERROR) |
| | |
| | diffusers.utils.logging.set_verbosity(40) |
| | warnings.filterwarnings(action="ignore", category=FutureWarning, module="diffusers") |
| | warnings.filterwarnings(action="ignore", category=FutureWarning, module="transformers") |
| |
|
| | |
| | |
| | |
| | def resize_image(input_image, resolution, interpolation=None): |
| | H, W, C = input_image.shape |
| | H = float(H) |
| | W = float(W) |
| | k = float(resolution) / max(H, W) |
| | H *= k |
| | W *= k |
| | H = int(np.round(H / 64.0)) * 64 |
| | W = int(np.round(W / 64.0)) * 64 |
| | if interpolation is None: |
| | interpolation = cv2.INTER_LANCZOS4 if k > 1 else cv2.INTER_AREA |
| | img = cv2.resize(input_image, (W, H), interpolation=interpolation) |
| | return img |
| |
|
| |
|
| | class DepthEstimator: |
| | def __init__(self): |
| | self.model = pipeline("depth-estimation") |
| |
|
| | def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image: |
| | detect_resolution = kwargs.pop("detect_resolution", 512) |
| | image_resolution = kwargs.pop("image_resolution", 512) |
| | image = np.array(image) |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=detect_resolution) |
| | image = PIL.Image.fromarray(image) |
| | image = self.model(image) |
| | image = image["depth"] |
| | image = np.array(image) |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | return PIL.Image.fromarray(image) |
| |
|
| |
|
| | class ImageSegmentor: |
| | def __init__(self): |
| | self.image_processor = AutoImageProcessor.from_pretrained( |
| | "openmmlab/upernet-convnext-small" |
| | ) |
| | self.image_segmentor = UperNetForSemanticSegmentation.from_pretrained( |
| | "openmmlab/upernet-convnext-small" |
| | ) |
| |
|
| | @torch.inference_mode() |
| | def __call__(self, image: np.ndarray, **kwargs) -> PIL.Image.Image: |
| | detect_resolution = kwargs.pop("detect_resolution", 512) |
| | image_resolution = kwargs.pop("image_resolution", 512) |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=detect_resolution) |
| | image = PIL.Image.fromarray(image) |
| |
|
| | pixel_values = self.image_processor(image, return_tensors="pt").pixel_values |
| | outputs = self.image_segmentor(pixel_values) |
| | seg = self.image_processor.post_process_semantic_segmentation( |
| | outputs, target_sizes=[image.size[::-1]] |
| | )[0] |
| | color_seg = np.zeros((seg.shape[0], seg.shape[1], 3), dtype=np.uint8) |
| | for label, color in enumerate(ade_palette()): |
| | color_seg[seg == label, :] = color |
| | color_seg = color_seg.astype(np.uint8) |
| |
|
| | color_seg = resize_image( |
| | color_seg, resolution=image_resolution, interpolation=cv2.INTER_NEAREST |
| | ) |
| | return PIL.Image.fromarray(color_seg) |
| |
|
| |
|
| | class Preprocessor: |
| | MODEL_ID = "lllyasviel/Annotators" |
| |
|
| | def __init__(self): |
| | self.model = None |
| | self.name = "" |
| |
|
| | def load(self, name: str) -> None: |
| | if name == self.name: |
| | return |
| | if name == "HED": |
| | self.model = HEDdetector.from_pretrained(self.MODEL_ID) |
| | elif name == "Midas": |
| | self.model = MidasDetector.from_pretrained(self.MODEL_ID) |
| | elif name == "MLSD": |
| | self.model = MLSDdetector.from_pretrained(self.MODEL_ID) |
| | elif name == "Openpose": |
| | self.model = OpenposeDetector.from_pretrained(self.MODEL_ID) |
| | elif name == "PidiNet": |
| | self.model = PidiNetDetector.from_pretrained(self.MODEL_ID) |
| | elif name == "NormalBae": |
| | self.model = NormalBaeDetector.from_pretrained(self.MODEL_ID) |
| | elif name == "Lineart": |
| | self.model = LineartDetector.from_pretrained(self.MODEL_ID) |
| | elif name == "LineartAnime": |
| | self.model = LineartAnimeDetector.from_pretrained(self.MODEL_ID) |
| | elif name == "Canny": |
| | self.model = CannyDetector() |
| | elif name == "ContentShuffle": |
| | self.model = ContentShuffleDetector() |
| | elif name == "DPT": |
| | self.model = DepthEstimator() |
| | elif name == "UPerNet": |
| | self.model = ImageSegmentor() |
| | else: |
| | raise ValueError |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| | self.name = name |
| |
|
| | def __call__(self, image: PIL.Image.Image, **kwargs) -> PIL.Image.Image: |
| | if self.name == "Canny": |
| | if "detect_resolution" in kwargs: |
| | detect_resolution = kwargs.pop("detect_resolution") |
| | image = np.array(image) |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=detect_resolution) |
| | image = self.model(image, **kwargs) |
| | return PIL.Image.fromarray(image) |
| | elif self.name == "Midas": |
| | detect_resolution = kwargs.pop("detect_resolution", 512) |
| | image_resolution = kwargs.pop("image_resolution", 512) |
| | image = np.array(image) |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=detect_resolution) |
| | image = self.model(image, **kwargs) |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | return PIL.Image.fromarray(image) |
| | else: |
| | return self.model(image, **kwargs) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | CONTROLNET_MODEL_IDS = { |
| | "openpose": "lllyasviel/control_v11p_sd15_openpose", |
| | "canny": "lllyasviel/control_v11p_sd15_canny", |
| | "mlsd": "lllyasviel/control_v11p_sd15_mlsd", |
| | "scribble": "lllyasviel/control_v11p_sd15_scribble", |
| | "softedge": "lllyasviel/control_v11p_sd15_softedge", |
| | "segmentation": "lllyasviel/control_v11p_sd15_seg", |
| | "depth": "lllyasviel/control_v11f1p_sd15_depth", |
| | "normalbae": "lllyasviel/control_v11p_sd15_normalbae", |
| | "lineart": "lllyasviel/control_v11p_sd15_lineart", |
| | "lineart_anime": "lllyasviel/control_v11p_sd15s2_lineart_anime", |
| | "shuffle": "lllyasviel/control_v11e_sd15_shuffle", |
| | "ip2p": "lllyasviel/control_v11e_sd15_ip2p", |
| | "inpaint": "lllyasviel/control_v11p_sd15_inpaint", |
| | "txt2img": "Nothinghere", |
| | "sdxl_canny": "TencentARC/t2i-adapter-canny-sdxl-1.0", |
| | "sdxl_sketch": "TencentARC/t2i-adapter-sketch-sdxl-1.0", |
| | "sdxl_lineart": "TencentARC/t2i-adapter-lineart-sdxl-1.0", |
| | "sdxl_depth-midas": "TencentARC/t2i-adapter-depth-midas-sdxl-1.0", |
| | "sdxl_openpose": "TencentARC/t2i-adapter-openpose-sdxl-1.0", |
| | |
| | |
| | "img2img": "Nothinghere", |
| | } |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | SCHEDULER_CONFIG_MAP = { |
| | "DPM++ 2M": (DPMSolverMultistepScheduler, {}), |
| | "DPM++ 2M Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True}), |
| | "DPM++ 2M SDE": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++"}), |
| | "DPM++ 2M SDE Karras": (DPMSolverMultistepScheduler, {"use_karras_sigmas": True, "algorithm_type": "sde-dpmsolver++"}), |
| | "DPM++ SDE": (DPMSolverSinglestepScheduler, {}), |
| | "DPM++ SDE Karras": (DPMSolverSinglestepScheduler, {"use_karras_sigmas": True}), |
| | "DPM2": (KDPM2DiscreteScheduler, {}), |
| | "DPM2 Karras": (KDPM2DiscreteScheduler, {"use_karras_sigmas": True}), |
| | "DPM2 a" : (KDPM2AncestralDiscreteScheduler, {}), |
| | "DPM2 a Karras" : (KDPM2AncestralDiscreteScheduler, {"use_karras_sigmas": True}), |
| | "Euler": (EulerDiscreteScheduler, {}), |
| | "Euler a": (EulerAncestralDiscreteScheduler, {}), |
| | "Heun": (HeunDiscreteScheduler, {}), |
| | "LMS": (LMSDiscreteScheduler, {}), |
| | "LMS Karras": (LMSDiscreteScheduler, {"use_karras_sigmas": True}), |
| | "DDIM": (DDIMScheduler, {}), |
| | "DEIS": (DEISMultistepScheduler, {}), |
| | "UniPC": (UniPCMultistepScheduler, {}), |
| | "PNDM" : (PNDMScheduler, {}), |
| |
|
| | "DPM++ 2M Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True}), |
| | "DPM++ 2M Ef": (DPMSolverMultistepScheduler, {"euler_at_final": True}), |
| | "DPM++ 2M SDE Lu": (DPMSolverMultistepScheduler, {"use_lu_lambdas": True, "algorithm_type": "sde-dpmsolver++"}), |
| | "DPM++ 2M SDE Ef": (DPMSolverMultistepScheduler, {"algorithm_type": "sde-dpmsolver++", "euler_at_final": True}), |
| |
|
| | "EDMDPM": (EDMDPMSolverMultistepScheduler, {}), |
| | "EDMEuler": (EDMEulerScheduler, {}), |
| |
|
| | "LCM" : (LCMScheduler, {}), |
| | } |
| |
|
| | scheduler_names = list(SCHEDULER_CONFIG_MAP.keys()) |
| |
|
| | def process_prompts_valid(specific_prompt, specific_negative_prompt, prompt, negative_prompt): |
| | specific_prompt_empty = (specific_prompt in [None, ""]) |
| | specific_negative_prompt_empty = (specific_negative_prompt in [None, ""]) |
| |
|
| | prompt_valid = prompt if specific_prompt_empty else specific_prompt |
| | negative_prompt_valid = negative_prompt if specific_negative_prompt_empty else specific_negative_prompt |
| |
|
| | return specific_prompt_empty, specific_negative_prompt_empty, prompt_valid, negative_prompt_valid |
| |
|
| | class Model_Diffusers: |
| | def __init__( |
| | self, |
| | base_model_id: str = "runwayml/stable-diffusion-v1-5", |
| | task_name: str = "txt2img", |
| | vae_model=None, |
| | type_model_precision=torch.float16, |
| | sdxl_safetensors = False, |
| | ): |
| | self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
| | self.base_model_id = "" |
| | self.task_name = "" |
| | self.vae_model = None |
| | self.type_model_precision = ( |
| | type_model_precision if torch.cuda.is_available() else torch.float32 |
| | ) |
| |
|
| | self.load_pipe( |
| | base_model_id, task_name, vae_model, type_model_precision, sdxl_safetensors = sdxl_safetensors |
| | ) |
| | self.preprocessor = Preprocessor() |
| |
|
| | self.styles_data = styles_data |
| | self.STYLE_NAMES = STYLE_NAMES |
| | self.style_json_file = "" |
| |
|
| |
|
| | def load_pipe( |
| | self, |
| | base_model_id: str, |
| | task_name="txt2img", |
| | vae_model=None, |
| | type_model_precision=torch.float16, |
| | reload=False, |
| | sdxl_safetensors = False, |
| | retain_model_in_memory = True, |
| | ) -> DiffusionPipeline: |
| | if ( |
| | base_model_id == self.base_model_id |
| | and task_name == self.task_name |
| | and hasattr(self, "pipe") |
| | and self.vae_model == vae_model |
| | and self.pipe is not None |
| | and reload == False |
| | ): |
| | if self.type_model_precision == type_model_precision or self.device.type == "cpu": |
| | return |
| |
|
| | if hasattr(self, "pipe") and os.path.isfile(base_model_id): |
| | unload_model = False |
| | if self.pipe == None: |
| | unload_model = True |
| | elif type_model_precision != self.type_model_precision and self.device.type != "cpu": |
| | unload_model = True |
| | else: |
| | if hasattr(self, "pipe"): |
| | unload_model = False |
| | if self.pipe == None: |
| | unload_model = True |
| | else: |
| | unload_model = True |
| | self.type_model_precision = ( |
| | type_model_precision if torch.cuda.is_available() else torch.float32 |
| | ) |
| |
|
| | if self.type_model_precision == torch.float32 and os.path.isfile(base_model_id): |
| | logger.info(f"Working with full precision {str(self.type_model_precision)}") |
| |
|
| | |
| | if self.base_model_id == base_model_id and self.pipe is not None and reload == False and self.vae_model == vae_model and unload_model == False: |
| | |
| | class_name = self.class_name |
| | else: |
| | |
| | self.pipe = None |
| | self.model_memory = {} |
| | self.lora_memory = [None, None, None, None, None] |
| | self.lora_scale_memory = [1.0, 1.0, 1.0, 1.0, 1.0] |
| | self.LCMconfig = None |
| | self.embed_loaded = [] |
| | self.FreeU = False |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| |
|
| | |
| | if os.path.isfile(base_model_id): |
| |
|
| | if sdxl_safetensors: |
| | logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix") |
| | self.pipe = StableDiffusionXLPipeline.from_single_file( |
| | base_model_id, |
| | vae=AutoencoderKL.from_pretrained( |
| | "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 |
| | ), |
| | torch_dtype=self.type_model_precision, |
| | ) |
| | class_name = "StableDiffusionXLPipeline" |
| | else: |
| | self.pipe = StableDiffusionPipeline.from_single_file( |
| | base_model_id, |
| | |
| | |
| | |
| | |
| | |
| | torch_dtype=self.type_model_precision, |
| | ) |
| | class_name = "StableDiffusionPipeline" |
| | else: |
| | file_config = hf_hub_download(repo_id=base_model_id, filename="model_index.json") |
| |
|
| | |
| | with open(file_config, 'r') as json_config: |
| | data_config = json.load(json_config) |
| |
|
| | |
| | if '_class_name' in data_config: |
| | class_name = data_config['_class_name'] |
| |
|
| | match class_name: |
| | case "StableDiffusionPipeline": |
| | self.pipe = StableDiffusionPipeline.from_pretrained( |
| | base_model_id, |
| | torch_dtype=self.type_model_precision, |
| | ) |
| |
|
| | case "StableDiffusionXLPipeline": |
| | logger.info("Default VAE: madebyollin/sdxl-vae-fp16-fix") |
| | try: |
| | self.pipe = DiffusionPipeline.from_pretrained( |
| | base_model_id, |
| | vae=AutoencoderKL.from_pretrained( |
| | "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 |
| | ), |
| | torch_dtype=torch.float16, |
| | use_safetensors=True, |
| | variant="fp16", |
| | add_watermarker=False, |
| | ) |
| | except Exception as e: |
| | logger.debug(e) |
| | logger.debug("Loading model without parameter variant=fp16") |
| | self.pipe = DiffusionPipeline.from_pretrained( |
| | base_model_id, |
| | vae=AutoencoderKL.from_pretrained( |
| | "madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16 |
| | ), |
| | torch_dtype=torch.float16, |
| | use_safetensors=True, |
| | add_watermarker=False, |
| | ) |
| | self.base_model_id = base_model_id |
| | self.class_name = class_name |
| |
|
| | |
| | if vae_model is None : |
| | logger.debug("Default VAE") |
| | pass |
| | else: |
| | if os.path.isfile(vae_model): |
| | self.pipe.vae = AutoencoderKL.from_single_file( |
| | vae_model |
| | ) |
| | else: |
| | self.pipe.vae = AutoencoderKL.from_pretrained( |
| | vae_model, |
| | subfolder = "vae", |
| | ) |
| | try: |
| | self.pipe.vae.to(self.type_model_precision) |
| | except: |
| | logger.warning(f"VAE: not in {self.type_model_precision}") |
| | self.vae_model = vae_model |
| |
|
| | |
| | self.default_scheduler = copy.deepcopy(self.pipe.scheduler) |
| | logger.debug(f"Base sampler: {self.default_scheduler}") |
| |
|
| | if task_name in self.model_memory: |
| | self.pipe = self.model_memory[task_name] |
| | |
| | |
| | |
| | |
| | self.base_model_id = base_model_id |
| | self.task_name = task_name |
| | self.vae_model = vae_model |
| | self.class_name = class_name |
| | self.pipe.watermark = None |
| | return |
| |
|
| | |
| | model_id = CONTROLNET_MODEL_IDS[task_name] |
| |
|
| | if task_name == "inpaint": |
| | match class_name: |
| | case "StableDiffusionPipeline": |
| |
|
| | controlnet = ControlNetModel.from_pretrained( |
| | model_id, torch_dtype=self.type_model_precision |
| | ) |
| |
|
| | self.pipe = StableDiffusionControlNetInpaintPipeline( |
| | vae=self.pipe.vae, |
| | text_encoder=self.pipe.text_encoder, |
| | tokenizer=self.pipe.tokenizer, |
| | unet=self.pipe.unet, |
| | controlnet=controlnet, |
| | scheduler=self.pipe.scheduler, |
| | safety_checker=self.pipe.safety_checker, |
| | feature_extractor=self.pipe.feature_extractor, |
| | requires_safety_checker=self.pipe.config.requires_safety_checker, |
| | ) |
| | case "StableDiffusionXLPipeline": |
| |
|
| | self.pipe = StableDiffusionXLInpaintPipeline( |
| | vae=self.pipe.vae, |
| | text_encoder=self.pipe.text_encoder, |
| | text_encoder_2=self.pipe.text_encoder_2, |
| | tokenizer=self.pipe.tokenizer, |
| | tokenizer_2=self.pipe.tokenizer_2, |
| | unet=self.pipe.unet, |
| | |
| | scheduler=self.pipe.scheduler, |
| | ) |
| |
|
| |
|
| | if task_name not in ["txt2img", "inpaint", "img2img"]: |
| | match class_name: |
| | case "StableDiffusionPipeline": |
| |
|
| | controlnet = ControlNetModel.from_pretrained( |
| | model_id, torch_dtype=self.type_model_precision |
| | ) |
| |
|
| | self.pipe = StableDiffusionControlNetPipeline( |
| | vae=self.pipe.vae, |
| | text_encoder=self.pipe.text_encoder, |
| | tokenizer=self.pipe.tokenizer, |
| | unet=self.pipe.unet, |
| | controlnet=controlnet, |
| | scheduler=self.pipe.scheduler, |
| | safety_checker=self.pipe.safety_checker, |
| | feature_extractor=self.pipe.feature_extractor, |
| | requires_safety_checker=self.pipe.config.requires_safety_checker, |
| | ) |
| | self.pipe.scheduler = UniPCMultistepScheduler.from_config(self.pipe.scheduler.config) |
| |
|
| | case "StableDiffusionXLPipeline": |
| |
|
| | adapter = T2IAdapter.from_pretrained( |
| | model_id, |
| | torch_dtype=torch.float16, |
| | varient="fp16", |
| | ).to(self.device) |
| |
|
| | self.pipe = StableDiffusionXLAdapterPipeline( |
| | vae=self.pipe.vae, |
| | text_encoder=self.pipe.text_encoder, |
| | text_encoder_2=self.pipe.text_encoder_2, |
| | tokenizer=self.pipe.tokenizer, |
| | tokenizer_2=self.pipe.tokenizer_2, |
| | unet=self.pipe.unet, |
| | adapter=adapter, |
| | scheduler=self.pipe.scheduler, |
| | ).to(self.device) |
| |
|
| |
|
| | if task_name in ["txt2img", "img2img"]: |
| | match class_name: |
| |
|
| | case "StableDiffusionPipeline": |
| | self.pipe = StableDiffusionPipeline( |
| | vae=self.pipe.vae, |
| | text_encoder=self.pipe.text_encoder, |
| | tokenizer=self.pipe.tokenizer, |
| | unet=self.pipe.unet, |
| | scheduler=self.pipe.scheduler, |
| | safety_checker=self.pipe.safety_checker, |
| | feature_extractor=self.pipe.feature_extractor, |
| | requires_safety_checker=self.pipe.config.requires_safety_checker, |
| | ) |
| |
|
| | case "StableDiffusionXLPipeline": |
| | self.pipe = StableDiffusionXLPipeline( |
| | vae=self.pipe.vae, |
| | text_encoder=self.pipe.text_encoder, |
| | text_encoder_2=self.pipe.text_encoder_2, |
| | tokenizer=self.pipe.tokenizer, |
| | tokenizer_2=self.pipe.tokenizer_2, |
| | unet=self.pipe.unet, |
| | scheduler=self.pipe.scheduler, |
| | ) |
| |
|
| | if task_name == "img2img": |
| | self.pipe = AutoPipelineForImage2Image.from_pipe(self.pipe) |
| |
|
| | |
| | self.pipe.to(self.device) |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| |
|
| | self.base_model_id = base_model_id |
| | self.task_name = task_name |
| | self.vae_model = vae_model |
| | self.class_name = class_name |
| |
|
| | if self.class_name == "StableDiffusionXLPipeline": |
| | self.pipe.enable_vae_slicing() |
| | self.pipe.enable_vae_tiling() |
| | self.pipe.watermark = None |
| |
|
| | if retain_model_in_memory == True and task_name not in self.model_memory: |
| | self.model_memory[task_name] = self.pipe |
| |
|
| | return |
| |
|
| | def load_controlnet_weight(self, task_name: str) -> None: |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| | model_id = CONTROLNET_MODEL_IDS[task_name] |
| | controlnet = ControlNetModel.from_pretrained( |
| | model_id, torch_dtype=self.type_model_precision |
| | ) |
| | controlnet.to(self.device) |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| | self.pipe.controlnet = controlnet |
| | |
| |
|
| | @torch.autocast("cuda") |
| | def run_pipe( |
| | self, |
| | prompt: str, |
| | negative_prompt: str, |
| | prompt_embeds, |
| | negative_prompt_embeds, |
| | control_image: PIL.Image.Image, |
| | num_images: int, |
| | num_steps: int, |
| | guidance_scale: float, |
| | clip_skip: int, |
| | generator, |
| | controlnet_conditioning_scale, |
| | control_guidance_start, |
| | control_guidance_end, |
| | ) -> list[PIL.Image.Image]: |
| | |
| | |
| | return self.pipe( |
| | prompt=prompt, |
| | negative_prompt=negative_prompt, |
| | prompt_embeds=prompt_embeds, |
| | negative_prompt_embeds=negative_prompt_embeds, |
| | guidance_scale=guidance_scale, |
| | clip_skip=clip_skip, |
| | num_images_per_prompt=num_images, |
| | num_inference_steps=num_steps, |
| | generator=generator, |
| | controlnet_conditioning_scale=controlnet_conditioning_scale, |
| | control_guidance_start=control_guidance_start, |
| | control_guidance_end=control_guidance_end, |
| | image=control_image, |
| | ).images |
| |
|
| | @torch.autocast("cuda") |
| | def run_pipe_SD( |
| | self, |
| | prompt: str, |
| | negative_prompt: str, |
| | prompt_embeds, |
| | negative_prompt_embeds, |
| | num_images: int, |
| | num_steps: int, |
| | guidance_scale: float, |
| | clip_skip: int, |
| | height: int, |
| | width: int, |
| | generator, |
| | ) -> list[PIL.Image.Image]: |
| | |
| | |
| | self.preview_handle = None |
| | return self.pipe( |
| | prompt=prompt, |
| | negative_prompt=negative_prompt, |
| | prompt_embeds=prompt_embeds, |
| | negative_prompt_embeds=negative_prompt_embeds, |
| | guidance_scale=guidance_scale, |
| | clip_skip=clip_skip, |
| | num_images_per_prompt=num_images, |
| | num_inference_steps=num_steps, |
| | generator=generator, |
| | height=height, |
| | width=width, |
| | callback=self.callback_pipe if self.image_previews else None, |
| | callback_steps=10 if self.image_previews else 100, |
| | ).images |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | @torch.autocast("cuda") |
| | def run_pipe_inpaint( |
| | self, |
| | prompt: str, |
| | negative_prompt: str, |
| | prompt_embeds, |
| | negative_prompt_embeds, |
| | control_image: PIL.Image.Image, |
| | num_images: int, |
| | num_steps: int, |
| | guidance_scale: float, |
| | clip_skip: int, |
| | strength: float, |
| | init_image, |
| | control_mask, |
| | controlnet_conditioning_scale, |
| | control_guidance_start, |
| | control_guidance_end, |
| | generator, |
| | ) -> list[PIL.Image.Image]: |
| | |
| | |
| | return self.pipe( |
| | prompt=None, |
| | negative_prompt=None, |
| | prompt_embeds=prompt_embeds, |
| | negative_prompt_embeds=negative_prompt_embeds, |
| | eta=1.0, |
| | strength=strength, |
| | image=init_image, |
| | mask_image=control_mask, |
| | control_image=control_image, |
| | num_images_per_prompt=num_images, |
| | num_inference_steps=num_steps, |
| | guidance_scale=guidance_scale, |
| | clip_skip=clip_skip, |
| | generator=generator, |
| | controlnet_conditioning_scale=controlnet_conditioning_scale, |
| | control_guidance_start=control_guidance_start, |
| | control_guidance_end=control_guidance_end, |
| | ).images |
| |
|
| | @torch.autocast("cuda") |
| | def run_pipe_img2img( |
| | self, |
| | prompt: str, |
| | negative_prompt: str, |
| | prompt_embeds, |
| | negative_prompt_embeds, |
| | num_images: int, |
| | num_steps: int, |
| | guidance_scale: float, |
| | clip_skip: int, |
| | strength: float, |
| | init_image, |
| | generator, |
| | ) -> list[PIL.Image.Image]: |
| | |
| | |
| | return self.pipe( |
| | prompt=None, |
| | negative_prompt=None, |
| | prompt_embeds=prompt_embeds, |
| | negative_prompt_embeds=negative_prompt_embeds, |
| | eta=1.0, |
| | strength=strength, |
| | image=init_image, |
| | num_images_per_prompt=num_images, |
| | num_inference_steps=num_steps, |
| | guidance_scale=guidance_scale, |
| | clip_skip=clip_skip, |
| | generator=generator, |
| | ).images |
| |
|
| | |
| | @torch.inference_mode() |
| | def process_canny( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | low_threshold: int, |
| | high_threshold: int, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | self.preprocessor.load("Canny") |
| | control_image = self.preprocessor( |
| | image=image, |
| | low_threshold=low_threshold, |
| | high_threshold=high_threshold, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | ) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_mlsd( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | value_threshold: float, |
| | distance_threshold: float, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | self.preprocessor.load("MLSD") |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | thr_v=value_threshold, |
| | thr_d=distance_threshold, |
| | ) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_scribble( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | preprocessor_name: str, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | if preprocessor_name == "None": |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| | elif preprocessor_name == "HED": |
| | self.preprocessor.load(preprocessor_name) |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | scribble=False, |
| | ) |
| | elif preprocessor_name == "PidiNet": |
| | self.preprocessor.load(preprocessor_name) |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | safe=False, |
| | ) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_scribble_interactive( |
| | self, |
| | image_and_mask: dict[str, np.ndarray], |
| | image_resolution: int, |
| | ) -> list[PIL.Image.Image]: |
| | if image_and_mask is None: |
| | raise ValueError |
| |
|
| | image = image_and_mask["mask"] |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_softedge( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | preprocessor_name: str, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | if preprocessor_name == "None": |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| | elif preprocessor_name in ["HED", "HED safe"]: |
| | safe = "safe" in preprocessor_name |
| | self.preprocessor.load("HED") |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | scribble=safe, |
| | ) |
| | elif preprocessor_name in ["PidiNet", "PidiNet safe"]: |
| | safe = "safe" in preprocessor_name |
| | self.preprocessor.load("PidiNet") |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | safe=safe, |
| | ) |
| | else: |
| | raise ValueError |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_openpose( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | preprocessor_name: str, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | if preprocessor_name == "None": |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| | else: |
| | self.preprocessor.load("Openpose") |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | hand_and_face=True, |
| | ) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_segmentation( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | preprocessor_name: str, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | if preprocessor_name == "None": |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| | else: |
| | self.preprocessor.load(preprocessor_name) |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | ) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_depth( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | preprocessor_name: str, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | if preprocessor_name == "None": |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| | else: |
| | self.preprocessor.load(preprocessor_name) |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | ) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_normal( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | preprocessor_name: str, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | if preprocessor_name == "None": |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| | else: |
| | self.preprocessor.load("NormalBae") |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | ) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_lineart( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | preprocessor_name: str, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | if preprocessor_name in ["None", "None (anime)"]: |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| | elif preprocessor_name in ["Lineart", "Lineart coarse"]: |
| | coarse = "coarse" in preprocessor_name |
| | self.preprocessor.load("Lineart") |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | coarse=coarse, |
| | ) |
| | elif preprocessor_name == "Lineart (anime)": |
| | self.preprocessor.load("LineartAnime") |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | detect_resolution=preprocess_resolution, |
| | ) |
| |
|
| | if self.class_name == "StableDiffusionPipeline": |
| | if "anime" in preprocessor_name: |
| | self.load_controlnet_weight("lineart_anime") |
| | logger.info("Linear anime") |
| | else: |
| | self.load_controlnet_weight("lineart") |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_shuffle( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocessor_name: str, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | if preprocessor_name == "None": |
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| | else: |
| | self.preprocessor.load(preprocessor_name) |
| | control_image = self.preprocessor( |
| | image=image, |
| | image_resolution=image_resolution, |
| | ) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_ip2p( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | control_image = PIL.Image.fromarray(image) |
| |
|
| | return control_image |
| |
|
| | @torch.inference_mode() |
| | def process_inpaint( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | preprocess_resolution: int, |
| | image_mask: str, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | init_image = PIL.Image.fromarray(image) |
| |
|
| | image_mask = HWC3(image_mask) |
| | image_mask = resize_image(image_mask, resolution=image_resolution) |
| | control_mask = PIL.Image.fromarray(image_mask) |
| |
|
| | control_image = make_inpaint_condition(init_image, control_mask) |
| |
|
| | return init_image, control_mask, control_image |
| |
|
| | @torch.inference_mode() |
| | def process_img2img( |
| | self, |
| | image: np.ndarray, |
| | image_resolution: int, |
| | ) -> list[PIL.Image.Image]: |
| | if image is None: |
| | raise ValueError |
| |
|
| | image = HWC3(image) |
| | image = resize_image(image, resolution=image_resolution) |
| | init_image = PIL.Image.fromarray(image) |
| |
|
| | return init_image |
| |
|
| | def get_scheduler(self, name): |
| | if name in SCHEDULER_CONFIG_MAP: |
| | scheduler_class, config = SCHEDULER_CONFIG_MAP[name] |
| | |
| | |
| | return scheduler_class.from_config(self.default_scheduler.config, **config) |
| | else: |
| | raise ValueError(f"Scheduler with name {name} not found. Valid schedulers: {', '.join(scheduler_names)}") |
| |
|
| | def create_prompt_embeds( |
| | self, |
| | prompt, |
| | negative_prompt, |
| | textual_inversion, |
| | clip_skip, |
| | syntax_weights, |
| | ): |
| | if self.class_name == "StableDiffusionPipeline": |
| | if self.embed_loaded != textual_inversion and textual_inversion != []: |
| | |
| | for name, directory_name in textual_inversion: |
| | try: |
| | if directory_name.endswith(".pt"): |
| | model = torch.load(directory_name, map_location=self.device) |
| | model_tensors = model.get("string_to_param").get("*") |
| | s_model = {"emb_params": model_tensors} |
| | |
| | self.pipe.load_textual_inversion(s_model, token=name) |
| |
|
| | else: |
| | |
| | |
| | self.pipe.load_textual_inversion(directory_name, token=name) |
| | if not self.gui_active: |
| | logger.info(f"Applied : {name}") |
| |
|
| | except Exception as e: |
| | exception = str(e) |
| | if name in exception: |
| | logger.debug(f"Previous loaded embed {name}") |
| | else: |
| | logger.error(exception) |
| | logger.error(f"Can't apply embed {name}") |
| | self.embed_loaded = textual_inversion |
| |
|
| | |
| | |
| | if not hasattr(self, "compel"): |
| | self.compel = Compel( |
| | tokenizer=self.pipe.tokenizer, |
| | text_encoder=self.pipe.text_encoder, |
| | truncate_long_prompts=False, |
| | returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NORMALIZED if clip_skip else ReturnedEmbeddingsType.LAST_HIDDEN_STATES_NORMALIZED, |
| | ) |
| |
|
| | |
| | prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer) |
| | negative_prompt_ti = self.pipe.maybe_convert_prompt( |
| | negative_prompt, self.pipe.tokenizer |
| | ) |
| |
|
| | |
| | if self.embed_loaded != []: |
| | prompt_ti = add_comma_after_pattern_ti(prompt_ti) |
| | negative_prompt_ti = add_comma_after_pattern_ti(negative_prompt_ti) |
| |
|
| | |
| | self.pipe.to(self.device) |
| | if syntax_weights == "Classic": |
| | prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel) |
| | negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel) |
| | else: |
| | prompt_emb = get_embed_new(prompt_ti, self.pipe, self.compel, compel_process_sd=True) |
| | negative_prompt_emb = get_embed_new(negative_prompt_ti, self.pipe, self.compel, compel_process_sd=True) |
| |
|
| | |
| | if prompt_emb.shape != negative_prompt_emb.shape: |
| | ( |
| | prompt_emb, |
| | negative_prompt_emb, |
| | ) = self.compel.pad_conditioning_tensors_to_same_length( |
| | [prompt_emb, negative_prompt_emb] |
| | ) |
| |
|
| | return prompt_emb, negative_prompt_emb |
| |
|
| | else: |
| | |
| | if self.embed_loaded != textual_inversion and textual_inversion != []: |
| | |
| | for name, directory_name in textual_inversion: |
| | try: |
| | from safetensors.torch import load_file |
| | state_dict = load_file(directory_name) |
| | self.pipe.load_textual_inversion(state_dict["clip_g"], token=name, text_encoder=self.pipe.text_encoder_2, tokenizer=self.pipe.tokenizer_2) |
| | self.pipe.load_textual_inversion(state_dict["clip_l"], token=name, text_encoder=self.pipe.text_encoder, tokenizer=self.pipe.tokenizer) |
| | if not self.gui_active: |
| | logger.info(f"Applied : {name}") |
| | except Exception as e: |
| | exception = str(e) |
| | if name in exception: |
| | logger.debug(f"Previous loaded embed {name}") |
| | else: |
| | logger.error(exception) |
| | logger.error(f"Can't apply embed {name}") |
| | self.embed_loaded = textual_inversion |
| |
|
| | if not hasattr(self, "compel"): |
| | |
| | if clip_skip: |
| | |
| | self.compel = Compel( |
| | tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2], |
| | text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2], |
| | returned_embeddings_type=ReturnedEmbeddingsType.PENULTIMATE_HIDDEN_STATES_NON_NORMALIZED, |
| | requires_pooled=[False, True], |
| | truncate_long_prompts=False, |
| | ) |
| | else: |
| | |
| | self.compel = Compel( |
| | tokenizer=[self.pipe.tokenizer, self.pipe.tokenizer_2], |
| | text_encoder=[self.pipe.text_encoder, self.pipe.text_encoder_2], |
| | requires_pooled=[False, True], |
| | truncate_long_prompts=False, |
| | ) |
| |
|
| | |
| | try: |
| | prompt_ti = self.pipe.maybe_convert_prompt(prompt, self.pipe.tokenizer) |
| | negative_prompt_ti = self.pipe.maybe_convert_prompt(negative_prompt, self.pipe.tokenizer) |
| | except: |
| | prompt_ti = prompt |
| | negative_prompt_ti = negative_prompt |
| | logger.error("FAILED: Convert prompt for textual inversion") |
| |
|
| | |
| | if syntax_weights == "Classic": |
| | self.pipe.to("cuda") |
| | prompt_ti = get_embed_new(prompt_ti, self.pipe, self.compel, only_convert_string=True) |
| | negative_prompt_ti = get_embed_new(negative_prompt_ti, self.pipe, self.compel, only_convert_string=True) |
| | else: |
| | prompt_ti = prompt |
| | negative_prompt_ti = negative_prompt |
| |
|
| | conditioning, pooled = self.compel([prompt_ti, negative_prompt_ti]) |
| |
|
| | return conditioning, pooled |
| |
|
| |
|
| |
|
| | def process_lora(self, select_lora, lora_weights_scale, unload=False): |
| | device = "cuda" if torch.cuda.is_available() else "cpu" |
| | if not unload: |
| | if select_lora != None: |
| | try: |
| | self.pipe = lora_mix_load( |
| | self.pipe, |
| | select_lora, |
| | lora_weights_scale, |
| | device=device, |
| | dtype=self.type_model_precision, |
| | ) |
| | logger.info(select_lora) |
| | except Exception as e: |
| | logger.error(f"ERROR: LoRA not compatible: {select_lora}") |
| | logger.debug(f"{str(e)}") |
| | return self.pipe |
| | else: |
| | |
| | if select_lora != None: |
| | try: |
| | self.pipe = lora_mix_load( |
| | self.pipe, |
| | select_lora, |
| | -lora_weights_scale, |
| | device=device, |
| | dtype=self.type_model_precision, |
| | ) |
| | logger.debug(f"Unload LoRA: {select_lora}") |
| | except: |
| | pass |
| | return self.pipe |
| |
|
| | def load_style_file(self, style_json_file): |
| | if os.path.exists(style_json_file): |
| | try: |
| | file_json_read = get_json_content(style_json_file) |
| | self.styles_data = {k["name"]: (k["prompt"], k["negative_prompt"]) for k in file_json_read} |
| | self.STYLE_NAMES = list(self.styles_data.keys()) |
| | self.style_json_file = style_json_file |
| | logger.info(f"Styles json file loaded with {len(self.STYLE_NAMES)} styles") |
| | logger.debug(str(self.STYLE_NAMES)) |
| | except Exception as e: |
| | logger.error(str(e)) |
| | else: |
| | logger.error("Not found styles json file in directory") |
| |
|
| | def callback_pipe(self, iter, t, latents): |
| | |
| | with torch.no_grad(): |
| | latents = 1 / 0.18215 * latents |
| | image = self.pipe.vae.decode(latents).sample |
| |
|
| | image = (image / 2 + 0.5).clamp(0, 1) |
| |
|
| | |
| | image = image.cpu().permute(0, 2, 3, 1).float().numpy() |
| |
|
| | |
| | image = self.pipe.numpy_to_pil(image) |
| |
|
| | |
| | |
| | if self.preview_handle == None: |
| | self.preview_handle = display(image[0], display_id=True) |
| | else: |
| | self.preview_handle.update(image[0]) |
| |
|
| | def __call__( |
| | self, |
| | prompt: str = "", |
| | negative_prompt: str = "", |
| | img_height: int = 512, |
| | img_width: int = 512, |
| | num_images: int = 1, |
| | num_steps: int = 30, |
| | guidance_scale: float = 7.5, |
| | clip_skip: Optional[bool] = True, |
| | seed: int = -1, |
| | sampler: str = "DPM++ 2M", |
| | syntax_weights: str = "Classic", |
| | |
| | lora_A: Optional[str] = None, |
| | lora_scale_A: float = 1.0, |
| | lora_B: Optional[str] = None, |
| | lora_scale_B: float = 1.0, |
| | lora_C: Optional[str] = None, |
| | lora_scale_C: float = 1.0, |
| | lora_D: Optional[str] = None, |
| | lora_scale_D: float = 1.0, |
| | lora_E: Optional[str] = None, |
| | lora_scale_E: float = 1.0, |
| | textual_inversion: List[Tuple[str, str]] = [], |
| | FreeU: bool = False, |
| | adetailer_A: bool = False, |
| | adetailer_A_params: Dict[str, Any] = {}, |
| | adetailer_B: bool = False, |
| | adetailer_B_params: Dict[str, Any] = {}, |
| | style_prompt: Optional[Any] = [""], |
| | style_json_file: Optional[Any] = "", |
| | |
| | image: Optional[Any] = None, |
| | preprocessor_name: Optional[str] = "None", |
| | preprocess_resolution: int = 512, |
| | image_resolution: int = 512, |
| | image_mask: Optional[Any] = None, |
| | strength: float = 0.35, |
| | low_threshold: int = 100, |
| | high_threshold: int = 200, |
| | value_threshold: float = 0.1, |
| | distance_threshold: float = 0.1, |
| | controlnet_conditioning_scale: float = 1.0, |
| | control_guidance_start: float = 0.0, |
| | control_guidance_end: float = 1.0, |
| | t2i_adapter_preprocessor: bool = True, |
| | t2i_adapter_conditioning_scale: float = 1.0, |
| | t2i_adapter_conditioning_factor: float = 1.0, |
| | |
| | upscaler_model_path: Optional[str] = None, |
| | upscaler_increases_size: float = 1.5, |
| | esrgan_tile: int = 100, |
| | esrgan_tile_overlap: int = 10, |
| | hires_steps: int = 25, |
| | hires_denoising_strength: float = 0.35, |
| | hires_prompt: str = "", |
| | hires_negative_prompt: str = "", |
| | hires_sampler: str = "Use same sampler", |
| | |
| | loop_generation: int = 1, |
| | display_images: bool = False, |
| | save_generated_images: bool = True, |
| | image_storage_location: str = "./images", |
| | generator_in_cpu: bool = False, |
| | leave_progress_bar: bool = False, |
| | disable_progress_bar: bool = False, |
| | hires_before_adetailer: bool = False, |
| | hires_after_adetailer: bool = True, |
| | retain_compel_previous_load: bool = False, |
| | retain_detailfix_model_previous_load: bool = False, |
| | retain_hires_model_previous_load: bool = False, |
| | image_previews: bool = False, |
| | xformers_memory_efficient_attention: bool = False, |
| | gui_active: bool = False, |
| | ): |
| |
|
| | """ |
| | The call function for the generation. |
| | |
| | Args: |
| | prompt (str , optional): |
| | The prompt or prompts to guide image generation. |
| | negative_prompt (str , optional): |
| | The prompt or prompts to guide what to not include in image generation. Ignored when not using guidance (`guidance_scale < 1`). |
| | img_height (int, optional, defaults to 512): |
| | The height in pixels of the generated image. |
| | img_width (int, optional, defaults to 512): |
| | The width in pixels of the generated image. |
| | num_images (int, optional, defaults to 1): |
| | The number of images to generate per prompt. |
| | num_steps (int, optional, defaults to 30): |
| | The number of denoising steps. More denoising steps usually lead to a higher quality image at the |
| | expense of slower inference. |
| | guidance_scale (float, optional, defaults to 7.5): |
| | A higher guidance scale value encourages the model to generate images closely linked to the text |
| | `prompt` at the expense of lower image quality. Guidance scale is enabled when `guidance_scale > 1`. |
| | clip_skip (bool, optional): |
| | Number of layers to be skipped from CLIP while computing the prompt embeddings. It can be placed on |
| | the penultimate (True) or last layer (False). |
| | seed (int, optional, defaults to -1): |
| | A seed for controlling the randomness of the image generation process. -1 design a random seed. |
| | sampler (str, optional, defaults to "DPM++ 2M"): |
| | The sampler used for the generation process. Available samplers: DPM++ 2M, DPM++ 2M Karras, DPM++ 2M SDE, |
| | DPM++ 2M SDE Karras, DPM++ SDE, DPM++ SDE Karras, DPM2, DPM2 Karras, Euler, Euler a, Heun, LMS, LMS Karras, |
| | DDIM, DEIS, UniPC, DPM2 a, DPM2 a Karras, PNDM, LCM, DPM++ 2M Lu, DPM++ 2M Ef, DPM++ 2M SDE Lu and DPM++ 2M SDE Ef. |
| | syntax_weights (str, optional, defaults to "Classic"): |
| | Specifies the type of syntax weights used during generation. "Classic" is (word:weight), "Compel" is (word)weight |
| | lora_A (str, optional): |
| | Placeholder for lora A parameter. |
| | lora_scale_A (float, optional, defaults to 1.0): |
| | Placeholder for lora scale A parameter. |
| | lora_B (str, optional): |
| | Placeholder for lora B parameter. |
| | lora_scale_B (float, optional, defaults to 1.0): |
| | Placeholder for lora scale B parameter. |
| | lora_C (str, optional): |
| | Placeholder for lora C parameter. |
| | lora_scale_C (float, optional, defaults to 1.0): |
| | Placeholder for lora scale C parameter. |
| | lora_D (str, optional): |
| | Placeholder for lora D parameter. |
| | lora_scale_D (float, optional, defaults to 1.0): |
| | Placeholder for lora scale D parameter. |
| | lora_E (str, optional): |
| | Placeholder for lora E parameter. |
| | lora_scale_E (float, optional, defaults to 1.0): |
| | Placeholder for lora scale E parameter. |
| | textual_inversion (List[Tuple[str, str]], optional, defaults to []): |
| | Placeholder for textual inversion list of tuples. Help the model to adapt to a particular |
| | style. [("<token_activation>","<path_embeding>"),...] |
| | FreeU (bool, optional, defaults to False): |
| | Is a method that substantially improves diffusion model sample quality at no costs. |
| | adetailer_A (bool, optional, defaults to False): |
| | Guided Inpainting to Correct Image, it is preferable to use low values for strength. |
| | adetailer_A_params (Dict[str, Any], optional, defaults to {}): |
| | Placeholder for adetailer_A parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}. |
| | If not specified, default values will be used: |
| | - face_detector_ad (bool): Indicates whether face detection is enabled. Defaults to True. |
| | - person_detector_ad (bool): Indicates whether person detection is enabled. Defaults to True. |
| | - hand_detector_ad (bool): Indicates whether hand detection is enabled. Defaults to False. |
| | - prompt (str): A prompt for the adetailer_A. Defaults to an empty string. |
| | - negative_prompt (str): A negative prompt for the adetailer_A. Defaults to an empty string. |
| | - strength (float): The strength parameter value. Defaults to 0.35. |
| | - mask_dilation (int): The mask dilation value. Defaults to 4. |
| | - mask_blur (int): The mask blur value. Defaults to 4. |
| | - mask_padding (int): The mask padding value. Defaults to 32. |
| | - inpaint_only (bool): Indicates if only inpainting is to be performed. Defaults to True. False is img2img mode |
| | - sampler (str): The sampler type to be used. Defaults to "Use same sampler". |
| | adetailer_B (bool, optional, defaults to False): |
| | Guided Inpainting to Correct Image, it is preferable to use low values for strength. |
| | adetailer_B_params (Dict[str, Any], optional, defaults to {}): |
| | Placeholder for adetailer_B parameters in a dict example {"prompt": "my prompt", "inpaint_only": True ...}. |
| | If not specified, default values will be used. |
| | style_prompt (str, optional): |
| | If a style that is in STYLE_NAMES is specified, it will be added to the original prompt and negative prompt. |
| | style_json_file (str, optional): |
| | JSON with styles to be applied and used in style_prompt. |
| | upscaler_model_path (str, optional): |
| | Placeholder for upscaler model path. |
| | upscaler_increases_size (float, optional, defaults to 1.5): |
| | Placeholder for upscaler increases size parameter. |
| | esrgan_tile (int, optional, defaults to 100): |
| | Tile if use a ESRGAN model. |
| | esrgan_tile_overlap (int, optional, defaults to 100): |
| | Tile overlap if use a ESRGAN model. |
| | hires_steps (int, optional, defaults to 25): |
| | The number of denoising steps for hires. More denoising steps usually lead to a higher quality image at the |
| | expense of slower inference. |
| | hires_denoising_strength (float, optional, defaults to 0.35): |
| | Strength parameter for the hires. |
| | hires_prompt (str , optional): |
| | The prompt for hires. If not specified, the main prompt will be used. |
| | hires_negative_prompt (str , optional): |
| | The negative prompt for hires. If not specified, the main negative prompt will be used. |
| | hires_sampler (str, optional, defaults to "Use same sampler"): |
| | The sampler used for the hires generation process. If not specified, the main sampler will be used. |
| | image (Any, optional): |
| | The image to be used for the Inpaint, ControlNet, or T2I adapter. |
| | preprocessor_name (str, optional, defaults to "None"): |
| | Preprocessor name for ControlNet. |
| | preprocess_resolution (int, optional, defaults to 512): |
| | Preprocess resolution for the Inpaint, ControlNet, or T2I adapter. |
| | image_resolution (int, optional, defaults to 512): |
| | Image resolution for the Img2Img, Inpaint, ControlNet, or T2I adapter. |
| | image_mask (Any, optional): |
| | Path image mask for the Inpaint. |
| | strength (float, optional, defaults to 0.35): |
| | Strength parameter for the Inpaint and Img2Img. |
| | low_threshold (int, optional, defaults to 100): |
| | Low threshold parameter for ControlNet and T2I Adapter Canny. |
| | high_threshold (int, optional, defaults to 200): |
| | High threshold parameter for ControlNet and T2I Adapter Canny. |
| | value_threshold (float, optional, defaults to 0.1): |
| | Value threshold parameter for ControlNet MLSD. |
| | distance_threshold (float, optional, defaults to 0.1): |
| | Distance threshold parameter for ControlNet MLSD. |
| | controlnet_conditioning_scale (float, optional, defaults to 1.0): |
| | The outputs of the ControlNet are multiplied by `controlnet_conditioning_scale` before they are added |
| | to the residual in the original `unet`. Used in ControlNet and Inpaint |
| | control_guidance_start (float, optional, defaults to 0.0): |
| | The percentage of total steps at which the ControlNet starts applying. Used in ControlNet and Inpaint |
| | control_guidance_end (float, optional, defaults to 1.0): |
| | The percentage of total steps at which the ControlNet stops applying. Used in ControlNet and Inpaint |
| | t2i_adapter_preprocessor (bool, optional, defaults to True): |
| | Preprocessor for the image in sdxl_canny by default is True. |
| | t2i_adapter_conditioning_scale (float, optional, defaults to 1.0): |
| | The outputs of the adapter are multiplied by `t2i_adapter_conditioning_scale` before they are added to the |
| | residual in the original unet. |
| | t2i_adapter_conditioning_factor (float, optional, defaults to 1.0): |
| | The fraction of timesteps for which adapter should be applied. If `t2i_adapter_conditioning_factor` is |
| | `0.0`, adapter is not applied at all. If `t2i_adapter_conditioning_factor` is `1.0`, adapter is applied for |
| | all timesteps. If `t2i_adapter_conditioning_factor` is `0.5`, adapter is applied for half of the timesteps. |
| | loop_generation (int, optional, defaults to 1): |
| | The number of times the specified `num_images` will be generated. |
| | display_images (bool, optional, defaults to False): |
| | If you use a notebook, you will be able to display the images generated with this parameter. |
| | save_generated_images (bool, optional, defaults to True): |
| | By default, the generated images are saved in the current location within the 'images' folder. You can disable this with this parameter. |
| | image_storage_location (str , optional, defaults to "./images"): |
| | The directory where the generated images are saved. |
| | generator_in_cpu (bool, optional, defaults to False): |
| | The generator by default is specified on the GPU. To obtain more consistent results across various environments, |
| | it is preferable to use the generator on the CPU. |
| | leave_progress_bar (bool, optional, defaults to False): |
| | Leave the progress bar after generating the image. |
| | disable_progress_bar (bool, optional, defaults to False): |
| | Do not display the progress bar during image generation. |
| | hires_before_adetailer (bool, optional, defaults to False): |
| | Apply an upscale and high-resolution fix before adetailer. |
| | hires_after_adetailer (bool, optional, defaults to True): |
| | Apply an upscale and high-resolution fix after adetailer. |
| | retain_compel_previous_load (bool, optional, defaults to False): |
| | The previous compel remains preloaded in memory. |
| | retain_detailfix_model_previous_load (bool, optional, defaults to False): |
| | The previous adetailer model remains preloaded in memory. |
| | retain_hires_model_previous_load (bool, optional, defaults to False): |
| | The previous hires model remains preloaded in memory. |
| | image_previews (bool, optional, defaults to False): |
| | Displaying the image denoising process. |
| | xformers_memory_efficient_attention (bool, optional, defaults to False): |
| | Improves generation time, currently disabled. |
| | gui_active (bool, optional, defaults to False): |
| | utility when used with a GUI, it changes the behavior especially by displaying confirmation messages or options. |
| | |
| | Specific parameter usage details: |
| | |
| | Additional parameters that will be used in Inpaint: |
| | - image |
| | - image_mask |
| | - image_resolution |
| | - strength |
| | for SD 1.5: |
| | - controlnet_conditioning_scale |
| | - control_guidance_start |
| | - control_guidance_end |
| | |
| | Additional parameters that will be used in img2img: |
| | - image |
| | - image_resolution |
| | - strength |
| | |
| | Additional parameters that will be used in ControlNet for SD 1.5 depending on the task: |
| | - image |
| | - preprocessor_name |
| | - preprocess_resolution |
| | - image_resolution |
| | - controlnet_conditioning_scale |
| | - control_guidance_start |
| | - control_guidance_end |
| | for Canny: |
| | - low_threshold |
| | - high_threshold |
| | for MLSD: |
| | - value_threshold |
| | - distance_threshold |
| | |
| | Additional parameters that will be used in T2I adapter for SDXL depending on the task: |
| | - image |
| | - preprocess_resolution |
| | - image_resolution |
| | - t2i_adapter_preprocessor |
| | - t2i_adapter_conditioning_scale |
| | - t2i_adapter_conditioning_factor |
| | |
| | """ |
| |
|
| | if self.task_name != "txt2img" and image == None: |
| | raise ValueError( |
| | "You need to specify the <image> for this task." |
| | ) |
| | if img_height % 8 != 0: |
| | img_height = img_height + (8 - img_height % 8) |
| | logger.warning(f"Height must be divisible by 8, changed to {str(img_height)}") |
| | if img_width % 8 != 0: |
| | img_width = img_width + (8 - img_width % 8) |
| | logger.warning(f"Width must be divisible by 8, changed to {str(img_width)}") |
| | if image_resolution % 8 != 0: |
| | image_resolution = image_resolution + (8 - image_resolution % 8) |
| | logger.warning(f"Image resolution must be divisible by 8, changed to {str(image_resolution)}") |
| | if control_guidance_start >= control_guidance_end: |
| | logger.error( |
| | "Control guidance start (ControlNet Start Threshold) cannot be larger or equal to control guidance end (ControlNet Stop Threshold). The default values 0.0 and 1.0 will be used." |
| | ) |
| | control_guidance_start, control_guidance_end = 0.0, 1.0 |
| |
|
| | self.gui_active = gui_active |
| | self.image_previews = image_previews |
| |
|
| | if self.pipe == None: |
| | self.load_pipe( |
| | self.base_model_id, |
| | task_name=self.task_name, |
| | vae_model=self.vae_model, |
| | reload=True, |
| | ) |
| |
|
| | self.pipe.set_progress_bar_config(leave=leave_progress_bar) |
| | self.pipe.set_progress_bar_config(disable=disable_progress_bar) |
| |
|
| | xformers_memory_efficient_attention=False |
| | if xformers_memory_efficient_attention and torch.cuda.is_available(): |
| | self.pipe.disable_xformers_memory_efficient_attention() |
| | self.pipe.to(self.device) |
| |
|
| | |
| | if style_json_file != "" and style_json_file != self.style_json_file: |
| | self.load_style_file(style_json_file) |
| | |
| | if isinstance(style_prompt, str): |
| | style_prompt = [style_prompt] |
| | if style_prompt != [""]: |
| | prompt, negative_prompt = apply_style(style_prompt, prompt, negative_prompt, self.styles_data, self.STYLE_NAMES) |
| |
|
| | |
| | if self.lora_memory == [ |
| | lora_A, |
| | lora_B, |
| | lora_C, |
| | lora_D, |
| | lora_E, |
| | ] and self.lora_scale_memory == [ |
| | lora_scale_A, |
| | lora_scale_B, |
| | lora_scale_C, |
| | lora_scale_D, |
| | lora_scale_E, |
| | ]: |
| | for single_lora in self.lora_memory: |
| | if single_lora != None: |
| | logger.info(f"LoRA in memory: {single_lora}") |
| | pass |
| |
|
| | else: |
| | logger.debug("_un, re and load_ lora") |
| | self.pipe = self.process_lora( |
| | self.lora_memory[0], self.lora_scale_memory[0], unload=True |
| | ) |
| | self.pipe = self.process_lora( |
| | self.lora_memory[1], self.lora_scale_memory[1], unload=True |
| | ) |
| | self.pipe = self.process_lora( |
| | self.lora_memory[2], self.lora_scale_memory[2], unload=True |
| | ) |
| | self.pipe = self.process_lora( |
| | self.lora_memory[3], self.lora_scale_memory[3], unload=True |
| | ) |
| | self.pipe = self.process_lora( |
| | self.lora_memory[4], self.lora_scale_memory[4], unload=True |
| | ) |
| |
|
| | self.pipe = self.process_lora(lora_A, lora_scale_A) |
| | self.pipe = self.process_lora(lora_B, lora_scale_B) |
| | self.pipe = self.process_lora(lora_C, lora_scale_C) |
| | self.pipe = self.process_lora(lora_D, lora_scale_D) |
| | self.pipe = self.process_lora(lora_E, lora_scale_E) |
| |
|
| | self.lora_memory = [lora_A, lora_B, lora_C, lora_D, lora_E] |
| | self.lora_scale_memory = [ |
| | lora_scale_A, |
| | lora_scale_B, |
| | lora_scale_C, |
| | lora_scale_D, |
| | lora_scale_E, |
| | ] |
| |
|
| | |
| | if sampler == "LCM" and self.LCMconfig == None: |
| | if self.class_name == "StableDiffusionPipeline": |
| | adapter_id = "latent-consistency/lcm-lora-sdv1-5" |
| | elif self.class_name == "StableDiffusionXLPipeline": |
| | adapter_id = "latent-consistency/lcm-lora-sdxl" |
| |
|
| | self.process_lora(adapter_id, 1.0) |
| | self.LCMconfig = adapter_id |
| | logger.info("LCM") |
| | elif sampler != "LCM" and self.LCMconfig != None: |
| | self.process_lora(self.LCMconfig, 1.0, unload=True) |
| | self.LCMconfig = None |
| | elif self.LCMconfig != None: |
| | logger.info("LCM") |
| |
|
| | |
| | if FreeU: |
| | logger.info("FreeU active") |
| | if self.class_name == "StableDiffusionPipeline": |
| | |
| | self.pipe.enable_freeu(s1=0.9, s2=0.2, b1=1.2, b2=1.4) |
| | else: |
| | |
| | self.pipe.enable_freeu(s1=0.6, s2=0.4, b1=1.1, b2=1.2) |
| | self.FreeU = True |
| | elif self.FreeU: |
| | self.pipe.disable_freeu() |
| | self.FreeU = False |
| |
|
| | |
| | if hasattr(self, "compel") and not retain_compel_previous_load: |
| | del self.compel |
| |
|
| | prompt_emb, negative_prompt_emb = self.create_prompt_embeds( |
| | prompt=prompt, |
| | negative_prompt=negative_prompt, |
| | textual_inversion=textual_inversion, |
| | clip_skip=clip_skip, |
| | syntax_weights=syntax_weights, |
| | ) |
| |
|
| | if self.class_name != "StableDiffusionPipeline": |
| | |
| | conditioning, pooled = prompt_emb.clone(), negative_prompt_emb.clone() |
| | prompt_emb = negative_prompt_emb = None |
| |
|
| |
|
| | if torch.cuda.is_available() and xformers_memory_efficient_attention: |
| | if xformers_memory_efficient_attention: |
| | self.pipe.enable_xformers_memory_efficient_attention() |
| | else: |
| | self.pipe.disable_xformers_memory_efficient_attention() |
| |
|
| | try: |
| | |
| | self.pipe.scheduler = self.get_scheduler(sampler) |
| | except Exception as e: |
| | logger.debug(f"{e}") |
| | logger.warning(f"Error in sampler, please try again") |
| | |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| | return |
| |
|
| | self.pipe.safety_checker = None |
| |
|
| | |
| | if self.task_name != "txt2img": |
| | if isinstance(image, str): |
| | |
| | image_pil = Image.open(image) |
| | numpy_array = np.array(image_pil, dtype=np.uint8) |
| | elif isinstance(image, Image.Image): |
| | |
| | numpy_array = np.array(image, dtype=np.uint8) |
| | elif isinstance(image, np.ndarray): |
| | |
| | numpy_array = image.astype(np.uint8) |
| | else: |
| | if gui_active: |
| | logger.info( |
| | "Not found image" |
| | ) |
| | return |
| | else: |
| | raise ValueError( |
| | "Unsupported image type or not control image found; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive" |
| | ) |
| |
|
| | |
| | try: |
| | array_rgb = numpy_array[:, :, :3] |
| | except: |
| | logger.error("Unsupported image type") |
| | raise ValueError( |
| | "Unsupported image type; Bug report to https://github.com/R3gm/stablepy or https://github.com/R3gm/SD_diffusers_interactive" |
| | ) |
| |
|
| | |
| | preprocess_params_config = {} |
| | if self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | preprocess_params_config["image"] = array_rgb |
| | preprocess_params_config["image_resolution"] = image_resolution |
| |
|
| | if self.task_name != "ip2p": |
| | if self.task_name != "shuffle": |
| | preprocess_params_config[ |
| | "preprocess_resolution" |
| | ] = preprocess_resolution |
| | if self.task_name != "mlsd" and self.task_name != "canny": |
| | preprocess_params_config["preprocessor_name"] = preprocessor_name |
| |
|
| | |
| | if self.task_name == "inpaint": |
| | |
| | if gui_active or os.path.exists(str(image_mask)): |
| | |
| | mask_control_img = Image.open(image_mask) |
| | numpy_array_mask = np.array(mask_control_img, dtype=np.uint8) |
| | array_rgb_mask = numpy_array_mask[:, :, :3] |
| | elif not gui_active: |
| | |
| | import base64 |
| | import matplotlib.pyplot as plt |
| | name_without_extension = os.path.splitext(image.split("/")[-1])[0] |
| | image64 = base64.b64encode(open(image, "rb").read()) |
| | image64 = image64.decode("utf-8") |
| | img = np.array(plt.imread(f"{image}")[:, :, :3]) |
| |
|
| | |
| | logger.info(f"Draw the mask on this canvas using the mouse. When you finish, press 'Finish' in the bottom side of the canvas.") |
| | draw( |
| | image64, |
| | filename=f"./{name_without_extension}_draw.png", |
| | w=img.shape[1], |
| | h=img.shape[0], |
| | line_width=0.04 * img.shape[1], |
| | ) |
| |
|
| | |
| | with_mask = np.array( |
| | plt.imread(f"./{name_without_extension}_draw.png")[:, :, :3] |
| | ) |
| | mask = ( |
| | (with_mask[:, :, 0] == 1) |
| | * (with_mask[:, :, 1] == 0) |
| | * (with_mask[:, :, 2] == 0) |
| | ) |
| | plt.imsave(f"./{name_without_extension}_mask.png", mask, cmap="gray") |
| | mask_control = f"./{name_without_extension}_mask.png" |
| | logger.info(f"Mask saved: {mask_control}") |
| |
|
| | |
| | mask_control_img = Image.open(mask_control) |
| | numpy_array_mask = np.array(mask_control_img, dtype=np.uint8) |
| | array_rgb_mask = numpy_array_mask[:, :, :3] |
| | else: |
| | raise ValueError("No images found") |
| |
|
| | init_image, control_mask, control_image = self.process_inpaint( |
| | image=array_rgb, |
| | image_resolution=image_resolution, |
| | preprocess_resolution=preprocess_resolution, |
| | image_mask=array_rgb_mask, |
| | ) |
| |
|
| | elif self.task_name == "openpose": |
| | logger.info("Openpose") |
| | control_image = self.process_openpose(**preprocess_params_config) |
| |
|
| | elif self.task_name == "canny": |
| | logger.info("Canny") |
| | control_image = self.process_canny( |
| | **preprocess_params_config, |
| | low_threshold=low_threshold, |
| | high_threshold=high_threshold, |
| | ) |
| |
|
| | elif self.task_name == "mlsd": |
| | logger.info("MLSD") |
| | control_image = self.process_mlsd( |
| | **preprocess_params_config, |
| | value_threshold=value_threshold, |
| | distance_threshold=distance_threshold, |
| | ) |
| |
|
| | elif self.task_name == "scribble": |
| | logger.info("Scribble") |
| | control_image = self.process_scribble(**preprocess_params_config) |
| |
|
| | elif self.task_name == "softedge": |
| | logger.info("Softedge") |
| | control_image = self.process_softedge(**preprocess_params_config) |
| |
|
| | elif self.task_name == "segmentation": |
| | logger.info("Segmentation") |
| | control_image = self.process_segmentation(**preprocess_params_config) |
| |
|
| | elif self.task_name == "depth": |
| | logger.info("Depth") |
| | control_image = self.process_depth(**preprocess_params_config) |
| |
|
| | elif self.task_name == "normalbae": |
| | logger.info("NormalBae") |
| | control_image = self.process_normal(**preprocess_params_config) |
| |
|
| | elif self.task_name == "lineart": |
| | logger.info("Lineart") |
| | control_image = self.process_lineart(**preprocess_params_config) |
| |
|
| | elif self.task_name == "shuffle": |
| | logger.info("Shuffle") |
| | control_image = self.process_shuffle(**preprocess_params_config) |
| |
|
| | elif self.task_name == "ip2p": |
| | logger.info("Ip2p") |
| | control_image = self.process_ip2p(**preprocess_params_config) |
| |
|
| | elif self.task_name == "img2img": |
| | preprocess_params_config["image"] = array_rgb |
| | preprocess_params_config["image_resolution"] = image_resolution |
| | init_image = self.process_img2img(**preprocess_params_config) |
| |
|
| | |
| | if self.class_name == "StableDiffusionXLPipeline": |
| | |
| | preprocess_params_config_xl = {} |
| | if self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | preprocess_params_config_xl["image"] = array_rgb |
| | preprocess_params_config_xl["preprocess_resolution"] = preprocess_resolution |
| | preprocess_params_config_xl["image_resolution"] = image_resolution |
| | |
| |
|
| | if self.task_name == "sdxl_canny": |
| | logger.info("SDXL Canny: Preprocessor active by default") |
| | control_image = self.process_canny( |
| | **preprocess_params_config_xl, |
| | low_threshold=low_threshold, |
| | high_threshold=high_threshold, |
| | ) |
| | elif self.task_name == "sdxl_openpose": |
| | logger.info("SDXL Openpose") |
| | control_image = self.process_openpose( |
| | preprocessor_name = "Openpose" if t2i_adapter_preprocessor else "None", |
| | **preprocess_params_config_xl, |
| | ) |
| | elif self.task_name == "sdxl_sketch": |
| | logger.info("SDXL Scribble") |
| | control_image = self.process_scribble( |
| | preprocessor_name = "PidiNet" if t2i_adapter_preprocessor else "None", |
| | **preprocess_params_config_xl, |
| | ) |
| | elif self.task_name == "sdxl_depth-midas": |
| | logger.info("SDXL Depth") |
| | control_image = self.process_depth( |
| | preprocessor_name = "Midas" if t2i_adapter_preprocessor else "None", |
| | **preprocess_params_config_xl, |
| | ) |
| | elif self.task_name == "sdxl_lineart": |
| | logger.info("SDXL Lineart") |
| | control_image = self.process_lineart( |
| | preprocessor_name = "Lineart" if t2i_adapter_preprocessor else "None", |
| | **preprocess_params_config_xl, |
| | ) |
| |
|
| | |
| | if self.class_name == "StableDiffusionPipeline": |
| | |
| | pipe_params_config = { |
| | "prompt": None, |
| | "negative_prompt": None, |
| | "prompt_embeds": prompt_emb, |
| | "negative_prompt_embeds": negative_prompt_emb, |
| | "num_images": num_images, |
| | "num_steps": num_steps, |
| | "guidance_scale": guidance_scale, |
| | "clip_skip": None, |
| | } |
| | else: |
| | |
| | pipe_params_config = { |
| | "prompt" : None, |
| | "negative_prompt" : None, |
| | "num_inference_steps" : num_steps, |
| | "guidance_scale" : guidance_scale, |
| | "clip_skip" : None, |
| | "num_images_per_prompt" : num_images, |
| | } |
| |
|
| | |
| | if self.class_name == "StableDiffusionXLPipeline": |
| | |
| | if self.task_name == "txt2img": |
| | pipe_params_config["height"] = img_height |
| | pipe_params_config["width"] = img_width |
| | elif self.task_name == "inpaint": |
| | pipe_params_config["strength"] = strength |
| | pipe_params_config["image"] = init_image |
| | pipe_params_config["mask_image"] = control_mask |
| | logger.info(f"Image resolution: {str(init_image.size)}") |
| | elif self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | pipe_params_config["image"] = control_image |
| | pipe_params_config["adapter_conditioning_scale"] = t2i_adapter_conditioning_scale |
| | pipe_params_config["adapter_conditioning_factor"] = t2i_adapter_conditioning_factor |
| | logger.info(f"Image resolution: {str(control_image.size)}") |
| | elif self.task_name == "img2img": |
| | pipe_params_config["strength"] = strength |
| | pipe_params_config["image"] = init_image |
| | logger.info(f"Image resolution: {str(init_image.size)}") |
| | elif self.task_name == "txt2img": |
| | pipe_params_config["height"] = img_height |
| | pipe_params_config["width"] = img_width |
| | elif self.task_name == "inpaint": |
| | pipe_params_config["strength"] = strength |
| | pipe_params_config["init_image"] = init_image |
| | pipe_params_config["control_mask"] = control_mask |
| | pipe_params_config["control_image"] = control_image |
| | pipe_params_config[ |
| | "controlnet_conditioning_scale" |
| | ] = controlnet_conditioning_scale |
| | pipe_params_config["control_guidance_start"] = control_guidance_start |
| | pipe_params_config["control_guidance_end"] = control_guidance_end |
| | logger.info(f"Image resolution: {str(init_image.size)}") |
| | elif self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | pipe_params_config["control_image"] = control_image |
| | pipe_params_config[ |
| | "controlnet_conditioning_scale" |
| | ] = controlnet_conditioning_scale |
| | pipe_params_config["control_guidance_start"] = control_guidance_start |
| | pipe_params_config["control_guidance_end"] = control_guidance_end |
| | logger.info(f"Image resolution: {str(control_image.size)}") |
| | elif self.task_name == "img2img": |
| | pipe_params_config["strength"] = strength |
| | pipe_params_config["init_image"] = init_image |
| | logger.info(f"Image resolution: {str(init_image.size)}") |
| |
|
| | |
| | if adetailer_A or adetailer_B: |
| |
|
| | |
| | default_params_detailfix = { |
| | "face_detector_ad" : True, |
| | "person_detector_ad" : True, |
| | "hand_detector_ad" : False, |
| | "prompt": "", |
| | "negative_prompt" : "", |
| | "strength" : 0.35, |
| | "mask_dilation" : 4, |
| | "mask_blur" : 4, |
| | "mask_padding" : 32, |
| | |
| | |
| | } |
| |
|
| | |
| | if not hasattr(self, "detailfix_pipe") or not retain_detailfix_model_previous_load: |
| | if adetailer_A_params.get("inpaint_only", False) == True or adetailer_B_params.get("inpaint_only", False) == True: |
| | detailfix_pipe = custom_task_model_loader( |
| | pipe=self.pipe, |
| | model_category="detailfix", |
| | task_name=self.task_name, |
| | torch_dtype=self.type_model_precision |
| | ) |
| | else: |
| | detailfix_pipe = custom_task_model_loader( |
| | pipe=self.pipe, |
| | model_category="detailfix_img2img", |
| | task_name=self.task_name, |
| | torch_dtype=self.type_model_precision |
| | ) |
| | if hasattr(self, "detailfix_pipe"): |
| | del self.detailfix_pipe |
| | if retain_detailfix_model_previous_load: |
| | if hasattr(self, "detailfix_pipe"): |
| | detailfix_pipe = self.detailfix_pipe |
| | else: |
| | self.detailfix_pipe = detailfix_pipe |
| | adetailer_A_params.pop("inpaint_only", None) |
| | adetailer_B_params.pop("inpaint_only", None) |
| |
|
| | |
| | detailfix_pipe.default_scheduler = copy.deepcopy(self.default_scheduler) |
| | if adetailer_A_params.get("sampler", "Use same sampler") != "Use same sampler": |
| | logger.debug("detailfix_pipe will use the sampler from adetailer_A") |
| | detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"]) |
| | adetailer_A_params.pop("sampler", None) |
| | if adetailer_B_params.get("sampler", "Use same sampler") != "Use same sampler": |
| | logger.debug("detailfix_pipe will use the sampler from adetailer_B") |
| | detailfix_pipe.scheduler = self.get_scheduler(adetailer_A_params["sampler"]) |
| | adetailer_B_params.pop("sampler", None) |
| |
|
| | detailfix_pipe.set_progress_bar_config(leave=leave_progress_bar) |
| | detailfix_pipe.set_progress_bar_config(disable=disable_progress_bar) |
| | detailfix_pipe.to(self.device) |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| |
|
| | if adetailer_A: |
| | for key_param, default_value in default_params_detailfix.items(): |
| | if key_param not in adetailer_A_params: |
| | adetailer_A_params[key_param] = default_value |
| | elif type(default_value) != type(adetailer_A_params[key_param]): |
| | logger.warning(f"DetailFix A: Error type param, set default {str(key_param)}") |
| | adetailer_A_params[key_param] = default_value |
| |
|
| | detailfix_params_A = { |
| | "prompt": adetailer_A_params["prompt"], |
| | "negative_prompt" : adetailer_A_params["negative_prompt"], |
| | "strength" : adetailer_A_params["strength"], |
| | "num_inference_steps" : num_steps, |
| | "guidance_scale" : guidance_scale, |
| | } |
| |
|
| | |
| | adetailer_A_params.pop('strength', None) |
| | adetailer_A_params.pop('prompt', None) |
| | adetailer_A_params.pop('negative_prompt', None) |
| |
|
| | |
| | prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A, prompt_df_A, negative_prompt_df_A = process_prompts_valid( |
| | detailfix_params_A["prompt"], detailfix_params_A["negative_prompt"], prompt, negative_prompt |
| | ) |
| |
|
| | |
| | if self.class_name == "StableDiffusionPipeline": |
| | |
| | |
| | |
| | |
| |
|
| | if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A: |
| | detailfix_params_A["prompt_embeds"] = prompt_emb |
| | detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb |
| | else: |
| | prompt_emb_ad, negative_prompt_emb_ad = self.create_prompt_embeds( |
| | prompt=prompt_df_A, |
| | negative_prompt=negative_prompt_df_A, |
| | textual_inversion=textual_inversion, |
| | clip_skip=clip_skip, |
| | syntax_weights=syntax_weights, |
| | ) |
| | detailfix_params_A["prompt_embeds"] = prompt_emb_ad |
| | detailfix_params_A["negative_prompt_embeds"] = negative_prompt_emb_ad |
| |
|
| | detailfix_params_A["prompt"] = None |
| | detailfix_params_A["negative_prompt"] = None |
| |
|
| | else: |
| | |
| | if prompt_empty_detailfix_A and negative_prompt_empty_detailfix_A: |
| | conditioning_detailfix_A, pooled_detailfix_A = conditioning, pooled |
| | else: |
| | conditioning_detailfix_A, pooled_detailfix_A = self.create_prompt_embeds( |
| | prompt=prompt_df_A, |
| | negative_prompt=negative_prompt_df_A, |
| | textual_inversion=textual_inversion, |
| | clip_skip=clip_skip, |
| | syntax_weights=syntax_weights, |
| | ) |
| |
|
| | detailfix_params_A.pop('prompt', None) |
| | detailfix_params_A.pop('negative_prompt', None) |
| | detailfix_params_A["prompt_embeds"] = conditioning_detailfix_A[0:1] |
| | detailfix_params_A["pooled_prompt_embeds"] = pooled_detailfix_A[0:1] |
| | detailfix_params_A["negative_prompt_embeds"] = conditioning_detailfix_A[1:2] |
| | detailfix_params_A["negative_pooled_prompt_embeds"] = pooled_detailfix_A[1:2] |
| |
|
| | logger.debug(f"detailfix A prompt empty {prompt_empty_detailfix_A, negative_prompt_empty_detailfix_A}") |
| | if not prompt_empty_detailfix_A or not negative_prompt_empty_detailfix_A: |
| | logger.debug(f"Prompts detailfix A {prompt_df_A, negative_prompt_df_A}") |
| | logger.debug(f"Pipe params detailfix A \n{detailfix_params_A}") |
| | logger.debug(f"Params detailfix A \n{adetailer_A_params}") |
| |
|
| | if adetailer_B: |
| | for key_param, default_value in default_params_detailfix.items(): |
| | if key_param not in adetailer_B_params: |
| | adetailer_B_params[key_param] = default_value |
| | elif type(default_value) != type(adetailer_B_params[key_param]): |
| | logger.warning(f"DetailfFix B: Error type param, set default {str(key_param)}") |
| | adetailer_B_params[key_param] = default_value |
| |
|
| | detailfix_params_B = { |
| | "prompt": adetailer_B_params["prompt"], |
| | "negative_prompt" : adetailer_B_params["negative_prompt"], |
| | "strength" : adetailer_B_params["strength"], |
| | "num_inference_steps" : num_steps, |
| | "guidance_scale" : guidance_scale, |
| | } |
| |
|
| | |
| | adetailer_B_params.pop('strength', None) |
| | adetailer_B_params.pop('prompt', None) |
| | adetailer_B_params.pop('negative_prompt', None) |
| |
|
| | |
| | prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B, prompt_df_B, negative_prompt_df_B = process_prompts_valid( |
| | detailfix_params_B["prompt"], detailfix_params_B["negative_prompt"], prompt, negative_prompt |
| | ) |
| |
|
| | |
| | if self.class_name == "StableDiffusionPipeline": |
| | |
| | |
| | |
| | |
| |
|
| | if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B: |
| | detailfix_params_B["prompt_embeds"] = prompt_emb |
| | detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb |
| | else: |
| | prompt_emb_ad_b, negative_prompt_emb_ad_b = self.create_prompt_embeds( |
| | prompt=prompt_df_B, |
| | negative_prompt=negative_prompt_df_B, |
| | textual_inversion=textual_inversion, |
| | clip_skip=clip_skip, |
| | syntax_weights=syntax_weights, |
| | ) |
| | detailfix_params_B["prompt_embeds"] = prompt_emb_ad_b |
| | detailfix_params_B["negative_prompt_embeds"] = negative_prompt_emb_ad_b |
| | detailfix_params_B["prompt"] = None |
| | detailfix_params_B["negative_prompt"] = None |
| | else: |
| | |
| | if prompt_empty_detailfix_B and negative_prompt_empty_detailfix_B: |
| | conditioning_detailfix_B, pooled_detailfix_B = conditioning, pooled |
| | else: |
| | conditioning_detailfix_B, pooled_detailfix_B = self.create_prompt_embeds( |
| | prompt=prompt_df_B, |
| | negative_prompt=negative_prompt_df_B, |
| | textual_inversion=textual_inversion, |
| | clip_skip=clip_skip, |
| | syntax_weights=syntax_weights, |
| | ) |
| | detailfix_params_B.pop('prompt', None) |
| | detailfix_params_B.pop('negative_prompt', None) |
| | detailfix_params_B["prompt_embeds"] = conditioning_detailfix_B[0:1] |
| | detailfix_params_B["pooled_prompt_embeds"] = pooled_detailfix_B[0:1] |
| | detailfix_params_B["negative_prompt_embeds"] = conditioning_detailfix_B[1:2] |
| | detailfix_params_B["negative_pooled_prompt_embeds"] = pooled_detailfix_B[1:2] |
| |
|
| | logger.debug(f"detailfix B prompt empty {prompt_empty_detailfix_B, negative_prompt_empty_detailfix_B}") |
| | if not prompt_empty_detailfix_B or not negative_prompt_empty_detailfix_B: |
| | logger.debug(f"Prompts detailfix B {prompt_df_B, negative_prompt_df_B}") |
| | logger.debug(f"Pipe params detailfix B \n{detailfix_params_B}") |
| | logger.debug(f"Params detailfix B \n{adetailer_B_params}") |
| |
|
| | if hires_steps > 1 and upscaler_model_path != None: |
| | |
| | hires_params_config = { |
| | "prompt" : None, |
| | "negative_prompt" : None, |
| | "num_inference_steps" : hires_steps, |
| | "guidance_scale" : guidance_scale, |
| | "clip_skip" : None, |
| | "strength" : hires_denoising_strength, |
| | } |
| | if self.class_name == "StableDiffusionPipeline": |
| | hires_params_config["eta"] = 1.0 |
| |
|
| | |
| | hires_prompt_empty, hires_negative_prompt_empty, prompt_hires_valid, negative_prompt_hires_valid = process_prompts_valid( |
| | hires_prompt, hires_negative_prompt, prompt, negative_prompt |
| | ) |
| |
|
| | |
| | if self.class_name == "StableDiffusionPipeline": |
| | if hires_prompt_empty and hires_negative_prompt_empty: |
| | hires_params_config["prompt_embeds"] = prompt_emb |
| | hires_params_config["negative_prompt_embeds"] = negative_prompt_emb |
| | else: |
| | prompt_emb_hires, negative_prompt_emb_hires = self.create_prompt_embeds( |
| | prompt=prompt_hires_valid, |
| | negative_prompt=negative_prompt_hires_valid, |
| | textual_inversion=textual_inversion, |
| | clip_skip=clip_skip, |
| | syntax_weights=syntax_weights, |
| | ) |
| |
|
| | hires_params_config["prompt_embeds"] = prompt_emb_hires |
| | hires_params_config["negative_prompt_embeds"] = negative_prompt_emb_hires |
| | else: |
| | if hires_prompt_empty and hires_negative_prompt_empty: |
| | hires_conditioning, hires_pooled = conditioning, pooled |
| | else: |
| | hires_conditioning, hires_pooled = self.create_prompt_embeds( |
| | prompt=prompt_hires_valid, |
| | negative_prompt=negative_prompt_hires_valid, |
| | textual_inversion=textual_inversion, |
| | clip_skip=clip_skip, |
| | syntax_weights=syntax_weights, |
| | ) |
| |
|
| | hires_params_config.pop('prompt', None) |
| | hires_params_config.pop('negative_prompt', None) |
| | hires_params_config["prompt_embeds"] = hires_conditioning[0:1] |
| | hires_params_config["pooled_prompt_embeds"] = hires_pooled[0:1] |
| | hires_params_config["negative_prompt_embeds"] = hires_conditioning[1:2] |
| | hires_params_config["negative_pooled_prompt_embeds"] = hires_pooled[1:2] |
| |
|
| | |
| | if not hasattr(self, "hires_pipe") or not retain_hires_model_previous_load: |
| | hires_pipe = custom_task_model_loader( |
| | pipe=self.pipe, |
| | model_category="hires", |
| | task_name=self.task_name, |
| | torch_dtype=self.type_model_precision |
| | ) |
| | if hasattr(self, "hires_pipe"): |
| | del self.hires_pipe |
| | if retain_hires_model_previous_load: |
| | if hasattr(self, "hires_pipe"): |
| | hires_pipe = self.hires_pipe |
| | else: |
| | self.hires_pipe = hires_pipe |
| |
|
| | |
| | if hires_sampler != "Use same sampler": |
| | logger.debug("New hires sampler") |
| | hires_pipe.scheduler = self.get_scheduler(hires_sampler) |
| |
|
| | hires_pipe.set_progress_bar_config(leave=leave_progress_bar) |
| | hires_pipe.set_progress_bar_config(disable=disable_progress_bar) |
| | hires_pipe.to(self.device) |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| | else: |
| | hires_params_config = {} |
| | hires_pipe = None |
| |
|
| | |
| | try: |
| | logger.debug(f"INFO PIPE: {self.pipe.__class__.__name__}") |
| | logger.debug(f"text_encoder_type: {self.pipe.text_encoder.dtype}") |
| | logger.debug(f"unet_type: {self.pipe.unet.dtype}") |
| | logger.debug(f"vae_type: {self.pipe.vae.dtype}") |
| | logger.debug(f"pipe_type: {self.pipe.dtype}") |
| | logger.debug(f"scheduler_main_pipe: {self.pipe.scheduler}") |
| | if adetailer_A or adetailer_B: |
| | logger.debug(f"scheduler_detailfix: {detailfix_pipe.scheduler}") |
| | if hires_steps > 1 and upscaler_model_path != None: |
| | logger.debug(f"scheduler_hires: {hires_pipe.scheduler}") |
| | except Exception as e: |
| | logger.debug(f"{str(e)}") |
| |
|
| | |
| | for i in range(loop_generation): |
| |
|
| | |
| | if seed == -1: |
| | seeds = [random.randint(0, 2147483647) for _ in range(num_images)] |
| | else: |
| | if num_images == 1: |
| | seeds = [seed] |
| | else: |
| | seeds = [seed] + [random.randint(0, 2147483647) for _ in range(num_images-1)] |
| | |
| | |
| | generators = [] |
| | for calculate_seed in seeds: |
| | if generator_in_cpu or self.device.type == "cpu": |
| | generator = torch.Generator().manual_seed(calculate_seed) |
| | else: |
| | try: |
| | generator = torch.Generator("cuda").manual_seed(calculate_seed) |
| | except: |
| | logger.warning("Generator in CPU") |
| | generator = torch.Generator().manual_seed(calculate_seed) |
| |
|
| | generators.append(generator) |
| |
|
| | |
| | pipe_params_config["generator"] = generators if self.task_name != "img2img" else generators[0] |
| | seeds = seeds if self.task_name != "img2img" else [seeds[0]] * num_images |
| |
|
| | try: |
| | if self.class_name == "StableDiffusionXLPipeline": |
| | |
| | images = self.pipe( |
| | prompt_embeds=conditioning[0:1], |
| | pooled_prompt_embeds=pooled[0:1], |
| | negative_prompt_embeds=conditioning[1:2], |
| | negative_pooled_prompt_embeds=pooled[1:2], |
| | |
| | **pipe_params_config, |
| | ).images |
| | if self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | images = [control_image] + images |
| | elif self.task_name == "txt2img": |
| | images = self.run_pipe_SD(**pipe_params_config) |
| | elif self.task_name == "inpaint": |
| | images = self.run_pipe_inpaint(**pipe_params_config) |
| | elif self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | results = self.run_pipe( |
| | **pipe_params_config |
| | ) |
| | images = [control_image] + results |
| | del results |
| | elif self.task_name == "img2img": |
| | images = self.run_pipe_img2img(**pipe_params_config) |
| | except Exception as e: |
| | e = str(e) |
| | if "Tensor with 2 elements cannot be converted to Scalar" in e: |
| | logger.debug(e) |
| | logger.error("Error in sampler; trying with DDIM sampler") |
| | self.pipe.scheduler = self.default_scheduler |
| | self.pipe.scheduler = DDIMScheduler.from_config(self.pipe.scheduler.config) |
| | if self.class_name == "StableDiffusionXLPipeline": |
| | |
| | images = self.pipe( |
| | prompt_embeds=conditioning[0:1], |
| | pooled_prompt_embeds=pooled[0:1], |
| | negative_prompt_embeds=conditioning[1:2], |
| | negative_pooled_prompt_embeds=pooled[1:2], |
| | |
| | **pipe_params_config, |
| | ).images |
| | if self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | images = [control_image] + images |
| | elif self.task_name == "txt2img": |
| | images = self.run_pipe_SD(**pipe_params_config) |
| | elif self.task_name == "inpaint": |
| | images = self.run_pipe_inpaint(**pipe_params_config) |
| | elif self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | results = self.run_pipe( |
| | **pipe_params_config |
| | ) |
| | images = [control_image] + results |
| | del results |
| | elif self.task_name == "img2img": |
| | images = self.run_pipe_img2img(**pipe_params_config) |
| | elif "The size of tensor a (0) must match the size of tensor b (3) at non-singleton" in e: |
| | raise ValueError(f"steps / strength too low for the model to produce a satisfactory response") |
| | else: |
| | raise ValueError(e) |
| | |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| |
|
| | if hires_before_adetailer and upscaler_model_path != None: |
| | logger.debug(f"Hires before; same seed for each image (no batch)") |
| | images = process_images_high_resolution( |
| | images, |
| | upscaler_model_path, |
| | upscaler_increases_size, |
| | esrgan_tile, esrgan_tile_overlap, |
| | hires_steps, hires_params_config, |
| | self.task_name, |
| | generators[0], |
| | hires_pipe, |
| | ) |
| |
|
| | |
| | if adetailer_A or adetailer_B: |
| | |
| | |
| | |
| | |
| | if self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | images = images[1:] |
| |
|
| | if adetailer_A: |
| | images = ad_model_process( |
| | pipe_params_df=detailfix_params_A, |
| | detailfix_pipe=detailfix_pipe, |
| | image_list_task=images, |
| | **adetailer_A_params, |
| | ) |
| | if adetailer_B: |
| | images = ad_model_process( |
| | pipe_params_df=detailfix_params_B, |
| | detailfix_pipe=detailfix_pipe, |
| | image_list_task=images, |
| | **adetailer_B_params, |
| | ) |
| |
|
| | if self.task_name not in ["txt2img", "inpaint", "img2img"]: |
| | images = [control_image] + images |
| | |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| |
|
| | if hires_after_adetailer and upscaler_model_path != None: |
| | logger.debug(f"Hires after; same seed for each image (no batch)") |
| | images = process_images_high_resolution( |
| | images, |
| | upscaler_model_path, |
| | upscaler_increases_size, |
| | esrgan_tile, esrgan_tile_overlap, |
| | hires_steps, hires_params_config, |
| | self.task_name, |
| | generators[0], |
| | hires_pipe, |
| | ) |
| |
|
| | logger.info(f"Seeds: {seeds}") |
| |
|
| | |
| | if display_images: |
| | mediapy.show_images(images) |
| | |
| | |
| | if loop_generation > 1: |
| | time.sleep(0.5) |
| |
|
| | |
| | image_list = [] |
| | metadata = [ |
| | prompt, |
| | negative_prompt, |
| | self.base_model_id, |
| | self.vae_model, |
| | num_steps, |
| | guidance_scale, |
| | sampler, |
| | 0000000000, |
| | img_width, |
| | img_height, |
| | clip_skip, |
| | ] |
| |
|
| | valid_seeds = [0] + seeds if self.task_name not in ["txt2img", "inpaint", "img2img"] else seeds |
| | for image_, seed_ in zip(images, valid_seeds): |
| | image_path = "not saved in storage" |
| | if save_generated_images: |
| | metadata[7] = seed_ |
| | image_path = save_pil_image_with_metadata(image_, image_storage_location, metadata) |
| | image_list.append(image_path) |
| |
|
| | torch.cuda.empty_cache() |
| | gc.collect() |
| |
|
| | if image_list[0] != "not saved in storage": |
| | logger.info(image_list) |
| |
|
| | if hasattr(self, "compel") and not retain_compel_previous_load: |
| | del self.compel |
| | torch.cuda.empty_cache() |
| | gc.collect() |
| |
|
| | return images, image_list |
| |
|