| from __future__ import annotations | |
| import argparse | |
| import math | |
| from collections import Counter | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import gradio as gr | |
| from datasets import Dataset | |
| from sklearn.model_selection import train_test_split | |
| from torch.utils.data import DataLoader | |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, DataCollatorForSeq2Seq | |
| DATASET_SPLITS = { | |
| "train": "data/train-00000-of-00001.parquet", | |
| "validation": "data/validation-00000-of-00001.parquet", | |
| "test": "data/test-00000-of-00001.parquet", | |
| } | |
| DATASET_URL = "hf://datasets/somosnlp/NoticIA-it/" | |
| BASE_MODEL_NAME = "josmunpen/mt5-small-spanish-summarization" | |
| DEFAULT_OUTPUT_DIR = "mt5-resumenes-es-final" | |
| SAMPLE_SIZE = 256 | |
| MAX_INPUT_LENGTH = 256 | |
| MAX_TARGET_LENGTH = 64 | |
| TRAIN_BATCH_SIZE = 2 | |
| EVAL_BATCH_SIZE = 2 | |
| MAX_TRAIN_STEPS = 20 | |
| LEARNING_RATE = 2e-5 | |
| def load_dataframe() -> pd.DataFrame: | |
| df = pd.read_parquet(DATASET_URL + DATASET_SPLITS["train"]) | |
| return df[["texto", "respuesta"]].dropna().reset_index(drop=True) | |
| def prepare_splits(df: pd.DataFrame): | |
| sample_size = min(SAMPLE_SIZE, len(df)) | |
| df_sample = df.sample(n=sample_size, random_state=42).reset_index(drop=True) | |
| train_df, temp_df = train_test_split(df_sample, test_size=0.2, random_state=42) | |
| val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42) | |
| return train_df.reset_index(drop=True), val_df.reset_index(drop=True), test_df.reset_index(drop=True) | |
| def tokenize_datasets(tokenizer, train_df: pd.DataFrame, val_df: pd.DataFrame, test_df: pd.DataFrame): | |
| train_dataset = Dataset.from_pandas(train_df) | |
| val_dataset = Dataset.from_pandas(val_df) | |
| test_dataset = Dataset.from_pandas(test_df) | |
| def preprocess_function(batch): | |
| inputs = tokenizer(batch["texto"], max_length=MAX_INPUT_LENGTH, truncation=True) | |
| targets = tokenizer(text_target=batch["respuesta"], max_length=MAX_TARGET_LENGTH, truncation=True) | |
| inputs["labels"] = targets["input_ids"] | |
| return inputs | |
| train_tokenized = train_dataset.map(preprocess_function, batched=True, remove_columns=train_dataset.column_names) | |
| val_tokenized = val_dataset.map(preprocess_function, batched=True, remove_columns=val_dataset.column_names) | |
| test_tokenized = test_dataset.map(preprocess_function, batched=True, remove_columns=test_dataset.column_names) | |
| return train_tokenized, val_tokenized, test_tokenized | |
| def train_model(model, tokenizer, train_tokenized, test_tokenized): | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE) | |
| data_collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model) | |
| train_loader = DataLoader(train_tokenized, batch_size=TRAIN_BATCH_SIZE, shuffle=True, collate_fn=data_collator) | |
| eval_loader = DataLoader(test_tokenized, batch_size=EVAL_BATCH_SIZE, shuffle=False, collate_fn=data_collator) | |
| model.train() | |
| train_losses = [] | |
| for step, batch in enumerate(train_loader, start=1): | |
| batch = {key: value.to(device) for key, value in batch.items()} | |
| outputs = model(**batch) | |
| loss = outputs.loss | |
| loss.backward() | |
| optimizer.step() | |
| optimizer.zero_grad() | |
| train_losses.append(loss.item()) | |
| if step >= MAX_TRAIN_STEPS: | |
| break | |
| train_loss = float(np.mean(train_losses)) if train_losses else float("nan") | |
| model.eval() | |
| eval_losses = [] | |
| with torch.no_grad(): | |
| for batch in eval_loader: | |
| batch = {key: value.to(device) for key, value in batch.items()} | |
| outputs = model(**batch) | |
| eval_losses.append(outputs.loss.item()) | |
| test_loss = float(np.mean(eval_losses)) if eval_losses else float("nan") | |
| test_perplexity = math.exp(test_loss) if np.isfinite(test_loss) and test_loss < 20 else float("inf") | |
| return device, train_loss, test_loss, test_perplexity, data_collator | |
| def compute_metrics(model, tokenizer, test_tokenized, data_collator, device): | |
| test_eval_loader = DataLoader(test_tokenized, batch_size=EVAL_BATCH_SIZE, shuffle=False, collate_fn=data_collator) | |
| predictions = [] | |
| references = [] | |
| model.eval() | |
| with torch.no_grad(): | |
| for batch in test_eval_loader: | |
| labels = batch["labels"].clone() | |
| model_inputs = {key: value.to(device) for key, value in batch.items() if key != "labels"} | |
| generated_ids = model.generate(**model_inputs, max_new_tokens=32, num_beams=4) | |
| batch_predictions = tokenizer.batch_decode(generated_ids, skip_special_tokens=True) | |
| labels[labels == -100] = tokenizer.pad_token_id | |
| batch_references = tokenizer.batch_decode(labels, skip_special_tokens=True) | |
| predictions.extend(batch_predictions) | |
| references.extend(batch_references) | |
| def tokenize_summary(text): | |
| return [token for token in text.lower().split() if token] | |
| def rouge_n_score(prediction_tokens, reference_tokens, n): | |
| prediction_ngrams = Counter( | |
| tuple(prediction_tokens[index : index + n]) | |
| for index in range(max(len(prediction_tokens) - n + 1, 0)) | |
| ) | |
| reference_ngrams = Counter( | |
| tuple(reference_tokens[index : index + n]) | |
| for index in range(max(len(reference_tokens) - n + 1, 0)) | |
| ) | |
| overlap = sum(min(count, reference_ngrams[ngram]) for ngram, count in prediction_ngrams.items()) | |
| prediction_total = sum(prediction_ngrams.values()) | |
| reference_total = sum(reference_ngrams.values()) | |
| precision = overlap / prediction_total if prediction_total else 0.0 | |
| recall = overlap / reference_total if reference_total else 0.0 | |
| return 2 * precision * recall / (precision + recall) if precision + recall else 0.0 | |
| def lcs_length(left_tokens, right_tokens): | |
| previous_row = [0] * (len(right_tokens) + 1) | |
| for left_token in left_tokens: | |
| current_row = [0] | |
| for index, right_token in enumerate(right_tokens, start=1): | |
| if left_token == right_token: | |
| current_row.append(previous_row[index - 1] + 1) | |
| else: | |
| current_row.append(max(previous_row[index], current_row[-1])) | |
| previous_row = current_row | |
| return previous_row[-1] | |
| def rouge_l_score(prediction_tokens, reference_tokens): | |
| lcs = lcs_length(prediction_tokens, reference_tokens) | |
| precision = lcs / len(prediction_tokens) if prediction_tokens else 0.0 | |
| recall = lcs / len(reference_tokens) if reference_tokens else 0.0 | |
| return 2 * precision * recall / (precision + recall) if precision + recall else 0.0 | |
| rouge_scores = {"rouge1": [], "rouge2": [], "rougeL": []} | |
| for prediction, reference in zip(predictions, references): | |
| prediction_tokens = tokenize_summary(prediction) | |
| reference_tokens = tokenize_summary(reference) | |
| rouge_scores["rouge1"].append(rouge_n_score(prediction_tokens, reference_tokens, 1)) | |
| rouge_scores["rouge2"].append(rouge_n_score(prediction_tokens, reference_tokens, 2)) | |
| rouge_scores["rougeL"].append(rouge_l_score(prediction_tokens, reference_tokens)) | |
| metrics_df = pd.DataFrame( | |
| [ | |
| {"metric": "ROUGE-1 aprox.", "valor": float(np.mean(rouge_scores["rouge1"]))}, | |
| {"metric": "ROUGE-2 aprox.", "valor": float(np.mean(rouge_scores["rouge2"]))}, | |
| {"metric": "ROUGE-L aprox.", "valor": float(np.mean(rouge_scores["rougeL"]))}, | |
| ] | |
| ) | |
| return metrics_df | |
| def save_model(model, tokenizer, output_dir: Path): | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| model.save_pretrained(output_dir) | |
| tokenizer.save_pretrained(output_dir) | |
| def generate_sample_summary(model, tokenizer, test_df: pd.DataFrame, device): | |
| sample_text = test_df.iloc[0]["texto"] | |
| inputs = tokenizer(sample_text, return_tensors="pt", truncation=True, max_length=MAX_INPUT_LENGTH).to(device) | |
| generated_ids = model.generate(**inputs, max_new_tokens=32, num_beams=4) | |
| return sample_text, tokenizer.decode(generated_ids[0], skip_special_tokens=True) | |
| def build_gradio_demo(model, tokenizer, device): | |
| def generate_summary(text): | |
| if not text or not text.strip(): | |
| return "Introduce un texto para generar el resumen." | |
| model.eval() | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_INPUT_LENGTH).to(device) | |
| with torch.no_grad(): | |
| summary_ids = model.generate(**inputs, max_new_tokens=32, num_beams=4) | |
| return tokenizer.decode(summary_ids[0], skip_special_tokens=True) | |
| with gr.Blocks(title="Resumen de texto en espanol") as demo: | |
| gr.Markdown("# Resumen de textos en espanol\nEscribe un texto largo y pulsa el boton para generar un resumen.") | |
| with gr.Row(): | |
| input_text = gr.Textbox(label="Texto de entrada", lines=12, placeholder="Pega aqui el texto que quieras resumir...") | |
| output_text = gr.Textbox(label="Resumen generado", lines=6) | |
| generate_button = gr.Button("Generar resumen") | |
| generate_button.click(fn=generate_summary, inputs=input_text, outputs=output_text) | |
| return demo | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Fine-tuning y demo de resumen en espanol") | |
| parser.add_argument("--retrain", action="store_true", help="Reentrenar el modelo aunque ya exista una version guardada") | |
| parser.add_argument("--no-demo", action="store_true", help="No lanzar la interfaz de Gradio al final") | |
| parser.add_argument("--share", action="store_true", help="Crear un enlace publico de Gradio") | |
| parser.add_argument("--server-port", type=int, default=7860, help="Puerto para la demo de Gradio") | |
| args = parser.parse_args() | |
| base_dir = Path(__file__).resolve().parent | |
| output_dir = base_dir / DEFAULT_OUTPUT_DIR | |
| df = load_dataframe() | |
| train_df, val_df, test_df = prepare_splits(df) | |
| if output_dir.exists() and not args.retrain: | |
| tokenizer = AutoTokenizer.from_pretrained(output_dir) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(output_dir) | |
| train_tokenized, val_tokenized, test_tokenized = tokenize_datasets(tokenizer, train_df, val_df, test_df) | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| model.to(device) | |
| data_collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model) | |
| train_loss = float("nan") | |
| test_loss = float("nan") | |
| test_perplexity = float("nan") | |
| else: | |
| tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL_NAME) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(BASE_MODEL_NAME) | |
| train_tokenized, val_tokenized, test_tokenized = tokenize_datasets(tokenizer, train_df, val_df, test_df) | |
| device, train_loss, test_loss, test_perplexity, data_collator = train_model(model, tokenizer, train_tokenized, test_tokenized) | |
| save_model(model, tokenizer, output_dir) | |
| metrics_df = compute_metrics(model, tokenizer, test_tokenized, data_collator, device) | |
| metrics_df["valor"] = metrics_df["valor"].apply(lambda value: round(value, 4) if isinstance(value, (float, np.floating)) and np.isfinite(value) else value) | |
| print("Train loss:", round(train_loss, 4) if np.isfinite(train_loss) else train_loss) | |
| print("Test loss:", round(test_loss, 4) if np.isfinite(test_loss) else test_loss) | |
| print("Test perplexity:", round(test_perplexity, 4) if np.isfinite(test_perplexity) else test_perplexity) | |
| print(metrics_df) | |
| sample_text, sample_summary = generate_sample_summary(model, tokenizer, test_df, device) | |
| print("Texto de entrada:", sample_text[:1200]) | |
| print("Resumen generado:", sample_summary) | |
| if not args.no_demo: | |
| demo = build_gradio_demo(model, tokenizer, device) | |
| demo.launch(share=args.share, server_port=args.server_port) | |
| if __name__ == "__main__": | |
| main() |