# -*- coding: utf-8 -*- """MainModel.ipynb Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/1-4IJzCALsOTWJG0nqR1_thQSGLh-Zfm9 ## Adversarial AutoEncoder #### Generative model to learn to reproduce the input and check the output is generated from a input in reconstruction process or generated creatively. The idea is combination of GAN (Generative Adversarial Networks) and AE (AutoEncoder) to produce a super generative model. ## Initialization We mounted google drive to access dataset from there. """ from google.colab import drive drive.mount('/content/drive') """Import useful libraries.""" import numpy as np import tensorflow.keras as ke import tensorflow as tf from pathlib import Path import time """Check GPU availability""" print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU'))) """## Preparing Data For Train Read dataset from google drive and load it to `dataset` variable and preprocessing data to prepare them to learn the model with. """ batchsize = 64 directory = '/content/drive/MyDrive/MainModel/train' dataset = tf.keras.utils.image_dataset_from_directory( directory, labels=None, batch_size=batchsize, image_size=(224, 224) ) dataset = dataset.unbatch() dataset = dataset.batch(batchsize, drop_remainder=True) """## Make Model Set latent vector size to use in the model. """ latent_vector_size = 256 """Make the builder function of encoder part of model.""" def build_model_enc(): model = ke.models.Sequential() model.add(ke.layers.Conv2D(64, (5, 5), strides=(2, 2), activation="relu", padding='same', input_shape=(224, 224, 3))) model.add(ke.layers.Conv2D(128, (5, 5), strides=(2, 2), activation="relu", padding='same')) model.add(ke.layers.Conv2D(256, (5, 5), strides=(2, 2), activation="relu", padding='same')) model.add(ke.layers.Conv2D(512, (5, 5), strides=(2, 2), activation="relu", padding='same')) model.add(ke.layers.Conv2D(512, (5, 5), strides=(2, 2), activation="relu", padding='same')) model.add(ke.layers.Flatten()) model.add(ke.layers.Dense(512, activation="relu")) model.add(ke.layers.Dense(latent_vector_size, activation="relu")) return model """Make the builder function of decoder part of model.""" def build_model_dec(): model = ke.models.Sequential() model.add(ke.layers.Dense(512, input_shape=(latent_vector_size,))) model.add(ke.layers.Dense(25088)) model.add(ke.layers.Reshape((7, 7, 512))) model.add(ke.layers.Conv2D(512, (5, 5), activation="relu", padding="same")) model.add(ke.layers.UpSampling2D()) model.add(ke.layers.Conv2D(512, (5, 5), activation="relu", padding="same")) model.add(ke.layers.UpSampling2D()) model.add(ke.layers.Conv2D(256, (5, 5), activation="relu", padding="same")) model.add(ke.layers.UpSampling2D()) model.add(ke.layers.Conv2D(128, (5, 5), activation="relu", padding="same")) model.add(ke.layers.UpSampling2D()) model.add(ke.layers.Conv2D(64, (5, 5), activation="relu", padding="same")) model.add(ke.layers.UpSampling2D()) model.add(ke.layers.Conv2D(3, (5, 5), activation="relu", padding="same")) return model """Make the builder function of discriminator part of model.""" def build_model_disc(): model = ke.models.Sequential() model.add(ke.layers.Dense(512, activation="relu", input_shape=(latent_vector_size,))) model.add(ke.layers.Dense(256, activation="relu")) model.add(ke.layers.Dense(1, activation="sigmoid")) return model """Make the builder function of the all parts of model.""" def build_model_aae(): model_encoder = build_model_enc() model_decoder = build_model_dec() model_discriminator = build_model_disc() model_autoencoder = ke.models.Sequential() model_autoencoder.add(model_encoder) model_autoencoder.add(model_decoder) model_encoder__generator__discriminator = ke.models.Sequential() model_encoder__generator__discriminator.add(model_encoder) model_encoder__generator__discriminator.add(model_discriminator) return model_encoder, model_decoder, model_discriminator, model_autoencoder, model_encoder__generator__discriminator """Build the model, compile it and then check parts of model. Then show the model structure. """ model_enc, model_dec, model_disc, model_ae, model_enc_disc = build_model_aae() model_enc.summary() model_dec.summary() model_disc.summary() model_ae.summary() model_enc_disc.summary() model_disc.compile(optimizer=ke.optimizers.Adam(lr=1e-4), loss="binary_crossentropy", metrics=[tf.keras.metrics.BinaryAccuracy()]) model_enc_disc.compile(optimizer=ke.optimizers.Adam(lr=1e-4), loss="binary_crossentropy", metrics=[tf.keras.metrics.BinaryAccuracy()]) model_ae.compile(optimizer=ke.optimizers.Adam(lr=1e-3), loss=ke.losses.MeanSquaredError()) """## Trainable function Because our model has some complexity to train and has 2 seperate parts (decoder and discriminator) connected to same part (encoder) which makes 2 paths to train the model and optimize the model (autoencoder and adversarial), we should lock some parts of model for traing other parts and change this option a lot. So we defined a function to set a layer and all it's inside layers trainable or untrainable. """ def settrainable(model, toset): for layer in model.layers: layer.trainable = toset model.trainable = toset """## Inception Score""" import math from keras.applications.inception_v3 import InceptionV3 from keras.applications.inception_v3 import preprocess_input inception_model = InceptionV3() def inception_score(images, n_split=10, eps=1E-16): images = tf.image.resize(images, (299, 299)) from tensorflow.python.ops.numpy_ops import np_config np_config.enable_numpy_behavior() processed = images.astype('float32') processed = preprocess_input(images) yhat = inception_model.predict(processed) scores = list() n_part = math.floor(images.shape[0] / n_split) for i in range(n_split): ix_start, ix_end = i * n_part, i * n_part + n_part p_yx = yhat[ix_start:ix_end] p_y = np.expand_dims(p_yx.mean(axis=0), 0) kl_d = p_yx * (np.log(p_yx + eps) - np.log(p_y + eps)) sum_kl_d = kl_d.sum(axis=1) avg_kl_d = np.mean(sum_kl_d) is_score = np.exp(avg_kl_d) scores.append(is_score) is_avg, is_std = np.mean(scores), np.std(scores) return is_avg, is_std """## Train the model In this part model is trained based on the model defined and inputs loaded. The default batch size is seted to 64 and epoch numbers to 20. In training each batch of each epoch at the first we train the autoencoder, then discriminator and then encoder-discriminator (adversarial part). """ epochs = 20 datasetnumpy = dataset.map(lambda x : (x, x)) datasetnumpy2 = dataset.map(lambda x : (x, np.ones((batchsize, 1)))) enc_path = '/content/drive/MyDrive/MainModel/model_enc' dec_path = '/content/drive/MyDrive/MainModel/model_dec' disc_path = '/content/drive/MyDrive/MainModel/model_disc' ae_path = '/content/drive/MyDrive/MainModel/model_ae' enc_disc_path = '/content/drive/MyDrive/MainModel/model_enc_disc' log_path = '/content/drive/MyDrive/MainModel/log.txt' initial_epoch = -1 my_file = Path(log_path) if my_file.is_file(): with open(log_path, 'r') as file: initial_epoch = int(file.readline()) print(f'log found. epoch: {initial_epoch}') model_enc = ke.models.load_model(enc_path) model_dec = ke.models.load_model(dec_path) model_disc = ke.models.load_model(disc_path) model_ae = ke.models.load_model(ae_path) model_enc_disc = ke.models.load_model(enc_disc_path) else: print('log not found.') for epochnumber in range(epochs): if epochnumber < initial_epoch: continue batchcounter = 0 batch_size = 0 for batchtensor in dataset: batchcounter = batchcounter + 1; settrainable(model_ae, True) settrainable(model_enc, True) settrainable(model_dec, True) batch = batchtensor.numpy() batch_size = batch.shape[0] model_ae.train_on_batch(batch, batch) settrainable(model_disc, True) batchpred = model_enc.predict(batch) fakepred = np.random.standard_normal((batch_size, latent_vector_size)) discbatch_x = np.concatenate([batchpred, fakepred]) discbatch_y = np.concatenate([np.zeros(batch_size), np.ones(batch_size)]) model_disc.train_on_batch(discbatch_x, discbatch_y) settrainable(model_enc_disc, True) settrainable(model_enc, True) settrainable(model_disc, False) model_enc_disc.train_on_batch(batch, np.ones(batch_size)) if batchcounter % 100 == 0: model_enc.save(enc_path) model_dec.save(dec_path) model_disc.save(disc_path) model_ae.save(ae_path) model_enc_disc.save(enc_disc_path) with open(log_path, 'w') as file: file.write(f'{epochnumber}\n{batchcounter}') elif batchcounter % 20 == 0: print(batchcounter) with open(log_path, 'w') as file: file.write(f'{epochnumber}') ae_evaluation = model_ae.evaluate(datasetnumpy, batch_size=batchsize) adversarial_evaluation = model_enc_disc.evaluate(datasetnumpy2, batch_size=batchsize) print('Reconstruction Loss:', ae_evaluation) print('Adverserial Loss:', adversarial_evaluation[0]) print('Adverserial Accuracy:', adversarial_evaluation[1]) if batchcounter >= 1000: num_batch = 100 else: num_batch = 10 fakepred = np.random.standard_normal((batch_size * num_batch, latent_vector_size)) generated_data = model_dec.predict(fakepred) inc_score = inception_score(generated_data) print(f'Inception score for decoder is: {inc_score}') import os try: my_file = Path(log_path) if my_file.is_file(): os.remove(log_path) except: print('Exception occured.') ae_evaluation = model_ae.evaluate(datasetnumpy, batch_size=batchsize) adversarial_evaluation = model_enc_disc.evaluate(datasetnumpy2, batch_size=batchsize) print('Reconstruction Loss:', ae_evaluation) print('Adverserial Loss:', adversarial_evaluation[0]) print('Adverserial Accuracy:', adversarial_evaluation[1]) if batchcounter >= 1000: num_batch = 100 else: num_batch = 10 fakepred = np.random.standard_normal((batch_size * num_batch, latent_vector_size)) generated_data = model_dec.predict(fakepred) inc_score = inception_score(generated_data) print(f'Inception score for decoder is: {inc_score}') model_enc.save(enc_path) model_dec.save(dec_path) model_disc.save(disc_path) model_ae.save(ae_path) model_enc_disc.save(enc_disc_path) """## Preparing Data For Evaluation | Test Read dataset from google drive and load it to `dataset` variable and preprocessing data to prepare them to learn the model with. """ batchsize = 64 directory = '/content/drive/MyDrive/MainModel/test' dataset = tf.keras.utils.image_dataset_from_directory( directory, labels=None, batch_size=batchsize, image_size=(224, 224) ) dataset = dataset.unbatch() dataset = dataset.batch(batchsize, drop_remainder=True) """## Evaluation | Test""" enc_path = '/content/drive/MyDrive/MainModel/model_enc' dec_path = '/content/drive/MyDrive/MainModel/model_dec' disc_path = '/content/drive/MyDrive/MainModel/model_disc' ae_path = '/content/drive/MyDrive/MainModel/model_ae' enc_disc_path = '/content/drive/MyDrive/MainModel/model_enc_disc' log_path = '/content/drive/MyDrive/MainModel/log.txt' model_enc = ke.models.load_model(enc_path) model_dec = ke.models.load_model(dec_path) model_disc = ke.models.load_model(disc_path) model_ae = ke.models.load_model(ae_path) model_enc_disc = ke.models.load_model(enc_disc_path) with open(log_path, 'r') as file: initial_epoch = int(file.readline()) print(f'log found. Epochs model trained: {initial_epoch}') datasetnumpy = dataset.map(lambda x : (x, x)) datasetnumpy2 = dataset.map(lambda x : (x, np.ones((batchsize, 1)))) ae_evaluation = model_ae.evaluate(datasetnumpy, batch_size=batchsize) adversarial_evaluation = model_enc_disc.evaluate(datasetnumpy2, batch_size=batchsize) print('Reconstruction Loss:', ae_evaluation) print('Adverserial Accuracy(positive):', adversarial_evaluation[1]) fakepred = np.random.standard_normal((batchsize, latent_vector_size)) fake_pred = model_disc.predict(fakepred) fake_disc_score = fake_pred.round() == 0 fake_disc_score = fake_disc_score.sum() print('Adverserial Accuracy(negative):', fake_disc_score / batchsize) generated_data = model_dec.predict(fakepred) inc_score = inception_score(generated_data) print(f'Inception score for decoder is: {inc_score}')