# -*- coding: utf-8 -*- """Solo_descripcion_ripios Automatically generated by Colaboratory. Original file is located at https://colab.research.google.com/drive/1RYsNm31Nta3rhqrgDbBsBCFcT3l-RZpC """ """# **Descripción y medición de ripios de perforación mediante IA** Este trabajo es una adaptación de los códigos de [A_K_Nain, 2021](https://keras.io/examples/vision/image_captioning/) y de [Sitar, M. & Leary, R., 2023](https://gchron.copernicus.org/articles/5/109/2023/)
**Autores:** Jhoel Ortiz, Christian Mejía & Paola Vargas
**Fecha de creación:** 2024/01/06
**Última modificación:** 2024/02/15
**Descripción:** Este trabajo implementa modelos de CNN y TNN para la descripción y medición de imágenes de ripios de perforación. El siguiente Notebook de Google Colab se esquematiza de la siguiente manera: **Descripción textual y oral de imágenes de ripios de perforación** - Carga e instalación de librerías - Procesamiento de los archivos de imagen y descripciones - Vectorización de los datos de texto - Canalización de datos para el entrenamiento - Construcción del modelo - Entrenamiento del modelo - Verificación de las predicciones - Evaluación con BLEU - Predicción de imágenes externas **Medición de imágenes de ripios de perforación** - Carga e instalación de librerías - Inspección de la imagen - Descarga e inicialización del modelo - Evaluación de prueba - Procesamiento automatizado - Ilustración de resultados automáticos - Procesamiento semi-automático - Ilustración de resultados semi-automáticos # **Descripción textual y oral de imágenes de ripios de perforación** Esta sección contiene todos los pasos a seguir para el desarrollo de un modelo de IA que describa automaticamente de forma escrita y oral imágenes de ripios de perforación aplicandao una RNN y un Transformer. ##**Carga e instalación de librerías** Esta subsección carga e instala las librerías que se requieren para la descripción textual y oral de imágenes de ripios de perforación. """ # Carga de librerías import os os.environ["KERAS_BACKEND"] = "tensorflow" import re import numpy as np import matplotlib.pyplot as plt import tensorflow as tf import keras from keras import layers from keras.applications import MobileNetV2 from keras.layers import TextVectorization keras.utils.set_random_seed(111) from gtts import gTTS """##**Procesamiento de las imágenes y descripciones de ripios de perforación** La siguiente subsección realiza lo siguiente: * Carga los archivos de imagen y de texto de ripios de perforación * Define las características y parámetros base de los archivos ingresados * Divide al conjunto de datos en subconjuntos de entrenamiento y validación """ IMAGES_PATHS = ["/app3/Data", "/app3/Data1", "/app3/Data2"] IMAGES_PATH = IMAGES_PATHS[0] # Accede al primer elemento de la lista # Dimensiones de imagen IMAGE_SIZE = (359,359) # Tamaño del vocabulario VOCAB_SIZE = 700 # Longitud fija para cualquier secuencia SEQ_LENGTH = 400 # Dimensiones para los embeddings de imágenes y de tokens EMBED_DIM = 512 # Unidades por capa en la red feed-forward FF_DIM = 512 # Otros parámetros de entrenamiento BATCH_SIZE = 64 EPOCHS = 1 AUTOTUNE = tf.data.AUTOTUNE def load_captions_data(filename): """Carga las descripciones (texto) y los asigna a sus imágenes correspondientes. Argumentos: filename: Ruta al archivo de texto que contiene las descripciones. Returna: caption_mapping: Diccionario que mapea los nombres de las imágenes y sus descipciones correspondientes. text_data: Lista que contiene todos los subtítulos disponibles. """ with open(filename) as caption_file: caption_data = caption_file.readlines() caption_mapping = {} text_data = [] images_to_skip = set() for line in caption_data: line = line.rstrip("\n") # El nombre de la imagen se separa de su descripción por una tabulación img_name, caption = line.split("\t") print(img_name) print(caption) # Cada nombre de imagen tiene un sufijo `#img_name.jpg#0` img_name = img_name.split("#")[0] img_name = os.path.join(IMAGES_PATH, img_name.strip()) # Se eliminan las descripciones demasiado largas o demasiado cortas tokens = caption.strip().split() if img_name.endswith("jpg") and img_name not in images_to_skip: # Se agrega un token de inicio y fin a cada descripción caption = " " + caption.strip() + " " text_data.append(caption) if img_name in caption_mapping: caption_mapping[img_name].append(caption) else: caption_mapping[img_name] = [caption] for img_name in images_to_skip: if img_name in caption_mapping: del caption_mapping[img_name] return caption_mapping, text_data def train_val_split(caption_data, train_size=0.8, shuffle=True): """Divide el conjunto de datos en subconjuntos de entrenamiento y validación. Args: caption_data (dict): Diccionario que contiene las descripciones asignadas. train_size (float): Fracción del conjunto de datos que se usa como subconjunto de entrenamiento. shuffle (bool): Se especifica si se quiere mezclar el conjunto de datos antes de dividirlo. Returns: Conjuntos de datos de entrenamiento y validación como dos dictados separados """ # 1. Lista de todas las imágenes all_images = list(caption_data.keys()) # 2. Se mezcla para que sean aleatorias y no exista sesgo if shuffle: np.random.shuffle(all_images) # 3. Se divide en conjuntos de entrenamiento y validación train_size = int(len(caption_data) * train_size) training_data = { img_name: caption_data[img_name] for img_name in all_images[:train_size] } validation_data = { img_name: caption_data[img_name] for img_name in all_images[train_size:] } # 4. Retorna las divisiones return training_data, validation_data # Carga del archivo .txt de descripciones captions_mapping, text_data = load_captions_data("/app3/ROCAS.token.txt") # Se divide en conjuntos de entrenamiento y validación train_data, valid_data = train_val_split(captions_mapping) print("Número de muestras de entrenamiento: ", len(train_data)) print("Número de muestras de validación: ", len(valid_data)) """##**Vectorización de los datos de texto** Esta sección transforma las descripciones del archivo de texto en vectores, estandariza las cadenas de caracteres y aumenta el número de imágenes con características establecidas. """ def custom_standardization(input_string): lowercase = tf.strings.lower(input_string) return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "") strip_chars = "!\"$&'*+-/:<=>?@[\]^_`{|}~" strip_chars = strip_chars.replace("<", "") strip_chars = strip_chars.replace(">", "") # Vectorización de los archivos de texto vectorization = TextVectorization( max_tokens=VOCAB_SIZE, output_mode="int", output_sequence_length=SEQ_LENGTH, standardize=custom_standardization, ) vectorization.adapt(text_data) # Aumento del número de imágenes image_augmentation = keras.Sequential( [ layers.RandomFlip("horizontal"), layers.RandomRotation(0.2), layers.RandomContrast(0.3), ] ) """##**Canalización de datos para el entrenamiento** Se genera pares de imágenes con sus respectivas descripciones usando `tf.data.Dataset`. El proceso consiste de dos etapas: - Leer la imagen del disco - Tokenizar las descripciones de cada una de ellas """ def decode_and_resize(img_path): img = tf.io.read_file(img_path) img = tf.image.decode_jpeg(img, channels=3) img = tf.image.resize(img, IMAGE_SIZE) img = tf.image.convert_image_dtype(img, tf.float32) return img def process_input(img_path, captions): return decode_and_resize(img_path), vectorization(captions) def make_dataset(images, captions): dataset = tf.data.Dataset.from_tensor_slices((images, captions)) dataset = dataset.shuffle(BATCH_SIZE * 8) dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE) dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE) return dataset # Lista de imágenes y de descripciones train_dataset = make_dataset(list(train_data.keys()), list(train_data.values())) valid_dataset = make_dataset(list(valid_data.keys()), list(valid_data.values())) """## **Construcción del modelo** La descripción de imágenes consta de tres modelos: - Una CNN: extrae las características de las imágenes. - Un TransformerEncoder: por medio de un modelo pre-entrenado para trabajar con imágenes de rocas, se encarga de identificar y extraer las características (features) de las fotos de la base de datos. - Un TransformerDecoder: toma como entradas las features del codificador y las descripciones (secuencias) e identifica el proceso para generar descripciones de imágenes. """ def get_cnn_model(): base_model = MobileNetV2( #resnet.ResNetV2 input_shape=(*IMAGE_SIZE, 3), include_top=False, weights="imagenet", ) # base_model= tf.keras.models.load_model('/content/gdrive/MyDrive/best_model.h5') # base_model.summary() # Se congela el extractor de características base_model.trainable = False base_model_out = base_model.output base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out) cnn_model = keras.models.Model(base_model.input, base_model_out) return cnn_model class TransformerEncoderBlock(layers.Layer): def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.dense_dim = dense_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.0 ) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.dense_1 = layers.Dense(embed_dim, activation="relu") def call(self, inputs, training, mask=None): inputs = self.layernorm_1(inputs) inputs = self.dense_1(inputs) attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=None, training=training, ) out_1 = self.layernorm_2(inputs + attention_output_1) return out_1 class PositionalEmbedding(layers.Layer): def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs): super().__init__(**kwargs) self.token_embeddings = layers.Embedding( input_dim=vocab_size, output_dim=embed_dim ) self.position_embeddings = layers.Embedding( input_dim=sequence_length, output_dim=embed_dim ) self.sequence_length = sequence_length self.vocab_size = vocab_size self.embed_dim = embed_dim self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32)) def call(self, inputs): length = tf.shape(inputs)[-1] positions = tf.range(start=0, limit=length, delta=1) embedded_tokens = self.token_embeddings(inputs) embedded_tokens = embedded_tokens * self.embed_scale embedded_positions = self.position_embeddings(positions) return embedded_tokens + embedded_positions def compute_mask(self, inputs, mask=None): return tf.math.not_equal(inputs, 0) class TransformerDecoderBlock(layers.Layer): def __init__(self, embed_dim, ff_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.ff_dim = ff_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.1 ) self.attention_2 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.1 ) self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu") self.ffn_layer_2 = layers.Dense(embed_dim) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.layernorm_3 = layers.LayerNormalization() self.embedding = PositionalEmbedding( embed_dim=EMBED_DIM, sequence_length=SEQ_LENGTH, vocab_size=VOCAB_SIZE, ) self.out = layers.Dense(VOCAB_SIZE, activation="softmax") self.dropout_1 = layers.Dropout(0.3) self.dropout_2 = layers.Dropout(0.5) self.supports_masking = True def call(self, inputs, encoder_outputs, training, mask=None): inputs = self.embedding(inputs) causal_mask = self.get_causal_attention_mask(inputs) if mask is not None: padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32) combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32) combined_mask = tf.minimum(combined_mask, causal_mask) attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=combined_mask, training=training, ) out_1 = self.layernorm_1(inputs + attention_output_1) attention_output_2 = self.attention_2( query=out_1, value=encoder_outputs, key=encoder_outputs, attention_mask=padding_mask, training=training, ) out_2 = self.layernorm_2(out_1 + attention_output_2) ffn_out = self.ffn_layer_1(out_2) ffn_out = self.dropout_1(ffn_out, training=training) ffn_out = self.ffn_layer_2(ffn_out) ffn_out = self.layernorm_3(ffn_out + out_2, training=training) ffn_out = self.dropout_2(ffn_out, training=training) preds = self.out(ffn_out) return preds def get_causal_attention_mask(self, inputs): input_shape = tf.shape(inputs) batch_size, sequence_length = input_shape[0], input_shape[1] i = tf.range(sequence_length)[:, tf.newaxis] j = tf.range(sequence_length) mask = tf.cast(i >= j, dtype="int32") mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) mult = tf.concat( [ tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32), ], axis=0, ) return tf.tile(mask, mult) class ImageCaptioningModel(keras.Model): def __init__( self, cnn_model, encoder, decoder, num_captions_per_image=1, image_aug=None, ): super().__init__() self.cnn_model = cnn_model self.encoder = encoder self.decoder = decoder self.loss_tracker = keras.metrics.Mean(name="loss") self.acc_tracker = keras.metrics.Mean(name="accuracy") self.num_captions_per_image = num_captions_per_image self.image_aug = image_aug def calculate_loss(self, y_true, y_pred, mask): loss = self.loss(y_true, y_pred) mask = tf.cast(mask, dtype=loss.dtype) loss *= mask return tf.reduce_sum(loss) / tf.reduce_sum(mask) def calculate_accuracy(self, y_true, y_pred, mask): accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2)) accuracy = tf.math.logical_and(mask, accuracy) accuracy = tf.cast(accuracy, dtype=tf.float32) mask = tf.cast(mask, dtype=tf.float32) return tf.reduce_sum(accuracy) / tf.reduce_sum(mask) def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True): encoder_out = self.encoder(img_embed, training=training) batch_seq_inp = batch_seq[:, :-1] batch_seq_true = batch_seq[:, 1:] mask = tf.math.not_equal(batch_seq_true, 0) batch_seq_pred = self.decoder( batch_seq_inp, encoder_out, training=training, mask=mask ) loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask) acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask) return loss, acc def train_step(self, batch_data): batch_img, batch_seq = batch_data batch_loss = 0 batch_acc = 0 if self.image_aug: batch_img = self.image_aug(batch_img) # 1. Se obtiene los embeddings de imágenes img_embed = self.cnn_model(batch_img) # 2. Las descripciones pasan por el decodificador # junto con las salidas del codificador y calcula # la pérdida y la precisión para cada descripción for i in range(self.num_captions_per_image): with tf.GradientTape() as tape: loss, acc = self._compute_caption_loss_and_acc( img_embed, batch_seq[:, i, :], training=True ) # 3. Actualización de pérdida y precisión batch_loss += loss batch_acc += acc # 4. Se obtiene la lista de los pesos entrenables train_vars = ( self.encoder.trainable_variables + self.decoder.trainable_variables ) # 5. Se obtiene los gradientes grads = tape.gradient(loss, train_vars) # 6. Actualiza los pesos entrenables self.optimizer.apply_gradients(zip(grads, train_vars)) # 7. Actualiza de los rastreadores batch_acc /= float(self.num_captions_per_image) self.loss_tracker.update_state(batch_loss) self.acc_tracker.update_state(batch_acc) # 8. Retorna los valores de pérdida y precisión return { "loss": self.loss_tracker.result(), "acc": self.acc_tracker.result(), } def test_step(self, batch_data): batch_img, batch_seq = batch_data batch_loss = 0 batch_acc = 0 # 1. Obtiene los embeddings de imágenes img_embed = self.cnn_model(batch_img) # 2. Las descripciones pasan por el decodificador # junto con las salidas del codificador y calcula # la pérdida y la precisión para cada descripción for i in range(self.num_captions_per_image): loss, acc = self._compute_caption_loss_and_acc( img_embed, batch_seq[:, i, :], training=False ) # 3. Actualización de pérdida y precisión batch_loss += loss batch_acc += acc batch_acc /= float(self.num_captions_per_image) # 4. Actualiza de los rastreadores self.loss_tracker.update_state(batch_loss) self.acc_tracker.update_state(batch_acc) # 5. Retorna los valores de pérdida y precisión return { "loss": self.loss_tracker.result(), "acc": self.acc_tracker.result(), } @property def metrics(self): # Se necesita enumerar las métricas para que `reset_states()` # pueda ser llamado automaticamente. return [self.loss_tracker, self.acc_tracker] cnn_model = get_cnn_model() encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1) decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2) caption_model = ImageCaptioningModel( cnn_model=cnn_model, encoder=encoder, decoder=decoder, image_aug=image_augmentation, ) """## **Entrenamiento del modelo**""" # Define la función de pérdida cross_entropy = keras.losses.SparseCategoricalCrossentropy( from_logits=False, reduction='none', ) # Criterios de parada anticipada early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True) # Programador de tasa de aprendizaje para el optimizador from tensorflow.keras.optimizers.schedules import LearningRateSchedule class LRSchedule(LearningRateSchedule): def __init__(self, post_warmup_learning_rate, warmup_steps): super().__init__() self.post_warmup_learning_rate = post_warmup_learning_rate self.warmup_steps = warmup_steps def __call__(self, step): global_step = tf.cast(step, tf.float32) warmup_steps = tf.cast(self.warmup_steps, tf.float32) warmup_progress = global_step / warmup_steps warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress return tf.cond( global_step < warmup_steps, lambda: warmup_learning_rate, lambda: self.post_warmup_learning_rate, ) # Se crea un cronograma de tasa de aprendizaje num_train_steps = len(train_dataset) * EPOCHS num_warmup_steps = num_train_steps // 15 lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps) # Se compila el modelo caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy) # Entrenamiento del modelo caption_model.fit( train_dataset, epochs=EPOCHS, validation_data=valid_dataset, callbacks=[early_stopping], ) """### **Opción para guardar el modelo entrenado**""" #con está opción vemos los pesos del modelo en una lista pesos = caption_model.get_weights() #guardamos esos pesos en formato npy - en este caso lo guardamos entrenado con una época, ya que si quitamos el fit o el entrenamiento nos da error, por lo que siempre tenemos que #entrenarle al modelo con una época para después configurarle con otro con 10 épocas np.save('/app3/pesos1.npy', np.array(pesos, dtype=object), allow_pickle=True) #aquí configuramos los pesos que estaban entrenados con una época con diez - nosotros corrimos anteriormente con 10 y nos descargamos import os import numpy as np archivo_pesos = os.path.join("/app3", "pesos10.npy") pesos_nuevos = np.load(archivo_pesos, allow_pickle=True) caption_model.set_weights(pesos_nuevos) """##**Verificación de las predicciones**""" vocab = vectorization.get_vocabulary() index_lookup = dict(zip(range(len(vocab)), vocab)) max_decoded_sentence_length = SEQ_LENGTH - 1 valid_images = list(valid_data.keys()) def generate_caption(): # Selecciona una imagen aleatoria del conjunto de datos de validación sample_img = np.random.choice(valid_images) print(sample_img) # Lee la imagen del disco sample_img = decode_and_resize(sample_img) img = sample_img.numpy().clip(0, 255).astype(np.uint8) plt.imshow(img) plt.show() # Pasa la imagen a la CNN img = tf.expand_dims(sample_img, 0) img = caption_model.cnn_model(img) # Pasa las características de la imagen al codificador Transformer encoded_img = caption_model.encoder(img, training=False) # Genera la descripción usando el decodificador Transformer decoded_caption = " " for i in range(max_decoded_sentence_length): tokenized_caption = vectorization([decoded_caption])[:, :-1] mask = tf.math.not_equal(tokenized_caption, 0) predictions = caption_model.decoder( tokenized_caption, encoded_img, training=False, mask=mask ) sampled_token_index = np.argmax(predictions[0, i, :]) sampled_token = index_lookup[sampled_token_index] if sampled_token == "": break decoded_caption += " " + sampled_token decoded_caption = decoded_caption.replace(" ", "") decoded_caption = decoded_caption.replace(" ", "").strip() print("Predicted Caption: ", decoded_caption) # Verifica las predicciones para una imagen del dataset Ex_1= generate_caption()