| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | from typing import List, Optional, Tuple, Union |
| |
|
| | import PIL.Image |
| | import torch |
| | from torchvision import transforms |
| |
|
| | from diffusers.pipelines.pipeline_utils import DiffusionPipeline, ImagePipelineOutput |
| | from diffusers.schedulers import DDIMScheduler |
| | from diffusers.utils.torch_utils import randn_tensor |
| |
|
| |
|
| | trans = transforms.Compose( |
| | [ |
| | transforms.Resize((256, 256)), |
| | transforms.ToTensor(), |
| | transforms.Normalize([0.5], [0.5]), |
| | ] |
| | ) |
| |
|
| |
|
| | def preprocess(image): |
| | if isinstance(image, torch.Tensor): |
| | return image |
| | elif isinstance(image, PIL.Image.Image): |
| | image = [image] |
| |
|
| | image = [trans(img.convert("RGB")) for img in image] |
| | image = torch.stack(image) |
| | return image |
| |
|
| |
|
| | class DDIMNoiseComparativeAnalysisPipeline(DiffusionPipeline): |
| | r""" |
| | This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods the |
| | library implements for all the pipelines (such as downloading or saving, running on a particular device, etc.) |
| | |
| | Parameters: |
| | unet ([`UNet2DModel`]): U-Net architecture to denoise the encoded image. |
| | scheduler ([`SchedulerMixin`]): |
| | A scheduler to be used in combination with `unet` to denoise the encoded image. Can be one of |
| | [`DDPMScheduler`], or [`DDIMScheduler`]. |
| | """ |
| |
|
| | def __init__(self, unet, scheduler): |
| | super().__init__() |
| |
|
| | |
| | scheduler = DDIMScheduler.from_config(scheduler.config) |
| |
|
| | self.register_modules(unet=unet, scheduler=scheduler) |
| |
|
| | def check_inputs(self, strength): |
| | if strength < 0 or strength > 1: |
| | raise ValueError(f"The value of strength should in [0.0, 1.0] but is {strength}") |
| |
|
| | def get_timesteps(self, num_inference_steps, strength, device): |
| | |
| | init_timestep = min(int(num_inference_steps * strength), num_inference_steps) |
| |
|
| | t_start = max(num_inference_steps - init_timestep, 0) |
| | timesteps = self.scheduler.timesteps[t_start:] |
| |
|
| | return timesteps, num_inference_steps - t_start |
| |
|
| | def prepare_latents(self, image, timestep, batch_size, dtype, device, generator=None): |
| | if not isinstance(image, (torch.Tensor, PIL.Image.Image, list)): |
| | raise ValueError( |
| | f"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or list but is {type(image)}" |
| | ) |
| |
|
| | init_latents = image.to(device=device, dtype=dtype) |
| |
|
| | if isinstance(generator, list) and len(generator) != batch_size: |
| | raise ValueError( |
| | f"You have passed a list of generators of length {len(generator)}, but requested an effective batch" |
| | f" size of {batch_size}. Make sure the batch size matches the length of the generators." |
| | ) |
| |
|
| | shape = init_latents.shape |
| | noise = randn_tensor(shape, generator=generator, device=device, dtype=dtype) |
| |
|
| | |
| | print("add noise to latents at timestep", timestep) |
| | init_latents = self.scheduler.add_noise(init_latents, noise, timestep) |
| | latents = init_latents |
| |
|
| | return latents |
| |
|
| | @torch.no_grad() |
| | def __call__( |
| | self, |
| | image: Union[torch.Tensor, PIL.Image.Image] = None, |
| | strength: float = 0.8, |
| | batch_size: int = 1, |
| | generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, |
| | eta: float = 0.0, |
| | num_inference_steps: int = 50, |
| | use_clipped_model_output: Optional[bool] = None, |
| | output_type: Optional[str] = "pil", |
| | return_dict: bool = True, |
| | ) -> Union[ImagePipelineOutput, Tuple]: |
| | r""" |
| | Args: |
| | image (`torch.Tensor` or `PIL.Image.Image`): |
| | `Image`, or tensor representing an image batch, that will be used as the starting point for the |
| | process. |
| | strength (`float`, *optional*, defaults to 0.8): |
| | Conceptually, indicates how much to transform the reference `image`. Must be between 0 and 1. `image` |
| | will be used as a starting point, adding more noise to it the larger the `strength`. The number of |
| | denoising steps depends on the amount of noise initially added. When `strength` is 1, added noise will |
| | be maximum and the denoising process will run for the full number of iterations specified in |
| | `num_inference_steps`. A value of 1, therefore, essentially ignores `image`. |
| | batch_size (`int`, *optional*, defaults to 1): |
| | The number of images to generate. |
| | generator (`torch.Generator`, *optional*): |
| | One or a list of [torch generator(s)](https://pytorch.org/docs/stable/generated/torch.Generator.html) |
| | to make generation deterministic. |
| | eta (`float`, *optional*, defaults to 0.0): |
| | The eta parameter which controls the scale of the variance (0 is DDIM and 1 is one type of DDPM). |
| | num_inference_steps (`int`, *optional*, defaults to 50): |
| | The number of denoising steps. More denoising steps usually lead to a higher quality image at the |
| | expense of slower inference. |
| | use_clipped_model_output (`bool`, *optional*, defaults to `None`): |
| | if `True` or `False`, see documentation for `DDIMScheduler.step`. If `None`, nothing is passed |
| | downstream to the scheduler. So use `None` for schedulers which don't support this argument. |
| | output_type (`str`, *optional*, defaults to `"pil"`): |
| | The output format of the generate image. Choose between |
| | [PIL](https://pillow.readthedocs.io/en/stable/): `PIL.Image.Image` or `np.array`. |
| | return_dict (`bool`, *optional*, defaults to `True`): |
| | Whether or not to return a [`~pipelines.ImagePipelineOutput`] instead of a plain tuple. |
| | |
| | Returns: |
| | [`~pipelines.ImagePipelineOutput`] or `tuple`: [`~pipelines.utils.ImagePipelineOutput`] if `return_dict` is |
| | True, otherwise a `tuple. When returning a tuple, the first element is a list with the generated images. |
| | """ |
| | |
| | self.check_inputs(strength) |
| |
|
| | |
| | image = preprocess(image) |
| |
|
| | |
| | self.scheduler.set_timesteps(num_inference_steps, device=self.device) |
| | timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength, self.device) |
| | latent_timestep = timesteps[:1].repeat(batch_size) |
| |
|
| | |
| | latents = self.prepare_latents(image, latent_timestep, batch_size, self.unet.dtype, self.device, generator) |
| | image = latents |
| |
|
| | |
| | for t in self.progress_bar(timesteps): |
| | |
| | model_output = self.unet(image, t).sample |
| |
|
| | |
| | |
| | |
| | image = self.scheduler.step( |
| | model_output, |
| | t, |
| | image, |
| | eta=eta, |
| | use_clipped_model_output=use_clipped_model_output, |
| | generator=generator, |
| | ).prev_sample |
| |
|
| | image = (image / 2 + 0.5).clamp(0, 1) |
| | image = image.cpu().permute(0, 2, 3, 1).numpy() |
| | if output_type == "pil": |
| | image = self.numpy_to_pil(image) |
| |
|
| | if not return_dict: |
| | return (image, latent_timestep.item()) |
| |
|
| | return ImagePipelineOutput(images=image) |
| |
|