| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| from typing import List, Optional, Tuple, Union |
|
|
| import PIL.Image |
| import torch |
| from torchvision import transforms |
|
|
| from diffusers.pipelines.pipeline_utils import DiffusionPipeline, ImagePipelineOutput |
| from diffusers.schedulers import DDIMScheduler |
| from diffusers.utils.torch_utils import randn_tensor |
|
|
|
|
| trans = transforms.Compose( |
| [ |
| transforms.Resize((256, 256)), |
| transforms.ToTensor(), |
| transforms.Normalize([0.5], [0.5]), |
| ] |
| ) |
|
|
|
|
| def preprocess(image): |
| if isinstance(image, torch.Tensor): |
| return image |
| elif isinstance(image, PIL.Image.Image): |
| image = [image] |
|
|
| image = [trans(img.convert("RGB")) for img in image] |
| image = torch.stack(image) |
| return image |
|
|
|
|
| class DDIMNoiseComparativeAnalysisPipeline(DiffusionPipeline): |
| r""" |
| This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the |
| library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) |
| |
| Parameters: |
| unet ([`UNet2DModel`]): U-Net architecture to denoise the encoded image. |
| scheduler ([`SchedulerMixin`]): |
| A scheduler to be used in combination with `unet` to denoise the encoded image. Can be one of |
| [`DDPMScheduler`], or [`DDIMScheduler`]. |
| """ |
|
|
| def __init__(self, unet, scheduler): |
| super().__init__() |
|
|
| |
| scheduler = DDIMScheduler.from_config(scheduler.config) |
|
|
| self.register_modules(unet=unet, scheduler=scheduler) |
|
|
| def check_inputs(self, strength): |
| if strength < 0 or strength > 1: |
| raise ValueError(f"The value of strength should in [0.0, 1.0] but is {strength}") |
|
|
| def get_timesteps(self, num_inference_steps, strength, device): |
| |
| init_timestep = min(int(num_inference_steps * strength), num_inference_steps) |
|
|
| t_start = max(num_inference_steps - init_timestep, 0) |
| timesteps = self.scheduler.timesteps[t_start:] |
|
|
| return timesteps, num_inference_steps - t_start |
|
|
| def prepare_latents(self, image, timestep, batch_size, dtype, device, generator=None): |
| if not isinstance(image, (torch.Tensor, PIL.Image.Image, list)): |
| raise ValueError( |
| f"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or list but is {type(image)}" |
| ) |
|
|
| init_latents = image.to(device=device, dtype=dtype) |
|
|
| if isinstance(generator, list) and len(generator) != batch_size: |
| raise ValueError( |
| f"You have passed a list of generators of length {len(generator)}, but requested an effective batch" |
| f" size of {batch_size}. Make sure the batch size matches the length of the generators." |
| ) |
|
|
| shape = init_latents.shape |
| noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype) |
|
|
| |
| print("add noise to latents at timestep", timestep) |
| init_latents = self.scheduler.add_noise(init_latents, noise, timestep) |
| latents = init_latents |
|
|
| return latents |
|
|
| @torch.no_grad() |
| def __call__( |
| self, |
| image: Union[torch.Tensor, PIL.Image.Image] = None, |
| strength: float = 0.8, |
| batch_size: int = 1, |
| generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, |
| eta: float = 0.0, |
| num_inference_steps: int = 50, |
| use_clipped_model_output: Optional[bool] = None, |
| output_type: Optional[str] = "pil", |
| return_dict: bool = True, |
| ) -> Union[ImagePipelineOutput, Tuple]: |
| r""" |
| Args: |
| image (`torch.Tensor` or `PIL.Image.Image`): |
| `Image`, or tensor representing an image batch, that will be used as the starting point for the |
| process. |
| strength (`float`, *optional*, defaults to 0.8): |
| Conceptually, indicates how much to transform the reference `image`. Must be between 0 and 1. `image` |
| will be used as a starting point, adding more noise to it the larger the `strength`. The number of |
| denoising steps depends on the amount of noise initially added. When `strength` is 1, added noise will |
| be maximum and the denoising process will run for the full number of iterations specified in |
| `num_inference_steps`. A value of 1, therefore, essentially ignores `image`. |
| batch_size (`int`, *optional*, defaults to 1): |
| The number of images to generate. |
| generator (`torch.Generator`, *optional*): |
| One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html) |
| to make generation deterministic. |
| eta (`float`, *optional*, defaults to 0.0): |
| The eta parameter which controls the scale of the variance (0 is DDIM and 1 is one type of DDPM). |
| num_inference_steps (`int`, *optional*, defaults to 50): |
| The number of denoising steps. More denoising steps usually lead to a higher quality image at the |
| expense of slower inference. |
| use_clipped_model_output (`bool`, *optional*, defaults to `None`): |
| if `True` or `False`, see documentation for `DDIMScheduler.step`. If `None`, nothing is passed |
| downstream to the scheduler. So use `None` for schedulers which don't support this argument. |
| output_type (`str`, *optional*, defaults to `"pil"`): |
| The output format of the generate image. Choose between |
| [PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. |
| return_dict (`bool`, *optional*, defaults to `True`): |
| Whether or not to return a [`~pipelines.ImagePipelineOutput`] instead of a plain tuple. |
| |
| Returns: |
| [`~pipelines.ImagePipelineOutput`] or `tuple`: [`~pipelines.utils.ImagePipelineOutput`] if `return_dict` is |
| True, otherwise a `tuple. When returning a tuple, the first element is a list with the generated images. |
| """ |
| |
| self.check_inputs(strength) |
|
|
| |
| image = preprocess(image) |
|
|
| |
| self.scheduler.set_timesteps(num_inference_steps, device=self.device) |
| timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, self.device) |
| latent_timestep = timesteps[:1].repeat(batch_size) |
|
|
| |
| latents = self.prepare_latents(image, latent_timestep, batch_size, self.unet.dtype, self.device, generator) |
| image = latents |
|
|
| |
| for t in self.progress_bar(timesteps): |
| |
| model_output = self.unet(image, t).sample |
|
|
| |
| |
| |
| image = self.scheduler.step( |
| model_output, |
| t, |
| image, |
| eta=eta, |
| use_clipped_model_output=use_clipped_model_output, |
| generator=generator, |
| ).prev_sample |
|
|
| image = (image / 2 + 0.5).clamp(0, 1) |
| image = image.cpu().permute(0, 2, 3, 1).numpy() |
| if output_type == "pil": |
| image = self.numpy_to_pil(image) |
|
|
| if not return_dict: |
| return (image, latent_timestep.item()) |
|
|
| return ImagePipelineOutput(images=image) |
|
|