app4 / app.py
ojoel98's picture
Update app.py
16036bc verified
#Librer铆as necesarias
import os
import re
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
import gradio as gr
import requests
from keras import layers
from keras.applications import MobileNetV2
from keras.layers import TextVectorization
from gtts import gTTS
IMAGES_PATH = "Data"
# Dimensiones de imagen
IMAGE_SIZE = (359,359)
# Tama帽o del vocabulario
VOCAB_SIZE = 700
# Longitud fija para cualquier secuencia
SEQ_LENGTH = 400
# Dimensiones para los embeddings de im谩genes y de tokens
EMBED_DIM = 512
# Unidades por capa en la red feed-forward
FF_DIM = 512
# Otros par谩metros de entrenamiento
BATCH_SIZE = 64
EPOCHS = 1
AUTOTUNE = tf.data.AUTOTUNE
def load_captions_data(filename):
"""Carga las descripciones (texto) y los asigna a sus im谩genes correspondientes.
Argumentos:
filename: Ruta al archivo de texto que contiene las descripciones.
Returna:
caption_mapping: Diccionario que mapea los nombres de las im谩genes y sus descipciones correspondientes.
text_data: Lista que contiene todos los subt铆tulos disponibles.
"""
with open(filename) as caption_file:
caption_data = caption_file.readlines()
caption_mapping = {}
text_data = []
images_to_skip = set()
for line in caption_data:
line = line.rstrip("\n")
# El nombre de la imagen se separa de su descripci贸n por una tabulaci贸n
img_name, caption = line.split("\t")
print(img_name)
print(caption)
# Cada nombre de imagen tiene un sufijo `#img_name.jpg#0`
img_name = img_name.split("#")[0]
img_name = os.path.join(IMAGES_PATH, img_name.strip())
# Se eliminan las descripciones demasiado largas o demasiado cortas
tokens = caption.strip().split()
if img_name.endswith("jpg") and img_name not in images_to_skip:
# Se agrega un token de inicio <start> y fin <end> a cada descripci贸n
caption = "<start> " + caption.strip() + " <end>"
text_data.append(caption)
if img_name in caption_mapping:
caption_mapping[img_name].append(caption)
else:
caption_mapping[img_name] = [caption]
for img_name in images_to_skip:
if img_name in caption_mapping:
del caption_mapping[img_name]
return caption_mapping, text_data
def train_val_split(caption_data, train_size=0.8, shuffle=True):
"""Divide el conjunto de datos en subconjuntos de entrenamiento y validaci贸n.
Args:
caption_data (dict): Diccionario que contiene las descripciones asignadas.
train_size (float): Fracci贸n del conjunto de datos que se usa como subconjunto de entrenamiento.
shuffle (bool): Se especifica si se quiere mezclar el conjunto de datos antes de dividirlo.
Returns:
Conjuntos de datos de entrenamiento y validaci贸n como dos dictados separados
"""
# 1. Lista de todas las im谩genes
all_images = list(caption_data.keys())
# 2. Se mezcla para que sean aleatorias y no exista sesgo
if shuffle:
np.random.shuffle(all_images)
# 3. Se divide en conjuntos de entrenamiento y validaci贸n
train_size = int(len(caption_data) * train_size)
training_data = {
img_name: caption_data[img_name] for img_name in all_images[:train_size]
}
validation_data = {
img_name: caption_data[img_name] for img_name in all_images[train_size:]
}
# 4. Retorna las divisiones
return training_data, validation_data
# Carga del archivo .txt de descripciones
captions_mapping, text_data = load_captions_data("ROCAS.token.txt")
# Se divide en conjuntos de entrenamiento y validaci贸n
train_data, valid_data = train_val_split(captions_mapping)
print("N煤mero de muestras de entrenamiento: ", len(train_data))
print("N煤mero de muestras de validaci贸n: ", len(valid_data))
"""##**Vectorizaci贸n de los datos de texto**
Esta secci贸n transforma las descripciones del archivo de texto en vectores,
estandariza las cadenas de caracteres y aumenta el n煤mero de im谩genes con caracter铆sticas establecidas.
"""
def custom_standardization(input_string):
lowercase = tf.strings.lower(input_string)
return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")
strip_chars = "!\"$&'*+-/:<=>?@[\]^_`{|}~"
strip_chars = strip_chars.replace("<", "")
strip_chars = strip_chars.replace(">", "")
# Vectorizaci贸n de los archivos de texto
vectorization = TextVectorization(
max_tokens=VOCAB_SIZE,
output_mode="int",
output_sequence_length=SEQ_LENGTH,
standardize=custom_standardization,
)
vectorization.adapt(text_data)
# Aumento del n煤mero de im谩genes
image_augmentation = keras.Sequential(
[
layers.RandomFlip("horizontal"),
layers.RandomRotation(0.2),
layers.RandomContrast(0.3),
]
)
"""##**Canalizaci贸n de datos para el entrenamiento**
Se genera pares de im谩genes con sus respectivas descripciones usando `tf.data.Dataset`.
El proceso consiste de dos etapas:
- Leer la imagen del disco
- Tokenizar las descripciones de cada una de ellas
"""
def decode_and_resize(img_path):
img = tf.io.read_file(img_path)
img = tf.image.decode_jpeg(img, channels=3)
img = tf.image.resize(img, IMAGE_SIZE)
img = tf.image.convert_image_dtype(img, tf.float32)
return img
def process_input(img_path, captions):
return decode_and_resize(img_path), vectorization(captions)
def make_dataset(images, captions):
dataset = tf.data.Dataset.from_tensor_slices((images, captions))
dataset = dataset.shuffle(BATCH_SIZE * 8)
dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE)
dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE)
return dataset
# Lista de im谩genes y de descripciones
train_dataset = make_dataset(list(train_data.keys()), list(train_data.values()))
valid_dataset = make_dataset(list(valid_data.keys()), list(valid_data.values()))
"""## **Construcci贸n del modelo**
La descripci贸n de im谩genes consta de tres modelos:
- Una CNN: extrae las caracter铆sticas de las im谩genes.
- Un TransformerEncoder: por medio de un modelo pre-entrenado para trabajar con im谩genes de rocas, se encarga de identificar y extraer las caracter铆sticas (features) de las fotos de la base de datos.
- Un TransformerDecoder: toma como entradas las features del codificador y las descripciones (secuencias) e identifica el proceso para generar descripciones de im谩genes.
"""
def get_cnn_model():
base_model = MobileNetV2( #resnet.ResNetV2
input_shape=(*IMAGE_SIZE, 3),
include_top=False,
weights="imagenet",
)
# base_model= tf.keras.models.load_model('/content/gdrive/MyDrive/best_model.h5')
# base_model.summary()
# Se congela el extractor de caracter铆sticas
base_model.trainable = False
base_model_out = base_model.output
base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out)
cnn_model = keras.models.Model(base_model.input, base_model_out)
return cnn_model
class TransformerEncoderBlock(layers.Layer):
def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.dense_dim = dense_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim, dropout=0.0
)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.dense_1 = layers.Dense(embed_dim, activation="relu")
def call(self, inputs, training, mask=None):
inputs = self.layernorm_1(inputs)
inputs = self.dense_1(inputs)
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs,
attention_mask=None,
training=training,
)
out_1 = self.layernorm_2(inputs + attention_output_1)
return out_1
class PositionalEmbedding(layers.Layer):
def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
super().__init__(**kwargs)
self.token_embeddings = layers.Embedding(
input_dim=vocab_size, output_dim=embed_dim
)
self.position_embeddings = layers.Embedding(
input_dim=sequence_length, output_dim=embed_dim
)
self.sequence_length = sequence_length
self.vocab_size = vocab_size
self.embed_dim = embed_dim
self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32))
def call(self, inputs):
length = tf.shape(inputs)[-1]
positions = tf.range(start=0, limit=length, delta=1)
embedded_tokens = self.token_embeddings(inputs)
embedded_tokens = embedded_tokens * self.embed_scale
embedded_positions = self.position_embeddings(positions)
return embedded_tokens + embedded_positions
def compute_mask(self, inputs, mask=None):
return tf.math.not_equal(inputs, 0)
class TransformerDecoderBlock(layers.Layer):
def __init__(self, embed_dim, ff_dim, num_heads, **kwargs):
super().__init__(**kwargs)
self.embed_dim = embed_dim
self.ff_dim = ff_dim
self.num_heads = num_heads
self.attention_1 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim, dropout=0.1
)
self.attention_2 = layers.MultiHeadAttention(
num_heads=num_heads, key_dim=embed_dim, dropout=0.1
)
self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu")
self.ffn_layer_2 = layers.Dense(embed_dim)
self.layernorm_1 = layers.LayerNormalization()
self.layernorm_2 = layers.LayerNormalization()
self.layernorm_3 = layers.LayerNormalization()
self.embedding = PositionalEmbedding(
embed_dim=EMBED_DIM,
sequence_length=SEQ_LENGTH,
vocab_size=VOCAB_SIZE,
)
self.out = layers.Dense(VOCAB_SIZE, activation="softmax")
self.dropout_1 = layers.Dropout(0.3)
self.dropout_2 = layers.Dropout(0.5)
self.supports_masking = True
def call(self, inputs, encoder_outputs, training, mask=None):
inputs = self.embedding(inputs)
causal_mask = self.get_causal_attention_mask(inputs)
if mask is not None:
padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32)
combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32)
combined_mask = tf.minimum(combined_mask, causal_mask)
attention_output_1 = self.attention_1(
query=inputs,
value=inputs,
key=inputs,
attention_mask=combined_mask,
training=training,
)
out_1 = self.layernorm_1(inputs + attention_output_1)
attention_output_2 = self.attention_2(
query=out_1,
value=encoder_outputs,
key=encoder_outputs,
attention_mask=padding_mask,
training=training,
)
out_2 = self.layernorm_2(out_1 + attention_output_2)
ffn_out = self.ffn_layer_1(out_2)
ffn_out = self.dropout_1(ffn_out, training=training)
ffn_out = self.ffn_layer_2(ffn_out)
ffn_out = self.layernorm_3(ffn_out + out_2, training=training)
ffn_out = self.dropout_2(ffn_out, training=training)
preds = self.out(ffn_out)
return preds
def get_causal_attention_mask(self, inputs):
input_shape = tf.shape(inputs)
batch_size, sequence_length = input_shape[0], input_shape[1]
i = tf.range(sequence_length)[:, tf.newaxis]
j = tf.range(sequence_length)
mask = tf.cast(i >= j, dtype="int32")
mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
mult = tf.concat(
[
tf.expand_dims(batch_size, -1),
tf.constant([1, 1], dtype=tf.int32),
],
axis=0,
)
return tf.tile(mask, mult)
class ImageCaptioningModel(keras.Model):
def __init__(
self,
cnn_model,
encoder,
decoder,
num_captions_per_image=1,
image_aug=None,
):
super().__init__()
self.cnn_model = cnn_model
self.encoder = encoder
self.decoder = decoder
self.loss_tracker = keras.metrics.Mean(name="loss")
self.acc_tracker = keras.metrics.Mean(name="accuracy")
self.num_captions_per_image = num_captions_per_image
self.image_aug = image_aug
def calculate_loss(self, y_true, y_pred, mask):
loss = self.loss(y_true, y_pred)
mask = tf.cast(mask, dtype=loss.dtype)
loss *= mask
return tf.reduce_sum(loss) / tf.reduce_sum(mask)
def calculate_accuracy(self, y_true, y_pred, mask):
accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2))
accuracy = tf.math.logical_and(mask, accuracy)
accuracy = tf.cast(accuracy, dtype=tf.float32)
mask = tf.cast(mask, dtype=tf.float32)
return tf.reduce_sum(accuracy) / tf.reduce_sum(mask)
def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True):
encoder_out = self.encoder(img_embed, training=training)
batch_seq_inp = batch_seq[:, :-1]
batch_seq_true = batch_seq[:, 1:]
mask = tf.math.not_equal(batch_seq_true, 0)
batch_seq_pred = self.decoder(
batch_seq_inp, encoder_out, training=training, mask=mask
)
loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask)
acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask)
return loss, acc
def train_step(self, batch_data):
batch_img, batch_seq = batch_data
batch_loss = 0
batch_acc = 0
if self.image_aug:
batch_img = self.image_aug(batch_img)
# 1. Se obtiene los embeddings de im谩genes
img_embed = self.cnn_model(batch_img)
# 2. Las descripciones pasan por el decodificador
# junto con las salidas del codificador y calcula
# la p茅rdida y la precisi贸n para cada descripci贸n
for i in range(self.num_captions_per_image):
with tf.GradientTape() as tape:
loss, acc = self._compute_caption_loss_and_acc(
img_embed, batch_seq[:, i, :], training=True
)
# 3. Actualizaci贸n de p茅rdida y precisi贸n
batch_loss += loss
batch_acc += acc
# 4. Se obtiene la lista de los pesos entrenables
train_vars = (
self.encoder.trainable_variables + self.decoder.trainable_variables
)
# 5. Se obtiene los gradientes
grads = tape.gradient(loss, train_vars)
# 6. Actualiza los pesos entrenables
self.optimizer.apply_gradients(zip(grads, train_vars))
# 7. Actualiza de los rastreadores
batch_acc /= float(self.num_captions_per_image)
self.loss_tracker.update_state(batch_loss)
self.acc_tracker.update_state(batch_acc)
# 8. Retorna los valores de p茅rdida y precisi贸n
return {
"loss": self.loss_tracker.result(),
"acc": self.acc_tracker.result(),
}
def test_step(self, batch_data):
batch_img, batch_seq = batch_data
batch_loss = 0
batch_acc = 0
# 1. Obtiene los embeddings de im谩genes
img_embed = self.cnn_model(batch_img)
# 2. Las descripciones pasan por el decodificador
# junto con las salidas del codificador y calcula
# la p茅rdida y la precisi贸n para cada descripci贸n
for i in range(self.num_captions_per_image):
loss, acc = self._compute_caption_loss_and_acc(
img_embed, batch_seq[:, i, :], training=False
)
# 3. Actualizaci贸n de p茅rdida y precisi贸n
batch_loss += loss
batch_acc += acc
batch_acc /= float(self.num_captions_per_image)
# 4. Actualiza de los rastreadores
self.loss_tracker.update_state(batch_loss)
self.acc_tracker.update_state(batch_acc)
# 5. Retorna los valores de p茅rdida y precisi贸n
return {
"loss": self.loss_tracker.result(),
"acc": self.acc_tracker.result(),
}
@property
def metrics(self):
# Se necesita enumerar las m茅tricas para que `reset_states()`
# pueda ser llamado automaticamente.
return [self.loss_tracker, self.acc_tracker]
cnn_model = get_cnn_model()
encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1)
decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2)
caption_model = ImageCaptioningModel(
cnn_model=cnn_model,
encoder=encoder,
decoder=decoder,
image_aug=image_augmentation,
)
"""## **Entrenamiento del modelo**"""
# Define la funci贸n de p茅rdida
cross_entropy = keras.losses.SparseCategoricalCrossentropy(
from_logits=False,
reduction='none',
)
# Criterios de parada anticipada
early_stopping = keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)
# Programador de tasa de aprendizaje para el optimizador
from tensorflow.keras.optimizers.schedules import LearningRateSchedule
class LRSchedule(LearningRateSchedule):
def __init__(self, post_warmup_learning_rate, warmup_steps):
super().__init__()
self.post_warmup_learning_rate = post_warmup_learning_rate
self.warmup_steps = warmup_steps
def __call__(self, step):
global_step = tf.cast(step, tf.float32)
warmup_steps = tf.cast(self.warmup_steps, tf.float32)
warmup_progress = global_step / warmup_steps
warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress
return tf.cond(
global_step < warmup_steps,
lambda: warmup_learning_rate,
lambda: self.post_warmup_learning_rate,
)
# Se crea un cronograma de tasa de aprendizaje
num_train_steps = len(train_dataset) * EPOCHS
num_warmup_steps = num_train_steps // 15
lr_schedule = LRSchedule(post_warmup_learning_rate=1e-4, warmup_steps=num_warmup_steps)
# Se compila el modelo
caption_model.compile(optimizer=keras.optimizers.Adam(lr_schedule), loss=cross_entropy)
# Entrenamiento del modelo
caption_model.fit(
train_dataset,
epochs=EPOCHS,
validation_data=valid_dataset,
callbacks=[early_stopping],
)
"""### **Opci贸n para guardar el modelo entrenado**"""
#con est谩 opci贸n vemos los pesos del modelo en una lista
#pesos = caption_model.get_weights()
#guardamos esos pesos en formato npy - en este caso lo guardamos entrenado con una 茅poca, ya que si quitamos el fit o el entrenamiento nos da error, por lo que siempre tenemos que
#entrenarle al modelo con una 茅poca para despu茅s configurarle con otro con 10 茅pocas
#np.save('pesos1.npy', np.array(pesos, dtype=object), allow_pickle=True)
#aqu铆 configuramos los pesos que estaban entrenados con una 茅poca con diez - nosotros corrimos anteriormente con 10 y nos descargamos
import os
import numpy as np
#Carga del modelo
archivo_pesos = os.path.join("pesos10.npy")
caption_model = np.load(archivo_pesos, allow_pickle=True)
#Interfaz para gradio
def generate_caption(sample_img):
# Decodifica y redimensiona la imagen de entrada
sample_img = decode_and_resize(sample_img)
img = sample_img.numpy().clip(0, 255).astype(np.uint8)
plt.imshow(img)
plt.show()
# Prepara la imagen para el modelo
img = tf.expand_dims(sample_img, 0)
img_embed = caption_model.cnn_model(img)
encoded_img = caption_model.encoder(img_embed, training=False)
# Inicializa la descripci贸n con el token de inicio
decoded_caption = "<start>"
# Itera para generar la descripci贸n
for i in range(max_decoded_sentence_length):
tokenized_caption = vectorization([decoded_caption])[:, :-1]
mask = tf.math.not_equal(tokenized_caption, 0)
predictions = caption_model.decoder(
tokenized_caption, encoded_img, training=False, mask=mask
)
sampled_token_index = np.argmax(predictions[0, i, :])
sampled_token = index_lookup[sampled_token_index]
if sampled_token == "<end>":
break
decoded_caption += " " + sampled_token
# Elimina los tokens de inicio y fin de la descripci贸n
decoded_caption = decoded_caption.replace("<start> ", "")
decoded_caption = decoded_caption.replace(" <end>", "").strip()
# Convierte la descripci贸n a audio
text_to_say = decoded_caption
lenguage = "es-es"
gtts_object = gTTS(text=text_to_say, lang=lenguage, slow=False)
gtts_object.save("gtts.mp3")
audio = "gtts.mp3"
return decoded_caption, audio
demo = gr.Interface(fn = generate_caption,inputs = gr.Image(label="Imagen"), outputs = [gr.Text(label="Descripci贸n textual"), gr.Audio(label="Audio")], theme ='darkhuggingface', title = 'DESCRIPCI脫N DE IM脕GENES DE RIPIOS DE PERFORACI脫N',
description = 'La siguiente interfaz describir谩 de forma autom谩tica im谩genes de ripios de perforaci贸n. El usuario deber谩 ingresar en el recuadro de la izquierda la imagen a ser procesada, y en los recuadros de la derecha se mostrar谩 la descripci贸n textual y oral de la imagen. Se recomienda ingresar im谩genes sin ning煤n tipo de mediciones o s铆mbolos ya que esto podr铆a afectar en la predicci贸n del modelo.',
article = 'Nota: En el caso de ingresar im谩genes que no tengan relaci贸n a muestras de ripios de perforaci贸n, los autores de esta aplicaci贸n no se hacen responsables por los resultados de estas, el modelo de descripci贸n de ripios de perforaci贸n est谩 entrenado para dar un resultado.')
demo.launch()