# -*- coding: utf-8 -*- """ Original file is located at https://colab.research.google.com/drive/1SbxWXhffEnCJ6tVT6ZfTDbY2-cxb063U """ #Login to Huggingface from huggingface_hub import notebook_login notebook_login() """ ## Training configuration """ from dataclasses import dataclass @dataclass class TrainingConfig: image_size = 256 # the generated image resolution train_batch_size = 10 #Images to sample when training eval_batch_size = 16 # how many images to sample during evaluation num_epochs = 2000 #Number of epochs to run gradient_accumulation_steps = 1 # 1 == not utilisesd learning_rate = 1e-4 lr_warmup_steps = 250 save_image_epochs = 500 #When to sample current model output save_model_epochs = 500 #When to generate ALL data mixed_precision = "fp16" #`no` for float32, `fp16` for automatic mixed precision output_dir = "Ball1730_10Real" #the model name locally and on the HF Hub push_to_hub = True # whether to upload the saved model to the HF Hub hub_private_repo = False overwrite_output_dir = False # KEEP THIS AS FALSE seed = 0 # IS RANDOMISED WHEN GENERATING config = TrainingConfig() """## Load the dataset """ from datasets import load_dataset config.dataset_name = "GaumlessGraham/Ball10Real" dataset = load_dataset(config.dataset_name, split="train") """ Preprocessing of images: Shouldnt be required as all input images are the same size, but just to be sure * `Resize` changes the image size to the one defined in `config.image_size`. * `Normalize` is important to rescale the pixel values into a [-1, 1] range, which is what the model expects. """ from torchvision import transforms preprocess = transforms.Compose( [ transforms.Resize((config.image_size, config.image_size)), transforms.ToTensor(), transforms.Normalize([0.5], [0.5]), ] ) """Use 🤗 Datasets' [set_transform] method to apply the `preprocess` function on the fly during training:""" def transform(examples): images = [preprocess(image) for image in examples["image"]] return {"images": images} dataset.set_transform(transform) """Feel free to visualize the images again to confirm that they've been resized. Now you're ready to wrap the dataset in a [DataLoader](https://pytorch.org/docs/stable/data#torch.utils.data.DataLoader) for training!""" import torch train_dataloader = torch.utils.data.DataLoader(dataset, batch_size=config.train_batch_size, shuffle=True) fig.show() """## Create a UNet2DModel """ from diffusers import UNet2DModel model = UNet2DModel( sample_size=config.image_size, # the target image resolution in_channels=1, # the number of input channels, 3 for RGB images, 1 for grayscale out_channels=1, # the number of output channels layers_per_block=2, # how many ResNet layers to use per UNet block block_out_channels=(128, 128, 256, 256, 512, 512), # the number of output channels for each UNet block down_block_types=( "DownBlock2D", # a regular ResNet downsampling block "DownBlock2D", "DownBlock2D", "DownBlock2D", "AttnDownBlock2D", # a ResNet downsampling block with spatial self-attention "DownBlock2D", ), up_block_types=( "UpBlock2D", # a regular ResNet upsampling block "AttnUpBlock2D", # a ResNet upsampling block with spatial self-attention "UpBlock2D", "UpBlock2D", "UpBlock2D", "UpBlock2D", ), ) """Check sample and output sizes to ensure they match""" sample_image = dataset[0]["images"].unsqueeze(0) print("Input shape:", sample_image.shape) print("Output shape:", model(sample_image, timestep=0).sample.shape) """ ## Create a scheduler (To add noise) """ import torch from PIL import Image from diffusers import DDPMScheduler noise_scheduler = DDPMScheduler(num_train_timesteps=1000) noise = torch.randn(sample_image.shape) timesteps = torch.LongTensor([50]) noisy_image = noise_scheduler.add_noise(sample_image, noise, timesteps) # """ The training objective of the model is to predict the noise added to the image. The loss at this step can be calculated by: """ import torch.nn.functional as F noise_pred = model(noisy_image, timesteps).sample loss = F.mse_loss(noise_pred, noise) #Mean Sqaure Error loss function """## Train the model Optimizer and a learning rate scheduler """ from diffusers.optimization import get_cosine_schedule_with_warmup optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate) lr_scheduler = get_cosine_schedule_with_warmup( optimizer=optimizer, num_warmup_steps=config.lr_warmup_steps, num_training_steps=(len(train_dataloader) * config.num_epochs), ) """Model Evaluation""" from diffusers import DDPMPipeline import math import os #Make grid when sampling images def make_grid(images, rows, cols): w, h = images[0].size grid = Image.new("RGB", size=(cols * w, rows * h)) for i, image in enumerate(images): grid.paste(image, box=(i % cols * w, i // cols * h)) return grid def evalfirst(config, epoch, pipeline): #Function evaluates 16 images ONCE to ensure the actual generated images are correct #Takes approx 5 mins to evaluate on an RTX 4090 #Generate images from noise images = pipeline( batch_size=config.eval_batch_size, generator=torch.manual_seed(config.seed), ).images # Make a grid out of the images image_grid = make_grid(images, rows=4, cols=4) # Save the images test_dir = os.path.join(config.output_dir, "samples") os.makedirs(test_dir, exist_ok=True) image_grid.save(f"{test_dir}/{epoch:04d}.png") def evaluate(config, epoch, pipeline): import random import sys #Function generates (16*k) images, please use function "evalfirst" to ensure model has trained correctly before running #for k = 20, function takes approx. 1 hour, 45 mins on an RTX 4090 for k in range(1, 20): #generate images images = pipeline( batch_size=config.eval_batch_size, generator=torch.manual_seed(config.seed), ).images #Save the images test_dir = os.path.join(config.output_dir, "samples_generated") if not os.path.exists(test_dir): os.makedirs(test_dir) for i, image in enumerate(images): image.save(f"{test_dir}/{(i+((k-1)*16)):04d}.png") #Change seed config.seed = random.randint(1, 100000) """ Training Loop: """ from accelerate import Accelerator from huggingface_hub import HfFolder, Repository, whoami from tqdm.auto import tqdm from pathlib import Path import os def get_full_repo_name(model_id: str, organization: str = None, token: str = None): if token is None: token = HfFolder.get_token() if organization is None: username = whoami(token)["name"] return f"{username}/{model_id}" else: return f"{organization}/{model_id}" def train_loop(config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler): import sys # Initialize accelerator and tensorboard logging accelerator = Accelerator( mixed_precision=config.mixed_precision, gradient_accumulation_steps=config.gradient_accumulation_steps, log_with="tensorboard", project_dir=os.path.join(config.output_dir, "logs"), ) if accelerator.is_main_process: if config.push_to_hub: repo_name = get_full_repo_name(Path(config.output_dir).name) repo = Repository(config.output_dir, clone_from=repo_name) elif config.output_dir is not None: os.makedirs(config.output_dir, exist_ok=True) accelerator.init_trackers("train_example") # Prepare everything model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare( model, optimizer, train_dataloader, lr_scheduler ) global_step = 0 # Train model for epoch in range(config.num_epochs): progress_bar = tqdm(total=len(train_dataloader), disable=not accelerator.is_local_main_process) progress_bar.set_description(f"Epoch {epoch}") for step, batch in enumerate(train_dataloader): clean_images = batch["images"] # Sample noise to add to the images noise = torch.randn(clean_images.shape).to(clean_images.device) bs = clean_images.shape[0] # Sample a random timestep for each image timesteps = torch.randint( 0, noise_scheduler.config.num_train_timesteps, (bs,), device=clean_images.device ).long() # Add noise to the clean images according to the noise magnitude at each timestep # (this is the forward diffusion process) noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps) with accelerator.accumulate(model): # Predict the noise residual noise_pred = model(noisy_images, timesteps, return_dict=False)[0] loss = F.mse_loss(noise_pred, noise) accelerator.backward(loss) accelerator.clip_grad_norm_(model.parameters(), 1.0) optimizer.step() lr_scheduler.step() optimizer.zero_grad() progress_bar.update(1) logs = {"loss": loss.detach().item(), "lr": lr_scheduler.get_last_lr()[0], "step": global_step} progress_bar.set_postfix(**logs) accelerator.log(logs, step=global_step) global_step += 1 # Image sampling/model saving if accelerator.is_main_process: pipeline = DDPMPipeline(unet=accelerator.unwrap_model(model), scheduler=noise_scheduler) #Determines whether to sample 16 images (Ensures model has been trained correctly) if ((epoch + 1) % config.save_image_epochs == 0 or epoch == config.num_epochs - 1) and epoch > 195: #Change if want to not evaluate before a certain epoch evalfirst(config, epoch, pipeline) model_dir = os.path.join(config.output_dir, str(epoch)) os.makedirs(model_dir, exist_ok=True) repo.push_to_hub(commit_message=f"Sample Images Epoch {epoch}", blocking=True) #If at correct epoch, generates images en masse and saves model once image generation is complete if (epoch + 1) % config.save_model_epochs == 0 or epoch == config.num_epochs - 1: if config.push_to_hub: evaluate(config, epoch, pipeline) model_dir = os.path.join(config.output_dir, str(epoch)) os.makedirs(model_dir, exist_ok=True) pipeline.save_pretrained(model_dir) repo.push_to_hub(commit_message=f"Epoch {epoch}", blocking=True) sys.exit(0) else: pipeline.save_pretrained(config.output_dir) from accelerate import notebook_launcher args = (config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler) notebook_launcher(train_loop, args, num_processes=1) """Once training is complete, take a look at the final images""" import glob sample_images = sorted(glob.glob(f"{config.output_dir}/samples/*.png")) Image.open(sample_images[-1])