# -*- coding: utf-8 -*-
"""Solo_descripcion_ripios
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1RYsNm31Nta3rhqrgDbBsBCFcT3l-RZpC
"""
"""# **Descripción y medición de ripios de perforación mediante IA**
Este trabajo es una adaptación de los códigos de [A_K_Nain, 2021](https://keras.io/examples/vision/image_captioning/) y de [Sitar, M. & Leary, R., 2023](https://gchron.copernicus.org/articles/5/109/2023/)
**Autores:** Jhoel Ortiz, Christian Mejía & Paola Vargas
**Fecha de creación:** 2024/01/06
**Última modificación:** 2024/02/15
**Descripción:** Este trabajo implementa modelos de CNN y TNN para la descripción y medición de imágenes de ripios de perforación.
El siguiente Notebook de Google Colab se esquematiza de la siguiente manera:
**Descripción textual y oral de imágenes de ripios de perforación**
- Carga e instalación de librerías
- Procesamiento de los archivos de imagen y descripciones
- Vectorización de los datos de texto
- Canalización de datos para el entrenamiento
- Construcción del modelo
- Entrenamiento del modelo
- Verificación de las predicciones
- Evaluación con BLEU
- Predicción de imágenes externas
**Medición de imágenes de ripios de perforación**
- Carga e instalación de librerías
- Inspección de la imagen
- Descarga e inicialización del modelo
- Evaluación de prueba
- Procesamiento automatizado
- Ilustración de resultados automáticos
- Procesamiento semi-automático
- Ilustración de resultados semi-automáticos
# **Descripción textual y oral de imágenes de ripios de perforación**
Esta sección contiene todos los pasos a seguir para el desarrollo de un modelo de IA que describa automaticamente de forma escrita y oral imágenes de ripios de perforación aplicandao una RNN y un Transformer.
##**Carga e instalación de librerías**
Esta subsección carga e instala las librerías que se requieren para la descripción textual y oral de imágenes de ripios de perforación.
"""
# Carga de librerías
import os
os.environ["KERAS_BACKEND"] = "tensorflow"
import re
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
from keras import layers
from keras.applications import MobileNetV2
from keras.layers import TextVectorization
keras.utils.set_random_seed(111)
from gtts import gTTS
"""##**Procesamiento de las imágenes y descripciones de ripios de perforación**
La siguiente subsección realiza lo siguiente:
* Carga los archivos de imagen y de texto de ripios de perforación
* Define las características y parámetros base de los archivos ingresados
* Divide al conjunto de datos en subconjuntos de entrenamiento y validación
"""
IMAGES_PATHS = ["/app3/Data", "/app3/Data1", "/app3/Data2"]
IMAGES_PATH = IMAGES_PATHS[0] # Accede al primer elemento de la lista
# Dimensiones de imagen
IMAGE_SIZE = (359,359)
# Tamaño del vocabulario
VOCAB_SIZE = 700
# Longitud fija para cualquier secuencia
SEQ_LENGTH = 400
# Dimensiones para los embeddings de imágenes y de tokens
EMBED_DIM = 512
# Unidades por capa en la red feed-forward
FF_DIM = 512
# Otros parámetros de entrenamiento
BATCH_SIZE = 64
EPOCHS = 1
AUTOTUNE = tf.data.AUTOTUNE
def load_captions_data(filename):
"""Carga las descripciones (texto) y los asigna a sus imágenes correspondientes.
Argumentos:
filename: Ruta al archivo de texto que contiene las descripciones.
Returna:
caption_mapping: Diccionario que mapea los nombres de las imágenes y sus descipciones correspondientes.
text_data: Lista que contiene todos los subtítulos disponibles.
"""
with open(filename) as caption_file:
caption_data = caption_file.readlines()
caption_mapping = {}
text_data = []
images_to_skip = set()
for line in caption_data:
line = line.rstrip("\n")
# El nombre de la imagen se separa de su descripción por una tabulación
img_name, caption = line.split("\t")
print(img_name)
print(caption)
# Cada nombre de imagen tiene un sufijo `#img_name.jpg#0`
img_name = img_name.split("#")[0]
img_name = os.path.join(IMAGES_PATH, img_name.strip())
# Se eliminan las descripciones demasiado largas o demasiado cortas
tokens = caption.strip().split()
if img_name.endswith("jpg") and img_name not in images_to_skip:
# Se agrega un token de inicio y fin a cada descripción
caption = " " + caption.strip() + " "
text_data.append(caption)
if img_name in caption_mapping:
caption_mapping[img_name].append(caption)
else:
caption_mapping[img_name] = [caption]
for img_name in images_to_skip:
if img_name in caption_mapping:
del caption_mapping[img_name]
return caption_mapping, text_data
def train_val_split(caption_data, train_size=0.8, shuffle=True):
"""Divide el conjunto de datos en subconjuntos de entrenamiento y validación.
Args:
caption_data (dict): Diccionario que contiene las descripciones asignadas.
train_size (float): Fracción del conjunto de datos que se usa como subconjunto de entrenamiento.
shuffle (bool): Se especifica si se quiere mezclar el conjunto de datos antes de dividirlo.
Returns:
Conjuntos de datos de entrenamiento y validación como dos dictados separados
"""
# 1. Lista de todas las imágenes
all_images = list(caption_data.keys())
# 2. Se mezcla para que sean aleatorias y no exista sesgo
if shuffle:
np.random.shuffle(all_images)
# 3. Se divide en conjuntos de entrenamiento y validación
train_size = int(len(caption_data) * train_size)
training_data = {
img_name: caption_data[img_name] for img_name in all_images[:train_size]
}
validation_data = {
img_name: caption_data[img_name] for img_name in all_images[train_size:]
}
# 4. Retorna las divisiones
return training_data, validation_data
# Carga del archivo .txt de descripciones
captions_mapping, text_data = load_captions_data("/app3/ROCAS.token.txt")
# Se divide en conjuntos de entrenamiento y validación
train_data, valid_data = train_val_split(captions_mapping)
print("Número de muestras de entrenamiento: ", len(train_data))
print("Número de muestras de validación: ", len(valid_data))
"""##**Vectorización de los datos de texto**
Esta sección transforma las descripciones del archivo de texto en vectores,
estandariza las cadenas de caracteres y aumenta el número de imágenes con características establecidas.
"""
def custom_standardization(input_string):
lowercase = tf.strings.lower(input_string)
return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")
strip_chars = "!\"$&'*+-/:<=>?@[\]^_`{|}~"
strip_chars = strip_chars.replace("<", "")
strip_chars = strip_chars.replace(">", "")
# Vectorización de los archivos de texto
vectorization = TextVectorization(
max_tokens=VOCAB_SIZE,
output_mode="int",
output_sequence_length=SEQ_LENGTH,
standardize=custom_standardization,
)
vectorization.adapt(text_data)
# Aumento del número de imágenes
image_augmentation = keras.Sequential(
[
layers.RandomFlip("horizontal"),
layers.RandomRotation(0.2),
layers.RandomContrast(0.3),
]
)
"""##**Canalización de datos para el entrenamiento**
Se genera pares de imágenes con sus respectivas descripciones usando `tf.data.Dataset`.
El proceso consiste de dos etapas:
- Leer la imagen del disco
- Tokenizar las descripciones de cada una de ellas
"""
def decode_and_resize(img_path):
img = tf.io.read_file(img_path)
img = tf.image.decode_jpeg(img, channels=3)
img = tf.image.resize(img, IMAGE_SIZE)
img = tf.image.convert_image_dtype(img, tf.float32)
return img
def process_input(img_path, captions):
return decode_and_resize(img_path), vectorization(captions)
def make_dataset(images, captions):
dataset = tf.data.Dataset.from_tensor_slices((images, captions))
dataset = dataset.shuffle(BATCH_SIZE * 8)
dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE)
dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE)
return dataset
# Lista de imágenes y de descripciones
train_dataset = make_dataset(list(train_data.keys()), list(train_data.values()))
valid_dataset = make_dataset(list(valid_data.keys()), list(valid_data.values()))
"""## **Construcción del modelo**
La descripción de imágenes consta de tres modelos:
- Una CNN: extrae las características de las imágenes.
- Un TransformerEncoder: por medio de un modelo pre-entrenado para trabajar con imágenes de rocas, se encarga de identificar y extraer las características (features) de las fotos de la base de datos.
- Un TransformerDecoder: toma como entradas las features del codificador y las descripciones (secuencias) e identifica el proceso para generar descripciones de imágenes.
"""
def get_cnn_model():
base_model = MobileNetV2( #resnet.ResNetV2
input_shape=(*IMAGE_SIZE, 3),
include_top=False,
weights="imagenet",
)
# base_model= tf.keras.models.load_model('/content/gdrive/MyDrive/best_model.h5')
# base_model.summary()
# Se congela el extractor de características
base_model.trainable = False
base_model_out = base_model.output
base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out)
cnn_model = keras.models.Model(base_model.input, base_model_out)
return cnn_model
class TransformerEncoderBlock(layers.Layer):
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.dense_dim = dense_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim, dropout=0.0
)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.dense_1 = layers.Dense(embed_dim, activation="relu")
def call(self, inputs, training, mask=None):
inputs = self.layernorm_1(inputs)
inputs = self.dense_1(inputs)
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs,
attention_mask=None,
training=training,
)
out_1 = self.layernorm_2(inputs + attention_output_1)
return out_1
class PositionalEmbedding(layers.Layer):
def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
super().__init__(**kwargs)
self.token_embeddings = layers.Embedding(
input_dim=vocab_size, output_dim=embed_dim
)
self.position_embeddings = layers.Embedding(
input_dim=sequence_length, output_dim=embed_dim
)
self.sequence_length = sequence_length
self.vocab_size = vocab_size
self.embed_dim = embed_dim
self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32))
def call(self, inputs):
length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=length, delta=1)
embedded_tokens = self.token_embeddings(inputs)
embedded_tokens = embedded_tokens * self.embed_scale
embedded_positions = self.position_embeddings(positions)
return embedded_tokens + embedded_positions
def compute_mask(self, inputs, mask=None):
return tf.math.not_equal(inputs, 0)
class TransformerDecoderBlock(layers.Layer):
def __init__(self, embed_dim, ff_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.ff_dim = ff_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim, dropout=0.1
)
self.attention_2 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim, dropout=0.1
)
self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu")
self.ffn_layer_2 = layers.Dense(embed_dim)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.layernorm_3 = layers.LayerNormalization()
self.embedding = PositionalEmbedding(
embed_dim=EMBED_DIM,
sequence_length=SEQ_LENGTH,
vocab_size=VOCAB_SIZE,
)
self.out = layers.Dense(VOCAB_SIZE, activation="softmax")
self.dropout_1 = layers.Dropout(0.3)
self.dropout_2 = layers.Dropout(0.5)
self.supports_masking = True
def call(self, inputs, encoder_outputs, training, mask=None):
inputs = self.embedding(inputs)
causal_mask = self.get_causal_attention_mask(inputs)
if mask is not None:
padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32)
combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32)
combined_mask = tf.minimum(combined_mask, causal_mask)
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs,
attention_mask=combined_mask,
training=training,
)
out_1 = self.layernorm_1(inputs + attention_output_1)
attention_output_2 = self.attention_2(
query=out_1,
value=encoder_outputs,
key=encoder_outputs,
attention_mask=padding_mask,
training=training,
)
out_2 = self.layernorm_2(out_1 + attention_output_2)
ffn_out = self.ffn_layer_1(out_2)
ffn_out = self.dropout_1(ffn_out, training=training)
ffn_out = self.ffn_layer_2(ffn_out)
ffn_out = self.layernorm_3(ffn_out + out_2, training=training)
ffn_out = self.dropout_2(ffn_out, training=training)
preds = self.out(ffn_out)
return preds
def get_causal_attention_mask(self, inputs):
input_shape = tf.shape(inputs)
batch_size, sequence_length = input_shape[0], input_shape[1]
i = tf.range(sequence_length)[:, tf.newaxis]
j = tf.range(sequence_length)
mask = tf.cast(i >= j, dtype="int32")
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
mult = tf.concat(
[
tf.expand_dims(batch_size, -1),
tf.constant([1, 1], dtype=tf.int32),
],
axis=0,
)
return tf.tile(mask, mult)
class ImageCaptioningModel(keras.Model):
def __init__(
self,
cnn_model,
encoder,
decoder,
num_captions_per_image=1,
image_aug=None,
):
super().__init__()
self.cnn_model = cnn_model
self.encoder = encoder
self.decoder = decoder
self.loss_tracker = keras.metrics.Mean(name="loss")
self.acc_tracker = keras.metrics.Mean(name="accuracy")
self.num_captions_per_image = num_captions_per_image
self.image_aug = image_aug
def calculate_loss(self, y_true, y_pred, mask):
loss = self.loss(y_true, y_pred)
mask = tf.cast(mask, dtype=loss.dtype)
loss *= mask
return tf.reduce_sum(loss) / tf.reduce_sum(mask)
def calculate_accuracy(self, y_true, y_pred, mask):
accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2))
accuracy = tf.math.logical_and(mask, accuracy)
accuracy = tf.cast(accuracy, dtype=tf.float32)
mask = tf.cast(mask, dtype=tf.float32)
return tf.reduce_sum(accuracy) / tf.reduce_sum(mask)
def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True):
encoder_out = self.encoder(img_embed, training=training)
batch_seq_inp = batch_seq[:, :-1]
batch_seq_true = batch_seq[:, 1:]
mask = tf.math.not_equal(batch_seq_true, 0)
batch_seq_pred = self.decoder(
batch_seq_inp, encoder_out, training=training, mask=mask
)
loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask)
acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask)
return loss, acc
def train_step(self, batch_data):
batch_img, batch_seq = batch_data
batch_loss = 0
batch_acc = 0
if self.image_aug:
batch_img = self.image_aug(batch_img)
# 1. Se obtiene los embeddings de imágenes
img_embed = self.cnn_model(batch_img)
# 2. Las descripciones pasan por el decodificador
# junto con las salidas del codificador y calcula
# la pérdida y la precisión para cada descripción
for i in range(self.num_captions_per_image):
with tf.GradientTape() as tape:
loss, acc = self._compute_caption_loss_and_acc(
img_embed, batch_seq[:, i, :], training=True
)
# 3. Actualización de pérdida y precisión
batch_loss += loss
batch_acc += acc
# 4. Se obtiene la lista de los pesos entrenables
train_vars = (
self.encoder.trainable_variables + self.decoder.trainable_variables
)
# 5. Se obtiene los gradientes
grads = tape.gradient(loss, train_vars)
# 6. Actualiza los pesos entrenables
self.optimizer.apply_gradients(zip(grads, train_vars))
# 7. Actualiza de los rastreadores
batch_acc /= float(self.num_captions_per_image)
self.loss_tracker.update_state(batch_loss)
self.acc_tracker.update_state(batch_acc)
# 8. Retorna los valores de pérdida y precisión
return {
"loss": self.loss_tracker.result(),
"acc": self.acc_tracker.result(),
}
def test_step(self, batch_data):
batch_img, batch_seq = batch_data
batch_loss = 0
batch_acc = 0
# 1. Obtiene los embeddings de imágenes
img_embed = self.cnn_model(batch_img)
# 2. Las descripciones pasan por el decodificador
# junto con las salidas del codificador y calcula
# la pérdida y la precisión para cada descripción
for i in range(self.num_captions_per_image):
loss, acc = self._compute_caption_loss_and_acc(
img_embed, batch_seq[:, i, :], training=False
)
# 3. Actualización de pérdida y precisión
batch_loss += loss
batch_acc += acc
batch_acc /= float(self.num_captions_per_image)
# 4. Actualiza de los rastreadores
self.loss_tracker.update_state(batch_loss)
self.acc_tracker.update_state(batch_acc)
# 5. Retorna los valores de pérdida y precisión
return {
"loss": self.loss_tracker.result(),
"acc": self.acc_tracker.result(),
}
@property
def metrics(self):
# Se necesita enumerar las métricas para que `reset_states()`
# pueda ser llamado automaticamente.
return [self.loss_tracker, self.acc_tracker]
cnn_model = get_cnn_model()
encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1)
decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2)
caption_model = ImageCaptioningModel(
cnn_model=cnn_model,
encoder=encoder,
decoder=decoder,
image_aug=image_augmentation,
)
"""## **Entrenamiento del modelo**"""
# Define la función de pérdida
cross_entropy = keras.losses.SparseCategoricalCrossentropy(
from_logits=False,
reduction='none',
)
# Criterios de parada anticipada
early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)
# Programador de tasa de aprendizaje para el optimizador
from tensorflow.keras.optimizers.schedules import LearningRateSchedule
class LRSchedule(LearningRateSchedule):
def __init__(self, post_warmup_learning_rate, warmup_steps):
super().__init__()
self.post_warmup_learning_rate = post_warmup_learning_rate
self.warmup_steps = warmup_steps
def __call__(self, step):
global_step = tf.cast(step, tf.float32)
warmup_steps = tf.cast(self.warmup_steps, tf.float32)
warmup_progress = global_step / warmup_steps
warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress
return tf.cond(
global_step < warmup_steps,
lambda: warmup_learning_rate,
lambda: self.post_warmup_learning_rate,
)
# Se crea un cronograma de tasa de aprendizaje
num_train_steps = len(train_dataset) * EPOCHS
num_warmup_steps = num_train_steps // 15
lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps)
# Se compila el modelo
caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy)
# Entrenamiento del modelo
caption_model.fit(
train_dataset,
epochs=EPOCHS,
validation_data=valid_dataset,
callbacks=[early_stopping],
)
"""### **Opción para guardar el modelo entrenado**"""
#con está opción vemos los pesos del modelo en una lista
pesos = caption_model.get_weights()
#guardamos esos pesos en formato npy - en este caso lo guardamos entrenado con una época, ya que si quitamos el fit o el entrenamiento nos da error, por lo que siempre tenemos que
#entrenarle al modelo con una época para después configurarle con otro con 10 épocas
np.save('/app3/pesos1.npy', np.array(pesos, dtype=object), allow_pickle=True)
#aquí configuramos los pesos que estaban entrenados con una época con diez - nosotros corrimos anteriormente con 10 y nos descargamos
import os
import numpy as np
archivo_pesos = os.path.join("/app3", "pesos10.npy")
pesos_nuevos = np.load(archivo_pesos, allow_pickle=True)
caption_model.set_weights(pesos_nuevos)
"""##**Verificación de las predicciones**"""
vocab = vectorization.get_vocabulary()
index_lookup = dict(zip(range(len(vocab)), vocab))
max_decoded_sentence_length = SEQ_LENGTH - 1
valid_images = list(valid_data.keys())
def generate_caption():
# Selecciona una imagen aleatoria del conjunto de datos de validación
sample_img = np.random.choice(valid_images)
print(sample_img)
# Lee la imagen del disco
sample_img = decode_and_resize(sample_img)
img = sample_img.numpy().clip(0, 255).astype(np.uint8)
plt.imshow(img)
plt.show()
# Pasa la imagen a la CNN
img = tf.expand_dims(sample_img, 0)
img = caption_model.cnn_model(img)
# Pasa las características de la imagen al codificador Transformer
encoded_img = caption_model.encoder(img, training=False)
# Genera la descripción usando el decodificador Transformer
decoded_caption = " "
for i in range(max_decoded_sentence_length):
tokenized_caption = vectorization([decoded_caption])[:, :-1]
mask = tf.math.not_equal(tokenized_caption, 0)
predictions = caption_model.decoder(
tokenized_caption, encoded_img, training=False, mask=mask
)
sampled_token_index = np.argmax(predictions[0, i, :])
sampled_token = index_lookup[sampled_token_index]
if sampled_token == "":
break
decoded_caption += " " + sampled_token
decoded_caption = decoded_caption.replace(" ", "")
decoded_caption = decoded_caption.replace(" ", "").strip()
print("Predicted Caption: ", decoded_caption)
# Verifica las predicciones para una imagen del dataset
Ex_1= generate_caption()