#Librerías necesarias import os import re import numpy as np import matplotlib.pyplot as plt import tensorflow as tf import keras import gradio as gr import requests from keras import layers from keras.applications import MobileNetV2 from keras.layers import TextVectorization from gtts import gTTS #Procesamiento de las imágenes IMAGES_PATH = "training data" IMAGE_SIZE = (500,500) VOCAB_SIZE = 700 SEQ_LENGTH = 400 EMBED_DIM = 512 FF_DIM = 512 BATCH_SIZE = 64 EPOCHS = 1 AUTOTUNE = tf.data.AUTOTUNE def load_captions_data(filename): with open(filename) as caption_file: caption_data = caption_file.readlines() caption_mapping = {} text_data = [] images_to_skip = set() for line in caption_data: line = line.rstrip("\n") img_name, caption = line.split("\t") print(img_name) print(caption) img_name = img_name.split("#")[0] img_name = os.path.join(IMAGES_PATH, img_name.strip()) tokens = caption.strip().split() if img_name.endswith("jpg") and img_name not in images_to_skip: caption = " " + caption.strip() + " " text_data.append(caption) if img_name in caption_mapping: caption_mapping[img_name].append(caption) else: caption_mapping[img_name] = [caption] for img_name in images_to_skip: if img_name in caption_mapping: del caption_mapping[img_name] return caption_mapping, text_data def train_val_split(caption_data, train_size=0.8, shuffle=True): all_images = list(caption_data.keys()) if shuffle: np.random.shuffle(all_images) train_size = int(len(caption_data) * train_size) training_data = { img_name: caption_data[img_name] for img_name in all_images[:train_size] } validation_data = { img_name: caption_data[img_name] for img_name in all_images[train_size:] } return training_data, validation_data captions_mapping, text_data = load_captions_data("RIPIOS.token.txt") train_data, valid_data = train_val_split(captions_mapping) #Vectorización de los datos de texto def custom_standardization(input_string): lowercase = tf.strings.lower(input_string) return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "") strip_chars = "!\"$&'*+-/:<=>?@[\]^_`{|}~" strip_chars = strip_chars.replace("<", "") strip_chars = strip_chars.replace(">", "") vectorization = TextVectorization( max_tokens=VOCAB_SIZE, output_mode="int", output_sequence_length=SEQ_LENGTH, standardize=custom_standardization, ) vectorization.adapt(text_data) image_augmentation = keras.Sequential( [ layers.RandomFlip("horizontal"), layers.RandomRotation(0.2), layers.RandomContrast(0.3), ] ) #Canalización de datos para entrenamiento def decode_and_resize(img_path): img = tf.io.read_file(img_path) img = tf.image.decode_jpeg(img, channels=3) img = tf.image.resize(img, IMAGE_SIZE) img = tf.image.convert_image_dtype(img, tf.float32) return img def process_input(img_path, captions): return decode_and_resize(img_path), vectorization(captions) def make_dataset(images, captions): dataset = tf.data.Dataset.from_tensor_slices((images, captions)) dataset = dataset.shuffle(BATCH_SIZE * 8) dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE) dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE) return dataset train_dataset = make_dataset(list(train_data.keys()), list(train_data.values())) valid_dataset = make_dataset(list(valid_data.keys()), list(valid_data.values())) #Construcción del modelo def get_cnn_model(): base_model = MobileNetV2( #resnet.ResNetV2 input_shape=(*IMAGE_SIZE, 3), include_top=False, weights="imagenet", ) base_model.trainable = False base_model_out = base_model.output base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out) cnn_model = keras.models.Model(base_model.input, base_model_out) return cnn_model class TransformerEncoderBlock(layers.Layer): def __init__(self, embed_dim, dense_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.dense_dim = dense_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.0 ) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.dense_1 = layers.Dense(embed_dim, activation="relu") def call(self, inputs, training, mask=None): inputs = self.layernorm_1(inputs) inputs = self.dense_1(inputs) attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=None, training=training, ) out_1 = self.layernorm_2(inputs + attention_output_1) return out_1 class PositionalEmbedding(layers.Layer): def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs): super().__init__(**kwargs) self.token_embeddings = layers.Embedding( input_dim=vocab_size, output_dim=embed_dim ) self.position_embeddings = layers.Embedding( input_dim=sequence_length, output_dim=embed_dim ) self.sequence_length = sequence_length self.vocab_size = vocab_size self.embed_dim = embed_dim self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32)) def call(self, inputs): length = tf.shape(inputs)[-1] positions = tf.range(start=0, limit=length, delta=1) embedded_tokens = self.token_embeddings(inputs) embedded_tokens = embedded_tokens * self.embed_scale embedded_positions = self.position_embeddings(positions) return embedded_tokens + embedded_positions def compute_mask(self, inputs, mask=None): return tf.math.not_equal(inputs, 0) class TransformerDecoderBlock(layers.Layer): def __init__(self, embed_dim, ff_dim, num_heads, **kwargs): super().__init__(**kwargs) self.embed_dim = embed_dim self.ff_dim = ff_dim self.num_heads = num_heads self.attention_1 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.1 ) self.attention_2 = layers.MultiHeadAttention( num_heads=num_heads, key_dim=embed_dim, dropout=0.1 ) self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu") self.ffn_layer_2 = layers.Dense(embed_dim) self.layernorm_1 = layers.LayerNormalization() self.layernorm_2 = layers.LayerNormalization() self.layernorm_3 = layers.LayerNormalization() self.embedding = PositionalEmbedding( embed_dim=EMBED_DIM, sequence_length=SEQ_LENGTH, vocab_size=VOCAB_SIZE, ) self.out = layers.Dense(VOCAB_SIZE, activation="softmax") self.dropout_1 = layers.Dropout(0.3) self.dropout_2 = layers.Dropout(0.5) self.supports_masking = True def call(self, inputs, encoder_outputs, training, mask=None): inputs = self.embedding(inputs) causal_mask = self.get_causal_attention_mask(inputs) if mask is not None: padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32) combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32) combined_mask = tf.minimum(combined_mask, causal_mask) attention_output_1 = self.attention_1( query=inputs, value=inputs, key=inputs, attention_mask=combined_mask, training=training, ) out_1 = self.layernorm_1(inputs + attention_output_1) attention_output_2 = self.attention_2( query=out_1, value=encoder_outputs, key=encoder_outputs, attention_mask=padding_mask, training=training, ) out_2 = self.layernorm_2(out_1 + attention_output_2) ffn_out = self.ffn_layer_1(out_2) ffn_out = self.dropout_1(ffn_out, training=training) ffn_out = self.ffn_layer_2(ffn_out) ffn_out = self.layernorm_3(ffn_out + out_2, training=training) ffn_out = self.dropout_2(ffn_out, training=training) preds = self.out(ffn_out) return preds def get_causal_attention_mask(self, inputs): input_shape = tf.shape(inputs) batch_size, sequence_length = input_shape[0], input_shape[1] i = tf.range(sequence_length)[:, tf.newaxis] j = tf.range(sequence_length) mask = tf.cast(i >= j, dtype="int32") mask = tf.reshape(mask, (1, input_shape[1], input_shape[1])) mult = tf.concat( [ tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32), ], axis=0, ) return tf.tile(mask, mult) class ImageCaptioningModel(keras.Model): def __init__( self, cnn_model, encoder, decoder, num_captions_per_image=1, image_aug=None, ): super().__init__() self.cnn_model = cnn_model self.encoder = encoder self.decoder = decoder self.loss_tracker = keras.metrics.Mean(name="loss") self.acc_tracker = keras.metrics.Mean(name="accuracy") self.num_captions_per_image = num_captions_per_image self.image_aug = image_aug def calculate_loss(self, y_true, y_pred, mask): loss = self.loss(y_true, y_pred) mask = tf.cast(mask, dtype=loss.dtype) loss *= mask return tf.reduce_sum(loss) / tf.reduce_sum(mask) def calculate_accuracy(self, y_true, y_pred, mask): accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2)) accuracy = tf.math.logical_and(mask, accuracy) accuracy = tf.cast(accuracy, dtype=tf.float32) mask = tf.cast(mask, dtype=tf.float32) return tf.reduce_sum(accuracy) / tf.reduce_sum(mask) def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True): encoder_out = self.encoder(img_embed, training=training) batch_seq_inp = batch_seq[:, :-1] batch_seq_true = batch_seq[:, 1:] mask = tf.math.not_equal(batch_seq_true, 0) batch_seq_pred = self.decoder( batch_seq_inp, encoder_out, training=training, mask=mask ) loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask) acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask) return loss, acc def train_step(self, batch_data): batch_img, batch_seq = batch_data batch_loss = 0 batch_acc = 0 if self.image_aug: batch_img = self.image_aug(batch_img) img_embed = self.cnn_model(batch_img) for i in range(self.num_captions_per_image): with tf.GradientTape() as tape: loss, acc = self._compute_caption_loss_and_acc( img_embed, batch_seq[:, i, :], training=True ) batch_loss += loss batch_acc += acc train_vars = ( self.encoder.trainable_variables + self.decoder.trainable_variables ) grads = tape.gradient(loss, train_vars) self.optimizer.apply_gradients(zip(grads, train_vars)) batch_acc /= float(self.num_captions_per_image) self.loss_tracker.update_state(batch_loss) self.acc_tracker.update_state(batch_acc) return { "loss": self.loss_tracker.result(), "acc": self.acc_tracker.result(), } def test_step(self, batch_data): batch_img, batch_seq = batch_data batch_loss = 0 batch_acc = 0 img_embed = self.cnn_model(batch_img) for i in range(self.num_captions_per_image): loss, acc = self._compute_caption_loss_and_acc( img_embed, batch_seq[:, i, :], training=False ) batch_loss += loss batch_acc += acc batch_acc /= float(self.num_captions_per_image) self.loss_tracker.update_state(batch_loss) self.acc_tracker.update_state(batch_acc) return { "loss": self.loss_tracker.result(), "acc": self.acc_tracker.result(), } @property def metrics(self): return [self.loss_tracker, self.acc_tracker] cnn_model = get_cnn_model() encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1) decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2) caption_model = ImageCaptioningModel( cnn_model=cnn_model, encoder=encoder, decoder=decoder, image_aug=image_augmentation, ) #Entrenamiento del modelo cross_entropy = keras.losses.SparseCategoricalCrossentropy( from_logits=False, reduction='none', ) early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True) class LRSchedule(keras.optimizers.schedules.LearningRateSchedule): def __init__(self, post_warmup_learning_rate, warmup_steps): super().__init__() self.post_warmup_learning_rate = post_warmup_learning_rate self.warmup_steps = warmup_steps def __call__(self, step): global_step = tf.cast(step, tf.float32) warmup_steps = tf.cast(self.warmup_steps, tf.float32) warmup_progress = global_step / warmup_steps warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress return tf.cond( global_step < warmup_steps, lambda: warmup_learning_rate, lambda: self.post_warmup_learning_rate, ) num_train_steps = len(train_dataset) * EPOCHS num_warmup_steps = num_train_steps // 15 lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps) caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy) caption_model.fit( train_dataset, epochs=EPOCHS, validation_data=valid_dataset, callbacks=[early_stopping], ) #Carga del modelo archivo_pesos = os.path.join("pesos10.npy") caption_model = np.load(archivo_pesos, allow_pickle=True) #Interfaz para gradio def generate_caption(sample_img): print(sample_img.shape) sample_img = np.random.choice(valid_images) sample_img = decode_and_resize(sample_img) img = sample_img.numpy().clip(0, 255).astype(np.uint8) plt.imshow(img) plt.show() img = tf.expand_dims(sample_img, 0) img = caption_model.cnn_model(img) encoded_img = caption_model.encoder(img, training=False) decoded_caption = " " for i in range(max_decoded_sentence_length): tokenized_caption = vectorization([decoded_caption])[:, :-1] mask = tf.math.not_equal(tokenized_caption, 0) predictions = caption_model.decoder( tokenized_caption, encoded_img, training=False, mask=mask ) sampled_token_index = np.argmax(predictions[0, i, :]) sampled_token = index_lookup[sampled_token_index] if sampled_token == "": break decoded_caption += " " + sampled_token decoded_caption = decoded_caption.replace(" ", "") decoded_caption = decoded_caption.replace(" ", "").strip() text_to_say = decoded_caption lenguage = "es-es" gtts_object = gTTS(text = text_to_say, lang = lenguage, slow = False ) gtts_object.save("/content/gtts.mp3") audio = "/content/gtts.mp3" return decoded_caption, audio demo = gr.Interface(fn = generate_caption,inputs = gr.Image(label="Imagen"), outputs = [gr.Text(label="Descripción textual"), gr.Audio(label="Audio")], theme ='darkhuggingface', title = 'DESCRIPCIÓN DE IMÁGENES DE RIPIOS DE PERFORACIÓN', description = 'La siguiente interfaz describirá de forma automática imágenes de ripios de perforación. El usuario deberá ingresar en el recuadro de la izquierda la imagen a ser procesada, y en los recuadros de la derecha se mostrará la descripción textual y oral de la imagen. Se recomienda ingresar imágenes sin ningún tipo de mediciones o símbolos ya que esto podría afectar en la predicción del modelo.', article = 'Nota: En el caso de ingresar imágenes que no tengan relación a muestras de ripios de perforación, los autores de esta aplicación no se hacen responsables por los resultados de estas, el modelo de descripción de ripios de perforación está entrenado para dar un resultado.') demo.launch()