Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import json | |
| import torch | |
| import random | |
| from pathlib import Path | |
| from torch.utils.data import Dataset | |
| from torchvision import transforms | |
| from diffusers import StableDiffusionPipeline, DDIMScheduler, UNet2DConditionModel, AutoencoderKL, DDPMScheduler | |
| from transformers import CLIPTextModel, CLIPTokenizer | |
| from accelerate import Accelerator | |
| from tqdm.auto import tqdm | |
| from PIL import Image | |
| class CustomDataset(Dataset): | |
| def __init__(self, data_dir, prompt, tokenizer, size=512, center_crop=False): | |
| self.data_dir = Path(data_dir) | |
| self.prompt = prompt | |
| self.tokenizer = tokenizer | |
| self.size = size | |
| self.center_crop = center_crop | |
| self.image_transforms = transforms.Compose([ | |
| transforms.Resize(size, interpolation=transforms.InterpolationMode.BILINEAR), | |
| transforms.CenterCrop(size) if center_crop else transforms.RandomCrop(size), | |
| transforms.ToTensor(), | |
| transforms.Normalize([0.5], [0.5]) | |
| ]) | |
| self.images = [f for f in self.data_dir.iterdir() if f.is_file() and not str(f).endswith(".txt")] | |
| def __len__(self): | |
| return len(self.images) | |
| def __getitem__(self, idx): | |
| image_path = self.images[idx] | |
| image = Image.open(image_path) | |
| if not image.mode == "RGB": | |
| image = image.convert("RGB") | |
| image = self.image_transforms(image) | |
| prompt_ids = self.tokenizer( | |
| self.prompt, padding="max_length", truncation=True, max_length=self.tokenizer.model_max_length | |
| ).input_ids | |
| return {"image": image, "prompt_ids": prompt_ids} | |
| def fine_tune_model(instance_data_dir, instance_prompt, model_name, output_dir, seed=1337, resolution=512, train_batch_size=1, max_train_steps=800): | |
| # Setup | |
| accelerator = Accelerator() | |
| set_seed(seed) | |
| tokenizer = CLIPTokenizer.from_pretrained(model_name) | |
| text_encoder = CLIPTextModel.from_pretrained(model_name) | |
| vae = AutoencoderKL.from_pretrained(model_name) | |
| unet = UNet2DConditionModel.from_pretrained(model_name) | |
| noise_scheduler = DDPMScheduler.from_pretrained(model_name, subfolder="scheduler") | |
| dataset = CustomDataset(instance_data_dir, instance_prompt, tokenizer, resolution) | |
| dataloader = torch.utils.data.DataLoader(dataset, batch_size=train_batch_size, shuffle=True) | |
| optimizer = torch.optim.AdamW(unet.parameters(), lr=1e-6) | |
| unet, optimizer, dataloader = accelerator.prepare(unet, optimizer, dataloader) | |
| vae.to(accelerator.device) | |
| text_encoder.to(accelerator.device) | |
| global_step = 0 | |
| for step, batch in tqdm(enumerate(dataloader), total=max_train_steps): | |
| latents = vae.encode(batch["image"].to(accelerator.device)).latent_dist.sample() * 0.18215 | |
| noise = torch.randn_like(latents) | |
| timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (latents.shape[0],), device=latents.device).long() | |
| noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps) | |
| encoder_hidden_states = text_encoder(batch["prompt_ids"].to(accelerator.device))[0] | |
| model_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample | |
| loss = torch.nn.functional.mse_loss(model_pred.float(), noise.float(), reduction="mean") | |
| accelerator.backward(loss) | |
| optimizer.step() | |
| optimizer.zero_grad() | |
| global_step += 1 | |
| if global_step >= max_train_steps: | |
| break | |
| # Save model | |
| unet = accelerator.unwrap_model(unet) | |
| unet.save_pretrained(output_dir) | |
| vae.save_pretrained(output_dir) | |
| text_encoder.save_pretrained(output_dir) | |
| tokenizer.save_pretrained(output_dir) | |
| def set_seed(seed): | |
| random.seed(seed) | |
| torch.manual_seed(seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(seed) |