| import inspect |
|
|
| from pathlib import Path |
| from tempfile import TemporaryDirectory |
| from typing import List, Optional, Tuple, Union, Dict, Any, Callable, OrderedDict |
|
|
| import numpy as np |
| import openvino |
| import torch |
|
|
| from diffusers.pipelines.stable_diffusion import StableDiffusionPipelineOutput |
| from optimum.intel.openvino.modeling_diffusion import OVStableDiffusionPipeline, OVModelUnet, OVModelVaeDecoder, OVModelTextEncoder, OVModelVaeEncoder, VaeImageProcessor |
| from optimum.utils import ( |
| DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER, |
| DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER, |
| DIFFUSION_MODEL_UNET_SUBFOLDER, |
| DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER, |
| DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER, |
| ) |
|
|
|
|
| from diffusers import logging |
| logger = logging.get_logger(__name__) |
|
|
| class LCMOVModelUnet(OVModelUnet): |
| def __call__( |
| self, |
| sample: np.ndarray, |
| timestep: np.ndarray, |
| encoder_hidden_states: np.ndarray, |
| timestep_cond: Optional[np.ndarray] = None, |
| text_embeds: Optional[np.ndarray] = None, |
| time_ids: Optional[np.ndarray] = None, |
| ): |
| self._compile() |
|
|
| inputs = { |
| "sample": sample, |
| "timestep": timestep, |
| "encoder_hidden_states": encoder_hidden_states, |
| } |
|
|
| if timestep_cond is not None: |
| inputs["timestep_cond"] = timestep_cond |
| if text_embeds is not None: |
| inputs["text_embeds"] = text_embeds |
| if time_ids is not None: |
| inputs["time_ids"] = time_ids |
|
|
| outputs = self.request(inputs, shared_memory=True) |
| return list(outputs.values()) |
|
|
| class OVLatentConsistencyModelPipeline(OVStableDiffusionPipeline): |
|
|
| def __init__( |
| self, |
| vae_decoder: openvino.runtime.Model, |
| text_encoder: openvino.runtime.Model, |
| unet: openvino.runtime.Model, |
| config: Dict[str, Any], |
| tokenizer: "CLIPTokenizer", |
| scheduler: Union["DDIMScheduler", "PNDMScheduler", "LMSDiscreteScheduler"], |
| feature_extractor: Optional["CLIPFeatureExtractor"] = None, |
| vae_encoder: Optional[openvino.runtime.Model] = None, |
| text_encoder_2: Optional[openvino.runtime.Model] = None, |
| tokenizer_2: Optional["CLIPTokenizer"] = None, |
| device: str = "CPU", |
| dynamic_shapes: bool = True, |
| compile: bool = True, |
| ov_config: Optional[Dict[str, str]] = None, |
| model_save_dir: Optional[Union[str, Path, TemporaryDirectory]] = None, |
| **kwargs, |
| ): |
| self._internal_dict = config |
| self._device = device.upper() |
| self.is_dynamic = dynamic_shapes |
| self.ov_config = ov_config if ov_config is not None else {} |
| self._model_save_dir = ( |
| Path(model_save_dir.name) if isinstance(model_save_dir, TemporaryDirectory) else model_save_dir |
| ) |
| self.vae_decoder = OVModelVaeDecoder(vae_decoder, self) |
| self.unet = LCMOVModelUnet(unet, self) |
| self.text_encoder = OVModelTextEncoder(text_encoder, self) if text_encoder is not None else None |
| self.text_encoder_2 = ( |
| OVModelTextEncoder(text_encoder_2, self, model_name=DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER) |
| if text_encoder_2 is not None |
| else None |
| ) |
| self.vae_encoder = OVModelVaeEncoder(vae_encoder, self) if vae_encoder is not None else None |
|
|
| if "block_out_channels" in self.vae_decoder.config: |
| self.vae_scale_factor = 2 ** (len(self.vae_decoder.config["block_out_channels"]) - 1) |
| else: |
| self.vae_scale_factor = 8 |
|
|
| self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor) |
|
|
| self.tokenizer = tokenizer |
| self.tokenizer_2 = tokenizer_2 |
| self.scheduler = scheduler |
| self.feature_extractor = feature_extractor |
| self.safety_checker = None |
| self.preprocessors = [] |
|
|
| if self.is_dynamic: |
| self.reshape(batch_size=-1, height=-1, width=-1, num_images_per_prompt=-1) |
|
|
| if compile: |
| self.compile() |
|
|
| sub_models = { |
| DIFFUSION_MODEL_TEXT_ENCODER_SUBFOLDER: self.text_encoder, |
| DIFFUSION_MODEL_UNET_SUBFOLDER: self.unet, |
| DIFFUSION_MODEL_VAE_DECODER_SUBFOLDER: self.vae_decoder, |
| DIFFUSION_MODEL_VAE_ENCODER_SUBFOLDER: self.vae_encoder, |
| DIFFUSION_MODEL_TEXT_ENCODER_2_SUBFOLDER: self.text_encoder_2, |
| } |
| for name in sub_models.keys(): |
| self._internal_dict[name] = ( |
| ("optimum", sub_models[name].__class__.__name__) if sub_models[name] is not None else (None, None) |
| ) |
|
|
| self._internal_dict.pop("vae", None) |
|
|
| def _reshape_unet( |
| self, |
| model: openvino.runtime.Model, |
| batch_size: int = -1, |
| height: int = -1, |
| width: int = -1, |
| num_images_per_prompt: int = -1, |
| tokenizer_max_length: int = -1, |
| ): |
| if batch_size == -1 or num_images_per_prompt == -1: |
| batch_size = -1 |
| else: |
| batch_size = batch_size * num_images_per_prompt |
|
|
| height = height // self.vae_scale_factor if height > 0 else height |
| width = width // self.vae_scale_factor if width > 0 else width |
| shapes = {} |
| for inputs in model.inputs: |
| shapes[inputs] = inputs.get_partial_shape() |
| if inputs.get_any_name() == "timestep": |
| shapes[inputs][0] = 1 |
| elif inputs.get_any_name() == "sample": |
| in_channels = self.unet.config.get("in_channels", None) |
| if in_channels is None: |
| in_channels = shapes[inputs][1] |
| if in_channels.is_dynamic: |
| logger.warning( |
| "Could not identify `in_channels` from the unet configuration, to statically reshape the unet please provide a configuration." |
| ) |
| self.is_dynamic = True |
| |
| shapes[inputs] = [batch_size, in_channels, height, width] |
| elif inputs.get_any_name() == "timestep_cond": |
| shapes[inputs] = [batch_size, inputs.get_partial_shape()[1]] |
| elif inputs.get_any_name() == "text_embeds": |
| shapes[inputs] = [batch_size, self.text_encoder_2.config["projection_dim"]] |
| elif inputs.get_any_name() == "time_ids": |
| shapes[inputs] = [batch_size, inputs.get_partial_shape()[1]] |
| else: |
| shapes[inputs][0] = batch_size |
| shapes[inputs][1] = tokenizer_max_length |
| model.reshape(shapes) |
| return model |
|
|
| def get_guidance_scale_embedding(self, w, embedding_dim=512, dtype=np.float32): |
| """ |
| see https://github.com/google-research/vdm/blob/dc27b98a554f65cdc654b800da5aa1846545d41b/model_vdm.py#L298 |
| Args: |
| timesteps: np.array: generate embedding vectors at these timesteps |
| embedding_dim: int: dimension of the embeddings to generate |
| dtype: data type of the generated embeddings |
| |
| Returns: |
| embedding vectors with shape `(len(timesteps), embedding_dim)` |
| """ |
| assert len(w.shape) == 1 |
| w = w * 1000. |
|
|
| half_dim = embedding_dim // 2 |
| emb = np.log(np.array(10000.)) / (half_dim - 1) |
| emb = np.exp(np.arange(half_dim, dtype=dtype) * -emb) |
| emb = w.astype(dtype)[:, None] * emb[None, :] |
| emb = np.concatenate([np.sin(emb), np.cos(emb)], axis=1) |
| if embedding_dim % 2 == 1: |
| emb = np.pad(emb, (0, 1)) |
| assert emb.shape == (w.shape[0], embedding_dim) |
| return emb |
|
|
| |
| def __call__( |
| self, |
| prompt: Optional[Union[str, List[str]]] = None, |
| height: Optional[int] = None, |
| width: Optional[int] = None, |
| num_inference_steps: int = 4, |
| original_inference_steps: int = None, |
| guidance_scale: float = 7.5, |
| num_images_per_prompt: int = 1, |
| eta: float = 0.0, |
| generator: Optional[np.random.RandomState] = None, |
| latents: Optional[np.ndarray] = None, |
| prompt_embeds: Optional[np.ndarray] = None, |
| output_type: str = "pil", |
| return_dict: bool = True, |
| callback: Optional[Callable[[int, int, np.ndarray], None]] = None, |
| callback_steps: int = 1, |
| guidance_rescale: float = 0.0, |
| ): |
| r""" |
| Function invoked when calling the pipeline for generation. |
| |
| Args: |
| prompt (`Optional[Union[str, List[str]]]`, defaults to None): |
| The prompt or prompts to guide the image generation. If not defined, one has to pass `prompt_embeds`. |
| instead. |
| height (`Optional[int]`, defaults to None): |
| The height in pixels of the generated image. |
| width (`Optional[int]`, defaults to None): |
| The width in pixels of the generated image. |
| num_inference_steps (`int`, defaults to 4): |
| The number of denoising steps. More denoising steps usually lead to a higher quality image at the |
| expense of slower inference. |
| original_inference_steps (`int`, *optional*): |
| The original number of inference steps use to generate a linearly-spaced timestep schedule, from which |
| we will draw `num_inference_steps` evenly spaced timesteps from as our final timestep schedule, |
| following the Skipping-Step method in the paper (see Section 4.3). If not set this will default to the |
| scheduler's `original_inference_steps` attribute. |
| guidance_scale (`float`, defaults to 7.5): |
| Guidance scale as defined in [Classifier-Free Diffusion Guidance](https://arxiv.org/abs/2207.12598). |
| `guidance_scale` is defined as `w` of equation 2. of [Imagen |
| Paper](https://arxiv.org/pdf/2205.11487.pdf). Guidance scale is enabled by setting `guidance_scale > |
| 1`. Higher guidance scale encourages to generate images that are closely linked to the text `prompt`, |
| usually at the expense of lower image quality. |
| num_images_per_prompt (`int`, defaults to 1): |
| The number of images to generate per prompt. |
| eta (`float`, defaults to 0.0): |
| Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502. Only applies to |
| [`schedulers.DDIMScheduler`], will be ignored for others. |
| generator (`Optional[np.random.RandomState]`, defaults to `None`):: |
| A np.random.RandomState to make generation deterministic. |
| latents (`Optional[np.ndarray]`, defaults to `None`): |
| Pre-generated noisy latents, sampled from a Gaussian distribution, to be used as inputs for image |
| generation. Can be used to tweak the same generation with different prompts. If not provided, a latents |
| tensor will ge generated by sampling using the supplied random `generator`. |
| prompt_embeds (`Optional[np.ndarray]`, defaults to `None`): |
| Pre-generated text embeddings. Can be used to easily tweak text inputs, *e.g.* prompt weighting. If not |
| provided, text embeddings will be generated from `prompt` input argument. |
| output_type (`str`, defaults to `"pil"`): |
| The output format of the generate image. Choose between |
| [PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. |
| return_dict (`bool`, defaults to `True`): |
| Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a |
| plain tuple. |
| callback (Optional[Callable], defaults to `None`): |
| A function that will be called every `callback_steps` steps during inference. The function will be |
| called with the following arguments: `callback(step: int, timestep: int, latents: torch.FloatTensor)`. |
| callback_steps (`int`, defaults to 1): |
| The frequency at which the `callback` function will be called. If not specified, the callback will be |
| called at every step. |
| guidance_rescale (`float`, defaults to 0.0): |
| Guidance rescale factor proposed by [Common Diffusion Noise Schedules and Sample Steps are |
| Flawed](https://arxiv.org/pdf/2305.08891.pdf) `guidance_scale` is defined as `φ` in equation 16. of |
| [Common Diffusion Noise Schedules and Sample Steps are Flawed](https://arxiv.org/pdf/2305.08891.pdf). |
| Guidance rescale factor should fix overexposure when using zero terminal SNR. |
| |
| Returns: |
| [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] or `tuple`: |
| [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] if `return_dict` is True, otherwise a `tuple. |
| When returning a tuple, the first element is a list with the generated images, and the second element is a |
| list of `bool`s denoting whether the corresponding generated image likely represents "not-safe-for-work" |
| (nsfw) content, according to the `safety_checker`. |
| """ |
| height = height or self.unet.config.get("sample_size", 64) * self.vae_scale_factor |
| width = width or self.unet.config.get("sample_size", 64) * self.vae_scale_factor |
|
|
| |
| self.check_inputs( |
| prompt, height, width, callback_steps, None, prompt_embeds, None |
| ) |
|
|
| |
| if isinstance(prompt, str): |
| batch_size = 1 |
| elif isinstance(prompt, list): |
| batch_size = len(prompt) |
| else: |
| batch_size = prompt_embeds.shape[0] |
|
|
| if generator is None: |
| generator = np.random |
|
|
| |
| torch_generator = torch.Generator().manual_seed(int(generator.get_state()[1][0])) |
|
|
| |
|
|
| |
| |
| |
| prompt_embeds = self._encode_prompt( |
| prompt, |
| num_images_per_prompt, |
| False, |
| negative_prompt=None, |
| prompt_embeds=prompt_embeds, |
| negative_prompt_embeds=None, |
| ) |
|
|
| |
| self.scheduler.set_timesteps(num_inference_steps, "cpu", original_inference_steps=original_inference_steps) |
| timesteps = self.scheduler.timesteps |
|
|
| latents = self.prepare_latents( |
| batch_size * num_images_per_prompt, |
| self.unet.config.get("in_channels", 4), |
| height, |
| width, |
| prompt_embeds.dtype, |
| generator, |
| latents, |
| ) |
|
|
| |
| w = np.tile(guidance_scale - 1, batch_size * num_images_per_prompt) |
| w_embedding = self.get_guidance_scale_embedding(w, embedding_dim=self.unet.config.get("time_cond_proj_dim", 256)) |
|
|
| |
| timestep_dtype = self.unet.input_dtype.get("timestep", np.float32) |
|
|
| |
| |
| |
| |
| accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) |
| extra_step_kwargs = {} |
| if accepts_eta: |
| extra_step_kwargs["eta"] = eta |
|
|
| accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys()) |
| if accepts_generator: |
| extra_step_kwargs["generator"] = torch_generator |
|
|
| num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order |
| for i, t in enumerate(self.progress_bar(timesteps)): |
|
|
| |
| timestep = np.array([t], dtype=timestep_dtype) |
| |
| noise_pred = self.unet(sample=latents, timestep=timestep, timestep_cond = w_embedding, encoder_hidden_states=prompt_embeds)[0] |
|
|
| |
| latents, denoised = self.scheduler.step( |
| torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs, return_dict = False |
| ) |
|
|
| latents, denoised = latents.numpy(), denoised.numpy() |
|
|
| |
| if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): |
| if callback is not None and i % callback_steps == 0: |
| callback(i, t, latents) |
|
|
| if output_type == "latent": |
| image = latents |
| has_nsfw_concept = None |
| else: |
| denoised /= self.vae_decoder.config.get("scaling_factor", 0.18215) |
| |
| image = np.concatenate( |
| [self.vae_decoder(latent_sample=denoised[i : i + 1])[0] for i in range(latents.shape[0])] |
| ) |
| image, has_nsfw_concept = self.run_safety_checker(image) |
|
|
| if has_nsfw_concept is None: |
| do_denormalize = [True] * image.shape[0] |
| else: |
| do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept] |
|
|
| image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize) |
|
|
| if not return_dict: |
| return (image, has_nsfw_concept) |
|
|
| return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept) |
|
|