| | |
| | import tensorflow as tf |
| | from tensorflow import keras |
| |
|
| | |
| | import numpy as np |
| | import matplotlib.pyplot as plt |
| |
|
| | import os |
| | import pathlib |
| | import time |
| | import datetime |
| | import glob |
| | import random |
| | import cv2 |
| |
|
| | from PIL import Image |
| | from IPython import display |
| |
|
| | def load(image_path): |
| | |
| | input_image_path, real_image_path = image_path |
| | |
| | |
| | input_image = tf.io.read_file(input_image_path) |
| | |
| | input_image = tf.io.decode_jpeg(input_image, channels=CHANNEL) |
| | |
| | real_image = tf.io.read_file(real_image_path) |
| | |
| | real_image = tf.io.decode_jpeg(real_image, channels=CHANNEL) |
| |
|
| | |
| | input_image = tf.cast(input_image, tf.float32) |
| | real_image = tf.cast(real_image, tf.float32) |
| |
|
| | return input_image, real_image |
| |
|
| | |
| | def read_bd_rm(dataset, batch_size=1, size=256): |
| | train_image_dataset = dataset |
| |
|
| | batch_input = [] |
| | batch_real = [] |
| |
|
| | for pd in train_image_dataset: |
| | input = pd[0] |
| | real = pd[1] |
| | |
| | |
| | if CHANNEL == 1: |
| | input = Image.open(input).convert("L") |
| | input = input.resize((256, 256)) |
| | real = Image.open(real).convert("L") |
| | real = real.resize((256, 256)) |
| | else: |
| | input = Image.open(input) |
| | input = input.resize((256, 256)) |
| | real = Image.open(real) |
| | real = real.resize((256, 256)) |
| | |
| | input = np.reshape(input, [1, size, size, CHANNEL]) |
| | |
| | real = np.reshape(real, [1, size, size, CHANNEL]) |
| | |
| | input = tf.cast(tf.convert_to_tensor(np.asarray(input)), dtype=tf.float32) / 255. |
| | |
| | real = tf.cast(tf.convert_to_tensor(np.asarray(real)), dtype=tf.float32) / 255. |
| |
|
| | |
| | batch_input += [input] |
| | |
| | batch_real += [real] |
| | |
| | |
| | if len(batch_input) == batch_size: |
| | batch_input = tf.concat(batch_input, axis=0) |
| | batch_real = tf.concat(batch_real, axis=0) |
| |
|
| | yield {'input': batch_input, 'real': batch_real} |
| | batch_input = [] |
| | batch_real = [] |
| |
|
| | |
| | def read_single_image(dataset, batch_size=1, size=256): |
| | train_image_dataset = dataset |
| |
|
| | batch_input = [] |
| |
|
| | for pd in train_image_dataset: |
| | input = pd[0] |
| | |
| | |
| | if CHANNEL == 1: |
| | input = Image.open(input).convert("L") |
| | input = input.resize((256, 256)) |
| | else: |
| | input = Image.open(input) |
| | input = input.resize((256, 256)) |
| | |
| | input = np.reshape(input, [1, size, size, CHANNEL]) |
| | |
| | input = tf.cast(tf.convert_to_tensor(np.asarray(input)), dtype=tf.float32) / 255. |
| |
|
| | |
| | batch_input += [input] |
| | |
| | |
| | if len(batch_input) == batch_size: |
| | batch_input = tf.concat(batch_input, axis=0) |
| |
|
| | yield {'input': batch_input} |
| | batch_input = [] |
| |
|
| | def train_data_loader(batch_size=1): |
| | |
| | input_paths = sorted(glob.glob(os.path.join(str(PATH), '*.png'))) |
| | real_paths = sorted(glob.glob(os.path.join(str(PATH), '*.png'))) |
| | |
| | |
| | records = [] |
| | |
| | |
| | |
| | for input in input_paths: |
| | records += [[input, real_paths[0]]] |
| |
|
| | |
| | |
| | |
| |
|
| | return read_bd_rm(records, batch_size=batch_size) |
| |
|
| | def test_data_loader(batch_size=1): |
| | input_paths = sorted(glob.glob(os.path.join(str(PATH) + "/test/" , '*.jpg'))) |
| | |
| | records = [] |
| | for input in input_paths: |
| | records += [[input]] |
| |
|
| | return read_single_image(records, batch_size=batch_size) |
| |
|
| | |
| | def train_step(input_image, target, step, epoch): |
| | with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape: |
| | |
| | gen_output = generator(input_image, training=True) |
| |
|
| | |
| | disc_real_output = discriminator([input_image, target], training=True) |
| | |
| | disc_generated_output = discriminator([input_image, gen_output], training=True) |
| | |
| | gen_total_loss, gen_gan_loss, gen_l1_loss = generator_loss(disc_generated_output, gen_output, target) |
| | |
| | disc_loss = discriminator_loss(disc_real_output, disc_generated_output) |
| |
|
| | |
| | generator_gradients = gen_tape.gradient(gen_total_loss, |
| | generator.trainable_variables) |
| | |
| | discriminator_gradients = disc_tape.gradient(disc_loss, |
| | discriminator.trainable_variables) |
| |
|
| | |
| | generator_optimizer.apply_gradients(zip(generator_gradients, |
| | generator.trainable_variables)) |
| | |
| | discriminator_optimizer.apply_gradients(zip(discriminator_gradients, |
| | discriminator.trainable_variables)) |
| | |
| | if step % 10 == 0: |
| | |
| | print("epoch: {} step: {} gen_total_loss: {}".format(epoch, step, str(gen_total_loss.numpy()))) |
| | print("epoch: {} step: {} gen_gan_loss: {}".format(epoch, step, str(gen_gan_loss.numpy()))) |
| | print("epoch: {} step: {} disc_loss: {}".format(epoch, step, str(disc_loss.numpy()))) |
| |
|
| | |
| | with summary_writer.as_default(): |
| | tf.summary.scalar('gen_total_loss', gen_total_loss, step) |
| | tf.summary.scalar('gen_gan_loss', gen_gan_loss, step) |
| | tf.summary.scalar('gen_l1_loss', gen_l1_loss, step) |
| | tf.summary.scalar('disc_loss', disc_loss, step) |
| |
|
| |
|
| | def fit(): |
| | for epoch in range(1, EPOCH+1): |
| | step = 1 |
| | |
| | train_generator = train_data_loader(batch_size=8) |
| |
|
| | start = time.time() |
| | for train_loader_dict in train_generator: |
| | input_image = train_loader_dict['input'] |
| | target = train_loader_dict['real'] |
| | |
| | train_step(input_image, target, step, epoch) |
| | step = step + 1 |
| |
|
| | if step % 10 == 0: |
| | |
| | display.clear_output(wait=True) |
| | |
| | |
| | start = time.time() |
| | |
| | |
| | if epoch % 10 == 0: |
| | checkpoint_path = "checkpoints/cp-" + str(epoch) + ".h5" |
| | generator.save_weights(checkpoint_path) |
| |
|
| | |
| | def downsample(filters, kernel_size, strides=2, dropout=0.5, max_pool=True, batch_norm=True): |
| | initializer = tf.random_normal_initializer(0., 0.02) |
| |
|
| | result = tf.keras.Sequential() |
| | result.add( |
| | |
| | tf.keras.layers.Conv2D(filters, kernel_size, strides=strides, padding='same', |
| | kernel_initializer=initializer, use_bias=False)) |
| | |
| | if max_pool: |
| | result.add(tf.keras.layers.MaxPool2D(pool_size=(1, 1), strides=None, padding='same')) |
| |
|
| | |
| | if batch_norm: |
| | result.add(tf.keras.layers.BatchNormalization()) |
| | |
| | |
| | if dropout != None: |
| | result.add(tf.keras.layers.Dropout(dropout)) |
| |
|
| | result.add(tf.keras.layers.LeakyReLU()) |
| | return result |
| |
|
| |
|
| | |
| | def upsample(filters, size, dropout=0.5, max_pool=True, batch_norm=True): |
| | initializer = tf.random_normal_initializer(0., 0.02) |
| |
|
| | result = tf.keras.Sequential() |
| | result.add( |
| | |
| | tf.keras.layers.Conv2DTranspose(filters, size, strides=2, |
| | padding='same', |
| | kernel_initializer=initializer, |
| | use_bias=False) |
| | ) |
| | |
| | |
| | if max_pool: |
| | result.add(tf.keras.layers.MaxPool2D(pool_size=(1, 1), strides=None, padding='same')) |
| |
|
| | |
| | if batch_norm: |
| | result.add(tf.keras.layers.BatchNormalization()) |
| |
|
| | |
| | if dropout != None: |
| | result.add(tf.keras.layers.Dropout(dropout)) |
| | result.add(tf.keras.layers.ReLU()) |
| |
|
| | return result |
| |
|
| | |
| | def Generator(image_shape): |
| | initializer = tf.random_normal_initializer(0., 0.02) |
| | |
| | input_image = keras.layers.Input(shape=image_shape, name='input_image') |
| | x = input_image |
| |
|
| | |
| | enc1 = downsample(n_E1, kernel_size_E1, stride_E1, DropOut_E1, MaxPooling_E1, BatchNorm_E1)(x) |
| | enc2 = downsample(n_E2, kernel_size_E2 ,stride_E2, DropOut_E2, MaxPooling_E2, BatchNorm_E2)(enc1) |
| | enc3 = downsample(n_E3, kernel_size_E3, stride_E3, DropOut_E3, MaxPooling_E3, BatchNorm_E3)(enc2) |
| | enc4 = downsample(n_E4, kernel_size_E4, stride_E4, DropOut_E4, MaxPooling_E4, BatchNorm_E4)(enc3) |
| | enc5 = downsample(n_E5, kernel_size_E5 ,stride_E5, DropOut_E5, MaxPooling_E5, BatchNorm_E5)(enc4) |
| | enc6 = downsample(n_E6, kernel_size_E6 ,stride_E6, DropOut_E6, MaxPooling_E6, BatchNorm_E6)(enc5) |
| | enc7 = downsample(n_E7, kernel_size_E7 ,stride_E7, DropOut_E7, MaxPooling_E7, BatchNorm_E7)(enc6) |
| | enc8 = downsample(n_E8, kernel_size_E8, stride_E8, DropOut_E8, MaxPooling_E8, BatchNorm_E8)(enc7) |
| |
|
| | |
| | dec1 = upsample(n_E7, kernel_size_E7, DropOut_E7, MaxPooling_E7, BatchNorm_E7) |
| | dec2 = upsample(n_E6, kernel_size_E6, DropOut_E6, MaxPooling_E6, BatchNorm_E6) |
| | dec3 = upsample(n_E5, kernel_size_E5, DropOut_E5, MaxPooling_E5, BatchNorm_E5) |
| | dec4 = upsample(n_E4, kernel_size_E4, DropOut_E4, MaxPooling_E4, BatchNorm_E4) |
| | dec5 = upsample(n_E3, kernel_size_E3, DropOut_E3, MaxPooling_E3, BatchNorm_E3) |
| | dec6 = upsample(n_E2, kernel_size_E2, DropOut_E2, MaxPooling_E2, BatchNorm_E2) |
| | dec7 = upsample(n_E1, kernel_size_E1, DropOut_E1, MaxPooling_E1, BatchNorm_E1) |
| | |
| |
|
| | |
| | |
| |
|
| | |
| | enc_value_list = [enc7, enc6, enc5, enc4, enc3, enc2, enc1] |
| |
|
| | |
| | dec_value_list = [dec1, dec2, dec3, dec4, dec5, dec6, dec7] |
| |
|
| | |
| | bipass_list = [Bipass_7, Bipass_6, Bipass_5, Bipass_4, Bipass_3, Bipass_2, Bipass_1] |
| |
|
| | |
| | x = enc8 |
| |
|
| | |
| | for dec, enc, bipass in zip(dec_value_list, enc_value_list, bipass_list): |
| | x = dec(x) |
| | |
| | if bipass: |
| | x = tf.keras.layers.Concatenate()([x, enc]) |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | OUTPUT_CHANNELS = CHANNEL |
| |
|
| | |
| | initializer = tf.random_normal_initializer(0., 0.02) |
| |
|
| | |
| | last = tf.keras.layers.Conv2DTranspose(OUTPUT_CHANNELS, 4, |
| | strides=2, |
| | padding='same', |
| | kernel_initializer=initializer, |
| | activation='tanh') |
| | x = last(x) |
| |
|
| | return tf.keras.Model(inputs=input_image, outputs=x) |
| |
|
| |
|
| | |
| | def generator_loss(disc_generated_output, gen_output, target): |
| | gan_loss = loss_object(tf.ones_like(disc_generated_output), disc_generated_output) |
| |
|
| | |
| | l1_loss = tf.reduce_mean(tf.abs(target - gen_output)) |
| |
|
| | total_gen_loss = gan_loss + (LAMBDA * l1_loss) |
| |
|
| | return total_gen_loss, gan_loss, l1_loss |
| |
|
| | |
| | def Discriminator(image_shape): |
| | initializer = tf.random_normal_initializer(0., 0.02) |
| |
|
| | input_image = tf.keras.layers.Input(image_shape, name='input_image') |
| | target_image = tf.keras.layers.Input(image_shape, name='target_image') |
| |
|
| | x = tf.keras.layers.concatenate([input_image, target_image]) |
| |
|
| | |
| | if image_shape == (512, 512, 1): |
| | x = downsample(CHANNEL, 4)(x) |
| | down1 = downsample(n_E1, kernel_size_E1, 1, DropOut_E1, MaxPooling_E1, BatchNorm_E1)(x) |
| | down2 = downsample(n_E2, kernel_size_E2, 1, DropOut_E2, MaxPooling_E2, BatchNorm_E2)(x) |
| | down3 = downsample(n_E3, kernel_size_E3, stride_E3, DropOut_E3, MaxPooling_E3, BatchNorm_E3)(down2) |
| | down4 = downsample(n_E4, kernel_size_E4, stride_E4, DropOut_E4, MaxPooling_E4, BatchNorm_E4)(down3) |
| | down5 = downsample(n_E5, kernel_size_E5, stride_E5, DropOut_E5, MaxPooling_E5, BatchNorm_E5)(down4) |
| | down6 = downsample(n_E6, kernel_size_E6, stride_E6, DropOut_E6, MaxPooling_E6, BatchNorm_E6)(down5) |
| | last = tf.keras.layers.Conv2D(1, 4, strides=1, padding='same', |
| | kernel_initializer=initializer) |
| |
|
| | x = last(down6) |
| | return tf.keras.Model(inputs=[input_image, target_image], outputs=x) |
| |
|
| |
|
| | def discriminator_loss(disc_real_output, disc_generated_output): |
| | |
| | real_loss = loss_object(tf.ones_like(disc_real_output), disc_real_output) |
| | |
| | generated_loss = loss_object(tf.zeros_like(disc_generated_output), disc_generated_output) |
| | |
| | total_disc_loss = real_loss + generated_loss |
| |
|
| | return total_disc_loss |
| |
|
| | |
| | generator_optimizer = keras.optimizers.RMSprop(lr=0.001, rho=0.9, epsilon=None, decay=0.0) |
| | discriminator_optimizer = keras.optimizers.RMSprop(lr=0.001, rho=0.9, epsilon=None, decay=0.0) |
| |
|
| | def generate_images(model, test_input, tar): |
| | prediction = model(test_input, training=True) |
| | plt.figure(figsize=(15, 15)) |
| |
|
| | display_list = [test_input[0], tar[0], prediction[0]] |
| | title = ['Input Image', 'Ground Truth', 'Predicted Image'] |
| |
|
| | for i in range(3): |
| | plt.subplot(1, 3, i+1) |
| | plt.title(title[i]) |
| | |
| | if CHANNEL == 1: |
| | plt.imshow(display_list[i].numpy().flatten().reshape(256, 256) * 0.5 + 0.5, cmap='gray') |
| | else: |
| | plt.imshow(display_list[i]) |
| | plt.axis('off') |
| | plt.show() |
| |
|
| |
|
| |
|
| | train_images = glob.glob('bottle/train/good/*') |
| | train = [] |
| | for im in train_images: |
| | image = cv2.imread(im) |
| | image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) |
| | image = cv2.resize(image, (256, 256)) |
| | train.append(image) |
| |
|
| | train = np.array(train) |
| | train = train.astype('float32') /255. |
| |
|
| |
|
| | PATH = 'bottle/train/good' |
| | |
| | CHANNEL = 1 |
| |
|
| | |
| | G_input_dim = (256, 256, CHANNEL) |
| | EPOCH = 20 |
| |
|
| | |
| | n_E1 = 32 |
| | m_E1 = 128 |
| | stride_E1 = 2 |
| | kernel_size_E1 = 4 |
| | MaxPooling_E1 = True |
| | ActivationFunc_E1 = "Leaky_ReLu" |
| | BatchNorm_E1 = True |
| | DropOut_E1 = 0.5 |
| | Bipass_1 = True |
| |
|
| | |
| | n_E2 = 64 |
| | m_E2 = 64 |
| | stride_E2 = 2 |
| | kernel_size_E2 = 4 |
| | MaxPooling_E2 = True |
| | ActivationFunc_E2 = "Leaky_ReLu" |
| | alfa = 0.2 |
| | BatchNorm_E2 = True |
| | DropOut_E2 = 0.5 |
| | Bipass_2 = True |
| |
|
| | |
| | n_E3 = 128 |
| | m_E3 = 128 |
| | stride_E3 = 2 |
| | kernel_size_E3 = 4 |
| | MaxPooling_E3 = True |
| | ActivationFunc_E3 = "Leaky_ReLu" |
| | alfa = 0.2 |
| | BatchNorm_E3 = True |
| | DropOut_E3 = 0.5 |
| | Bipass_3 = True |
| |
|
| | |
| | n_E4 = 256 |
| | m_E4 = 256 |
| | stride_E4 = 2 |
| | kernel_size_E4 = 4 |
| | MaxPooling_E4 = True |
| | ActivationFunc_E4 = "Leaky_ReLu" |
| | alfa = 0.2 |
| | BatchNorm_E4 = True |
| | DropOut_E4 = 0.5 |
| | Bipass_4 = True |
| |
|
| | |
| | n_E5 = 512 |
| | m_E5 = 512 |
| | stride_E5 = 2 |
| | kernel_size_E5 = 4 |
| | MaxPooling_E5 = True |
| | ActivationFunc_E5 = "Leaky_ReLu" |
| | alfa = 0.2 |
| | BatchNorm_E5 = True |
| | DropOut_E5 = 0.5 |
| | Bipass_5 = True |
| |
|
| | |
| | n_E6 = 512 |
| | m_E6 = 512 |
| | stride_E6 = 2 |
| | kernel_size_E6 = 4 |
| | MaxPooling_E6 = True |
| | ActivationFunc_E6 = "Leaky_ReLu" |
| | alfa = 0.2 |
| | BatchNorm_E6 = True |
| | DropOut_E6 = 0.5 |
| | Bipass_6 = True |
| |
|
| | |
| | n_E7 = 512 |
| | m_E7 = 512 |
| | stride_E7 = 2 |
| | kernel_size_E7 = 4 |
| | MaxPooling_E7 = True |
| | ActivationFunc_E7 = "Leaky_ReLu" |
| | alfa = 0.2 |
| | BatchNorm_E7 = True |
| | DropOut_E7 = 0.5 |
| | Bipass_7 = True |
| |
|
| |
|
| | |
| | n_E8 = 512 |
| | m_E8 = 512 |
| | stride_E8 = 2 |
| | kernel_size_E8 = 4 |
| | MaxPooling_E8 = True |
| | ActivationFunc_E8 = "Leaky_ReLu" |
| | alfa = 0.2 |
| | BatchNorm_E8 = True |
| | DropOut_E8 = 0.5 |
| |
|
| | |
| | expantion = True |
| |
|
| | log_dir="logs/" |
| |
|
| | summary_writer = tf.summary.create_file_writer( |
| | log_dir + "fit/" + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")) |
| |
|
| | generator = Generator(G_input_dim) |
| | |
| |
|
| | LAMBDA = 100 |
| |
|
| | loss_object = tf.keras.losses.BinaryCrossentropy(from_logits=True) |
| | discriminator = Discriminator(G_input_dim) |
| |
|
| | fit() |