|
|
|
|
|
"""Solo_descripcion_ripios |
|
|
|
|
|
Automatically generated by Colaboratory. |
|
|
|
|
|
Original file is located at |
|
|
https://colab.research.google.com/drive/1RYsNm31Nta3rhqrgDbBsBCFcT3l-RZpC |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
"""# **Descripción y medición de ripios de perforación mediante IA** |
|
|
|
|
|
Este trabajo es una adaptación de los códigos de [A_K_Nain, 2021](https://keras.io/examples/vision/image_captioning/) y de [Sitar, M. & Leary, R., 2023](https://gchron.copernicus.org/articles/5/109/2023/)<br> |
|
|
**Autores:** Jhoel Ortiz, Christian Mejía & Paola Vargas<br> |
|
|
**Fecha de creación:** 2024/01/06<br> |
|
|
**Última modificación:** 2024/02/15<br> |
|
|
**Descripción:** Este trabajo implementa modelos de CNN y TNN para la descripción y medición de imágenes de ripios de perforación. |
|
|
|
|
|
El siguiente Notebook de Google Colab se esquematiza de la siguiente manera: |
|
|
|
|
|
**Descripción textual y oral de imágenes de ripios de perforación** |
|
|
- Carga e instalación de librerías |
|
|
- Procesamiento de los archivos de imagen y descripciones |
|
|
- Vectorización de los datos de texto |
|
|
- Canalización de datos para el entrenamiento |
|
|
- Construcción del modelo |
|
|
- Entrenamiento del modelo |
|
|
- Verificación de las predicciones |
|
|
- Evaluación con BLEU |
|
|
- Predicción de imágenes externas |
|
|
|
|
|
**Medición de imágenes de ripios de perforación** |
|
|
- Carga e instalación de librerías |
|
|
- Inspección de la imagen |
|
|
- Descarga e inicialización del modelo |
|
|
- Evaluación de prueba |
|
|
- Procesamiento automatizado |
|
|
- Ilustración de resultados automáticos |
|
|
- Procesamiento semi-automático |
|
|
- Ilustración de resultados semi-automáticos |
|
|
|
|
|
# **Descripción textual y oral de imágenes de ripios de perforación** |
|
|
Esta sección contiene todos los pasos a seguir para el desarrollo de un modelo de IA que describa automaticamente de forma escrita y oral imágenes de ripios de perforación aplicandao una RNN y un Transformer. |
|
|
|
|
|
##**Carga e instalación de librerías** |
|
|
Esta subsección carga e instala las librerías que se requieren para la descripción textual y oral de imágenes de ripios de perforación. |
|
|
""" |
|
|
|
|
|
|
|
|
import os |
|
|
|
|
|
os.environ["KERAS_BACKEND"] = "tensorflow" |
|
|
|
|
|
import re |
|
|
import numpy as np |
|
|
import matplotlib.pyplot as plt |
|
|
|
|
|
import tensorflow as tf |
|
|
import keras |
|
|
from keras import layers |
|
|
from keras.applications import MobileNetV2 |
|
|
from keras.layers import TextVectorization |
|
|
|
|
|
keras.utils.set_random_seed(111) |
|
|
|
|
|
from gtts import gTTS |
|
|
"""##**Procesamiento de las imágenes y descripciones de ripios de perforación** |
|
|
La siguiente subsección realiza lo siguiente: |
|
|
* Carga los archivos de imagen y de texto de ripios de perforación |
|
|
* Define las características y parámetros base de los archivos ingresados |
|
|
* Divide al conjunto de datos en subconjuntos de entrenamiento y validación |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
IMAGES_PATHS = ["/app3/Data", "/app3/Data1", "/app3/Data2"] |
|
|
IMAGES_PATH = IMAGES_PATHS[0] |
|
|
|
|
|
|
|
|
IMAGE_SIZE = (359,359) |
|
|
|
|
|
|
|
|
VOCAB_SIZE = 700 |
|
|
|
|
|
|
|
|
SEQ_LENGTH = 400 |
|
|
|
|
|
|
|
|
EMBED_DIM = 512 |
|
|
|
|
|
|
|
|
FF_DIM = 512 |
|
|
|
|
|
|
|
|
BATCH_SIZE = 64 |
|
|
EPOCHS = 1 |
|
|
AUTOTUNE = tf.data.AUTOTUNE |
|
|
|
|
|
def load_captions_data(filename): |
|
|
"""Carga las descripciones (texto) y los asigna a sus imágenes correspondientes. |
|
|
|
|
|
Argumentos: |
|
|
filename: Ruta al archivo de texto que contiene las descripciones. |
|
|
|
|
|
Returna: |
|
|
caption_mapping: Diccionario que mapea los nombres de las imágenes y sus descipciones correspondientes. |
|
|
text_data: Lista que contiene todos los subtítulos disponibles. |
|
|
""" |
|
|
|
|
|
with open(filename) as caption_file: |
|
|
caption_data = caption_file.readlines() |
|
|
caption_mapping = {} |
|
|
text_data = [] |
|
|
images_to_skip = set() |
|
|
|
|
|
for line in caption_data: |
|
|
line = line.rstrip("\n") |
|
|
|
|
|
img_name, caption = line.split("\t") |
|
|
print(img_name) |
|
|
print(caption) |
|
|
|
|
|
|
|
|
|
|
|
img_name = img_name.split("#")[0] |
|
|
img_name = os.path.join(IMAGES_PATH, img_name.strip()) |
|
|
|
|
|
|
|
|
tokens = caption.strip().split() |
|
|
|
|
|
if img_name.endswith("jpg") and img_name not in images_to_skip: |
|
|
|
|
|
caption = "<start> " + caption.strip() + " <end>" |
|
|
text_data.append(caption) |
|
|
|
|
|
if img_name in caption_mapping: |
|
|
caption_mapping[img_name].append(caption) |
|
|
else: |
|
|
caption_mapping[img_name] = [caption] |
|
|
|
|
|
for img_name in images_to_skip: |
|
|
if img_name in caption_mapping: |
|
|
del caption_mapping[img_name] |
|
|
|
|
|
return caption_mapping, text_data |
|
|
|
|
|
|
|
|
def train_val_split(caption_data, train_size=0.8, shuffle=True): |
|
|
"""Divide el conjunto de datos en subconjuntos de entrenamiento y validación. |
|
|
|
|
|
Args: |
|
|
caption_data (dict): Diccionario que contiene las descripciones asignadas. |
|
|
train_size (float): Fracción del conjunto de datos que se usa como subconjunto de entrenamiento. |
|
|
shuffle (bool): Se especifica si se quiere mezclar el conjunto de datos antes de dividirlo. |
|
|
|
|
|
Returns: |
|
|
Conjuntos de datos de entrenamiento y validación como dos dictados separados |
|
|
""" |
|
|
|
|
|
|
|
|
all_images = list(caption_data.keys()) |
|
|
|
|
|
|
|
|
if shuffle: |
|
|
np.random.shuffle(all_images) |
|
|
|
|
|
|
|
|
train_size = int(len(caption_data) * train_size) |
|
|
|
|
|
training_data = { |
|
|
img_name: caption_data[img_name] for img_name in all_images[:train_size] |
|
|
} |
|
|
validation_data = { |
|
|
img_name: caption_data[img_name] for img_name in all_images[train_size:] |
|
|
} |
|
|
|
|
|
|
|
|
return training_data, validation_data |
|
|
|
|
|
|
|
|
captions_mapping, text_data = load_captions_data("/app3/ROCAS.token.txt") |
|
|
|
|
|
|
|
|
|
|
|
train_data, valid_data = train_val_split(captions_mapping) |
|
|
print("Número de muestras de entrenamiento: ", len(train_data)) |
|
|
print("Número de muestras de validación: ", len(valid_data)) |
|
|
|
|
|
"""##**Vectorización de los datos de texto** |
|
|
Esta sección transforma las descripciones del archivo de texto en vectores, |
|
|
estandariza las cadenas de caracteres y aumenta el número de imágenes con características establecidas. |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
|
|
|
|
def custom_standardization(input_string): |
|
|
lowercase = tf.strings.lower(input_string) |
|
|
return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "") |
|
|
|
|
|
|
|
|
strip_chars = "!\"$&'*+-/:<=>?@[\]^_`{|}~" |
|
|
strip_chars = strip_chars.replace("<", "") |
|
|
strip_chars = strip_chars.replace(">", "") |
|
|
|
|
|
|
|
|
vectorization = TextVectorization( |
|
|
max_tokens=VOCAB_SIZE, |
|
|
output_mode="int", |
|
|
output_sequence_length=SEQ_LENGTH, |
|
|
standardize=custom_standardization, |
|
|
) |
|
|
vectorization.adapt(text_data) |
|
|
|
|
|
|
|
|
image_augmentation = keras.Sequential( |
|
|
[ |
|
|
layers.RandomFlip("horizontal"), |
|
|
layers.RandomRotation(0.2), |
|
|
layers.RandomContrast(0.3), |
|
|
] |
|
|
) |
|
|
|
|
|
"""##**Canalización de datos para el entrenamiento** |
|
|
|
|
|
Se genera pares de imágenes con sus respectivas descripciones usando `tf.data.Dataset`. |
|
|
|
|
|
El proceso consiste de dos etapas: |
|
|
|
|
|
- Leer la imagen del disco |
|
|
- Tokenizar las descripciones de cada una de ellas |
|
|
""" |
|
|
|
|
|
def decode_and_resize(img_path): |
|
|
img = tf.io.read_file(img_path) |
|
|
img = tf.image.decode_jpeg(img, channels=3) |
|
|
img = tf.image.resize(img, IMAGE_SIZE) |
|
|
img = tf.image.convert_image_dtype(img, tf.float32) |
|
|
return img |
|
|
|
|
|
|
|
|
def process_input(img_path, captions): |
|
|
return decode_and_resize(img_path), vectorization(captions) |
|
|
|
|
|
|
|
|
def make_dataset(images, captions): |
|
|
dataset = tf.data.Dataset.from_tensor_slices((images, captions)) |
|
|
dataset = dataset.shuffle(BATCH_SIZE * 8) |
|
|
dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE) |
|
|
dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE) |
|
|
|
|
|
return dataset |
|
|
|
|
|
|
|
|
|
|
|
train_dataset = make_dataset(list(train_data.keys()), list(train_data.values())) |
|
|
|
|
|
valid_dataset = make_dataset(list(valid_data.keys()), list(valid_data.values())) |
|
|
|
|
|
"""## **Construcción del modelo** |
|
|
|
|
|
La descripción de imágenes consta de tres modelos: |
|
|
|
|
|
- Una CNN: extrae las características de las imágenes. |
|
|
- Un TransformerEncoder: por medio de un modelo pre-entrenado para trabajar con imágenes de rocas, se encarga de identificar y extraer las características (features) de las fotos de la base de datos. |
|
|
- Un TransformerDecoder: toma como entradas las features del codificador y las descripciones (secuencias) e identifica el proceso para generar descripciones de imágenes. |
|
|
""" |
|
|
|
|
|
def get_cnn_model(): |
|
|
base_model = MobileNetV2( |
|
|
input_shape=(*IMAGE_SIZE, 3), |
|
|
include_top=False, |
|
|
weights="imagenet", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
base_model.trainable = False |
|
|
base_model_out = base_model.output |
|
|
base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out) |
|
|
cnn_model = keras.models.Model(base_model.input, base_model_out) |
|
|
return cnn_model |
|
|
|
|
|
|
|
|
class TransformerEncoderBlock(layers.Layer): |
|
|
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.embed_dim = embed_dim |
|
|
self.dense_dim = dense_dim |
|
|
self.num_heads = num_heads |
|
|
self.attention_1 = layers.MultiHeadAttention( |
|
|
num_heads=num_heads, key_dim=embed_dim, dropout=0.0 |
|
|
) |
|
|
self.layernorm_1 = layers.LayerNormalization() |
|
|
self.layernorm_2 = layers.LayerNormalization() |
|
|
self.dense_1 = layers.Dense(embed_dim, activation="relu") |
|
|
|
|
|
def call(self, inputs, training, mask=None): |
|
|
inputs = self.layernorm_1(inputs) |
|
|
inputs = self.dense_1(inputs) |
|
|
|
|
|
attention_output_1 = self.attention_1( |
|
|
query=inputs, |
|
|
value=inputs, |
|
|
key=inputs, |
|
|
attention_mask=None, |
|
|
training=training, |
|
|
) |
|
|
out_1 = self.layernorm_2(inputs + attention_output_1) |
|
|
return out_1 |
|
|
|
|
|
|
|
|
class PositionalEmbedding(layers.Layer): |
|
|
def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.token_embeddings = layers.Embedding( |
|
|
input_dim=vocab_size, output_dim=embed_dim |
|
|
) |
|
|
self.position_embeddings = layers.Embedding( |
|
|
input_dim=sequence_length, output_dim=embed_dim |
|
|
) |
|
|
self.sequence_length = sequence_length |
|
|
self.vocab_size = vocab_size |
|
|
self.embed_dim = embed_dim |
|
|
self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32)) |
|
|
|
|
|
def call(self, inputs): |
|
|
length = tf.shape(inputs)[-1] |
|
|
positions = tf.range(start=0, limit=length, delta=1) |
|
|
embedded_tokens = self.token_embeddings(inputs) |
|
|
embedded_tokens = embedded_tokens * self.embed_scale |
|
|
embedded_positions = self.position_embeddings(positions) |
|
|
return embedded_tokens + embedded_positions |
|
|
|
|
|
def compute_mask(self, inputs, mask=None): |
|
|
return tf.math.not_equal(inputs, 0) |
|
|
|
|
|
|
|
|
class TransformerDecoderBlock(layers.Layer): |
|
|
def __init__(self, embed_dim, ff_dim, num_heads, **kwargs): |
|
|
super().__init__(**kwargs) |
|
|
self.embed_dim = embed_dim |
|
|
self.ff_dim = ff_dim |
|
|
self.num_heads = num_heads |
|
|
self.attention_1 = layers.MultiHeadAttention( |
|
|
num_heads=num_heads, key_dim=embed_dim, dropout=0.1 |
|
|
) |
|
|
self.attention_2 = layers.MultiHeadAttention( |
|
|
num_heads=num_heads, key_dim=embed_dim, dropout=0.1 |
|
|
) |
|
|
self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu") |
|
|
self.ffn_layer_2 = layers.Dense(embed_dim) |
|
|
|
|
|
self.layernorm_1 = layers.LayerNormalization() |
|
|
self.layernorm_2 = layers.LayerNormalization() |
|
|
self.layernorm_3 = layers.LayerNormalization() |
|
|
|
|
|
self.embedding = PositionalEmbedding( |
|
|
embed_dim=EMBED_DIM, |
|
|
sequence_length=SEQ_LENGTH, |
|
|
vocab_size=VOCAB_SIZE, |
|
|
) |
|
|
self.out = layers.Dense(VOCAB_SIZE, activation="softmax") |
|
|
|
|
|
self.dropout_1 = layers.Dropout(0.3) |
|
|
self.dropout_2 = layers.Dropout(0.5) |
|
|
self.supports_masking = True |
|
|
|
|
|
def call(self, inputs, encoder_outputs, training, mask=None): |
|
|
inputs = self.embedding(inputs) |
|
|
causal_mask = self.get_causal_attention_mask(inputs) |
|
|
|
|
|
if mask is not None: |
|
|
padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32) |
|
|
combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32) |
|
|
combined_mask = tf.minimum(combined_mask, causal_mask) |
|
|
|
|
|
attention_output_1 = self.attention_1( |
|
|
query=inputs, |
|
|
value=inputs, |
|
|
key=inputs, |
|
|
attention_mask=combined_mask, |
|
|
training=training, |
|
|
) |
|
|
out_1 = self.layernorm_1(inputs + attention_output_1) |
|
|
|
|
|
attention_output_2 = self.attention_2( |
|
|
query=out_1, |
|
|
value=encoder_outputs, |
|
|
key=encoder_outputs, |
|
|
attention_mask=padding_mask, |
|
|
training=training, |
|
|
) |
|
|
out_2 = self.layernorm_2(out_1 + attention_output_2) |
|
|
|
|
|
ffn_out = self.ffn_layer_1(out_2) |
|
|
ffn_out = self.dropout_1(ffn_out, training=training) |
|
|
ffn_out = self.ffn_layer_2(ffn_out) |
|
|
|
|
|
ffn_out = self.layernorm_3(ffn_out + out_2, training=training) |
|
|
ffn_out = self.dropout_2(ffn_out, training=training) |
|
|
preds = self.out(ffn_out) |
|
|
return preds |
|
|
|
|
|
def get_causal_attention_mask(self, inputs): |
|
|
input_shape = tf.shape(inputs) |
|
|
batch_size, sequence_length = input_shape[0], input_shape[1] |
|
|
i = tf.range(sequence_length)[:, tf.newaxis] |
|
|
j = tf.range(sequence_length) |
|
|
mask = tf.cast(i >= j, dtype="int32") |
|
|
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) |
|
|
mult = tf.concat( |
|
|
[ |
|
|
tf.expand_dims(batch_size, -1), |
|
|
tf.constant([1, 1], dtype=tf.int32), |
|
|
], |
|
|
axis=0, |
|
|
) |
|
|
return tf.tile(mask, mult) |
|
|
|
|
|
|
|
|
class ImageCaptioningModel(keras.Model): |
|
|
def __init__( |
|
|
self, |
|
|
cnn_model, |
|
|
encoder, |
|
|
decoder, |
|
|
num_captions_per_image=1, |
|
|
image_aug=None, |
|
|
): |
|
|
super().__init__() |
|
|
self.cnn_model = cnn_model |
|
|
self.encoder = encoder |
|
|
self.decoder = decoder |
|
|
self.loss_tracker = keras.metrics.Mean(name="loss") |
|
|
self.acc_tracker = keras.metrics.Mean(name="accuracy") |
|
|
self.num_captions_per_image = num_captions_per_image |
|
|
self.image_aug = image_aug |
|
|
|
|
|
def calculate_loss(self, y_true, y_pred, mask): |
|
|
loss = self.loss(y_true, y_pred) |
|
|
mask = tf.cast(mask, dtype=loss.dtype) |
|
|
loss *= mask |
|
|
return tf.reduce_sum(loss) / tf.reduce_sum(mask) |
|
|
|
|
|
def calculate_accuracy(self, y_true, y_pred, mask): |
|
|
accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2)) |
|
|
accuracy = tf.math.logical_and(mask, accuracy) |
|
|
accuracy = tf.cast(accuracy, dtype=tf.float32) |
|
|
mask = tf.cast(mask, dtype=tf.float32) |
|
|
return tf.reduce_sum(accuracy) / tf.reduce_sum(mask) |
|
|
|
|
|
def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True): |
|
|
encoder_out = self.encoder(img_embed, training=training) |
|
|
batch_seq_inp = batch_seq[:, :-1] |
|
|
batch_seq_true = batch_seq[:, 1:] |
|
|
mask = tf.math.not_equal(batch_seq_true, 0) |
|
|
batch_seq_pred = self.decoder( |
|
|
batch_seq_inp, encoder_out, training=training, mask=mask |
|
|
) |
|
|
loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask) |
|
|
acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask) |
|
|
return loss, acc |
|
|
|
|
|
def train_step(self, batch_data): |
|
|
batch_img, batch_seq = batch_data |
|
|
batch_loss = 0 |
|
|
batch_acc = 0 |
|
|
|
|
|
if self.image_aug: |
|
|
batch_img = self.image_aug(batch_img) |
|
|
|
|
|
|
|
|
img_embed = self.cnn_model(batch_img) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i in range(self.num_captions_per_image): |
|
|
with tf.GradientTape() as tape: |
|
|
loss, acc = self._compute_caption_loss_and_acc( |
|
|
img_embed, batch_seq[:, i, :], training=True |
|
|
) |
|
|
|
|
|
|
|
|
batch_loss += loss |
|
|
batch_acc += acc |
|
|
|
|
|
|
|
|
train_vars = ( |
|
|
self.encoder.trainable_variables + self.decoder.trainable_variables |
|
|
) |
|
|
|
|
|
|
|
|
grads = tape.gradient(loss, train_vars) |
|
|
|
|
|
|
|
|
self.optimizer.apply_gradients(zip(grads, train_vars)) |
|
|
|
|
|
|
|
|
batch_acc /= float(self.num_captions_per_image) |
|
|
self.loss_tracker.update_state(batch_loss) |
|
|
self.acc_tracker.update_state(batch_acc) |
|
|
|
|
|
|
|
|
return { |
|
|
"loss": self.loss_tracker.result(), |
|
|
"acc": self.acc_tracker.result(), |
|
|
} |
|
|
|
|
|
def test_step(self, batch_data): |
|
|
batch_img, batch_seq = batch_data |
|
|
batch_loss = 0 |
|
|
batch_acc = 0 |
|
|
|
|
|
|
|
|
img_embed = self.cnn_model(batch_img) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for i in range(self.num_captions_per_image): |
|
|
loss, acc = self._compute_caption_loss_and_acc( |
|
|
img_embed, batch_seq[:, i, :], training=False |
|
|
) |
|
|
|
|
|
|
|
|
batch_loss += loss |
|
|
batch_acc += acc |
|
|
|
|
|
batch_acc /= float(self.num_captions_per_image) |
|
|
|
|
|
|
|
|
self.loss_tracker.update_state(batch_loss) |
|
|
self.acc_tracker.update_state(batch_acc) |
|
|
|
|
|
|
|
|
return { |
|
|
"loss": self.loss_tracker.result(), |
|
|
"acc": self.acc_tracker.result(), |
|
|
} |
|
|
|
|
|
@property |
|
|
def metrics(self): |
|
|
|
|
|
|
|
|
return [self.loss_tracker, self.acc_tracker] |
|
|
|
|
|
|
|
|
cnn_model = get_cnn_model() |
|
|
encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1) |
|
|
decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2) |
|
|
caption_model = ImageCaptioningModel( |
|
|
cnn_model=cnn_model, |
|
|
encoder=encoder, |
|
|
decoder=decoder, |
|
|
image_aug=image_augmentation, |
|
|
) |
|
|
|
|
|
"""## **Entrenamiento del modelo**""" |
|
|
|
|
|
|
|
|
cross_entropy = keras.losses.SparseCategoricalCrossentropy( |
|
|
from_logits=False, |
|
|
reduction='none', |
|
|
) |
|
|
|
|
|
|
|
|
early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True) |
|
|
|
|
|
|
|
|
|
|
|
from tensorflow.keras.optimizers.schedules import LearningRateSchedule |
|
|
|
|
|
class LRSchedule(LearningRateSchedule): |
|
|
def __init__(self, post_warmup_learning_rate, warmup_steps): |
|
|
super().__init__() |
|
|
self.post_warmup_learning_rate = post_warmup_learning_rate |
|
|
self.warmup_steps = warmup_steps |
|
|
|
|
|
def __call__(self, step): |
|
|
global_step = tf.cast(step, tf.float32) |
|
|
warmup_steps = tf.cast(self.warmup_steps, tf.float32) |
|
|
warmup_progress = global_step / warmup_steps |
|
|
warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress |
|
|
return tf.cond( |
|
|
global_step < warmup_steps, |
|
|
lambda: warmup_learning_rate, |
|
|
lambda: self.post_warmup_learning_rate, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
num_train_steps = len(train_dataset) * EPOCHS |
|
|
num_warmup_steps = num_train_steps // 15 |
|
|
lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps) |
|
|
|
|
|
|
|
|
caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy) |
|
|
|
|
|
|
|
|
caption_model.fit( |
|
|
train_dataset, |
|
|
epochs=EPOCHS, |
|
|
|
|
|
validation_data=valid_dataset, |
|
|
callbacks=[early_stopping], |
|
|
) |
|
|
|
|
|
"""### **Opción para guardar el modelo entrenado**""" |
|
|
|
|
|
|
|
|
pesos = caption_model.get_weights() |
|
|
|
|
|
|
|
|
|
|
|
np.save('/app3/pesos1.npy', np.array(pesos, dtype=object), allow_pickle=True) |
|
|
|
|
|
|
|
|
import os |
|
|
import numpy as np |
|
|
|
|
|
archivo_pesos = os.path.join("/app3", "pesos10.npy") |
|
|
pesos_nuevos = np.load(archivo_pesos, allow_pickle=True) |
|
|
|
|
|
|
|
|
caption_model.set_weights(pesos_nuevos) |
|
|
|
|
|
"""##**Verificación de las predicciones**""" |
|
|
|
|
|
vocab = vectorization.get_vocabulary() |
|
|
index_lookup = dict(zip(range(len(vocab)), vocab)) |
|
|
max_decoded_sentence_length = SEQ_LENGTH - 1 |
|
|
valid_images = list(valid_data.keys()) |
|
|
|
|
|
|
|
|
def generate_caption(): |
|
|
|
|
|
sample_img = np.random.choice(valid_images) |
|
|
print(sample_img) |
|
|
|
|
|
|
|
|
sample_img = decode_and_resize(sample_img) |
|
|
img = sample_img.numpy().clip(0, 255).astype(np.uint8) |
|
|
plt.imshow(img) |
|
|
plt.show() |
|
|
|
|
|
|
|
|
img = tf.expand_dims(sample_img, 0) |
|
|
img = caption_model.cnn_model(img) |
|
|
|
|
|
|
|
|
encoded_img = caption_model.encoder(img, training=False) |
|
|
|
|
|
|
|
|
decoded_caption = "<start> " |
|
|
for i in range(max_decoded_sentence_length): |
|
|
tokenized_caption = vectorization([decoded_caption])[:, :-1] |
|
|
mask = tf.math.not_equal(tokenized_caption, 0) |
|
|
predictions = caption_model.decoder( |
|
|
tokenized_caption, encoded_img, training=False, mask=mask |
|
|
) |
|
|
sampled_token_index = np.argmax(predictions[0, i, :]) |
|
|
sampled_token = index_lookup[sampled_token_index] |
|
|
if sampled_token == "<end>": |
|
|
break |
|
|
decoded_caption += " " + sampled_token |
|
|
|
|
|
decoded_caption = decoded_caption.replace("<start> ", "") |
|
|
decoded_caption = decoded_caption.replace(" <end>", "").strip() |
|
|
print("Predicted Caption: ", decoded_caption) |
|
|
|
|
|
|
|
|
Ex_1= generate_caption() |
|
|
|
|
|
|