| from __future__ import annotations |
|
|
| import argparse |
| import math |
| from collections import Counter |
| from pathlib import Path |
|
|
| import numpy as np |
| import pandas as pd |
| import torch |
| import gradio as gr |
| from datasets import Dataset |
| from sklearn.model_selection import train_test_split |
| from torch.utils.data import DataLoader |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, DataCollatorForSeq2Seq |
|
|
|
|
| DATASET_SPLITS = { |
| "train": "data/train-00000-of-00001.parquet", |
| "validation": "data/validation-00000-of-00001.parquet", |
| "test": "data/test-00000-of-00001.parquet", |
| } |
| DATASET_URL = "hf://datasets/somosnlp/NoticIA-it/" |
| BASE_MODEL_NAME = "josmunpen/mt5-small-spanish-summarization" |
| DEFAULT_OUTPUT_DIR = "mt5-resumenes-es-final" |
| DEFAULT_BUCKET = "hf://buckets/AntonioCGF/statetensor_TECP" |
| SAMPLE_SIZE = 256 |
| MAX_INPUT_LENGTH = 256 |
| MAX_TARGET_LENGTH = 64 |
| TRAIN_BATCH_SIZE = 2 |
| EVAL_BATCH_SIZE = 2 |
| MAX_TRAIN_STEPS = 20 |
| LEARNING_RATE = 2e-5 |
|
|
|
|
| def load_dataframe() -> pd.DataFrame: |
| df = pd.read_parquet(DATASET_URL + DATASET_SPLITS["train"]) |
| return df[["texto", "respuesta"]].dropna().reset_index(drop=True) |
|
|
|
|
| def prepare_splits(df: pd.DataFrame): |
| sample_size = min(SAMPLE_SIZE, len(df)) |
| df_sample = df.sample(n=sample_size, random_state=42).reset_index(drop=True) |
| train_df, temp_df = train_test_split(df_sample, test_size=0.2, random_state=42) |
| val_df, test_df = train_test_split(temp_df, test_size=0.5, random_state=42) |
| return train_df.reset_index(drop=True), val_df.reset_index(drop=True), test_df.reset_index(drop=True) |
|
|
|
|
| def tokenize_datasets(tokenizer, train_df: pd.DataFrame, val_df: pd.DataFrame, test_df: pd.DataFrame): |
| train_dataset = Dataset.from_pandas(train_df) |
| val_dataset = Dataset.from_pandas(val_df) |
| test_dataset = Dataset.from_pandas(test_df) |
|
|
| def preprocess_function(batch): |
| inputs = tokenizer(batch["texto"], max_length=MAX_INPUT_LENGTH, truncation=True) |
| targets = tokenizer(text_target=batch["respuesta"], max_length=MAX_TARGET_LENGTH, truncation=True) |
| inputs["labels"] = targets["input_ids"] |
| return inputs |
|
|
| train_tokenized = train_dataset.map(preprocess_function, batched=True, remove_columns=train_dataset.column_names) |
| val_tokenized = val_dataset.map(preprocess_function, batched=True, remove_columns=val_dataset.column_names) |
| test_tokenized = test_dataset.map(preprocess_function, batched=True, remove_columns=test_dataset.column_names) |
| return train_tokenized, val_tokenized, test_tokenized |
|
|
|
|
| def train_model(model, tokenizer, train_tokenized, test_tokenized): |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| model.to(device) |
| optimizer = torch.optim.AdamW(model.parameters(), lr=LEARNING_RATE) |
| data_collator = DataCollatorForSeq2Seq(tokenizer=tokenizer, model=model) |
|
|
| train_loader = DataLoader(train_tokenized, batch_size=TRAIN_BATCH_SIZE, shuffle=True, collate_fn=data_collator) |
| eval_loader = DataLoader(test_tokenized, batch_size=EVAL_BATCH_SIZE, shuffle=False, collate_fn=data_collator) |
|
|
| model.train() |
| train_losses = [] |
| for step, batch in enumerate(train_loader, start=1): |
| batch = {key: value.to(device) for key, value in batch.items()} |
| outputs = model(**batch) |
| loss = outputs.loss |
| loss.backward() |
| optimizer.step() |
| optimizer.zero_grad() |
| train_losses.append(loss.item()) |
| if step >= MAX_TRAIN_STEPS: |
| break |
|
|
| train_loss = float(np.mean(train_losses)) if train_losses else float("nan") |
|
|
| model.eval() |
| eval_losses = [] |
| with torch.no_grad(): |
| for batch in eval_loader: |
| batch = {key: value.to(device) for key, value in batch.items()} |
| outputs = model(**batch) |
| eval_losses.append(outputs.loss.item()) |
|
|
| test_loss = float(np.mean(eval_losses)) if eval_losses else float("nan") |
| test_perplexity = math.exp(test_loss) if np.isfinite(test_loss) and test_loss < 20 else float("inf") |
|
|
| return device, train_loss, test_loss, test_perplexity, data_collator |
|
|
|
|
| def compute_metrics(model, tokenizer, test_tokenized, data_collator, device): |
| test_eval_loader = DataLoader(test_tokenized, batch_size=EVAL_BATCH_SIZE, shuffle=False, collate_fn=data_collator) |
| predictions = [] |
| references = [] |
|
|
| model.eval() |
| with torch.no_grad(): |
| for batch in test_eval_loader: |
| labels = batch["labels"].clone() |
| model_inputs = {key: value.to(device) for key, value in batch.items() if key != "labels"} |
| generated_ids = model.generate(**model_inputs, max_new_tokens=32, num_beams=4) |
| batch_predictions = tokenizer.batch_decode(generated_ids, skip_special_tokens=True) |
| labels[labels == -100] = tokenizer.pad_token_id |
| batch_references = tokenizer.batch_decode(labels, skip_special_tokens=True) |
| predictions.extend(batch_predictions) |
| references.extend(batch_references) |
|
|
| def tokenize_summary(text): |
| return [token for token in text.lower().split() if token] |
|
|
| def rouge_n_score(prediction_tokens, reference_tokens, n): |
| prediction_ngrams = Counter( |
| tuple(prediction_tokens[index : index + n]) |
| for index in range(max(len(prediction_tokens) - n + 1, 0)) |
| ) |
| reference_ngrams = Counter( |
| tuple(reference_tokens[index : index + n]) |
| for index in range(max(len(reference_tokens) - n + 1, 0)) |
| ) |
| overlap = sum(min(count, reference_ngrams[ngram]) for ngram, count in prediction_ngrams.items()) |
| prediction_total = sum(prediction_ngrams.values()) |
| reference_total = sum(reference_ngrams.values()) |
| precision = overlap / prediction_total if prediction_total else 0.0 |
| recall = overlap / reference_total if reference_total else 0.0 |
| return 2 * precision * recall / (precision + recall) if precision + recall else 0.0 |
|
|
| def lcs_length(left_tokens, right_tokens): |
| previous_row = [0] * (len(right_tokens) + 1) |
| for left_token in left_tokens: |
| current_row = [0] |
| for index, right_token in enumerate(right_tokens, start=1): |
| if left_token == right_token: |
| current_row.append(previous_row[index - 1] + 1) |
| else: |
| current_row.append(max(previous_row[index], current_row[-1])) |
| previous_row = current_row |
| return previous_row[-1] |
|
|
| def rouge_l_score(prediction_tokens, reference_tokens): |
| lcs = lcs_length(prediction_tokens, reference_tokens) |
| precision = lcs / len(prediction_tokens) if prediction_tokens else 0.0 |
| recall = lcs / len(reference_tokens) if reference_tokens else 0.0 |
| return 2 * precision * recall / (precision + recall) if precision + recall else 0.0 |
|
|
| rouge_scores = {"rouge1": [], "rouge2": [], "rougeL": []} |
| for prediction, reference in zip(predictions, references): |
| prediction_tokens = tokenize_summary(prediction) |
| reference_tokens = tokenize_summary(reference) |
| rouge_scores["rouge1"].append(rouge_n_score(prediction_tokens, reference_tokens, 1)) |
| rouge_scores["rouge2"].append(rouge_n_score(prediction_tokens, reference_tokens, 2)) |
| rouge_scores["rougeL"].append(rouge_l_score(prediction_tokens, reference_tokens)) |
|
|
| metrics_df = pd.DataFrame( |
| [ |
| {"metric": "ROUGE-1 aprox.", "valor": float(np.mean(rouge_scores["rouge1"]))}, |
| {"metric": "ROUGE-2 aprox.", "valor": float(np.mean(rouge_scores["rouge2"]))}, |
| {"metric": "ROUGE-L aprox.", "valor": float(np.mean(rouge_scores["rougeL"]))}, |
| ] |
| ) |
| return metrics_df |
|
|
|
|
| def save_model(model, tokenizer, output_dir: Path): |
| output_dir.mkdir(parents=True, exist_ok=True) |
| model.save_pretrained(output_dir) |
| tokenizer.save_pretrained(output_dir) |
|
|
|
|
| def generate_sample_summary(model, tokenizer, test_df: pd.DataFrame, device): |
| sample_text = test_df.iloc[0]["texto"] |
| inputs = tokenizer(sample_text, return_tensors="pt", truncation=True, max_length=MAX_INPUT_LENGTH).to(device) |
| generated_ids = model.generate(**inputs, max_new_tokens=32, num_beams=4) |
| return sample_text, tokenizer.decode(generated_ids[0], skip_special_tokens=True) |
|
|
|
|
| def build_gradio_demo(model, tokenizer, device): |
| def generate_summary(text): |
| if not text or not text.strip(): |
| return "Introduce un texto para generar el resumen." |
|
|
| model.eval() |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=MAX_INPUT_LENGTH).to(device) |
| with torch.no_grad(): |
| summary_ids = model.generate(**inputs, max_new_tokens=32, num_beams=4) |
| return tokenizer.decode(summary_ids[0], skip_special_tokens=True) |
|
|
| with gr.Blocks(title="Resumen de texto en espanol") as demo: |
| gr.Markdown("# Resumen de textos en espanol\nEscribe un texto largo y pulsa el boton para generar un resumen.") |
| with gr.Row(): |
| input_text = gr.Textbox(label="Texto de entrada", lines=12, placeholder="Pega aqui el texto que quieras resumir...") |
| output_text = gr.Textbox(label="Resumen generado", lines=6) |
| generate_button = gr.Button("Generar resumen") |
| generate_button.click(fn=generate_summary, inputs=input_text, outputs=output_text) |
| return demo |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser(description="Fine-tuning y demo de resumen en espanol") |
| parser.add_argument("--retrain", action="store_true", help="Reentrenar el modelo aunque ya exista una version guardada") |
| parser.add_argument("--no-demo", action="store_true", help="No lanzar la interfaz de Gradio al final") |
| parser.add_argument("--share", action="store_true", help="Crear un enlace publico de Gradio") |
| parser.add_argument("--server-port", type=int, default=7860, help="Puerto para la demo de Gradio") |
| args = parser.parse_args() |
|
|
| base_dir = Path(__file__).resolve().parent |
| output_dir = base_dir / DEFAULT_OUTPUT_DIR |
|
|
| df = load_dataframe() |
| train_df, val_df, test_df = prepare_splits(df) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
| tokenizer = AutoTokenizer.from_pretrained(BASE_MODEL_NAME) |
| model = AutoModelForSeq2SeqLM.from_pretrained(BASE_MODEL_NAME) |
| train_tokenized, val_tokenized, test_tokenized = tokenize_datasets(tokenizer, train_df, val_df, test_df) |
| device, train_loss, test_loss, test_perplexity, data_collator = train_model(model, tokenizer, train_tokenized, test_tokenized) |
|
|
| metrics_df = compute_metrics(model, tokenizer, test_tokenized, data_collator, device) |
| metrics_df["valor"] = metrics_df["valor"].apply(lambda value: round(value, 4) if isinstance(value, (float, np.floating)) and np.isfinite(value) else value) |
|
|
| print("Train loss:", round(train_loss, 4) if np.isfinite(train_loss) else train_loss) |
| print("Test loss:", round(test_loss, 4) if np.isfinite(test_loss) else test_loss) |
| print("Test perplexity:", round(test_perplexity, 4) if np.isfinite(test_perplexity) else test_perplexity) |
| print(metrics_df) |
|
|
| sample_text, sample_summary = generate_sample_summary(model, tokenizer, test_df, device) |
| print("Texto de entrada:", sample_text[:1200]) |
| print("Resumen generado:", sample_summary) |
|
|
| if not args.no_demo: |
| demo = build_gradio_demo(model, tokenizer, device) |
| demo.launch(share=args.share, server_port=args.server_port) |
|
|
|
|
| if __name__ == "__main__": |
| main() |