#Librerías necesarias import os import re import numpy as np import matplotlib.pyplot as plt import tensorflow as tf import keras import gradio as gr import requests from keras import layers from keras.applications import MobileNetV2 from keras.layers import TextVectorization from gtts import gTTS IMAGES_PATH = "Data" # Dimensiones de imagen IMAGE_SIZE = (359,359) # Tamaño del vocabulario VOCAB_SIZE = 700 # Longitud fija para cualquier secuencia SEQ_LENGTH = 400 # Dimensiones para los embeddings de imágenes y de tokens EMBED_DIM = 512 # Unidades por capa en la red feed-forward FF_DIM = 512 # Otros parámetros de entrenamiento BATCH_SIZE = 64 EPOCHS = 1 AUTOTUNE = tf.data.AUTOTUNE def load_captions_data(filename): """Carga las descripciones (texto) y los asigna a sus imágenes correspondientes. Argumentos: filename: Ruta al archivo de texto que contiene las descripciones. Returna: caption_mapping: Diccionario que mapea los nombres de las imágenes y sus descipciones correspondientes. text_data: Lista que contiene todos los subtítulos disponibles. """ with open(filename) as caption_file: caption_data = caption_file.readlines() caption_mapping = {} text_data = [] images_to_skip = set() for line in caption_data: line = line.rstrip("\n") # El nombre de la imagen se separa de su descripción por una tabulación img_name, caption = line.split("\t") print(img_name) print(caption) # Cada nombre de imagen tiene un sufijo `#img_name.jpg#0` img_name = img_name.split("#")[0] img_name = os.path.join(IMAGES_PATH, img_name.strip()) # Se eliminan las descripciones demasiado largas o demasiado cortas tokens = caption.strip().split() if img_name.endswith("jpg") and img_name not in images_to_skip: # Se agrega un token de inicio y fin a cada descripción caption = " " + caption.strip() + " " text_data.append(caption) if img_name in caption_mapping: caption_mapping[img_name].append(caption) else: caption_mapping[img_name] = [caption] for img_name in images_to_skip: if img_name in caption_mapping: del caption_mapping[img_name] return caption_mapping, text_data def train_val_split(caption_data, train_size=0.8, shuffle=True): """Divide el conjunto de datos en subconjuntos de entrenamiento y validación. Args: caption_data (dict): Diccionario que contiene las descripciones asignadas. train_size (float): Fracción del conjunto de datos que se usa como subconjunto de entrenamiento. shuffle (bool): Se especifica si se quiere mezclar el conjunto de datos antes de dividirlo. Returns: Conjuntos de datos de entrenamiento y validación como dos dictados separados """ # 1. Lista de todas las imágenes all_images = list(caption_data.keys()) # 2. Se mezcla para que sean aleatorias y no exista sesgo if shuffle: np.random.shuffle(all_images) # 3. Se divide en conjuntos de entrenamiento y validación train_size = int(len(caption_data) * train_size) training_data = { img_name: caption_data[img_name] for img_name in all_images[:train_size] } validation_data = { img_name: caption_data[img_name] for img_name in all_images[train_size:] } # 4. Retorna las divisiones return training_data, validation_data # Carga del archivo .txt de descripciones captions_mapping, text_data = load_captions_data("ROCAS.token.txt") # Se divide en conjuntos de entrenamiento y validación train_data, valid_data = train_val_split(captions_mapping) print("Número de muestras de entrenamiento: ", len(train_data)) print("Número de muestras de validación: ", len(valid_data)) """##**Vectorización de los datos de texto** Esta sección transforma las descripciones del archivo de texto en vectores, estandariza las cadenas de caracteres y aumenta el número de imágenes con características establecidas. """ def custom_standardization(input_string): lowercase = tf.strings.lower(input_string) return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "") strip_chars = "!\"$&'*+-/:<=>?@[\]^_`{|}~" strip_chars = strip_chars.replace("<", "") strip_chars = strip_chars.replace(">", "") # Vectorización de los archivos de texto vectorization = TextVectorization( max_tokens=VOCAB_SIZE, output_mode="int", output_sequence_length=SEQ_LENGTH, standardize=custom_standardization, ) vectorization.adapt(text_data) # Aumento del número de imágenes image_augmentation = keras.Sequential( [ layers.RandomFlip("horizontal"), layers.RandomRotation(0.2), layers.RandomContrast(0.3), ] ) """##**Canalización de datos para el entrenamiento** Se genera pares de imágenes con sus respectivas descripciones usando `tf.data.Dataset`. El proceso consiste de dos etapas: - Leer la imagen del disco - Tokenizar las descripciones de cada una de ellas """ def decode_and_resize(img_path): img = tf.io.read_file(img_path) img = tf.image.decode_jpeg(img, channels=3) img = tf.image.resize(img, IMAGE_SIZE) img = tf.image.convert_image_dtype(img, tf.float32) return img def process_input(img_path, captions): return decode_and_resize(img_path), vectorization(captions) def make_dataset(images, captions): dataset = tf.data.Dataset.from_tensor_slices((images, captions)) dataset = dataset.shuffle(BATCH_SIZE * 8) dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE) dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE) return dataset # Lista de imágenes y de descripciones train_dataset = make_dataset(list(train_data.keys()), list(train_data.values())) valid_dataset = make_dataset(list(valid_data.keys()), list(valid_data.values())) """## **Construcción del modelo** La descripción de imágenes consta de tres modelos: - Una CNN: extrae las características de las imágenes. - Un TransformerEncoder: por medio de un modelo pre-entrenado para trabajar con imágenes de rocas, se encarga de identificar y extraer las características (features) de las fotos de la base de datos. - Un TransformerDecoder: toma como entradas las features del codificador y las descripciones (secuencias) e identifica el proceso para generar descripciones de imágenes. """ def get_cnn_model(): base_model = MobileNetV2( #resnet.ResNetV2 input_shape=(*IMAGE_SIZE, 3), include_top=False, weights="imagenet", ) # base_model= tf.keras.models.load_model('/content/gdrive/MyDrive/best_model.h5') # base_model.summary() # Se congela el extractor de características base_model.trainable = False base_model_out = base_model.output base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out) cnn_model = keras.models.Model(base_model.input, base_model_out) return cnn_model class TransformerEncoderBlock(layers.Layer): def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.dense_dim = dense_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.0 ) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.dense_1 = layers.Dense(embed_dim, activation="relu") def call(self, inputs, training, mask=None): inputs = self.layernorm_1(inputs) inputs = self.dense_1(inputs) attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=None, training=training, ) out_1 = self.layernorm_2(inputs + attention_output_1) return out_1 class PositionalEmbedding(layers.Layer): def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs): super().__init__(**kwargs) self.token_embeddings = layers.Embedding( input_dim=vocab_size, output_dim=embed_dim ) self.position_embeddings = layers.Embedding( input_dim=sequence_length, output_dim=embed_dim ) self.sequence_length = sequence_length self.vocab_size = vocab_size self.embed_dim = embed_dim self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32)) def call(self, inputs): length = tf.shape(inputs)[-1] positions = tf.range(start=0, limit=length, delta=1) embedded_tokens = self.token_embeddings(inputs) embedded_tokens = embedded_tokens * self.embed_scale embedded_positions = self.position_embeddings(positions) return embedded_tokens + embedded_positions def compute_mask(self, inputs, mask=None): return tf.math.not_equal(inputs, 0) class TransformerDecoderBlock(layers.Layer): def __init__(self, embed_dim, ff_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.ff_dim = ff_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.1 ) self.attention_2 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.1 ) self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu") self.ffn_layer_2 = layers.Dense(embed_dim) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.layernorm_3 = layers.LayerNormalization() self.embedding = PositionalEmbedding( embed_dim=EMBED_DIM, sequence_length=SEQ_LENGTH, vocab_size=VOCAB_SIZE, ) self.out = layers.Dense(VOCAB_SIZE, activation="softmax") self.dropout_1 = layers.Dropout(0.3) self.dropout_2 = layers.Dropout(0.5) self.supports_masking = True def call(self, inputs, encoder_outputs, training, mask=None): inputs = self.embedding(inputs) causal_mask = self.get_causal_attention_mask(inputs) if mask is not None: padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32) combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32) combined_mask = tf.minimum(combined_mask, causal_mask) attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=combined_mask, training=training, ) out_1 = self.layernorm_1(inputs + attention_output_1) attention_output_2 = self.attention_2( query=out_1, value=encoder_outputs, key=encoder_outputs, attention_mask=padding_mask, training=training, ) out_2 = self.layernorm_2(out_1 + attention_output_2) ffn_out = self.ffn_layer_1(out_2) ffn_out = self.dropout_1(ffn_out, training=training) ffn_out = self.ffn_layer_2(ffn_out) ffn_out = self.layernorm_3(ffn_out + out_2, training=training) ffn_out = self.dropout_2(ffn_out, training=training) preds = self.out(ffn_out) return preds def get_causal_attention_mask(self, inputs): input_shape = tf.shape(inputs) batch_size, sequence_length = input_shape[0], input_shape[1] i = tf.range(sequence_length)[:, tf.newaxis] j = tf.range(sequence_length) mask = tf.cast(i >= j, dtype="int32") mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) mult = tf.concat( [ tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32), ], axis=0, ) return tf.tile(mask, mult) class ImageCaptioningModel(keras.Model): def __init__( self, cnn_model, encoder, decoder, num_captions_per_image=1, image_aug=None, ): super().__init__() self.cnn_model = cnn_model self.encoder = encoder self.decoder = decoder self.loss_tracker = keras.metrics.Mean(name="loss") self.acc_tracker = keras.metrics.Mean(name="accuracy") self.num_captions_per_image = num_captions_per_image self.image_aug = image_aug def calculate_loss(self, y_true, y_pred, mask): loss = self.loss(y_true, y_pred) mask = tf.cast(mask, dtype=loss.dtype) loss *= mask return tf.reduce_sum(loss) / tf.reduce_sum(mask) def calculate_accuracy(self, y_true, y_pred, mask): accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2)) accuracy = tf.math.logical_and(mask, accuracy) accuracy = tf.cast(accuracy, dtype=tf.float32) mask = tf.cast(mask, dtype=tf.float32) return tf.reduce_sum(accuracy) / tf.reduce_sum(mask) def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True): encoder_out = self.encoder(img_embed, training=training) batch_seq_inp = batch_seq[:, :-1] batch_seq_true = batch_seq[:, 1:] mask = tf.math.not_equal(batch_seq_true, 0) batch_seq_pred = self.decoder( batch_seq_inp, encoder_out, training=training, mask=mask ) loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask) acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask) return loss, acc def train_step(self, batch_data): batch_img, batch_seq = batch_data batch_loss = 0 batch_acc = 0 if self.image_aug: batch_img = self.image_aug(batch_img) # 1. Se obtiene los embeddings de imágenes img_embed = self.cnn_model(batch_img) # 2. Las descripciones pasan por el decodificador # junto con las salidas del codificador y calcula # la pérdida y la precisión para cada descripción for i in range(self.num_captions_per_image): with tf.GradientTape() as tape: loss, acc = self._compute_caption_loss_and_acc( img_embed, batch_seq[:, i, :], training=True ) # 3. Actualización de pérdida y precisión batch_loss += loss batch_acc += acc # 4. Se obtiene la lista de los pesos entrenables train_vars = ( self.encoder.trainable_variables + self.decoder.trainable_variables ) # 5. Se obtiene los gradientes grads = tape.gradient(loss, train_vars) # 6. Actualiza los pesos entrenables self.optimizer.apply_gradients(zip(grads, train_vars)) # 7. Actualiza de los rastreadores batch_acc /= float(self.num_captions_per_image) self.loss_tracker.update_state(batch_loss) self.acc_tracker.update_state(batch_acc) # 8. Retorna los valores de pérdida y precisión return { "loss": self.loss_tracker.result(), "acc": self.acc_tracker.result(), } def test_step(self, batch_data): batch_img, batch_seq = batch_data batch_loss = 0 batch_acc = 0 # 1. Obtiene los embeddings de imágenes img_embed = self.cnn_model(batch_img) # 2. Las descripciones pasan por el decodificador # junto con las salidas del codificador y calcula # la pérdida y la precisión para cada descripción for i in range(self.num_captions_per_image): loss, acc = self._compute_caption_loss_and_acc( img_embed, batch_seq[:, i, :], training=False ) # 3. Actualización de pérdida y precisión batch_loss += loss batch_acc += acc batch_acc /= float(self.num_captions_per_image) # 4. Actualiza de los rastreadores self.loss_tracker.update_state(batch_loss) self.acc_tracker.update_state(batch_acc) # 5. Retorna los valores de pérdida y precisión return { "loss": self.loss_tracker.result(), "acc": self.acc_tracker.result(), } @property def metrics(self): # Se necesita enumerar las métricas para que `reset_states()` # pueda ser llamado automaticamente. return [self.loss_tracker, self.acc_tracker] cnn_model = get_cnn_model() encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1) decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2) caption_model = ImageCaptioningModel( cnn_model=cnn_model, encoder=encoder, decoder=decoder, image_aug=image_augmentation, ) """## **Entrenamiento del modelo**""" # Define la función de pérdida cross_entropy = keras.losses.SparseCategoricalCrossentropy( from_logits=False, reduction='none', ) # Criterios de parada anticipada early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True) # Programador de tasa de aprendizaje para el optimizador from tensorflow.keras.optimizers.schedules import LearningRateSchedule class LRSchedule(LearningRateSchedule): def __init__(self, post_warmup_learning_rate, warmup_steps): super().__init__() self.post_warmup_learning_rate = post_warmup_learning_rate self.warmup_steps = warmup_steps def __call__(self, step): global_step = tf.cast(step, tf.float32) warmup_steps = tf.cast(self.warmup_steps, tf.float32) warmup_progress = global_step / warmup_steps warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress return tf.cond( global_step < warmup_steps, lambda: warmup_learning_rate, lambda: self.post_warmup_learning_rate, ) # Se crea un cronograma de tasa de aprendizaje num_train_steps = len(train_dataset) * EPOCHS num_warmup_steps = num_train_steps // 15 lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps) # Se compila el modelo caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy) # Entrenamiento del modelo caption_model.fit( train_dataset, epochs=EPOCHS, validation_data=valid_dataset, callbacks=[early_stopping], ) """### **Opción para guardar el modelo entrenado**""" #con está opción vemos los pesos del modelo en una lista #pesos = caption_model.get_weights() #guardamos esos pesos en formato npy - en este caso lo guardamos entrenado con una época, ya que si quitamos el fit o el entrenamiento nos da error, por lo que siempre tenemos que #entrenarle al modelo con una época para después configurarle con otro con 10 épocas #np.save('pesos1.npy', np.array(pesos, dtype=object), allow_pickle=True) #aquí configuramos los pesos que estaban entrenados con una época con diez - nosotros corrimos anteriormente con 10 y nos descargamos import os import numpy as np #Carga del modelo archivo_pesos = os.path.join("pesos10.npy") caption_model = np.load(archivo_pesos, allow_pickle=True) #Interfaz para gradio def generate_caption(sample_img): # Decodifica y redimensiona la imagen de entrada sample_img = decode_and_resize(sample_img) img = sample_img.numpy().clip(0, 255).astype(np.uint8) plt.imshow(img) plt.show() # Prepara la imagen para el modelo img = tf.expand_dims(sample_img, 0) img_embed = caption_model.cnn_model(img) encoded_img = caption_model.encoder(img_embed, training=False) # Inicializa la descripción con el token de inicio decoded_caption = "" # Itera para generar la descripción for i in range(max_decoded_sentence_length): tokenized_caption = vectorization([decoded_caption])[:, :-1] mask = tf.math.not_equal(tokenized_caption, 0) predictions = caption_model.decoder( tokenized_caption, encoded_img, training=False, mask=mask ) sampled_token_index = np.argmax(predictions[0, i, :]) sampled_token = index_lookup[sampled_token_index] if sampled_token == "": break decoded_caption += " " + sampled_token # Elimina los tokens de inicio y fin de la descripción decoded_caption = decoded_caption.replace(" ", "") decoded_caption = decoded_caption.replace(" ", "").strip() # Convierte la descripción a audio text_to_say = decoded_caption lenguage = "es-es" gtts_object = gTTS(text=text_to_say, lang=lenguage, slow=False) gtts_object.save("gtts.mp3") audio = "gtts.mp3" return decoded_caption, audio demo = gr.Interface(fn = generate_caption,inputs = gr.Image(label="Imagen"), outputs = [gr.Text(label="Descripción textual"), gr.Audio(label="Audio")], theme ='darkhuggingface', title = 'DESCRIPCIÓN DE IMÁGENES DE RIPIOS DE PERFORACIÓN', description = 'La siguiente interfaz describirá de forma automática imágenes de ripios de perforación. El usuario deberá ingresar en el recuadro de la izquierda la imagen a ser procesada, y en los recuadros de la derecha se mostrará la descripción textual y oral de la imagen. Se recomienda ingresar imágenes sin ningún tipo de mediciones o símbolos ya que esto podría afectar en la predicción del modelo.', article = 'Nota: En el caso de ingresar imágenes que no tengan relación a muestras de ripios de perforación, los autores de esta aplicación no se hacen responsables por los resultados de estas, el modelo de descripción de ripios de perforación está entrenado para dar un resultado.') demo.launch()