|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import re |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
import tensorflow as tf |
|
|
import keras |
|
|
import gradio as gr |
|
|
import requests |
|
|
from keras import layers |
|
|
from keras.applications import MobileNetV2 |
|
|
from keras.layers import TextVectorization |
|
|
from gtts import gTTS |
|
|
|
|
|
|
|
|
IMAGES_PATH = "training data" |
|
|
IMAGE_SIZE = (500,500) |
|
|
VOCAB_SIZE = 700 |
|
|
SEQ_LENGTH = 400 |
|
|
EMBED_DIM = 512 |
|
|
FF_DIM = 512 |
|
|
BATCH_SIZE = 64 |
|
|
EPOCHS = 1 |
|
|
AUTOTUNE = tf.data.AUTOTUNE |
|
|
|
|
|
|
|
|
|
|
|
def load_captions_data(filename): |
|
|
with open(filename) as caption_file: |
|
|
caption_data = caption_file.readlines() |
|
|
caption_mapping = {} |
|
|
text_data = [] |
|
|
images_to_skip = set() |
|
|
|
|
|
for line in caption_data: |
|
|
line = line.rstrip("\n") |
|
|
img_name, caption = line.split("\t") |
|
|
print(img_name) |
|
|
print(caption) |
|
|
img_name = img_name.split("#")[0] |
|
|
img_name = os.path.join(IMAGES_PATH, img_name.strip()) |
|
|
tokens = caption.strip().split() |
|
|
if img_name.endswith("jpg") and img_name not in images_to_skip: |
|
|
caption = "<start> " + caption.strip() + " <end>" |
|
|
text_data.append(caption) |
|
|
if img_name in caption_mapping: |
|
|
caption_mapping[img_name].append(caption) |
|
|
else: |
|
|
caption_mapping[img_name] = [caption] |
|
|
for img_name in images_to_skip: |
|
|
if img_name in caption_mapping: |
|
|
del caption_mapping[img_name] |
|
|
return caption_mapping, text_data |
|
|
def train_val_split(caption_data, train_size=0.8, shuffle=True): |
|
|
all_images = list(caption_data.keys()) |
|
|
if shuffle: |
|
|
np.random.shuffle(all_images) |
|
|
train_size = int(len(caption_data) * train_size) |
|
|
training_data = { |
|
|
img_name: caption_data[img_name] for img_name in all_images[:train_size] |
|
|
} |
|
|
validation_data = { |
|
|
img_name: caption_data[img_name] for img_name in all_images[train_size:] |
|
|
} |
|
|
return training_data, validation_data |
|
|
captions_mapping, text_data = load_captions_data("RIPIOS.token.txt") |
|
|
train_data, valid_data = train_val_split(captions_mapping) |
|
|
|
|
|
|
|
|
|
|
|
def custom_standardization(input_string): |
|
|
lowercase = tf.strings.lower(input_string) |
|
|
return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "") |
|
|
strip_chars = "!\"$&'*+-/:<=>?@[\]^_`{|}~" |
|
|
strip_chars = strip_chars.replace("<", "") |
|
|
strip_chars = strip_chars.replace(">", "") |
|
|
vectorization = TextVectorization( |
|
|
max_tokens=VOCAB_SIZE, |
|
|
output_mode="int", |
|
|
output_sequence_length=SEQ_LENGTH, |
|
|
standardize=custom_standardization, |
|
|
) |
|
|
vectorization.adapt(text_data) |
|
|
image_augmentation = keras.Sequential( |
|
|
[ |
|
|
layers.RandomFlip("horizontal"), |
|
|
layers.RandomRotation(0.2), |
|
|
layers.RandomContrast(0.3), |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
def decode_and_resize(img_path): |
|
|
img = tf.io.read_file(img_path) |
|
|
img = tf.image.decode_jpeg(img, channels=3) |
|
|
img = tf.image.resize(img, IMAGE_SIZE) |
|
|
img = tf.image.convert_image_dtype(img, tf.float32) |
|
|
return img |
|
|
def process_input(img_path, captions): |
|
|
return decode_and_resize(img_path), vectorization(captions) |
|
|
def make_dataset(images, captions): |
|
|
dataset = tf.data.Dataset.from_tensor_slices((images, captions)) |
|
|
dataset = dataset.shuffle(BATCH_SIZE * 8) |
|
|
dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE) |
|
|
dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE) |
|
|
|
|
|
return dataset |
|
|
train_dataset = make_dataset(list(train_data.keys()), list(train_data.values())) |
|
|
valid_dataset = make_dataset(list(valid_data.keys()), list(valid_data.values())) |
|
|
|
|
|
|
|
|
|
|
|
def get_cnn_model(): |
|
|
base_model = MobileNetV2( |
|
|
input_shape=(*IMAGE_SIZE, 3), |
|
|
include_top=False, |
|
|
weights="imagenet", |
|
|
) |
|
|
base_model.trainable = False |
|
|
base_model_out = base_model.output |
|
|
base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out) |
|
|
cnn_model = keras.models.Model(base_model.input, base_model_out) |
|
|
return cnn_model |
|
|
class TransformerEncoderBlock(layers.Layer): |
|
|
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.embed_dim = embed_dim |
|
|
self.dense_dim = dense_dim |
|
|
self.num_heads = num_heads |
|
|
self.attention_1 = layers.MultiHeadAttention( |
|
|
num_heads=num_heads, key_dim=embed_dim, dropout=0.0 |
|
|
) |
|
|
self.layernorm_1 = layers.LayerNormalization() |
|
|
self.layernorm_2 = layers.LayerNormalization() |
|
|
self.dense_1 = layers.Dense(embed_dim, activation="relu") |
|
|
|
|
|
def call(self, inputs, training, mask=None): |
|
|
inputs = self.layernorm_1(inputs) |
|
|
inputs = self.dense_1(inputs) |
|
|
|
|
|
attention_output_1 = self.attention_1( |
|
|
query=inputs, |
|
|
value=inputs, |
|
|
key=inputs, |
|
|
attention_mask=None, |
|
|
training=training, |
|
|
) |
|
|
out_1 = self.layernorm_2(inputs + attention_output_1) |
|
|
return out_1 |
|
|
class PositionalEmbedding(layers.Layer): |
|
|
def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.token_embeddings = layers.Embedding( |
|
|
input_dim=vocab_size, output_dim=embed_dim |
|
|
) |
|
|
self.position_embeddings = layers.Embedding( |
|
|
input_dim=sequence_length, output_dim=embed_dim |
|
|
) |
|
|
self.sequence_length = sequence_length |
|
|
self.vocab_size = vocab_size |
|
|
self.embed_dim = embed_dim |
|
|
self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32)) |
|
|
|
|
|
def call(self, inputs): |
|
|
length = tf.shape(inputs)[-1] |
|
|
positions = tf.range(start=0, limit=length, delta=1) |
|
|
embedded_tokens = self.token_embeddings(inputs) |
|
|
embedded_tokens = embedded_tokens * self.embed_scale |
|
|
embedded_positions = self.position_embeddings(positions) |
|
|
return embedded_tokens + embedded_positions |
|
|
|
|
|
def compute_mask(self, inputs, mask=None): |
|
|
return tf.math.not_equal(inputs, 0) |
|
|
class TransformerDecoderBlock(layers.Layer): |
|
|
def __init__(self, embed_dim, ff_dim, num_heads, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.embed_dim = embed_dim |
|
|
self.ff_dim = ff_dim |
|
|
self.num_heads = num_heads |
|
|
self.attention_1 = layers.MultiHeadAttention( |
|
|
num_heads=num_heads, key_dim=embed_dim, dropout=0.1 |
|
|
) |
|
|
self.attention_2 = layers.MultiHeadAttention( |
|
|
num_heads=num_heads, key_dim=embed_dim, dropout=0.1 |
|
|
) |
|
|
self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu") |
|
|
self.ffn_layer_2 = layers.Dense(embed_dim) |
|
|
|
|
|
self.layernorm_1 = layers.LayerNormalization() |
|
|
self.layernorm_2 = layers.LayerNormalization() |
|
|
self.layernorm_3 = layers.LayerNormalization() |
|
|
|
|
|
self.embedding = PositionalEmbedding( |
|
|
embed_dim=EMBED_DIM, |
|
|
sequence_length=SEQ_LENGTH, |
|
|
vocab_size=VOCAB_SIZE, |
|
|
) |
|
|
self.out = layers.Dense(VOCAB_SIZE, activation="softmax") |
|
|
|
|
|
self.dropout_1 = layers.Dropout(0.3) |
|
|
self.dropout_2 = layers.Dropout(0.5) |
|
|
self.supports_masking = True |
|
|
|
|
|
def call(self, inputs, encoder_outputs, training, mask=None): |
|
|
inputs = self.embedding(inputs) |
|
|
causal_mask = self.get_causal_attention_mask(inputs) |
|
|
|
|
|
if mask is not None: |
|
|
padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32) |
|
|
combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32) |
|
|
combined_mask = tf.minimum(combined_mask, causal_mask) |
|
|
|
|
|
attention_output_1 = self.attention_1( |
|
|
query=inputs, |
|
|
value=inputs, |
|
|
key=inputs, |
|
|
attention_mask=combined_mask, |
|
|
training=training, |
|
|
) |
|
|
out_1 = self.layernorm_1(inputs + attention_output_1) |
|
|
|
|
|
attention_output_2 = self.attention_2( |
|
|
query=out_1, |
|
|
value=encoder_outputs, |
|
|
key=encoder_outputs, |
|
|
attention_mask=padding_mask, |
|
|
training=training, |
|
|
) |
|
|
out_2 = self.layernorm_2(out_1 + attention_output_2) |
|
|
|
|
|
ffn_out = self.ffn_layer_1(out_2) |
|
|
ffn_out = self.dropout_1(ffn_out, training=training) |
|
|
ffn_out = self.ffn_layer_2(ffn_out) |
|
|
|
|
|
ffn_out = self.layernorm_3(ffn_out + out_2, training=training) |
|
|
ffn_out = self.dropout_2(ffn_out, training=training) |
|
|
preds = self.out(ffn_out) |
|
|
return preds |
|
|
|
|
|
def get_causal_attention_mask(self, inputs): |
|
|
input_shape = tf.shape(inputs) |
|
|
batch_size, sequence_length = input_shape[0], input_shape[1] |
|
|
i = tf.range(sequence_length)[:, tf.newaxis] |
|
|
j = tf.range(sequence_length) |
|
|
mask = tf.cast(i >= j, dtype="int32") |
|
|
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) |
|
|
mult = tf.concat( |
|
|
[ |
|
|
tf.expand_dims(batch_size, -1), |
|
|
tf.constant([1, 1], dtype=tf.int32), |
|
|
], |
|
|
axis=0, |
|
|
) |
|
|
return tf.tile(mask, mult) |
|
|
class ImageCaptioningModel(keras.Model): |
|
|
def __init__( |
|
|
self, |
|
|
cnn_model, |
|
|
encoder, |
|
|
decoder, |
|
|
num_captions_per_image=1, |
|
|
image_aug=None, |
|
|
): |
|
|
super().__init__() |
|
|
self.cnn_model = cnn_model |
|
|
self.encoder = encoder |
|
|
self.decoder = decoder |
|
|
self.loss_tracker = keras.metrics.Mean(name="loss") |
|
|
self.acc_tracker = keras.metrics.Mean(name="accuracy") |
|
|
self.num_captions_per_image = num_captions_per_image |
|
|
self.image_aug = image_aug |
|
|
|
|
|
def calculate_loss(self, y_true, y_pred, mask): |
|
|
loss = self.loss(y_true, y_pred) |
|
|
mask = tf.cast(mask, dtype=loss.dtype) |
|
|
loss *= mask |
|
|
return tf.reduce_sum(loss) / tf.reduce_sum(mask) |
|
|
|
|
|
def calculate_accuracy(self, y_true, y_pred, mask): |
|
|
accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2)) |
|
|
accuracy = tf.math.logical_and(mask, accuracy) |
|
|
accuracy = tf.cast(accuracy, dtype=tf.float32) |
|
|
mask = tf.cast(mask, dtype=tf.float32) |
|
|
return tf.reduce_sum(accuracy) / tf.reduce_sum(mask) |
|
|
|
|
|
def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True): |
|
|
encoder_out = self.encoder(img_embed, training=training) |
|
|
batch_seq_inp = batch_seq[:, :-1] |
|
|
batch_seq_true = batch_seq[:, 1:] |
|
|
mask = tf.math.not_equal(batch_seq_true, 0) |
|
|
batch_seq_pred = self.decoder( |
|
|
batch_seq_inp, encoder_out, training=training, mask=mask |
|
|
) |
|
|
loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask) |
|
|
acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask) |
|
|
return loss, acc |
|
|
|
|
|
def train_step(self, batch_data): |
|
|
batch_img, batch_seq = batch_data |
|
|
batch_loss = 0 |
|
|
batch_acc = 0 |
|
|
|
|
|
if self.image_aug: |
|
|
batch_img = self.image_aug(batch_img) |
|
|
img_embed = self.cnn_model(batch_img) |
|
|
for i in range(self.num_captions_per_image): |
|
|
with tf.GradientTape() as tape: |
|
|
loss, acc = self._compute_caption_loss_and_acc( |
|
|
img_embed, batch_seq[:, i, :], training=True |
|
|
) |
|
|
batch_loss += loss |
|
|
batch_acc += acc |
|
|
train_vars = ( |
|
|
self.encoder.trainable_variables + self.decoder.trainable_variables |
|
|
) |
|
|
grads = tape.gradient(loss, train_vars) |
|
|
self.optimizer.apply_gradients(zip(grads, train_vars)) |
|
|
batch_acc /= float(self.num_captions_per_image) |
|
|
self.loss_tracker.update_state(batch_loss) |
|
|
self.acc_tracker.update_state(batch_acc) |
|
|
return { |
|
|
"loss": self.loss_tracker.result(), |
|
|
"acc": self.acc_tracker.result(), |
|
|
} |
|
|
|
|
|
def test_step(self, batch_data): |
|
|
batch_img, batch_seq = batch_data |
|
|
batch_loss = 0 |
|
|
batch_acc = 0 |
|
|
img_embed = self.cnn_model(batch_img) |
|
|
for i in range(self.num_captions_per_image): |
|
|
loss, acc = self._compute_caption_loss_and_acc( |
|
|
img_embed, batch_seq[:, i, :], training=False |
|
|
) |
|
|
batch_loss += loss |
|
|
batch_acc += acc |
|
|
batch_acc /= float(self.num_captions_per_image) |
|
|
self.loss_tracker.update_state(batch_loss) |
|
|
self.acc_tracker.update_state(batch_acc) |
|
|
return { |
|
|
"loss": self.loss_tracker.result(), |
|
|
"acc": self.acc_tracker.result(), |
|
|
} |
|
|
|
|
|
@property |
|
|
def metrics(self): |
|
|
return [self.loss_tracker, self.acc_tracker] |
|
|
cnn_model = get_cnn_model() |
|
|
encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1) |
|
|
decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2) |
|
|
caption_model = ImageCaptioningModel( |
|
|
cnn_model=cnn_model, |
|
|
encoder=encoder, |
|
|
decoder=decoder, |
|
|
image_aug=image_augmentation, |
|
|
) |
|
|
|
|
|
|
|
|
cross_entropy = keras.losses.SparseCategoricalCrossentropy( |
|
|
from_logits=False, |
|
|
reduction='none', |
|
|
) |
|
|
early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True) |
|
|
class LRSchedule(keras.optimizers.schedules.LearningRateSchedule): |
|
|
def __init__(self, post_warmup_learning_rate, warmup_steps): |
|
|
super().__init__() |
|
|
self.post_warmup_learning_rate = post_warmup_learning_rate |
|
|
self.warmup_steps = warmup_steps |
|
|
|
|
|
def __call__(self, step): |
|
|
global_step = tf.cast(step, tf.float32) |
|
|
warmup_steps = tf.cast(self.warmup_steps, tf.float32) |
|
|
warmup_progress = global_step / warmup_steps |
|
|
warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress |
|
|
return tf.cond( |
|
|
global_step < warmup_steps, |
|
|
lambda: warmup_learning_rate, |
|
|
lambda: self.post_warmup_learning_rate, |
|
|
) |
|
|
num_train_steps = len(train_dataset) * EPOCHS |
|
|
num_warmup_steps = num_train_steps // 15 |
|
|
lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps) |
|
|
caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy) |
|
|
|
|
|
|
|
|
caption_model.fit( |
|
|
train_dataset, |
|
|
epochs=EPOCHS, |
|
|
|
|
|
validation_data=valid_dataset, |
|
|
callbacks=[early_stopping], |
|
|
) |
|
|
|
|
|
|
|
|
archivo_pesos = os.path.join("pesos10.npy") |
|
|
caption_model = np.load(archivo_pesos, allow_pickle=True) |
|
|
|
|
|
|
|
|
def generate_caption(sample_img): |
|
|
print(sample_img.shape) |
|
|
sample_img = np.random.choice(valid_images) |
|
|
sample_img = decode_and_resize(sample_img) |
|
|
img = sample_img.numpy().clip(0, 255).astype(np.uint8) |
|
|
plt.imshow(img) |
|
|
plt.show() |
|
|
img = tf.expand_dims(sample_img, 0) |
|
|
img = caption_model.cnn_model(img) |
|
|
encoded_img = caption_model.encoder(img, training=False) |
|
|
decoded_caption = "<start> " |
|
|
for i in range(max_decoded_sentence_length): |
|
|
tokenized_caption = vectorization([decoded_caption])[:, :-1] |
|
|
mask = tf.math.not_equal(tokenized_caption, 0) |
|
|
predictions = caption_model.decoder( |
|
|
tokenized_caption, encoded_img, training=False, mask=mask |
|
|
) |
|
|
sampled_token_index = np.argmax(predictions[0, i, :]) |
|
|
sampled_token = index_lookup[sampled_token_index] |
|
|
if sampled_token == "<end>": |
|
|
break |
|
|
decoded_caption += " " + sampled_token |
|
|
decoded_caption = decoded_caption.replace("<start> ", "") |
|
|
decoded_caption = decoded_caption.replace(" <end>", "").strip() |
|
|
text_to_say = decoded_caption |
|
|
lenguage = "es-es" |
|
|
gtts_object = gTTS(text = text_to_say, |
|
|
lang = lenguage, |
|
|
slow = False ) |
|
|
gtts_object.save("/content/gtts.mp3") |
|
|
audio = "/content/gtts.mp3" |
|
|
|
|
|
return decoded_caption, audio |
|
|
|
|
|
demo = gr.Interface(fn = generate_caption,inputs = gr.Image(label="Imagen"), outputs = [gr.Text(label="Descripci贸n textual"), gr.Audio(label="Audio")], theme ='darkhuggingface', title = 'DESCRIPCI脫N DE IM脕GENES DE RIPIOS DE PERFORACI脫N', |
|
|
description = 'La siguiente interfaz describir谩 de forma autom谩tica im谩genes de ripios de perforaci贸n. El usuario deber谩 ingresar en el recuadro de la izquierda la imagen a ser procesada, y en los recuadros de la derecha se mostrar谩 la descripci贸n textual y oral de la imagen. Se recomienda ingresar im谩genes sin ning煤n tipo de mediciones o s铆mbolos ya que esto podr铆a afectar en la predicci贸n del modelo.', |
|
|
article = 'Nota: En el caso de ingresar im谩genes que no tengan relaci贸n a muestras de ripios de perforaci贸n, los autores de esta aplicaci贸n no se hacen responsables por los resultados de estas, el modelo de descripci贸n de ripios de perforaci贸n est谩 entrenado para dar un resultado.') |
|
|
demo.launch() |
|
|
|