{ "cells": [ { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [], "source": [ "import torch\n", "import torch.nn as nn\n", "import numpy as np\n", "from torchvision.utils import save_image, make_grid\n", "import matplotlib.pyplot as plt\n", "from matplotlib.animation import FuncAnimation, PillowWriter\n", "import os\n", "import torchvision.transforms as transforms\n", "from torch.utils.data import Dataset\n", "from PIL import Image\n", "from torch.utils.data import DataLoader\n", "from tqdm.auto import tqdm\n", "import torch.nn.functional as F\n", "from IPython.display import HTML" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "class ResidualBlock(nn.Module):\n", " def __init__(self, in_channels: int, out_channels: int,is_res: bool = False) -> None:\n", " super(ResidualBlock,self).__init__()\n", "\n", " self.same_channesls = in_channels == out_channels\n", "\n", " self.is_res = is_res\n", "\n", " self.conv1 = nn.Sequential(\n", " nn.Conv2d(in_channels,out_channels,3,1,1),\n", " nn.BatchNorm2d(out_channels),\n", " nn.GELU(),\n", " )\n", "\n", " self.conv2 = nn.Sequential(\n", " nn.Conv2d(out_channels,out_channels,3,1,1),\n", " nn.BatchNorm2d(out_channels),\n", " nn.GELU(),\n", " )\n", "\n", " def forward(self,x): \n", " if self.is_res:\n", " x1 = self.conv1(x)\n", "\n", " x2 = self.conv2(x1)\n", "\n", " if self.same_channesls:\n", " out = x1 + x2\n", " else:\n", " shortcut = nn.Conv2d(x.shape[1],x2.shape[1],1,1,0).to(x.device)\n", " out = shortcut(x) + x2\n", "\n", " return out / 1.414\n", " \n", " else:\n", " x1 = self.conv1(x)\n", " x2 = self.conv2(x1)\n", " return x2\n" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "class UnetUp(nn.Module):\n", " def __init__(self, in_channels, out_channels) -> None:\n", " super(UnetUp,self).__init__()\n", "\n", " self.model = nn.Sequential(\n", " nn.ConvTranspose2d(in_channels,out_channels,2,2),\n", " ResidualBlock(out_channels,out_channels),\n", " ResidualBlock(out_channels,out_channels),\n", " )\n", "\n", " def forward(self, x, skip):\n", " x = torch.cat([x,skip],1)\n", "\n", " x = self.model(x)\n", " return x\n", " \n", "class UnetDown(nn.Module):\n", " def __init__(self, input_channels, out_channels) -> None:\n", " super(UnetDown,self).__init__()\n", "\n", " self.model = nn.Sequential(\n", " ResidualBlock(input_channels,out_channels),\n", " ResidualBlock(out_channels,out_channels),\n", " nn.MaxPool2d(2)\n", " )\n", "\n", " def forward(self,x):\n", " return self.model(x)\n", " \n", "\n", "class EmbedFC(nn.Module):\n", " def __init__(self, input_dim,embed_dm) -> None:\n", " super(EmbedFC,self).__init__()\n", "\n", " self.input_dim = input_dim\n", " \n", " self.model = nn.Sequential(\n", " nn.Linear(input_dim,embed_dm),\n", " nn.GELU(),\n", " nn.Linear(embed_dm,embed_dm),\n", " )\n", "\n", " def forward(self,x):\n", " x = x.view(-1,self.input_dim)\n", " return self.model(x)\n" ] }, { "cell_type": "code", "execution_count": 13, "metadata": {}, "outputs": [], "source": [ "def unorm(x):\n", " # unity norm. results in range of [0,1]\n", " # assume x (h,w,3)\n", " xmax = x.max((0,1))\n", " xmin = x.min((0,1))\n", " return(x - xmin)/(xmax - xmin)\n", "\n", "def norm_all(store, n_t, n_s):\n", " # runs unity norm on all timesteps of all samples\n", " nstore = np.zeros_like(store)\n", " for t in range(n_t):\n", " for s in range(n_s):\n", " nstore[t,s] = unorm(store[t,s])\n", " return nstore\n", "\n", "def norm_torch(x_all):\n", " # runs unity norm on all timesteps of all samples\n", " # input is (n_samples, 3,h,w), the torch image format\n", " x = x_all.cpu().numpy()\n", " xmax = x.max((2,3))\n", " xmin = x.min((2,3))\n", " xmax = np.expand_dims(xmax,(2,3)) \n", " xmin = np.expand_dims(xmin,(2,3))\n", " nstore = (x - xmin)/(xmax - xmin)\n", " return torch.from_numpy(nstore)\n" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [], "source": [ "def plot_grid(x,n_sample,n_rows,save_dir,w):\n", " # x:(n_sample, 3, h, w)\n", " ncols = n_sample//n_rows\n", " grid = make_grid(norm_torch(x), nrow=ncols) # curiously, nrow is number of columns.. or number of items in the row.\n", " save_image(grid, save_dir + f\"run_image_w{w}.png\")\n", " print('saved image at ' + save_dir + f\"run_image_w{w}.png\")\n", " return grid\n", "\n", "def plot_sample(x_gen_store,n_sample,nrows,save_dir, fn, w, save=False):\n", " ncols = n_sample//nrows\n", " sx_gen_store = np.moveaxis(x_gen_store,2,4) \n", " nsx_gen_store = norm_all(sx_gen_store, sx_gen_store.shape[0], n_sample) \n", " fig, axs = plt.subplots(nrows=nrows, ncols=ncols, sharex=True, sharey=True,figsize=(ncols,nrows))\n", " def animate_diff(i, store):\n", " print(f'gif animating frame {i} of {store.shape[0]}', end='\\r')\n", " plots = []\n", " for row in range(nrows):\n", " for col in range(ncols):\n", " axs[row, col].clear()\n", " axs[row, col].set_xticks([])\n", " axs[row, col].set_yticks([])\n", " plots.append(axs[row, col].imshow(store[i,(row*ncols)+col]))\n", " return plots\n", " ani = FuncAnimation(fig, animate_diff, fargs=[nsx_gen_store], interval=200, blit=False, repeat=True, frames=nsx_gen_store.shape[0]) \n", " plt.close()\n", " if save:\n", " ani.save(save_dir + f\"{fn}_w{w}.gif\", dpi=100, writer=PillowWriter(fps=5))\n", " print('saved gif at ' + save_dir + f\"{fn}_w{w}.gif\")\n", " return ani\n" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "transform = transforms.Compose([\n", " transforms.ToTensor(), # from [0,255] to range [0.0,1.0]\n", " transforms.Normalize((0.5,), (0.5,)) # range [-1,1]\n", "\n", "])\n", "\n", "class CustomDataset(Dataset):\n", " def __init__(self, sfilename, lfilename, transform, null_context=False):\n", " self.sprites = np.load(sfilename)\n", " self.slabels = np.load(lfilename)\n", " print(f\"sprite shape: {self.sprites.shape}\")\n", " print(f\"labels shape: {self.slabels.shape}\")\n", " self.transform = transform\n", " self.null_context = null_context\n", " self.sprites_shape = self.sprites.shape\n", " self.slabel_shape = self.slabels.shape\n", " \n", " def __len__(self):\n", " return len(self.sprites)\n", " \n", " def __getitem__(self, idx):\n", " if self.transform:\n", " image = self.transform(self.sprites[idx])\n", " if self.null_context:\n", " label = torch.tensor(0).to(torch.int64)\n", " else:\n", " label = torch.tensor(self.slabels[idx]).to(torch.int64)\n", " return (image, label)\n" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "class ContextUnet(nn.Module):\n", " def __init__(self,in_channels, n_feat = 256,n_cfeat = 10, height = 28) -> None:\n", " super(ContextUnet,self).__init__()\n", "\n", " self.in_channels = in_channels\n", " self.n_feat = n_feat\n", " self.n_cfeat = n_cfeat\n", " self.h = height\n", "\n", " self.init_conv = ResidualBlock(in_channels,n_feat,is_res=True)\n", "\n", " self.down1 = UnetDown(n_feat,n_feat)\n", " self.down2 = UnetDown(n_feat,n_feat * 2)\n", "\n", " self.to_vec = nn.Sequential(nn.AvgPool2d((4)),nn.GELU())\n", "\n", " self.timeembed1 = EmbedFC(1, 2 *n_feat)\n", " self.timeembed2 = EmbedFC(1,n_feat)\n", " self.contextembed1 = EmbedFC(n_cfeat,2 * n_feat)\n", " self.contextembed2 = EmbedFC(n_cfeat,n_feat)\n", "\n", " self.up0 = nn.Sequential(\n", " nn.ConvTranspose2d(2 * n_feat,2*n_feat,self.h // 4,self.h // 4),\n", " nn.GroupNorm(8, 2*n_feat),\n", " nn.ReLU(),\n", " )\n", "\n", " self.up1 = UnetUp(4 * n_feat,n_feat)\n", " self.up2 = UnetUp(2 * n_feat,n_feat)\n", "\n", " self.out = nn.Sequential(\n", " nn.Conv2d(2 * n_feat, n_feat,3,1,1),\n", " nn.GroupNorm(8,n_feat),\n", " nn.ReLU(),\n", " nn.Conv2d(n_feat,self.in_channels,3,1,1)\n", " )\n", "\n", " def forward(self,x,t,c=None):\n", " x = self.init_conv(x)\n", "\n", " down1 = self.down1(x)\n", " down2 = self.down2(down1)\n", "\n", " hidden_vec = self.to_vec(down2)\n", "\n", " if c is None:\n", " c = torch.zeros(x.shape[0],self.n_cfeat).to(x)\n", " \n", " cemb1 = self.contextembed1(c).view(-1,self.n_feat*2,1,1)\n", " temb1 = self.timeembed1(t).view(-1,self.n_feat * 2,1,1)\n", " cemb2 = self.contextembed2(c).view(-1,self.n_feat,1,1)\n", " temb2 = self.timeembed2(t).view(-1,self.n_feat,1,1)\n", "\n", " up0 = self.up0(hidden_vec)\n", " up1 =self.up1(up0*cemb1 + temb1,down2)\n", " up2 = self.up2(up1*cemb2+temb2,down1)\n", "\n", " out = self.out(torch.cat((up2,x),1))\n", "\n", " return out" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "# Hyperparameters\n", "\n", "timesteps = 500\n", "beta1 = 1e-4\n", "beta2 = 0.02\n", "\n", "device = \"cuda\"\n", "n_feat = 64\n", "n_cfeat = 5\n", "height = 16\n", "save_dir=\"./checkpoints\"\n", "\n", "batch_size = 100\n", "n_epoch = 40\n", "lrate = 1e-3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "b_t = (beta2 - beta1) * torch.linspace(0, 1, timesteps + 1, device=device) + beta1\n", "a_t = 1 - b_t\n", "a_bt = torch.cumsum(a_t.log(),0).exp()\n", "a_bt[0] = 1" ] }, { "cell_type": "code", "execution_count": 181, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "sprite shape: (89400, 16, 16, 3)\n", "labels shape: (89400, 5)\n" ] } ], "source": [ "dataset = CustomDataset(\"./sprites_1788_16x16.npy\", \"./sprite_labels_nc_1788_16x16.npy\", transform, null_context=False)\n", "dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=1)" ] }, { "cell_type": "code", "execution_count": 182, "metadata": {}, "outputs": [], "source": [ "nn_model = ContextUnet(3,n_feat,n_cfeat,height).to(device)\n", "optim = torch.optim.Adam(nn_model.parameters(),lrate)" ] }, { "cell_type": "code", "execution_count": 183, "metadata": {}, "outputs": [], "source": [ "def perturb_input(x, t, noise):\n", " return a_bt.sqrt()[t, None, None, None] * x + (1 - a_bt[t, None, None, None]) * noise" ] }, { "cell_type": "code", "execution_count": 184, "metadata": {}, "outputs": [], "source": [ "# nn_model.train()\n", "\n", "# for epoch in range(n_epoch):\n", "\n", "# optim.param_groups[0]['lr'] = lrate * (1-epoch/n_epoch)\n", "# for x,_ in tqdm(dataloader):\n", "# optim.zero_grad()\n", "\n", "# x = x.to(device)\n", "\n", "# t = torch.randint(1,timesteps+1,x.shape[0]).to(device)\n", "# noise = torch.randn_like(x)\n", "# x_pert = perturb_input(x,t,noise)\n", "\n", "# pred = nn_model(x_pert,t / timesteps)\n", "\n", "# loss = F.mse_loss(pred,noise)\n", "# loss.backward()\n", "# optim.step()\n", "\n", "# if epoch % 1 == 0 and epoch >0:\n", "# if not os.path.exists(save_dir):\n", "# os.mkdir(save_dir)\n", "# torch.save(nn_model,save_dir + f\"model_Epoch{epoch}.pth\")\n", "# print(\"Saved model\")\n" ] }, { "cell_type": "code", "execution_count": 185, "metadata": {}, "outputs": [], "source": [ "def denoise_add_noise(x, t, pred_noise, z=None):\n", " if z is None:\n", " z = torch.randn_like(x)\n", " noise = b_t.sqrt()[t] * z\n", " mean = (x - pred_noise * ((1 - a_t[t]) / (1 - a_bt[t]).sqrt())) / a_t[t].sqrt()\n", " return mean + noise" ] }, { "cell_type": "code", "execution_count": 186, "metadata": {}, "outputs": [], "source": [ "@torch.no_grad()\n", "def sample_ddpm(n_sample, save_rate=20):\n", " # x_T ~ N(0, 1), sample initial noise\n", " samples = torch.randn(n_sample, 3, height, height).to(device) \n", "\n", " # array to keep track of generated steps for plotting\n", " intermediate = [] \n", " for i in range(timesteps, 0, -1):\n", " print(f'sampling timestep {i:3d}', end='\\r')\n", "\n", " # reshape time tensor\n", " t = torch.tensor([i / timesteps])[:, None, None, None].to(device)\n", "\n", " # sample some random noise to inject back in. For i = 1, don't add back in noise\n", " z = torch.randn_like(samples) if i > 1 else 0\n", "\n", " eps = nn_model(samples, t) # predict noise e_(x_t,t)\n", " samples = denoise_add_noise(samples, i, eps, z)\n", " if i % save_rate ==0 or i==timesteps or i<8:\n", " intermediate.append(samples.detach().cpu().numpy())\n", "\n", " intermediate = np.stack(intermediate)\n", " return samples, intermediate" ] }, { "cell_type": "code", "execution_count": 193, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Loaded model\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "/tmp/ipykernel_22484/1945706525.py:1: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.\n", " model = torch.load(f\"/root/Diffusion-Sprite/checkpoints/model_Epoch50.pth\",map_location=device)\n" ] } ], "source": [ "nn_model = torch.load(f\"checkpoints/model_Epoch31.pth\",map_location=device)\n", "nn_model.eval()\n", "print(\"Loaded model\")" ] }, { "cell_type": "code", "execution_count": 194, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "sampling timestep 451\r" ] }, { "name": "stdout", "output_type": "stream", "text": [ "gif animating frame 31 of 32\r" ] }, { "data": { "text/html": [ "\n", "\n", "\n", "\n", "\n", "\n", "
\n", " \n", "
\n", " \n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", "
\n", " \n", " \n", " \n", " \n", " \n", " \n", "
\n", "
\n", "
\n", "\n", "\n", "\n" ], "text/plain": [ "" ] }, "execution_count": 194, "metadata": {}, "output_type": "execute_result" }, { "data": { "text/plain": [ "
" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "plt.clf()\n", "samples, intermediate_ddpm = sample_ddpm(32)\n", "animation_ddpm = plot_sample(intermediate_ddpm,32,4,save_dir, \"ani_run\", None, save=False)\n", "HTML(animation_ddpm.to_jshtml())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.12.3" } }, "nbformat": 4, "nbformat_minor": 2 }