monkeypox_detection_deployment / deep_cnn_project.py
saifarko's picture
Upload 2 files
a90c34d verified
# -*- coding: utf-8 -*-
"""Deep CNN Project
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1VvAfokQ6mPAsBqBajbRZU13412Wg0A_m
"""
from google.colab import drive
drive.mount('/content/drive')
"""<a id="import"></a>
# <center>We have imported important Modules here </center>
"""
import numpy as np
import pandas as pd
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
import time
import matplotlib.pyplot as plt
import cv2
import seaborn as sns
sns.set_style('darkgrid')
import shutil
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers import Dense, Activation,Dropout,Conv2D, MaxPooling2D,BatchNormalization
from tensorflow.keras.optimizers import Adam, Adamax
from tensorflow.keras.metrics import categorical_crossentropy
from tensorflow.keras import regularizers
from tensorflow.keras.models import Model
import time
"""<a id="makedf"></a>
# <center>Read in images and create a dataframe of image paths and class labels</center>
"""
sdir=r'/content/drive/MyDrive/archive (4)/Original Images/Original Images'
filepaths=[]
labels=[]
classlist=os.listdir(sdir)
for klass in classlist:
classpath=os.path.join(sdir, klass)
flist=os.listdir(classpath)
for f in flist:
fpath=os.path.join(classpath,f)
filepaths.append(fpath)
labels.append(klass)
Fseries=pd.Series(filepaths, name='filepaths')
Lseries=pd.Series(labels, name='labels')
df=pd.concat([Fseries, Lseries], axis=1)
train_df, dummy_df=train_test_split(df, train_size=.75, shuffle=True, random_state=123, stratify=df['labels'])
valid_df, test_df=train_test_split(dummy_df, train_size=.5, shuffle=True, random_state=123, stratify=dummy_df['labels'])
print('train_df lenght: ', len(train_df), ' test_df length: ', len(test_df), ' valid_df length: ', len(valid_df))
# get the number of classes and the images count for each class in train_df
classes=sorted(list(train_df['labels'].unique()))
class_count = len(classes)
print('The number of classes in the dataset is: ', class_count)
groups=train_df.groupby('labels')
print('{0:^30s} {1:^13s}'.format('CLASS', 'IMAGE COUNT'))
countlist=[]
classlist=[]
for label in sorted(list(train_df['labels'].unique())):
group=groups.get_group(label)
countlist.append(len(group))
classlist.append(label)
print('{0:^30s} {1:^13s}'.format(label, str(len(group))))
# get the classes with the minimum and maximum number of train images
max_value=np.max(countlist)
max_index=countlist.index(max_value)
max_class=classlist[max_index]
min_value=np.min(countlist)
min_index=countlist.index(min_value)
min_class=classlist[min_index]
print(max_class, ' has the most images= ',max_value, ' ', min_class, ' has the least images= ', min_value)
# lets get the average height and width of a sample of the train images
ht=0
wt=0
# select 100 random samples of train_df
train_df_sample=train_df.sample(n=100, random_state=123,axis=0)
for i in range (len(train_df_sample)):
fpath=train_df_sample['filepaths'].iloc[i]
img=plt.imread(fpath)
shape=img.shape
ht += shape[0]
wt += shape[1]
print('average height= ', ht//100, ' average width= ', wt//100, 'aspect ratio= ', ht/wt)
"""<a id="balance"></a>
# <center>Balance train_df by creating augmented images</center>
"""
def balance(df, n, working_dir, img_size):
df=df.copy()
print('Initial length of dataframe is ', len(df))
aug_dir=os.path.join(working_dir, 'aug')# directory to store augmented images
if os.path.isdir(aug_dir):# start with an empty directory
shutil.rmtree(aug_dir)
os.mkdir(aug_dir)
for label in df['labels'].unique():
dir_path=os.path.join(aug_dir,label)
os.mkdir(dir_path) # make class directories within aug directory
# create and store the augmented images
total=0
gen=ImageDataGenerator(horizontal_flip=True, rotation_range=20, width_shift_range=.2,
height_shift_range=.2, zoom_range=.2)
groups=df.groupby('labels') # group by class
for label in df['labels'].unique(): # for every class
group=groups.get_group(label) # a dataframe holding only rows with the specified label
sample_count=len(group) # determine how many samples there are in this class
if sample_count< n: # if the class has less than target number of images
aug_img_count=0
delta=n - sample_count # number of augmented images to create
target_dir=os.path.join(aug_dir, label) # define where to write the images
msg='{0:40s} for class {1:^30s} creating {2:^5s} augmented images'.format(' ', label, str(delta))
print(msg, '\r', end='') # prints over on the same line
aug_gen=gen.flow_from_dataframe( group, x_col='filepaths', y_col=None, target_size=img_size,
class_mode=None, batch_size=1, shuffle=False,
save_to_dir=target_dir, save_prefix='aug-', color_mode='rgb',
save_format='jpg')
while aug_img_count<delta:
images=next(aug_gen)
aug_img_count += len(images)
total +=aug_img_count
print('Total Augmented images created= ', total)
# create aug_df and merge with train_df to create composite training set ndf
aug_fpaths=[]
aug_labels=[]
classlist=os.listdir(aug_dir)
for klass in classlist:
classpath=os.path.join(aug_dir, klass)
flist=os.listdir(classpath)
for f in flist:
fpath=os.path.join(classpath,f)
aug_fpaths.append(fpath)
aug_labels.append(klass)
Fseries=pd.Series(aug_fpaths, name='filepaths')
Lseries=pd.Series(aug_labels, name='labels')
aug_df=pd.concat([Fseries, Lseries], axis=1)
df=pd.concat([df,aug_df], axis=0).reset_index(drop=True)
print('Length of augmented dataframe is now ', len(df))
return df
n=200 # number of samples in each class
working_dir=r'./' # directory to store augmented images
img_size=(224,224) # size of augmented images
train_df=balance(train_df, n, working_dir, img_size)
"""<a id="generators"></a>
# <center>Create the train_gen, test_gen final_test_gen and valid_gen</center>
"""
batch_size=20 # We will use and EfficientetB3 model, with image size of (200, 250) this size should not cause resource error
trgen=ImageDataGenerator(horizontal_flip=True,rotation_range=20, width_shift_range=.2,
height_shift_range=.2, zoom_range=.2 )
t_and_v_gen=ImageDataGenerator()
msg='{0:70s} for train generator'.format(' ')
print(msg, '\r', end='') # prints over on the same line
train_gen=trgen.flow_from_dataframe(train_df, x_col='filepaths', y_col='labels', target_size=img_size,
class_mode='categorical', color_mode='rgb', shuffle=True, batch_size=batch_size)
msg='{0:70s} for valid generator'.format(' ')
print(msg, '\r', end='') # prints over on the same line
valid_gen=t_and_v_gen.flow_from_dataframe(valid_df, x_col='filepaths', y_col='labels', target_size=img_size,
class_mode='categorical', color_mode='rgb', shuffle=False, batch_size=batch_size)
# for the test_gen we want to calculate the batch size and test steps such that batch_size X test_steps= number of samples in test set
# this insures that we go through all the sample in the test set exactly once.
length=len(test_df)
test_batch_size=sorted([int(length/n) for n in range(1,length+1) if length % n ==0 and length/n<=80],reverse=True)[0]
test_steps=int(length/test_batch_size)
msg='{0:70s} for test generator'.format(' ')
print(msg, '\r', end='') # prints over on the same line
test_gen=t_and_v_gen.flow_from_dataframe(test_df, x_col='filepaths', y_col='labels', target_size=img_size,
class_mode='categorical', color_mode='rgb', shuffle=False, batch_size=test_batch_size)
# from the generator we can get information we will need later
classes=list(train_gen.class_indices.keys())
class_indices=list(train_gen.class_indices.values())
class_count=len(classes)
labels=test_gen.labels
print ( 'test batch size: ' ,test_batch_size, ' test steps: ', test_steps, ' number of classes : ', class_count)
"""<a id="show"></a>
# <center>Create a function to show example training images</center>
"""
def show_image_samples(gen ):
t_dict=gen.class_indices
classes=list(t_dict.keys())
images,labels=next(gen) # get a sample batch from the generator
plt.figure(figsize=(20, 20))
length=len(labels)
if length<25: #show maximum of 25 images
r=length
else:
r=25
for i in range(r):
plt.subplot(5, 5, i + 1)
image=images[i] /255
plt.imshow(image)
index=np.argmax(labels[i])
class_name=classes[index]
plt.title(class_name, color='blue', fontsize=14)
plt.axis('off')
plt.show()
show_image_samples(train_gen )
"""<a id="model"></a>
# <center>Create a model using transfer learning with EfficientNetB3</center>
### NOTE experts advise you make the base model initially not trainable. Then train for some number of epochs
### then fine tune model by making base model trainable and run more epochs
### I have found this to be WRONG!!!!
### Making the base model trainable from the outset leads to faster convegence and a lower validation loss
### for the same number of total epochs!
"""
img_shape=(img_size[0], img_size[1], 3)
model_name='EfficientNetB3'
base_model=tf.keras.applications.efficientnet.EfficientNetB3(include_top=False, weights="imagenet",input_shape=img_shape, pooling='max')
# Note you are always told NOT to make the base model trainable initially- that is WRONG you get better results leaving it trainable
base_model.trainable=True
x=base_model.output
x=BatchNormalization(axis=-1, momentum=0.99, epsilon=0.001 )(x)
x = Dense(256, kernel_regularizer = regularizers.l2(l = 0.016),activity_regularizer=regularizers.l1(0.006),
bias_regularizer=regularizers.l1(0.006) ,activation='relu')(x)
x=Dropout(rate=.4, seed=123)(x)
output=Dense(class_count, activation='softmax')(x)
model=Model(inputs=base_model.input, outputs=output)
lr=.001 # start with this learning rate
model.compile(Adamax(learning_rate=lr), loss='categorical_crossentropy', metrics=['accuracy'])
"""<a id="callback"></a>
# <center>Create a custom Keras callback to continue and optionally set LR or halt training</center>
The LR_ASK callback is a convenient callback that allows you to continue training for ask_epoch more epochs or to halt training.
If you elect to continue training for more epochs you are given the option to retain the current learning rate (LR) or to
enter a new value for the learning rate. The form of use is:
ask=LR_ASK(model,epochs, ask_epoch) where:
* model is a string which is the name of your compiled model
* epochs is an integer which is the number of epochs to run specified in model.fit
* ask_epoch is an integer. If ask_epoch is set to a value say 5 then the model will train for 5 epochs.
then the user is ask to enter H to halt training, or enter an inter value. For example if you enter 4
training will continue for 4 more epochs to epoch 9 then you will be queried again. Once you enter an
integer value you are prompted to press ENTER to continue training using the current learning rate
or to enter a new value for the learning rate.
At the end of training the model weights are set to the weights for the epoch that achieved the lowest validation loss
"""
class LR_ASK(keras.callbacks.Callback):
def __init__ (self, model, epochs, ask_epoch): # initialization of the callback
super(LR_ASK, self).__init__()
self.model=model
self.ask_epoch=ask_epoch
self.epochs=epochs
self.ask=True # if True query the user on a specified epoch
self.lowest_vloss=np.inf
self.best_weights=self.model.get_weights() # set best weights to model's initial weights
self.best_epoch=1
def on_train_begin(self, logs=None): # this runs on the beginning of training
if self.ask_epoch == 0:
print('you set ask_epoch = 0, ask_epoch will be set to 1', flush=True)
self.ask_epoch=1
if self.ask_epoch >= self.epochs: # you are running for epochs but ask_epoch>epochs
print('ask_epoch >= epochs, will train for ', epochs, ' epochs', flush=True)
self.ask=False # do not query the user
if self.epochs == 1:
self.ask=False # running only for 1 epoch so do not query user
else:
print('Training will proceed until epoch', ask_epoch,' then you will be asked to')
print(' enter H to halt training or enter an integer for how many more epochs to run then be asked again')
self.start_time= time.time() # set the time at which training started
def on_train_end(self, logs=None): # runs at the end of training
print('loading model with weights from epoch ', self.best_epoch)
self.model.set_weights(self.best_weights) # set the weights of the model to the best weights
tr_duration=time.time() - self.start_time # determine how long the training cycle lasted
hours = tr_duration // 3600
minutes = (tr_duration - (hours * 3600)) // 60
seconds = tr_duration - ((hours * 3600) + (minutes * 60))
msg = f'training elapsed time was {str(hours)} hours, {minutes:4.1f} minutes, {seconds:4.2f} seconds)'
print (msg, flush=True) # print out training duration time
def on_epoch_end(self, epoch, logs=None): # method runs on the end of each epoch
v_loss=logs.get('val_loss') # get the validation loss for this epoch
if v_loss< self.lowest_vloss:
self.lowest_vloss=v_loss
self.best_weights=self.model.get_weights() # set best weights to model's initial weights
self.best_epoch=epoch + 1
print (f'\n validation loss of {v_loss:7.4f} is below lowest loss, saving weights from epoch {str(epoch + 1):3s} as best weights')
else:
print (f'\n validation loss of {v_loss:7.4f} is above lowest loss of {self.lowest_vloss:7.4f} keeping weights from epoch {str(self.best_epoch)} as best weights')
if self.ask: # are the conditions right to query the user?
if epoch + 1 ==self.ask_epoch: # is this epoch the one for quering the user?
print('\n Enter H to end training or an integer for the number of additional epochs to run then ask again')
ans=input()
if ans == 'H' or ans =='h' or ans == '0': # quit training for these conditions
print ('you entered ', ans, ' Training halted on epoch ', epoch+1, ' due to user input\n', flush=True)
self.model.stop_training = True # halt training
else: # user wants to continue training
self.ask_epoch += int(ans)
if self.ask_epoch > self.epochs:
print('\nYou specified maximum epochs of as ', self.epochs, ' cannot train for ', self.ask_epoch, flush =True)
else:
print ('you entered ', ans, ' Training will continue to epoch ', self.ask_epoch, flush=True)
lr=float(tf.keras.backend.get_value(self.model.optimizer.lr)) # get the current learning rate
print(f'current LR is {lr:7.5f} hit enter to keep this LR or enter a new LR')
ans=input(' ')
if ans =='':
print (f'keeping current LR of {lr:7.5f}')
else:
new_lr=float(ans)
tf.keras.backend.set_value(self.model.optimizer.lr, new_lr) # set the learning rate in the optimizer
print(' changing LR to ', ans)
"""<a id="callbacks"></a>
# <center>Instantiate custom callback
"""
epochs=40
ask_epoch=5
ask=LR_ASK(model, epochs, ask_epoch)
#rlronp=tf.keras.callbacks.ReduceLROnPlateau(monitor="val_loss", factor=0.5, patience=2,verbose=1)
#callbacks=[rlronp, ask]
callbacks=[ask]
"""<a id="train"></a>
# <center>Train the model
### Note unlike how you are told it is BETTER to make the base model trainable from the outset
### It will converge faster and have a lower validation losss
"""
history=model.fit(x=train_gen, epochs=epochs, verbose=1, callbacks=callbacks, validation_data=valid_gen,
validation_steps=None, shuffle=False, initial_epoch=0)
"""<a id="plot"></a>
# <center>Define a function to plot the training data
"""
def tr_plot(tr_data, start_epoch):
#Plot the training and validation data
tacc=tr_data.history['accuracy']
tloss=tr_data.history['loss']
vacc=tr_data.history['val_accuracy']
vloss=tr_data.history['val_loss']
Epoch_count=len(tacc)+ start_epoch
Epochs=[]
for i in range (start_epoch ,Epoch_count):
Epochs.append(i+1)
index_loss=np.argmin(vloss)# this is the epoch with the lowest validation loss
val_lowest=vloss[index_loss]
index_acc=np.argmax(vacc)
acc_highest=vacc[index_acc]
plt.style.use('fivethirtyeight')
sc_label='best epoch= '+ str(index_loss+1 +start_epoch)
vc_label='best epoch= '+ str(index_acc + 1+ start_epoch)
fig,axes=plt.subplots(nrows=1, ncols=2, figsize=(20,8))
axes[0].plot(Epochs,tloss, 'r', label='Training loss')
axes[0].plot(Epochs,vloss,'g',label='Validation loss' )
axes[0].scatter(index_loss+1 +start_epoch,val_lowest, s=150, c= 'blue', label=sc_label)
axes[0].set_title('Training and Validation Loss')
axes[0].set_xlabel('Epochs')
axes[0].set_ylabel('Loss')
axes[0].legend()
axes[1].plot (Epochs,tacc,'r',label= 'Training Accuracy')
axes[1].plot (Epochs,vacc,'g',label= 'Validation Accuracy')
axes[1].scatter(index_acc+1 +start_epoch,acc_highest, s=150, c= 'blue', label=vc_label)
axes[1].set_title('Training and Validation Accuracy')
axes[1].set_xlabel('Epochs')
axes[1].set_ylabel('Accuracy')
axes[1].legend()
plt.tight_layout
plt.show()
tr_plot(history,0)
"""<a id="result"></a>
# <center>Make Predictions on the test set</a>
### Define a function which takes in a test generator and an integer test_steps
### and generates predictions on the test set including a confusion matric
### and a classification report
"""
def predictor(test_gen, test_steps):
y_pred= []
y_true=test_gen.labels
classes=list(test_gen.class_indices.keys())
class_count=len(classes)
errors=0
preds=model.predict(test_gen, verbose=1)
tests=len(preds)
for i, p in enumerate(preds):
pred_index=np.argmax(p)
true_index=test_gen.labels[i] # labels are integer values
if pred_index != true_index: # a misclassification has occurred
errors=errors + 1
y_pred.append(pred_index)
acc=( 1-errors/tests) * 100
print(f'there were {errors} errors in {tests} tests for an accuracy of {acc:6.2f}')
ypred=np.array(y_pred)
ytrue=np.array(y_true)
if class_count <=30:
cm = confusion_matrix(ytrue, ypred )
# plot the confusion matrix
plt.figure(figsize=(12, 8))
sns.heatmap(cm, annot=True, vmin=0, fmt='g', cmap='Reds', cbar=False)
plt.xticks(np.arange(class_count)+.5, classes, rotation=90)
plt.yticks(np.arange(class_count)+.5, classes, rotation=0)
plt.xlabel("Predicted")
plt.ylabel("Actual")
plt.title("Confusion Matrix")
plt.show()
clr = classification_report(y_true, y_pred, target_names=classes, digits= 4) # create classification report
print("Classification Report:\n----------------------\n", clr)
return errors, tests
errors, tests=predictor(test_gen, test_steps)
"""<a id="save"></a>
# <center>Save the model
"""
subject='monkey pox'
acc=str(( 1-errors/tests) * 100)
index=acc.rfind('.')
acc=acc[:index + 3]
save_id= subject + '_' + str(acc) + '.h5'
model_save_loc=os.path.join(working_dir, save_id)
model.save(model_save_loc)
print ('model was saved as ' , model_save_loc )