Spaces:
Build error
Build error
| # Databricks notebook source | |
| import tensorflow as tf | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| from PIL import Image | |
| #from turtle import width | |
| import streamlit as st | |
| # COMMAND ---------- | |
| def load_image_initial(image_file): | |
| img = Image.open(image_file) | |
| return img | |
| #streamlit | |
| header = st.container() | |
| image = st.container() | |
| caption = st.container() | |
| with header: | |
| st.title('Image Captioning') | |
| st.text('Generate captions for your images!') | |
| with image: | |
| # st.markdown("**upload your image here:**") | |
| image_file = st.file_uploader("upload your image here:", type = ["png", "jpg", 'jpeg']) | |
| if image_file is not None: | |
| #st.write(type(image_file)) | |
| # st.write(dir(image_file)) | |
| # file_details = {"filename": image_file.name, "filetype":image_file.type, "filesize":image_file.size} | |
| # st.write(file_details) | |
| st.image(load_image_initial(image_file), width=299) | |
| ################################model 14 | |
| num_predictions = 3 | |
| feature_extraction_model = 'ResNet50' | |
| tokenizer_path = 'tokenizer.pkl' | |
| # checkpoint_path = "/dbfs/FileStore/shared_uploads/mhajiza@gap.com/computer_vision/models/image_captioning_tf_14/ckpt-10" | |
| # checkpoint_path = "/dbfs/FileStore/shared_uploads/mhajiza@gap.com/computer_vision/models/image_captioning_tf_14/manually_saved_model-11" | |
| # checkpoint_path = "/Users/mhajiza/Documents/Computer_Vison/Image_captioning/image_captioning_tf_model/ckpt-10" | |
| checkpoint_path = "ckpt-10" | |
| weights= "checkpoint" | |
| # checkpoint_path = "/Users/mhajiza/Documents/Computer_Vison/Image_captioning/image_captioning_tf_model/manually_saved_model-11" | |
| # COMMAND ---------- | |
| def load_image(image_file): | |
| img = Image.open(image_file).convert('RGB') | |
| img = tf.keras.preprocessing.image.img_to_array(img) | |
| img = tf.keras.layers.Resizing(299, 299)(img) | |
| if feature_extraction_model == 'InceptionV3': | |
| img = tf.keras.applications.inception_v3.preprocess_input(img) | |
| if (feature_extraction_model == 'ResNet50') or (feature_extraction_model == 'ResNet101') or (feature_extraction_model == 'ResNet152'): | |
| img = tf.keras.applications.resnet.preprocess_input(img) | |
| return img, image_file | |
| # COMMAND ---------- | |
| #Initialize ResNet and load the pretrained Imagenet weights | |
| if feature_extraction_model == 'ResNet152': | |
| image_model = tf.keras.applications.ResNet152(include_top=False, weights=weights) | |
| new_input = image_model.input | |
| hidden_layer = image_model.layers[-1].output | |
| image_features_extract_model = tf.keras.Model(new_input, hidden_layer) | |
| if feature_extraction_model == 'ResNet50': | |
| image_model = tf.keras.applications.ResNet50(include_top=False, weights=weights) | |
| new_input = image_model.input | |
| hidden_layer = image_model.layers[-1].output | |
| image_features_extract_model = tf.keras.Model(new_input, hidden_layer) | |
| if feature_extraction_model == 'ResNet101': | |
| image_model = tf.keras.applications.ResNet101(include_top=False, weights=weights) | |
| new_input = image_model.input | |
| hidden_layer = image_model.layers[-1].output | |
| image_features_extract_model = tf.keras.Model(new_input, hidden_layer) | |
| if feature_extraction_model == 'InceptionV3': | |
| image_model = tf.keras.applications.InceptionV3(include_top=False, weights=weights) | |
| new_input = image_model.input | |
| hidden_layer = image_model.layers[-1].output | |
| image_features_extract_model = tf.keras.Model(new_input, hidden_layer) | |
| # COMMAND ---------- | |
| def standardize(inputs): | |
| inputs = tf.strings.lower(inputs) | |
| return tf.strings.regex_replace(inputs, r"!\"#$%&\(\)\*\+.,-/:;=?@\[\\\]^_`{|}~", "") | |
| import pickle | |
| from tensorflow.keras.layers.experimental.preprocessing import TextVectorization | |
| from_disk = pickle.load(open(tokenizer_path, "rb")) | |
| tokenizer = TextVectorization.from_config(from_disk['config']) | |
| tokenizer.adapt(["this is a test"]) | |
| tokenizer.set_weights(from_disk['weights']) | |
| # COMMAND ---------- | |
| vocabulary_size = tokenizer.get_config()['max_tokens'] | |
| max_length = tokenizer.get_config()['output_sequence_length'] | |
| # COMMAND ---------- | |
| # Create mappings for words to indices and indices to words. | |
| word_to_index = tf.keras.layers.StringLookup(mask_token="", vocabulary=tokenizer.get_vocabulary()) | |
| index_to_word = tf.keras.layers.StringLookup( mask_token="", vocabulary=tokenizer.get_vocabulary(), invert=True) | |
| # COMMAND ---------- | |
| # max_length = 95 ##100 | |
| embedding_dim = 256 | |
| units = 512 | |
| # Shape of the vector extracted from InceptionV3 is (64, 2048) | |
| # These two variables represent that vector shape | |
| features_shape = 2048 | |
| attention_features_shape = 64 | |
| # COMMAND ---------- | |
| class BahdanauAttention(tf.keras.Model): #####Attention mechanism | |
| def __init__(self, units): | |
| super(BahdanauAttention, self).__init__() | |
| self.W1 = tf.keras.layers.Dense(units) | |
| self.W2 = tf.keras.layers.Dense(units) | |
| self.V = tf.keras.layers.Dense(1) | |
| def call(self, features, hidden): | |
| # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim) ######(batch_size, 64, 2048) | |
| # hidden shape == (batch_size, hidden_size) | |
| # hidden_with_time_axis shape == (batch_size, 1, hidden_size) ##### this is after expanding with axis =1 | |
| hidden_with_time_axis = tf.expand_dims(hidden, 1) | |
| # attention_hidden_layer shape == (batch_size, 64, units) | |
| attention_hidden_layer = (tf.nn.tanh(self.W1(features) + | |
| self.W2(hidden_with_time_axis))) | |
| # score shape == (batch_size, 64, 1) | |
| # This gives you an unnormalized score for each image feature. | |
| score = self.V(attention_hidden_layer) | |
| # attention_weights shape == (batch_size, 64, 1) | |
| attention_weights = tf.nn.softmax(score, axis=1) | |
| # context_vector shape after sum == (batch_size, hidden_size) | |
| context_vector = attention_weights * features | |
| context_vector = tf.reduce_sum(context_vector, axis=1) | |
| return context_vector, attention_weights | |
| # COMMAND ---------- | |
| class CNN_Encoder(tf.keras.Model): | |
| # Since you have already extracted the features and dumped it | |
| # This encoder passes those features through a Fully connected layer | |
| def __init__(self, embedding_dim): | |
| super(CNN_Encoder, self).__init__() | |
| # shape after fc == (batch_size, 64, embedding_dim) | |
| self.fc = tf.keras.layers.Dense(embedding_dim) | |
| def call(self, x): | |
| x = self.fc(x) | |
| x = tf.nn.relu(x) | |
| return x | |
| # COMMAND ---------- | |
| class RNN_Decoder(tf.keras.Model): | |
| def __init__(self, embedding_dim, units, vocab_size): | |
| super(RNN_Decoder, self).__init__() | |
| self.units = units | |
| self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim) | |
| self.gru = tf.keras.layers.GRU(self.units, | |
| return_sequences=True, | |
| return_state=True, | |
| recurrent_initializer='glorot_uniform') | |
| self.fc1 = tf.keras.layers.Dense(self.units) | |
| self.fc2 = tf.keras.layers.Dense(vocab_size) | |
| self.attention = BahdanauAttention(self.units) | |
| def call(self, x, features, hidden): | |
| # defining attention as a separate model | |
| context_vector, attention_weights = self.attention(features, hidden) | |
| # x shape after passing through embedding == (batch_size, 1, embedding_dim) | |
| x = self.embedding(x) | |
| # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size) | |
| x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1) | |
| # passing the concatenated vector to the GRU | |
| output, state = self.gru(x) | |
| # shape == (batch_size, max_length, hidden_size) | |
| x = self.fc1(output) | |
| # x shape == (batch_size * max_length, hidden_size) | |
| x = tf.reshape(x, (-1, x.shape[2])) | |
| # output shape == (batch_size * max_length, vocab) | |
| x = self.fc2(x) | |
| return x, state, attention_weights | |
| def reset_state(self, batch_size): | |
| return tf.zeros((batch_size, self.units)) | |
| # COMMAND ---------- | |
| encoder = CNN_Encoder(embedding_dim) | |
| decoder = RNN_Decoder(embedding_dim, units, tokenizer.vocabulary_size()) | |
| # COMMAND ---------- | |
| optimizer = tf.keras.optimizers.Adam() | |
| loss_object = tf.keras.losses.SparseCategoricalCrossentropy( | |
| from_logits=True, reduction='none') | |
| def loss_function(real, pred): | |
| mask = tf.math.logical_not(tf.math.equal(real, 0)) | |
| loss_ = loss_object(real, pred) | |
| mask = tf.cast(mask, dtype=loss_.dtype) | |
| loss_ *= mask | |
| return tf.reduce_mean(loss_) | |
| # COMMAND ---------- | |
| ckpt = tf.train.Checkpoint(encoder=encoder, | |
| decoder=decoder, | |
| optimizer=optimizer) | |
| ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=2) | |
| # ckpt.restore(ckpt_manager.latest_checkpoint) | |
| # COMMAND ---------- | |
| ckpt.restore(checkpoint_path) | |
| # COMMAND ---------- | |
| def evaluate(image): | |
| # attention_plot = np.zeros((max_length, attention_features_shape)) | |
| attention_plot = np.zeros((max_length, 100)) | |
| hidden = decoder.reset_state(batch_size=1) | |
| temp_input = tf.expand_dims(load_image(image)[0], 0) | |
| img_tensor_val = image_features_extract_model(temp_input) | |
| # print(img_tensor_val.shape) | |
| img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], | |
| -1, | |
| img_tensor_val.shape[3])) | |
| # print(img_tensor_val.shape) | |
| features = encoder(img_tensor_val) | |
| # print(features.shape) | |
| dec_input = tf.expand_dims([word_to_index('<start>')], 0) | |
| result = [] | |
| for i in range(max_length): | |
| predictions, hidden, attention_weights = decoder(dec_input, | |
| features, | |
| hidden) | |
| attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy() | |
| predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy() | |
| predicted_word = tf.compat.as_text(index_to_word(predicted_id).numpy()) | |
| result.append(predicted_word) | |
| if predicted_word == '<end>': | |
| return result, attention_plot | |
| dec_input = tf.expand_dims([predicted_id], 0) | |
| attention_plot = attention_plot[:len(result), :] | |
| return result, attention_plot | |
| # COMMAND ---------- | |
| def plot_attention(image, result, attention_plot): | |
| temp_image = np.array(Image.open(image)) | |
| fig = plt.figure(figsize=(30, 30)) | |
| len_result = len(result) | |
| for i in range(len_result): | |
| temp_att = np.resize(attention_plot[i], (8, 8)) | |
| grid_size = max(int(np.ceil(len_result/2)), 2) | |
| ax = fig.add_subplot(grid_size, grid_size, i+1) | |
| ax.set_title(result[i]) | |
| img = ax.imshow(temp_image) | |
| ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent()) | |
| plt.tight_layout() | |
| plt.show() | |
| # COMMAND ---------- | |
| if image_file is not None: | |
| with caption: | |
| st.header("generated captions by model:") | |
| for i in range(1, num_predictions+1): | |
| p = st.empty() | |
| result, _ = evaluate(image_file) | |
| pred = ' '.join(result) | |
| p.write(f"**caption {i}**: {pred}") | |
| # st.header("**caption**") | |
| # st.text(pred) | |