Spaces:
Sleeping
Sleeping
| import time | |
| import torch | |
| import gradio as gr | |
| from datasets import load_dataset | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import pandas as pd | |
| # ========================= | |
| # Configurazione generale | |
| # ========================= | |
| MAX_MODELS = 5 | |
| MAX_DATASETS = 5 | |
| DEFAULT_NUM_SAMPLES = 50 # numero di esempi da usare per ogni dataset | |
| def get_device(): | |
| if torch.cuda.is_available(): | |
| return "cuda" | |
| return "cpu" | |
| # ========================= | |
| # Definizione dataset | |
| # ========================= | |
| DATASETS = { | |
| "boolq_en": { | |
| "label": "BoolQ (en)", | |
| "language": "en", | |
| "description": "Yes/No QA su contesti in inglese", | |
| }, | |
| "squad_it": { | |
| "label": "SQuAD-it (it)", | |
| "language": "it", | |
| "description": "QA estrattivo in italiano", | |
| }, | |
| "pawsx_it": { | |
| "label": "PAWS-X (it)", | |
| "language": "it", | |
| "description": "Parafrasi in italiano (stesso significato sì/no)", | |
| }, | |
| "sentiment_it": { | |
| "label": "Sentiment-it (it)", | |
| "language": "it", | |
| "description": "Sentiment positivo/negativo in italiano", | |
| }, | |
| } | |
| DATASET_LABELS = [cfg["label"] for cfg in DATASETS.values()] | |
| LABEL_TO_KEY = {cfg["label"]: key for key, cfg in DATASETS.items()} | |
| # ========================= | |
| # Loader dataset | |
| # ========================= | |
| def load_boolq(num_samples=DEFAULT_NUM_SAMPLES): | |
| ds = load_dataset("boolq", split="validation") | |
| if num_samples is not None and num_samples < len(ds): | |
| ds = ds.select(range(num_samples)) | |
| return ds | |
| def load_squad_it(num_samples=DEFAULT_NUM_SAMPLES): | |
| # Nota: se "squad_it" non esiste o ha split diversi, qui puoi adattare. | |
| ds = load_dataset("squad_it", split="test") | |
| if num_samples is not None and num_samples < len(ds): | |
| ds = ds.select(range(num_samples)) | |
| return ds | |
| def load_pawsx_it(num_samples=DEFAULT_NUM_SAMPLES): | |
| ds = load_dataset("paws-x", "it", split="validation") | |
| if num_samples is not None and num_samples < len(ds): | |
| ds = ds.select(range(num_samples)) | |
| return ds | |
| def load_sentiment_it(num_samples=DEFAULT_NUM_SAMPLES): | |
| ds = load_dataset("sentiment-it", split="train") | |
| if num_samples is not None and num_samples < len(ds): | |
| ds = ds.select(range(num_samples)) | |
| return ds | |
| # ========================= | |
| # Prompt & parsing | |
| # ========================= | |
| def build_boolq_prompt_en(passage, question): | |
| prompt = ( | |
| "You are a question answering system. " | |
| "Answer strictly with 'yes' or 'no'.\n\n" | |
| f"Passage: {passage}\n" | |
| f"Question: {question}\n" | |
| "Answer:" | |
| ) | |
| return prompt | |
| def build_boolq_prompt_it(passage, question): | |
| prompt = ( | |
| "Sei un sistema di question answering. " | |
| "Rispondi strettamente solo con 'sì' o 'no'.\n\n" | |
| f"Testo: {passage}\n" | |
| f"Domanda: {question}\n" | |
| "Risposta:" | |
| ) | |
| return prompt | |
| def build_squad_it_prompt(context, question): | |
| prompt = ( | |
| "Sei un sistema di question answering in italiano. " | |
| "Rispondi con una breve frase che risponde alla domanda.\n\n" | |
| f"Contesto: {context}\n" | |
| f"Domanda: {question}\n" | |
| "Risposta:" | |
| ) | |
| return prompt | |
| def build_pawsx_it_prompt(sentence1, sentence2): | |
| prompt = ( | |
| "Sei un sistema di riconoscimento di parafrasi in italiano.\n" | |
| "Ti verranno date due frasi. Devi dire se esprimono lo stesso significato.\n" | |
| "Rispondi strettamente solo con 'sì' o 'no'.\n\n" | |
| f"Frase 1: {sentence1}\n" | |
| f"Frase 2: {sentence2}\n" | |
| "Le due frasi hanno lo stesso significato?\n" | |
| "Risposta:" | |
| ) | |
| return prompt | |
| def build_sentiment_it_prompt(text): | |
| prompt = ( | |
| "Sei un sistema di analisi del sentiment in italiano.\n" | |
| "Ti verrà dato un testo. Devi dire se il sentiment è positivo o negativo.\n" | |
| "Rispondi strettamente solo con 'positivo' o 'negativo'.\n\n" | |
| f"Testo: {text}\n" | |
| "Sentiment:" | |
| ) | |
| return prompt | |
| def parse_yes_no(output_text): | |
| """ | |
| Estrae 'sì/si' o 'no' dall'output del modello. | |
| Supporta anche 'yes'/'no' per modelli inglesi. | |
| Ritorna True per sì/yes, False per no, None se non riconosciuto. | |
| """ | |
| text = output_text.strip().lower() | |
| if not text: | |
| return None | |
| first = text.split()[0] | |
| # italiano | |
| if first.startswith("sì") or first.startswith("si"): | |
| return True | |
| if first.startswith("no"): | |
| return False | |
| # inglese | |
| if first.startswith("yes"): | |
| return True | |
| if first.startswith("no"): | |
| return False | |
| return None | |
| def parse_sentiment_it(output_text): | |
| """ | |
| Ritorna True per positivo, False per negativo, None se non riconosciuto. | |
| """ | |
| text = output_text.strip().lower() | |
| if not text: | |
| return None | |
| first = text.split()[0] | |
| if first.startswith("pos"): | |
| return True | |
| if first.startswith("neg"): | |
| return False | |
| return None | |
| def normalize_text(s): | |
| return " ".join(s.strip().lower().split()) | |
| # ========================= | |
| # Modello: load & generate | |
| # ========================= | |
| def load_model(model_name): | |
| device = get_device() | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| model = AutoModelForCausalLM.from_pretrained(model_name) | |
| model.to(device) | |
| model.eval() | |
| return tokenizer, model, device | |
| def generate_text(tokenizer, model, device, prompt, max_new_tokens=32): | |
| inputs = tokenizer(prompt, return_tensors="pt").to(device) | |
| with torch.no_grad(): | |
| output_ids = model.generate( | |
| **inputs, | |
| max_new_tokens=max_new_tokens, | |
| do_sample=False, | |
| temperature=0.0, | |
| ) | |
| gen_text = tokenizer.decode( | |
| output_ids[0][inputs["input_ids"].shape[-1]:], | |
| skip_special_tokens=True, | |
| ) | |
| return gen_text | |
| # ========================= | |
| # Valutazione per dataset | |
| # ========================= | |
| def evaluate_on_boolq(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES, lang="en"): | |
| ds = load_boolq(num_samples=num_samples) | |
| correct = 0 | |
| total = 0 | |
| times = [] | |
| for example in ds: | |
| passage = example["passage"] | |
| question = example["question"] | |
| label = example["answer"] # True/False | |
| if lang == "en": | |
| prompt = build_boolq_prompt_en(passage, question) | |
| else: | |
| prompt = build_boolq_prompt_it(passage, question) | |
| t0 = time.time() | |
| gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=5) | |
| t1 = time.time() | |
| pred = parse_yes_no(gen_text) | |
| total += 1 | |
| times.append(t1 - t0) | |
| if pred is not None and pred == label: | |
| correct += 1 | |
| accuracy = correct / total if total > 0 else 0.0 | |
| avg_time = sum(times) / len(times) if times else None | |
| return { | |
| "model_name": model_name, | |
| "dataset": "BoolQ (en)" if lang == "en" else "BoolQ (it)", | |
| "num_samples": total, | |
| "accuracy": accuracy, | |
| "avg_time_per_sample_sec": avg_time, | |
| } | |
| def evaluate_on_squad_it(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES): | |
| ds = load_squad_it(num_samples=num_samples) | |
| correct = 0 | |
| total = 0 | |
| times = [] | |
| for example in ds: | |
| context = example["context"] | |
| question = example["question"] | |
| answers = example.get("answers", {}) | |
| gold_answers = answers.get("text", []) if isinstance(answers, dict) else [] | |
| prompt = build_squad_it_prompt(context, question) | |
| t0 = time.time() | |
| gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=32) | |
| t1 = time.time() | |
| pred = normalize_text(gen_text) | |
| total += 1 | |
| times.append(t1 - t0) | |
| if gold_answers: | |
| gold_norm = [normalize_text(a) for a in gold_answers] | |
| if any(g in pred or pred in g for g in gold_norm): | |
| correct += 1 | |
| accuracy = correct / total if total > 0 else 0.0 | |
| avg_time = sum(times) / len(times) if times else None | |
| return { | |
| "model_name": model_name, | |
| "dataset": "SQuAD-it (it)", | |
| "num_samples": total, | |
| "accuracy": accuracy, | |
| "avg_time_per_sample_sec": avg_time, | |
| } | |
| def evaluate_on_pawsx_it(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES): | |
| ds = load_pawsx_it(num_samples=num_samples) | |
| correct = 0 | |
| total = 0 | |
| times = [] | |
| for example in ds: | |
| s1 = example["sentence1"] | |
| s2 = example["sentence2"] | |
| label = example["label"] # 0: non-parafrasi, 1: parafrasi | |
| prompt = build_pawsx_it_prompt(s1, s2) | |
| t0 = time.time() | |
| gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=5) | |
| t1 = time.time() | |
| pred = parse_yes_no(gen_text) | |
| total += 1 | |
| times.append(t1 - t0) | |
| if pred is not None: | |
| is_paraphrase = (label == 1) | |
| if pred == is_paraphrase: | |
| correct += 1 | |
| accuracy = correct / total if total > 0 else 0.0 | |
| avg_time = sum(times) / len(times) if times else None | |
| return { | |
| "model_name": model_name, | |
| "dataset": "PAWS-X (it)", | |
| "num_samples": total, | |
| "accuracy": accuracy, | |
| "avg_time_per_sample_sec": avg_time, | |
| } | |
| def evaluate_on_sentiment_it(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES): | |
| ds = load_sentiment_it(num_samples=num_samples) | |
| correct = 0 | |
| total = 0 | |
| times = [] | |
| for example in ds: | |
| text = example["text"] | |
| label = example["label"] # 0: negativo, 1: positivo (tipico schema) | |
| prompt = build_sentiment_it_prompt(text) | |
| t0 = time.time() | |
| gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=5) | |
| t1 = time.time() | |
| pred = parse_sentiment_it(gen_text) | |
| total += 1 | |
| times.append(t1 - t0) | |
| if pred is not None: | |
| is_positive = (label == 1) | |
| if pred == is_positive: | |
| correct += 1 | |
| accuracy = correct / total if total > 0 else 0.0 | |
| avg_time = sum(times) / len(times) if times else None | |
| return { | |
| "model_name": model_name, | |
| "dataset": "Sentiment-it (it)", | |
| "num_samples": total, | |
| "accuracy": accuracy, | |
| "avg_time_per_sample_sec": avg_time, | |
| } | |
| def evaluate_model_on_dataset(model_name, tokenizer, model, device, dataset_key, num_samples): | |
| start_total = time.time() | |
| if dataset_key == "boolq_en": | |
| res = evaluate_on_boolq(model_name, tokenizer, model, device, num_samples=num_samples, lang="en") | |
| elif dataset_key == "squad_it": | |
| res = evaluate_on_squad_it(model_name, tokenizer, model, device, num_samples=num_samples) | |
| elif dataset_key == "pawsx_it": | |
| res = evaluate_on_pawsx_it(model_name, tokenizer, model, device, num_samples=num_samples) | |
| elif dataset_key == "sentiment_it": | |
| res = evaluate_on_sentiment_it(model_name, tokenizer, model, device, num_samples=num_samples) | |
| else: | |
| raise ValueError(f"Dataset non supportato: {dataset_key}") | |
| total_time = time.time() - start_total | |
| res["total_time_sec"] = total_time | |
| return res | |
| # ========================= | |
| # Funzioni per la UI | |
| # ========================= | |
| def add_model_field(current_count): | |
| if current_count < MAX_MODELS: | |
| current_count += 1 | |
| return current_count | |
| def get_visible_textboxes(model_count): | |
| visibility = [] | |
| for i in range(1, MAX_MODELS + 1): | |
| visibility.append(gr.update(visible=(i <= model_count))) | |
| return visibility | |
| def add_dataset_field(current_count): | |
| if current_count < MAX_DATASETS: | |
| current_count += 1 | |
| return current_count | |
| def get_visible_datasets(dataset_count): | |
| visibility = [] | |
| for i in range(1, MAX_DATASETS + 1): | |
| visibility.append(gr.update(visible=(i <= dataset_count))) | |
| return visibility | |
| def run_benchmark_ui( | |
| model_1, | |
| model_2, | |
| model_3, | |
| model_4, | |
| model_5, | |
| model_count, | |
| dataset_1, | |
| dataset_2, | |
| dataset_3, | |
| dataset_4, | |
| dataset_5, | |
| dataset_count, | |
| num_samples, | |
| ): | |
| # Raccogli modelli | |
| model_names = [] | |
| all_models = [model_1, model_2, model_3, model_4, model_5] | |
| for i in range(model_count): | |
| name = (all_models[i] or "").strip() | |
| if name: | |
| model_names.append(name) | |
| # Raccogli dataset | |
| dataset_labels = [] | |
| all_datasets = [dataset_1, dataset_2, dataset_3, dataset_4, dataset_5] | |
| for i in range(dataset_count): | |
| label = all_datasets[i] | |
| if label in LABEL_TO_KEY: | |
| dataset_labels.append(label) | |
| if len(model_names) < 2: | |
| return pd.DataFrame(), "Devi specificare almeno due modelli validi." | |
| if len(dataset_labels) < 1: | |
| return pd.DataFrame(), "Devi selezionare almeno un dataset." | |
| logs = [] | |
| results = [] | |
| logs.append(f"Avvio benchmark con {num_samples} esempi per dataset...") | |
| logs.append(f"Modelli: {', '.join(model_names)}") | |
| logs.append(f"Dataset: {', '.join(dataset_labels)}") | |
| logs.append("Device: " + get_device()) | |
| logs.append("====================================") | |
| for model_name in model_names: | |
| logs.append(f"\n[MODELLO] {model_name}") | |
| try: | |
| tokenizer, model, device = load_model(model_name) | |
| except Exception as e: | |
| logs.append(f" ERRORE nel caricamento del modello: {e}") | |
| continue | |
| for dlabel in dataset_labels: | |
| dkey = LABEL_TO_KEY[dlabel] | |
| logs.append(f" [DATASET] {dlabel}") | |
| try: | |
| res = evaluate_model_on_dataset( | |
| model_name, tokenizer, model, device, dkey, num_samples | |
| ) | |
| results.append(res) | |
| avg_time_str = ( | |
| f"{res['avg_time_per_sample_sec']:.3f}" | |
| if res["avg_time_per_sample_sec"] is not None | |
| else "N/A" | |
| ) | |
| logs.append( | |
| f" - Esempi valutati: {res['num_samples']}\n" | |
| f" - Accuracy: {res['accuracy']:.3f}\n" | |
| f" - Tempo medio per esempio (s): {avg_time_str}\n" | |
| f" - Tempo totale (s): {res['total_time_sec']:.3f}" | |
| ) | |
| except Exception as e: | |
| logs.append(f" ERRORE durante il benchmark: {e}") | |
| if results: | |
| df = pd.DataFrame(results) | |
| df = df.sort_values(by=["dataset", "accuracy"], ascending=[True, False]) | |
| else: | |
| df = pd.DataFrame() | |
| log_text = "\n".join(str(l) for l in logs) | |
| return df, log_text | |
| # ========================= | |
| # Interfaccia Gradio | |
| # ========================= | |
| with gr.Blocks(title="LLM Benchmark Space - Multi-dataset") as demo: | |
| gr.Markdown( | |
| """ | |
| # 🔍 LLM Benchmark Space (multi-dataset) | |
| Inserisci i nomi dei modelli Hugging Face (es. `Mattimax/DAC4.3`) | |
| e confrontali su uno o più dataset selezionabili da menu a tendina. | |
| - Minimo **2 modelli** | |
| - Puoi aggiungere fino a **5 modelli** con il pulsante **"+ Aggiungi modello"** | |
| - Puoi selezionare **1 o più dataset** (fino a 5) con il pulsante **"+ Aggiungi dataset"** | |
| - Output: tabella con **modello**, **dataset**, **accuracy**, numero di esempi e tempi | |
| Dataset disponibili: | |
| - BoolQ (en) | |
| - SQuAD-it (it) | |
| - PAWS-X (it) | |
| - Sentiment-it (it) | |
| """ | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| # Stato numero modelli | |
| model_count_state = gr.State(value=2) | |
| model_1 = gr.Textbox( | |
| label="Modello 1", | |
| placeholder="es. Mattimax/DACMini-IT", | |
| value="", | |
| visible=True, | |
| ) | |
| model_2 = gr.Textbox( | |
| label="Modello 2", | |
| placeholder="es. Mattimax/DAC60M", | |
| value="", | |
| visible=True, | |
| ) | |
| model_3 = gr.Textbox( | |
| label="Modello 3", | |
| placeholder="Modello opzionale", | |
| value="", | |
| visible=False, | |
| ) | |
| model_4 = gr.Textbox( | |
| label="Modello 4", | |
| placeholder="Modello opzionale", | |
| value="", | |
| visible=False, | |
| ) | |
| model_5 = gr.Textbox( | |
| label="Modello 5", | |
| placeholder="Modello opzionale", | |
| value="", | |
| visible=False, | |
| ) | |
| add_model_button = gr.Button("+ Aggiungi modello") | |
| # Stato numero dataset | |
| dataset_count_state = gr.State(value=1) | |
| dataset_1 = gr.Dropdown( | |
| label="Dataset 1", | |
| choices=DATASET_LABELS, | |
| value="BoolQ (en)", | |
| visible=True, | |
| ) | |
| dataset_2 = gr.Dropdown( | |
| label="Dataset 2", | |
| choices=DATASET_LABELS, | |
| value="SQuAD-it (it)", | |
| visible=False, | |
| ) | |
| dataset_3 = gr.Dropdown( | |
| label="Dataset 3", | |
| choices=DATASET_LABELS, | |
| value="PAWS-X (it)", | |
| visible=False, | |
| ) | |
| dataset_4 = gr.Dropdown( | |
| label="Dataset 4", | |
| choices=DATASET_LABELS, | |
| value="Sentiment-it (it)", | |
| visible=False, | |
| ) | |
| dataset_5 = gr.Dropdown( | |
| label="Dataset 5", | |
| choices=DATASET_LABELS, | |
| value="BoolQ (en)", | |
| visible=False, | |
| ) | |
| add_dataset_button = gr.Button("+ Aggiungi dataset") | |
| num_samples = gr.Slider( | |
| minimum=10, | |
| maximum=200, | |
| step=10, | |
| value=DEFAULT_NUM_SAMPLES, | |
| label="Numero di esempi per dataset", | |
| ) | |
| run_button = gr.Button("🚀 Esegui benchmark", variant="primary") | |
| with gr.Column(): | |
| results_df = gr.Dataframe( | |
| headers=[ | |
| "model_name", | |
| "dataset", | |
| "num_samples", | |
| "accuracy", | |
| "avg_time_per_sample_sec", | |
| "total_time_sec", | |
| ], | |
| label="Risultati benchmark", | |
| interactive=False, | |
| ) | |
| logs_box = gr.Textbox( | |
| label="Log esecuzione", | |
| lines=25, | |
| interactive=False, | |
| ) | |
| # Logica "+ Aggiungi modello" | |
| def on_add_model(model_count): | |
| new_count = add_model_field(model_count) | |
| visibility_updates = get_visible_textboxes(new_count) | |
| return [new_count] + visibility_updates | |
| add_model_button.click( | |
| fn=on_add_model, | |
| inputs=[model_count_state], | |
| outputs=[model_count_state, model_1, model_2, model_3, model_4, model_5], | |
| ) | |
| # Logica "+ Aggiungi dataset" | |
| def on_add_dataset(dataset_count): | |
| new_count = add_dataset_field(dataset_count) | |
| visibility_updates = get_visible_datasets(new_count) | |
| return [new_count] + visibility_updates | |
| add_dataset_button.click( | |
| fn=on_add_dataset, | |
| inputs=[dataset_count_state], | |
| outputs=[dataset_count_state, dataset_1, dataset_2, dataset_3, dataset_4, dataset_5], | |
| ) | |
| # Logica "Esegui benchmark" | |
| run_button.click( | |
| fn=run_benchmark_ui, | |
| inputs=[ | |
| model_1, | |
| model_2, | |
| model_3, | |
| model_4, | |
| model_5, | |
| model_count_state, | |
| dataset_1, | |
| dataset_2, | |
| dataset_3, | |
| dataset_4, | |
| dataset_5, | |
| dataset_count_state, | |
| num_samples, | |
| ], | |
| outputs=[results_df, logs_box], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |