Spaces:
Sleeping
Sleeping
| import joblib | |
| import os | |
| import tensorflow as tf | |
| import tensorflow as tf | |
| from tensorflow.keras.layers import Dense,GlobalAveragePooling2D, Input, Embedding, LSTM,Dot,Reshape,Concatenate,BatchNormalization, GlobalMaxPooling2D, Dropout, Add, MaxPooling2D, GRU, AveragePooling2D | |
| from tensorflow.keras.preprocessing.text import Tokenizer | |
| from tensorflow.keras.preprocessing.sequence import pad_sequences | |
| import pandas as pd | |
| import numpy as np | |
| import cv2 | |
| from nltk.translate.bleu_score import sentence_bleu | |
| chexnet_weights = "chexnet_weights/brucechou1983_CheXNet_Keras_0.3.0_weights.h5" | |
| def create_chexnet(chexnet_weights = chexnet_weights,input_size=(224,224)): | |
| """ | |
| chexnet_weights: weights value in .h5 format of chexnet | |
| creates a chexnet model with preloaded weights present in chexnet_weights file | |
| """ | |
| model = tf.keras.applications.DenseNet121(include_top=False,input_shape = input_size+(3,)) #importing densenet the last layer will be a relu activation layer | |
| #we need to load the weights so setting the architecture of the model as same as the one of the chexnet | |
| x = model.output #output from chexnet | |
| x = GlobalAveragePooling2D()(x) | |
| x = Dense(14, activation="sigmoid", name="chexnet_output")(x) #here activation is sigmoid as seen in research paper | |
| chexnet = tf.keras.Model(inputs = model.input,outputs = x) | |
| chexnet.load_weights(chexnet_weights) | |
| chexnet = tf.keras.Model(inputs = model.input,outputs = chexnet.layers[-3].output) #we will be taking the 3rd last layer (here it is layer before global avgpooling) | |
| #since we are using attention here | |
| return chexnet | |
| class Image_encoder(tf.keras.layers.Layer): | |
| """ | |
| This layer will output image backbone features after passing it through chexnet | |
| """ | |
| def __init__(self, | |
| name = "image_encoder_block" | |
| ): | |
| super().__init__() | |
| self.chexnet = create_chexnet(input_size = (224,224)) | |
| self.chexnet.trainable = False | |
| self.avgpool = AveragePooling2D() | |
| # for i in range(10): #the last 10 layers of chexnet will be trained | |
| # self.chexnet.layers[-i].trainable = True | |
| def call(self,data): | |
| op = self.chexnet(data) #op shape: (None,7,7,1024) | |
| op = self.avgpool(op) #op shape (None,3,3,1024) | |
| op = tf.reshape(op,shape = (-1,op.shape[1]*op.shape[2],op.shape[3])) #op shape: (None,9,1024) | |
| return op | |
| def encoder(image1,image2,dense_dim,dropout_rate): | |
| """ | |
| Takes image1,image2 | |
| gets the final encoded vector of these | |
| """ | |
| #image1 | |
| im_encoder = Image_encoder() | |
| bkfeat1 = im_encoder(image1) #shape: (None,9,1024) | |
| bk_dense = Dense(dense_dim,name = 'bkdense',activation = 'relu') #shape: (None,9,512) | |
| bkfeat1 = bk_dense(bkfeat1) | |
| #image2 | |
| bkfeat2 = im_encoder(image2) #shape: (None,9,1024) | |
| bkfeat2 = bk_dense(bkfeat2) #shape: (None,9,512) | |
| #combining image1 and image2 | |
| concat = Concatenate(axis=1)([bkfeat1,bkfeat2]) #concatenating through the second axis shape: (None,18,1024) | |
| bn = BatchNormalization(name = "encoder_batch_norm")(concat) | |
| dropout = Dropout(dropout_rate,name = "encoder_dropout")(bn) | |
| return dropout | |
| class global_attention(tf.keras.layers.Layer): | |
| """ | |
| calculate global attention | |
| """ | |
| def __init__(self,dense_dim): | |
| super().__init__() | |
| # Intialize variables needed for Concat score function here | |
| self.W1 = Dense(units = dense_dim) #weight matrix of shape enc_units*dense_dim | |
| self.W2 = Dense(units = dense_dim) #weight matrix of shape dec_units*dense_dim | |
| self.V = Dense(units = 1) #weight matrix of shape dense_dim*1 | |
| #op (None,98,1) | |
| def call(self,encoder_output,decoder_h): #here the encoded output will be the concatted image bk features shape: (None,98,dense_dim) | |
| decoder_h = tf.expand_dims(decoder_h,axis=1) #shape: (None,1,dense_dim) | |
| tanh_input = self.W1(encoder_output) + self.W2(decoder_h) #ouput_shape: batch_size*98*dense_dim | |
| tanh_output = tf.nn.tanh(tanh_input) | |
| attention_weights = tf.nn.softmax(self.V(tanh_output),axis=1) #shape= batch_size*98*1 getting attention alphas | |
| op = attention_weights*encoder_output#op_shape: batch_size*98*dense_dim multiply all aplhas with corresponding context vector | |
| context_vector = tf.reduce_sum(op,axis=1) #summing all context vector over the time period ie input length, output_shape: batch_size*dense_dim | |
| return context_vector,attention_weights | |
| class One_Step_Decoder(tf.keras.layers.Layer): | |
| """ | |
| decodes a single token | |
| """ | |
| def __init__(self,vocab_size, embedding_dim, max_pad, dense_dim ,name = "onestepdecoder"): | |
| # Initialize decoder embedding layer, LSTM and any other objects needed | |
| super().__init__() | |
| self.dense_dim = dense_dim | |
| self.embedding = Embedding(input_dim = vocab_size+1, | |
| output_dim = embedding_dim, | |
| input_length=max_pad, | |
| mask_zero=True, | |
| name = 'onestepdecoder_embedding' | |
| ) | |
| self.LSTM = GRU(units=self.dense_dim, | |
| # return_sequences=True, | |
| return_state=True, | |
| name = 'onestepdecoder_LSTM' | |
| ) | |
| self.attention = global_attention(dense_dim = dense_dim) | |
| self.concat = Concatenate(axis=-1) | |
| self.dense = Dense(dense_dim,name = 'onestepdecoder_embedding_dense',activation = 'relu') | |
| self.final = Dense(vocab_size+1,activation='softmax') | |
| self.concat = Concatenate(axis=-1) | |
| self.add =Add() | |
| def call(self,input_to_decoder, encoder_output, decoder_h):#,decoder_c): | |
| ''' | |
| One step decoder mechanisim step by step: | |
| A. Pass the input_to_decoder to the embedding layer and then get the output(batch_size,1,embedding_dim) | |
| B. Using the encoder_output and decoder hidden state, compute the context vector. | |
| C. Concat the context vector with the step A output | |
| D. Pass the Step-C output to LSTM/GRU and get the decoder output and states(hidden and cell state) | |
| E. Pass the decoder output to dense layer(vocab size) and store the result into output. | |
| F. Return the states from step D, output from Step E, attention weights from Step -B | |
| here state_h,state_c are decoder states | |
| ''' | |
| embedding_op = self.embedding(input_to_decoder) #output shape = batch_size*1*embedding_shape (only 1 token) | |
| context_vector,attention_weights = self.attention(encoder_output,decoder_h) #passing hidden state h of decoder and encoder output | |
| #context_vector shape: batch_size*dense_dim we need to add time dimension | |
| context_vector_time_axis = tf.expand_dims(context_vector,axis=1) | |
| #now we will combine attention output context vector with next word input to the lstm here we will be teacher forcing | |
| concat_input = self.concat([context_vector_time_axis,embedding_op])#output dimension = batch_size*input_length(here it is 1)*(dense_dim+embedding_dim) | |
| output,decoder_h = self.LSTM(concat_input,initial_state = decoder_h) | |
| #output shape = batch*1*dense_dim and decoder_h,decoder_c has shape = batch*dense_dim | |
| #we need to remove the time axis from this decoder_output | |
| output = self.final(output)#shape = batch_size*decoder vocab size | |
| return output,decoder_h,attention_weights | |
| class decoder(tf.keras.Model): | |
| """ | |
| Decodes the encoder output and caption | |
| """ | |
| def __init__(self,max_pad, embedding_dim,dense_dim,batch_size ,vocab_size): | |
| super().__init__() | |
| self.onestepdecoder = One_Step_Decoder(vocab_size = vocab_size, embedding_dim = embedding_dim, max_pad = max_pad, dense_dim = dense_dim) | |
| self.output_array = tf.TensorArray(tf.float32,size=max_pad) | |
| self.max_pad = max_pad | |
| self.batch_size = batch_size | |
| self.dense_dim =dense_dim | |
| def call(self,encoder_output,caption):#,decoder_h,decoder_c): #caption : (None,max_pad), encoder_output: (None,dense_dim) | |
| decoder_h, decoder_c = tf.zeros_like(encoder_output[:,0]), tf.zeros_like(encoder_output[:,0]) #decoder_h, decoder_c | |
| output_array = tf.TensorArray(tf.float32,size=self.max_pad) | |
| for timestep in range(self.max_pad): #iterating through all timesteps ie through max_pad | |
| output,decoder_h,attention_weights = self.onestepdecoder(caption[:,timestep:timestep+1], encoder_output, decoder_h) | |
| output_array = output_array.write(timestep,output) #timestep*batch_size*vocab_size | |
| self.output_array = tf.transpose(output_array.stack(),[1,0,2]) #.stack :Return the values in the TensorArray as a stacked Tensor.) | |
| #shape output_array: (batch_size,max_pad,vocab_size) | |
| return self.output_array | |
| def create_model(): | |
| """ | |
| creates the best model ie the attention model | |
| and returns the model after loading the weights | |
| and also the tokenizer | |
| """ | |
| #hyperparameters | |
| input_size = (224,224) | |
| tokenizer = joblib.load('tokenizer.pkl') | |
| max_pad = 29 | |
| batch_size = 100 | |
| vocab_size = len(tokenizer.word_index) | |
| embedding_dim = 300 | |
| dense_dim = 512 | |
| lstm_units = dense_dim | |
| dropout_rate = 0.2 | |
| tf.keras.backend.clear_session() | |
| image1 = Input(shape = (input_size + (3,))) #shape = 224,224,3 | |
| image2 = Input(shape = (input_size + (3,))) #https://www.w3resource.com/python-exercises/tuple/python-tuple-exercise-5.php | |
| caption = Input(shape = (max_pad,)) | |
| encoder_output = encoder(image1,image2,dense_dim,dropout_rate) #shape: (None,28,512) | |
| output = decoder(max_pad, embedding_dim,dense_dim,batch_size ,vocab_size)(encoder_output,caption) | |
| model = tf.keras.Model(inputs = [image1,image2,caption], outputs = output) | |
| model_filename = 'Encoder_Decoder_global_attention.h5' | |
| model_save = model_filename | |
| model.load_weights(model_save) | |
| return model,tokenizer | |
| def greedy_search_predict(image1,image2,model,tokenizer,input_size = (224,224)): | |
| """ | |
| Given paths to two x-ray images predicts the impression part of the x-ray in a greedy search algorithm | |
| """ | |
| image1 = tf.expand_dims(cv2.resize(image1,input_size,interpolation = cv2.INTER_NEAREST),axis=0) #introduce batch and resize | |
| image2 = tf.expand_dims(cv2.resize(image2,input_size,interpolation = cv2.INTER_NEAREST),axis=0) | |
| image1 = model.get_layer('image_encoder')(image1) | |
| image2 = model.get_layer('image_encoder')(image2) | |
| image1 = model.get_layer('bkdense')(image1) | |
| image2 = model.get_layer('bkdense')(image2) | |
| concat = model.get_layer('concatenate')([image1,image2]) | |
| enc_op = model.get_layer('encoder_batch_norm')(concat) | |
| enc_op = model.get_layer('encoder_dropout')(enc_op) #this is the output from encoder | |
| decoder_h,decoder_c = tf.zeros_like(enc_op[:,0]),tf.zeros_like(enc_op[:,0]) | |
| a = [] | |
| pred = [] | |
| max_pad = 29 | |
| for i in range(max_pad): | |
| if i==0: #if first word | |
| caption = np.array(tokenizer.texts_to_sequences(['<cls>'])) #shape: (1,1) | |
| output,decoder_h,attention_weights = model.get_layer('decoder').onestepdecoder(caption,enc_op,decoder_h)#,decoder_c) decoder_c, | |
| #prediction | |
| max_prob = tf.argmax(output,axis=-1) #tf.Tensor of shape = (1,1) | |
| caption = np.array([max_prob]) #will be sent to onstepdecoder for next iteration | |
| if max_prob==np.squeeze(tokenizer.texts_to_sequences(['<end>'])): | |
| break; | |
| else: | |
| a.append(tf.squeeze(max_prob).numpy()) | |
| return tokenizer.sequences_to_texts([a])[0] #here output would be 1,1 so subscripting to open the array | |
| def get_bleu(reference,prediction): | |
| """ | |
| Given a reference and prediction string, outputs the 1-gram,2-gram,3-gram and 4-gram bleu scores | |
| """ | |
| reference = [reference.split()] #should be in an array (cos of multiple references can be there here only 1) | |
| prediction = prediction.split() | |
| bleu1 = sentence_bleu(reference,prediction,weights = (1,0,0,0)) | |
| bleu2 = sentence_bleu(reference,prediction,weights = (0.5,0.5,0,0)) | |
| bleu3 = sentence_bleu(reference,prediction,weights = (0.33,0.33,0.33,0)) | |
| bleu4 = sentence_bleu(reference,prediction,weights = (0.25,0.25,0.25,0.25)) | |
| return bleu1,bleu2,bleu3,bleu4 | |
| def predict1(image1,image2=None,model_tokenizer = None): | |
| """given image1 and image 2 filepaths returns the predicted caption, | |
| the model_tokenizer will contain stored model_weights and tokenizer | |
| """ | |
| if image2 is None: #if only 1 image file is given | |
| image2 = image1 | |
| # try: | |
| # image1 = cv2.imread(image1,cv2.IMREAD_UNCHANGED)/255 | |
| # image2 = cv2.imread(image2,cv2.IMREAD_UNCHANGED)/255 | |
| # except: | |
| # return print("Must be an image") | |
| if model_tokenizer == None: | |
| model,tokenizer = create_model() | |
| else: | |
| model,tokenizer = model_tokenizer[0],model_tokenizer[1] | |
| predicted_caption = greedy_search_predict(image1,image2,model,tokenizer) | |
| return predicted_caption | |
| def predict2(true_caption, image1,image2=None,model_tokenizer = None): | |
| """given image1 and image 2 filepaths and the true_caption | |
| returns the mean of cumulative ngram bleu scores where n=1,2,3,4, | |
| the model_tokenizer will contain stored model_weights and tokenizer | |
| """ | |
| if image2 == None: #if only 1 image file is given | |
| image2 = image1 | |
| try: | |
| image1 = cv2.imread(image1,cv2.IMREAD_UNCHANGED)/255 | |
| image2 = cv2.imread(image2,cv2.IMREAD_UNCHANGED)/255 | |
| except: | |
| return print("Must be an image") | |
| if model_tokenizer == None: | |
| model,tokenizer = create_model() | |
| else: | |
| model,tokenizer = model_tokenizer[0],model_tokenizer[1] | |
| predicted_caption = greedy_search_predict(image1,image2,model,tokenizer) | |
| _ = get_bleu(true_caption,predicted_caption) | |
| _ = list(_) | |
| return pd.DataFrame([_],columns = ['bleu1','bleu2','bleu3','bleu4']) | |
| def function1(image1,image2,model_tokenizer = None): | |
| """ | |
| here image1 and image2 will be a list of image | |
| filepaths and outputs the resulting captions as a list | |
| """ | |
| if model_tokenizer is None: | |
| model_tokenizer = list(create_model()) | |
| predicted_caption = [] | |
| for i1,i2 in zip(image1,image2): | |
| caption = predict1(i1,i2,model_tokenizer) | |
| predicted_caption.append(caption) | |
| return predicted_caption | |
| def function2(true_caption,image1,image2): | |
| """ | |
| here true_caption,image1 and image2 will be a list of true_captions and image | |
| filepaths and outputs the resulting bleu_scores | |
| as a dataframe | |
| """ | |
| model_tokenizer = list(create_model()) | |
| predicted = pd.DataFrame(columns = ['bleu1','bleu2','bleu3','bleu4']) | |
| for c,i1,i2 in zip(true_caption,image1,image2): | |
| caption = predict2(c,i1,i2,model_tokenizer) | |
| predicted = predicted.append(caption,ignore_index = True) | |
| return predicted |