| |
| import tensorflow as tf |
| from tensorflow import keras |
|
|
| |
| import numpy as np |
| import matplotlib.pyplot as plt |
|
|
| import os |
| import pathlib |
| import time |
| import datetime |
| import glob |
| import random |
| import cv2 |
|
|
| from PIL import Image |
| from IPython import display |
|
|
| def load(image_path): |
| |
| input_image_path, real_image_path = image_path |
| |
| |
| input_image = tf.io.read_file(input_image_path) |
| |
| input_image = tf.io.decode_jpeg(input_image, channels=CHANNEL) |
| |
| real_image = tf.io.read_file(real_image_path) |
| |
| real_image = tf.io.decode_jpeg(real_image, channels=CHANNEL) |
|
|
| |
| input_image = tf.cast(input_image, tf.float32) |
| real_image = tf.cast(real_image, tf.float32) |
|
|
| return input_image, real_image |
|
|
| |
| def read_bd_rm(dataset, batch_size=1, size=256): |
| train_image_dataset = dataset |
|
|
| batch_input = [] |
| batch_real = [] |
|
|
| for pd in train_image_dataset: |
| input = pd[0] |
| real = pd[1] |
| |
| |
| if CHANNEL == 1: |
| input = Image.open(input).convert("L") |
| input = input.resize((256, 256)) |
| real = Image.open(real).convert("L") |
| real = real.resize((256, 256)) |
| else: |
| input = Image.open(input) |
| input = input.resize((256, 256)) |
| real = Image.open(real) |
| real = real.resize((256, 256)) |
| |
| input = np.reshape(input, [1, size, size, CHANNEL]) |
| |
| real = np.reshape(real, [1, size, size, CHANNEL]) |
| |
| input = tf.cast(tf.convert_to_tensor(np.asarray(input)), dtype=tf.float32) / 255. |
| |
| real = tf.cast(tf.convert_to_tensor(np.asarray(real)), dtype=tf.float32) / 255. |
|
|
| |
| batch_input += [input] |
| |
| batch_real += [real] |
| |
| |
| if len(batch_input) == batch_size: |
| batch_input = tf.concat(batch_input, axis=0) |
| batch_real = tf.concat(batch_real, axis=0) |
|
|
| yield {'input': batch_input, 'real': batch_real} |
| batch_input = [] |
| batch_real = [] |
|
|
| |
| def read_single_image(dataset, batch_size=1, size=256): |
| train_image_dataset = dataset |
|
|
| batch_input = [] |
|
|
| for pd in train_image_dataset: |
| input = pd[0] |
| |
| |
| if CHANNEL == 1: |
| input = Image.open(input).convert("L") |
| input = input.resize((256, 256)) |
| else: |
| input = Image.open(input) |
| input = input.resize((256, 256)) |
| |
| input = np.reshape(input, [1, size, size, CHANNEL]) |
| |
| input = tf.cast(tf.convert_to_tensor(np.asarray(input)), dtype=tf.float32) / 255. |
|
|
| |
| batch_input += [input] |
| |
| |
| if len(batch_input) == batch_size: |
| batch_input = tf.concat(batch_input, axis=0) |
|
|
| yield {'input': batch_input} |
| batch_input = [] |
|
|
| def train_data_loader(batch_size=1): |
| |
| input_paths = sorted(glob.glob(os.path.join(str(PATH), '*.png'))) |
| real_paths = sorted(glob.glob(os.path.join(str(PATH), '*.png'))) |
| |
| |
| records = [] |
| |
| |
| |
| for input in input_paths: |
| records += [[input, real_paths[0]]] |
|
|
| |
| |
| |
|
|
| return read_bd_rm(records, batch_size=batch_size) |
|
|
| def test_data_loader(batch_size=1): |
| input_paths = sorted(glob.glob(os.path.join(str(PATH) + "/test/" , '*.jpg'))) |
| |
| records = [] |
| for input in input_paths: |
| records += [[input]] |
|
|
| return read_single_image(records, batch_size=batch_size) |
|
|
| |
| def train_step(input_image, target, step, epoch): |
| with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape: |
| |
| gen_output = generator(input_image, training=True) |
|
|
| |
| disc_real_output = discriminator([input_image, target], training=True) |
| |
| disc_generated_output = discriminator([input_image, gen_output], training=True) |
| |
| gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target) |
| |
| disc_loss = discriminator_loss(disc_real_output, disc_generated_output) |
|
|
| |
| generator_gradients = gen_tape.gradient(gen_total_loss, |
| generator.trainable_variables) |
| |
| discriminator_gradients = disc_tape.gradient(disc_loss, |
| discriminator.trainable_variables) |
|
|
| |
| generator_optimizer.apply_gradients(zip(generator_gradients, |
| generator.trainable_variables)) |
| |
| discriminator_optimizer.apply_gradients(zip(discriminator_gradients, |
| discriminator.trainable_variables)) |
| |
| if step % 10 == 0: |
| |
| print("epoch: {} step: {} gen_total_loss: {}".format(epoch, step, str(gen_total_loss.numpy()))) |
| print("epoch: {} step: {} gen_gan_loss: {}".format(epoch, step, str(gen_gan_loss.numpy()))) |
| print("epoch: {} step: {} disc_loss: {}".format(epoch, step, str(disc_loss.numpy()))) |
|
|
| |
| with summary_writer.as_default(): |
| tf.summary.scalar('gen_total_loss', gen_total_loss, step) |
| tf.summary.scalar('gen_gan_loss', gen_gan_loss, step) |
| tf.summary.scalar('gen_l1_loss', gen_l1_loss, step) |
| tf.summary.scalar('disc_loss', disc_loss, step) |
|
|
|
|
| def fit(): |
| for epoch in range(1, EPOCH+1): |
| step = 1 |
| |
| train_generator = train_data_loader(batch_size=8) |
|
|
| start = time.time() |
| for train_loader_dict in train_generator: |
| input_image = train_loader_dict['input'] |
| target = train_loader_dict['real'] |
| |
| train_step(input_image, target, step, epoch) |
| step = step + 1 |
|
|
| if step % 10 == 0: |
| |
| display.clear_output(wait=True) |
| |
| |
| start = time.time() |
| |
| |
| if epoch % 10 == 0: |
| checkpoint_path = "checkpoints/cp-" + str(epoch) + ".h5" |
| generator.save_weights(checkpoint_path) |
|
|
| |
| def downsample(filters, kernel_size, strides=2, dropout=0.5, max_pool=True, batch_norm=True): |
| initializer = tf.random_normal_initializer(0., 0.02) |
|
|
| result = tf.keras.Sequential() |
| result.add( |
| |
| tf.keras.layers.Conv2D(filters, kernel_size, strides=strides, padding='same', |
| kernel_initializer=initializer, use_bias=False)) |
| |
| if max_pool: |
| result.add(tf.keras.layers.MaxPool2D(pool_size=(1, 1), strides=None, padding='same')) |
|
|
| |
| if batch_norm: |
| result.add(tf.keras.layers.BatchNormalization()) |
| |
| |
| if dropout != None: |
| result.add(tf.keras.layers.Dropout(dropout)) |
|
|
| result.add(tf.keras.layers.LeakyReLU()) |
| return result |
|
|
|
|
| |
| def upsample(filters, size, dropout=0.5, max_pool=True, batch_norm=True): |
| initializer = tf.random_normal_initializer(0., 0.02) |
|
|
| result = tf.keras.Sequential() |
| result.add( |
| |
| tf.keras.layers.Conv2DTranspose(filters, size, strides=2, |
| padding='same', |
| kernel_initializer=initializer, |
| use_bias=False) |
| ) |
| |
| |
| if max_pool: |
| result.add(tf.keras.layers.MaxPool2D(pool_size=(1, 1), strides=None, padding='same')) |
|
|
| |
| if batch_norm: |
| result.add(tf.keras.layers.BatchNormalization()) |
|
|
| |
| if dropout != None: |
| result.add(tf.keras.layers.Dropout(dropout)) |
| result.add(tf.keras.layers.ReLU()) |
|
|
| return result |
|
|
| |
| def Generator(image_shape): |
| initializer = tf.random_normal_initializer(0., 0.02) |
| |
| input_image = keras.layers.Input(shape=image_shape, name='input_image') |
| x = input_image |
|
|
| |
| enc1 = downsample(n_E1, kernel_size_E1, stride_E1, DropOut_E1, MaxPooling_E1, BatchNorm_E1)(x) |
| enc2 = downsample(n_E2, kernel_size_E2 ,stride_E2, DropOut_E2, MaxPooling_E2, BatchNorm_E2)(enc1) |
| enc3 = downsample(n_E3, kernel_size_E3, stride_E3, DropOut_E3, MaxPooling_E3, BatchNorm_E3)(enc2) |
| enc4 = downsample(n_E4, kernel_size_E4, stride_E4, DropOut_E4, MaxPooling_E4, BatchNorm_E4)(enc3) |
| enc5 = downsample(n_E5, kernel_size_E5 ,stride_E5, DropOut_E5, MaxPooling_E5, BatchNorm_E5)(enc4) |
| enc6 = downsample(n_E6, kernel_size_E6 ,stride_E6, DropOut_E6, MaxPooling_E6, BatchNorm_E6)(enc5) |
| enc7 = downsample(n_E7, kernel_size_E7 ,stride_E7, DropOut_E7, MaxPooling_E7, BatchNorm_E7)(enc6) |
| enc8 = downsample(n_E8, kernel_size_E8, stride_E8, DropOut_E8, MaxPooling_E8, BatchNorm_E8)(enc7) |
|
|
| |
| dec1 = upsample(n_E7, kernel_size_E7, DropOut_E7, MaxPooling_E7, BatchNorm_E7) |
| dec2 = upsample(n_E6, kernel_size_E6, DropOut_E6, MaxPooling_E6, BatchNorm_E6) |
| dec3 = upsample(n_E5, kernel_size_E5, DropOut_E5, MaxPooling_E5, BatchNorm_E5) |
| dec4 = upsample(n_E4, kernel_size_E4, DropOut_E4, MaxPooling_E4, BatchNorm_E4) |
| dec5 = upsample(n_E3, kernel_size_E3, DropOut_E3, MaxPooling_E3, BatchNorm_E3) |
| dec6 = upsample(n_E2, kernel_size_E2, DropOut_E2, MaxPooling_E2, BatchNorm_E2) |
| dec7 = upsample(n_E1, kernel_size_E1, DropOut_E1, MaxPooling_E1, BatchNorm_E1) |
| |
|
|
| |
| |
|
|
| |
| enc_value_list = [enc7, enc6, enc5, enc4, enc3, enc2, enc1] |
|
|
| |
| dec_value_list = [dec1, dec2, dec3, dec4, dec5, dec6, dec7] |
|
|
| |
| bipass_list = [Bipass_7, Bipass_6, Bipass_5, Bipass_4, Bipass_3, Bipass_2, Bipass_1] |
|
|
| |
| x = enc8 |
|
|
| |
| for dec, enc, bipass in zip(dec_value_list, enc_value_list, bipass_list): |
| x = dec(x) |
| |
| if bipass: |
| x = tf.keras.layers.Concatenate()([x, enc]) |
|
|
| |
| |
| |
|
|
| |
| OUTPUT_CHANNELS = CHANNEL |
|
|
| |
| initializer = tf.random_normal_initializer(0., 0.02) |
|
|
| |
| last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS, 4, |
| strides=2, |
| padding='same', |
| kernel_initializer=initializer, |
| activation='tanh') |
| x = last(x) |
|
|
| return tf.keras.Model(inputs=input_image, outputs=x) |
|
|
|
|
| |
| def generator_loss(disc_generated_output, gen_output, target): |
| gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output) |
|
|
| |
| l1_loss = tf.reduce_mean(tf.abs(target - gen_output)) |
|
|
| total_gen_loss = gan_loss + (LAMBDA * l1_loss) |
|
|
| return total_gen_loss, gan_loss, l1_loss |
|
|
| |
| def Discriminator(image_shape): |
| initializer = tf.random_normal_initializer(0., 0.02) |
|
|
| input_image = tf.keras.layers.Input(image_shape, name='input_image') |
| target_image = tf.keras.layers.Input(image_shape, name='target_image') |
|
|
| x = tf.keras.layers.concatenate([input_image, target_image]) |
|
|
| |
| if image_shape == (512, 512, 1): |
| x = downsample(CHANNEL, 4)(x) |
| down1 = downsample(n_E1, kernel_size_E1, 1, DropOut_E1, MaxPooling_E1, BatchNorm_E1)(x) |
| down2 = downsample(n_E2, kernel_size_E2, 1, DropOut_E2, MaxPooling_E2, BatchNorm_E2)(x) |
| down3 = downsample(n_E3, kernel_size_E3, stride_E3, DropOut_E3, MaxPooling_E3, BatchNorm_E3)(down2) |
| down4 = downsample(n_E4, kernel_size_E4, stride_E4, DropOut_E4, MaxPooling_E4, BatchNorm_E4)(down3) |
| down5 = downsample(n_E5, kernel_size_E5, stride_E5, DropOut_E5, MaxPooling_E5, BatchNorm_E5)(down4) |
| down6 = downsample(n_E6, kernel_size_E6, stride_E6, DropOut_E6, MaxPooling_E6, BatchNorm_E6)(down5) |
| last = tf.keras.layers.Conv2D(1, 4, strides=1, padding='same', |
| kernel_initializer=initializer) |
|
|
| x = last(down6) |
| return tf.keras.Model(inputs=[input_image, target_image], outputs=x) |
|
|
|
|
| def discriminator_loss(disc_real_output, disc_generated_output): |
| |
| real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output) |
| |
| generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output) |
| |
| total_disc_loss = real_loss + generated_loss |
|
|
| return total_disc_loss |
|
|
| |
| generator_optimizer = keras.optimizers.RMSprop(lr=0.001, rho=0.9, epsilon=None, decay=0.0) |
| discriminator_optimizer = keras.optimizers.RMSprop(lr=0.001, rho=0.9, epsilon=None, decay=0.0) |
|
|
| def generate_images(model, test_input, tar): |
| prediction = model(test_input, training=True) |
| plt.figure(figsize=(15, 15)) |
|
|
| display_list = [test_input[0], tar[0], prediction[0]] |
| title = ['Input Image', 'Ground Truth', 'Predicted Image'] |
|
|
| for i in range(3): |
| plt.subplot(1, 3, i+1) |
| plt.title(title[i]) |
| |
| if CHANNEL == 1: |
| plt.imshow(display_list[i].numpy().flatten().reshape(256, 256) * 0.5 + 0.5, cmap='gray') |
| else: |
| plt.imshow(display_list[i]) |
| plt.axis('off') |
| plt.show() |
|
|
|
|
|
|
| train_images = glob.glob('bottle/train/good/*') |
| train = [] |
| for im in train_images: |
| image = cv2.imread(im) |
| image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
| image = cv2.resize(image, (256, 256)) |
| train.append(image) |
|
|
| train = np.array(train) |
| train = train.astype('float32') /255. |
|
|
|
|
| PATH = 'bottle/train/good' |
| |
| CHANNEL = 1 |
|
|
| |
| G_input_dim = (256, 256, CHANNEL) |
| EPOCH = 20 |
|
|
| |
| n_E1 = 32 |
| m_E1 = 128 |
| stride_E1 = 2 |
| kernel_size_E1 = 4 |
| MaxPooling_E1 = True |
| ActivationFunc_E1 = "Leaky_ReLu" |
| BatchNorm_E1 = True |
| DropOut_E1 = 0.5 |
| Bipass_1 = True |
|
|
| |
| n_E2 = 64 |
| m_E2 = 64 |
| stride_E2 = 2 |
| kernel_size_E2 = 4 |
| MaxPooling_E2 = True |
| ActivationFunc_E2 = "Leaky_ReLu" |
| alfa = 0.2 |
| BatchNorm_E2 = True |
| DropOut_E2 = 0.5 |
| Bipass_2 = True |
|
|
| |
| n_E3 = 128 |
| m_E3 = 128 |
| stride_E3 = 2 |
| kernel_size_E3 = 4 |
| MaxPooling_E3 = True |
| ActivationFunc_E3 = "Leaky_ReLu" |
| alfa = 0.2 |
| BatchNorm_E3 = True |
| DropOut_E3 = 0.5 |
| Bipass_3 = True |
|
|
| |
| n_E4 = 256 |
| m_E4 = 256 |
| stride_E4 = 2 |
| kernel_size_E4 = 4 |
| MaxPooling_E4 = True |
| ActivationFunc_E4 = "Leaky_ReLu" |
| alfa = 0.2 |
| BatchNorm_E4 = True |
| DropOut_E4 = 0.5 |
| Bipass_4 = True |
|
|
| |
| n_E5 = 512 |
| m_E5 = 512 |
| stride_E5 = 2 |
| kernel_size_E5 = 4 |
| MaxPooling_E5 = True |
| ActivationFunc_E5 = "Leaky_ReLu" |
| alfa = 0.2 |
| BatchNorm_E5 = True |
| DropOut_E5 = 0.5 |
| Bipass_5 = True |
|
|
| |
| n_E6 = 512 |
| m_E6 = 512 |
| stride_E6 = 2 |
| kernel_size_E6 = 4 |
| MaxPooling_E6 = True |
| ActivationFunc_E6 = "Leaky_ReLu" |
| alfa = 0.2 |
| BatchNorm_E6 = True |
| DropOut_E6 = 0.5 |
| Bipass_6 = True |
|
|
| |
| n_E7 = 512 |
| m_E7 = 512 |
| stride_E7 = 2 |
| kernel_size_E7 = 4 |
| MaxPooling_E7 = True |
| ActivationFunc_E7 = "Leaky_ReLu" |
| alfa = 0.2 |
| BatchNorm_E7 = True |
| DropOut_E7 = 0.5 |
| Bipass_7 = True |
|
|
|
|
| |
| n_E8 = 512 |
| m_E8 = 512 |
| stride_E8 = 2 |
| kernel_size_E8 = 4 |
| MaxPooling_E8 = True |
| ActivationFunc_E8 = "Leaky_ReLu" |
| alfa = 0.2 |
| BatchNorm_E8 = True |
| DropOut_E8 = 0.5 |
|
|
| |
| expantion = True |
|
|
| log_dir="logs/" |
|
|
| summary_writer = tf.summary.create_file_writer( |
| log_dir + "fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")) |
|
|
| generator = Generator(G_input_dim) |
| |
|
|
| LAMBDA = 100 |
|
|
| loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True) |
| discriminator = Discriminator(G_input_dim) |
|
|
| fit() |