image_captioning_v1 / image_captioning_tf_results.py
mandanah
initial commit
c7d03c1
# Databricks notebook source
import torchvision.transforms as transforms
import tensorflow as tf
import matplotlib.pyplot as plt
import numpy as np
import collections
import random
import numpy as np
import os
import time
import json
from PIL import Image
from turtle import width
import streamlit as st
import torchvision.transforms as transforms
# COMMAND ----------
def load_image_initial(image_file):
img = Image.open(image_file)
return img
#streamlit
header = st.container()
image = st.container()
caption = st.container()
with header:
st.title('Image Captioning')
st.text('Generate captions for your images!')
with image:
# st.markdown("**upload your image here:**")
image_file = st.file_uploader("upload your image here:", type = ["png", "jpg", 'jpeg'])
if image_file is not None:
#st.write(type(image_file))
# st.write(dir(image_file))
# file_details = {"filename": image_file.name, "filetype":image_file.type, "filesize":image_file.size}
# st.write(file_details)
st.image(load_image_initial(image_file), width=299)
################################model 14
num_predictions = 3
feature_extraction_model = 'ResNet152'
tokenizer_path = 'tokenizer.pkl'
# checkpoint_path = "/dbfs/FileStore/shared_uploads/mhajiza@gap.com/computer_vision/models/image_captioning_tf_14/ckpt-10"
# checkpoint_path = "/dbfs/FileStore/shared_uploads/mhajiza@gap.com/computer_vision/models/image_captioning_tf_14/manually_saved_model-11"
checkpoint_path = "/Users/mhajiza/Documents/Computer_Vison/Image_captioning/image_captioning_tf_model/ckpt-10"
# checkpoint_path = "/Users/mhajiza/Documents/Computer_Vison/Image_captioning/image_captioning_tf_model/manually_saved_model-11"
# COMMAND ----------
def load_image(image_file):
img = Image.open(image_file).convert('RGB')
img = tf.keras.preprocessing.image.img_to_array(img)
img = tf.keras.layers.Resizing(299, 299)(img)
if feature_extraction_model == 'InceptionV3':
img = tf.keras.applications.inception_v3.preprocess_input(img)
if (feature_extraction_model == 'ResNet50') or (feature_extraction_model == 'ResNet101') or (feature_extraction_model == 'ResNet152'):
img = tf.keras.applications.resnet.preprocess_input(img)
return img, image_file
# COMMAND ----------
#Initialize ResNet and load the pretrained Imagenet weights
if feature_extraction_model == 'ResNet152':
image_model = tf.keras.applications.ResNet152(include_top=False, weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
if feature_extraction_model == 'ResNet50':
image_model = tf.keras.applications.ResNet50(include_top=False, weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
if feature_extraction_model == 'ResNet101':
image_model = tf.keras.applications.ResNet101(include_top=False, weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
if feature_extraction_model == 'InceptionV3':
image_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_extract_model = tf.keras.Model(new_input, hidden_layer)
# COMMAND ----------
def standardize(inputs):
inputs = tf.strings.lower(inputs)
return tf.strings.regex_replace(inputs, r"!\"#$%&\(\)\*\+.,-/:;=?@\[\\\]^_`{|}~", "")
import pickle
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
from_disk = pickle.load(open(tokenizer_path, "rb"))
tokenizer = TextVectorization.from_config(from_disk['config'])
tokenizer.adapt(["this is a test"])
tokenizer.set_weights(from_disk['weights'])
# COMMAND ----------
vocabulary_size = tokenizer.get_config()['max_tokens']
max_length = tokenizer.get_config()['output_sequence_length']
# COMMAND ----------
# Create mappings for words to indices and indices to words.
word_to_index = tf.keras.layers.StringLookup(mask_token="", vocabulary=tokenizer.get_vocabulary())
index_to_word = tf.keras.layers.StringLookup( mask_token="", vocabulary=tokenizer.get_vocabulary(), invert=True)
# COMMAND ----------
# max_length = 95 ##100
embedding_dim = 256
units = 512
# Shape of the vector extracted from InceptionV3 is (64, 2048)
# These two variables represent that vector shape
features_shape = 2048
attention_features_shape = 100 #64
# COMMAND ----------
class BahdanauAttention(tf.keras.Model): #####Attention mechanism
def __init__(self, units):
super(BahdanauAttention, self).__init__()
self.W1 = tf.keras.layers.Dense(units)
self.W2 = tf.keras.layers.Dense(units)
self.V = tf.keras.layers.Dense(1)
def call(self, features, hidden):
# features(CNN_encoder output) shape == (batch_size, 64, embedding_dim) ######(batch_size, 64, 2048)
# hidden shape == (batch_size, hidden_size)
# hidden_with_time_axis shape == (batch_size, 1, hidden_size) ##### this is after expanding with axis =1
hidden_with_time_axis = tf.expand_dims(hidden, 1)
# attention_hidden_layer shape == (batch_size, 64, units)
attention_hidden_layer = (tf.nn.tanh(self.W1(features) +
self.W2(hidden_with_time_axis)))
# score shape == (batch_size, 64, 1)
# This gives you an unnormalized score for each image feature.
score = self.V(attention_hidden_layer)
# attention_weights shape == (batch_size, 64, 1)
attention_weights = tf.nn.softmax(score, axis=1)
# context_vector shape after sum == (batch_size, hidden_size)
context_vector = attention_weights * features
context_vector = tf.reduce_sum(context_vector, axis=1)
return context_vector, attention_weights
# COMMAND ----------
class CNN_Encoder(tf.keras.Model):
# Since you have already extracted the features and dumped it
# This encoder passes those features through a Fully connected layer
def __init__(self, embedding_dim):
super(CNN_Encoder, self).__init__()
# shape after fc == (batch_size, 64, embedding_dim)
self.fc = tf.keras.layers.Dense(embedding_dim)
def call(self, x):
x = self.fc(x)
x = tf.nn.relu(x)
return x
# COMMAND ----------
class RNN_Decoder(tf.keras.Model):
def __init__(self, embedding_dim, units, vocab_size):
super(RNN_Decoder, self).__init__()
self.units = units
self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
self.gru = tf.keras.layers.GRU(self.units,
return_sequences=True,
return_state=True,
recurrent_initializer='glorot_uniform')
self.fc1 = tf.keras.layers.Dense(self.units)
self.fc2 = tf.keras.layers.Dense(vocab_size)
self.attention = BahdanauAttention(self.units)
def call(self, x, features, hidden):
# defining attention as a separate model
context_vector, attention_weights = self.attention(features, hidden)
# x shape after passing through embedding == (batch_size, 1, embedding_dim)
x = self.embedding(x)
# x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
# passing the concatenated vector to the GRU
output, state = self.gru(x)
# shape == (batch_size, max_length, hidden_size)
x = self.fc1(output)
# x shape == (batch_size * max_length, hidden_size)
x = tf.reshape(x, (-1, x.shape[2]))
# output shape == (batch_size * max_length, vocab)
x = self.fc2(x)
return x, state, attention_weights
def reset_state(self, batch_size):
return tf.zeros((batch_size, self.units))
# COMMAND ----------
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, tokenizer.vocabulary_size())
# COMMAND ----------
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
from_logits=True, reduction='none')
def loss_function(real, pred):
mask = tf.math.logical_not(tf.math.equal(real, 0))
loss_ = loss_object(real, pred)
mask = tf.cast(mask, dtype=loss_.dtype)
loss_ *= mask
return tf.reduce_mean(loss_)
# COMMAND ----------
ckpt = tf.train.Checkpoint(encoder=encoder,
decoder=decoder,
optimizer=optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=2)
# ckpt.restore(ckpt_manager.latest_checkpoint)
# COMMAND ----------
ckpt.restore(checkpoint_path)
# COMMAND ----------
def evaluate(image):
# attention_plot = np.zeros((max_length, attention_features_shape))
attention_plot = np.zeros((max_length, 100))
hidden = decoder.reset_state(batch_size=1)
temp_input = tf.expand_dims(load_image(image)[0], 0)
img_tensor_val = image_features_extract_model(temp_input)
# print(img_tensor_val.shape)
img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0],
-1,
img_tensor_val.shape[3]))
# print(img_tensor_val.shape)
features = encoder(img_tensor_val)
# print(features.shape)
dec_input = tf.expand_dims([word_to_index('<start>')], 0)
result = []
for i in range(max_length):
predictions, hidden, attention_weights = decoder(dec_input,
features,
hidden)
attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()
predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
predicted_word = tf.compat.as_text(index_to_word(predicted_id).numpy())
result.append(predicted_word)
if predicted_word == '<end>':
return result, attention_plot
dec_input = tf.expand_dims([predicted_id], 0)
attention_plot = attention_plot[:len(result), :]
return result, attention_plot
# COMMAND ----------
def plot_attention(image, result, attention_plot):
temp_image = np.array(Image.open(image))
fig = plt.figure(figsize=(30, 30))
len_result = len(result)
for i in range(len_result):
temp_att = np.resize(attention_plot[i], (8, 8))
grid_size = max(int(np.ceil(len_result/2)), 2)
ax = fig.add_subplot(grid_size, grid_size, i+1)
ax.set_title(result[i])
img = ax.imshow(temp_image)
ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent())
plt.tight_layout()
plt.show()
# COMMAND ----------
if image_file is not None:
with caption:
st.header("generated captions by model:")
for i in range(1, num_predictions+1):
p = st.empty()
result, _ = evaluate(image_file)
pred = ' '.join(result)
p.write(f"**caption {i}**: {pred}")
# st.header("**caption**")
# st.text(pred)