import time import torch import gradio as gr from datasets import load_dataset from transformers import AutoTokenizer, AutoModelForCausalLM import pandas as pd # ========================= # Configurazione generale # ========================= MAX_MODELS = 5 MAX_DATASETS = 5 DEFAULT_NUM_SAMPLES = 50 # numero di esempi da usare per ogni dataset def get_device(): if torch.cuda.is_available(): return "cuda" return "cpu" # ========================= # Definizione dataset # ========================= DATASETS = { "boolq_en": { "label": "BoolQ (en)", "language": "en", "description": "Yes/No QA su contesti in inglese", }, "squad_it": { "label": "SQuAD-it (it)", "language": "it", "description": "QA estrattivo in italiano", }, "pawsx_it": { "label": "PAWS-X (it)", "language": "it", "description": "Parafrasi in italiano (stesso significato sì/no)", }, "sentiment_it": { "label": "Sentiment-it (it)", "language": "it", "description": "Sentiment positivo/negativo in italiano", }, } DATASET_LABELS = [cfg["label"] for cfg in DATASETS.values()] LABEL_TO_KEY = {cfg["label"]: key for key, cfg in DATASETS.items()} # ========================= # Loader dataset # ========================= def load_boolq(num_samples=DEFAULT_NUM_SAMPLES): ds = load_dataset("boolq", split="validation") if num_samples is not None and num_samples < len(ds): ds = ds.select(range(num_samples)) return ds def load_squad_it(num_samples=DEFAULT_NUM_SAMPLES): # Nota: se "squad_it" non esiste o ha split diversi, qui puoi adattare. ds = load_dataset("squad_it", split="test") if num_samples is not None and num_samples < len(ds): ds = ds.select(range(num_samples)) return ds def load_pawsx_it(num_samples=DEFAULT_NUM_SAMPLES): ds = load_dataset("paws-x", "it", split="validation") if num_samples is not None and num_samples < len(ds): ds = ds.select(range(num_samples)) return ds def load_sentiment_it(num_samples=DEFAULT_NUM_SAMPLES): ds = load_dataset("sentiment-it", split="train") if num_samples is not None and num_samples < len(ds): ds = ds.select(range(num_samples)) return ds # ========================= # Prompt & parsing # ========================= def build_boolq_prompt_en(passage, question): prompt = ( "You are a question answering system. " "Answer strictly with 'yes' or 'no'.\n\n" f"Passage: {passage}\n" f"Question: {question}\n" "Answer:" ) return prompt def build_boolq_prompt_it(passage, question): prompt = ( "Sei un sistema di question answering. " "Rispondi strettamente solo con 'sì' o 'no'.\n\n" f"Testo: {passage}\n" f"Domanda: {question}\n" "Risposta:" ) return prompt def build_squad_it_prompt(context, question): prompt = ( "Sei un sistema di question answering in italiano. " "Rispondi con una breve frase che risponde alla domanda.\n\n" f"Contesto: {context}\n" f"Domanda: {question}\n" "Risposta:" ) return prompt def build_pawsx_it_prompt(sentence1, sentence2): prompt = ( "Sei un sistema di riconoscimento di parafrasi in italiano.\n" "Ti verranno date due frasi. Devi dire se esprimono lo stesso significato.\n" "Rispondi strettamente solo con 'sì' o 'no'.\n\n" f"Frase 1: {sentence1}\n" f"Frase 2: {sentence2}\n" "Le due frasi hanno lo stesso significato?\n" "Risposta:" ) return prompt def build_sentiment_it_prompt(text): prompt = ( "Sei un sistema di analisi del sentiment in italiano.\n" "Ti verrà dato un testo. Devi dire se il sentiment è positivo o negativo.\n" "Rispondi strettamente solo con 'positivo' o 'negativo'.\n\n" f"Testo: {text}\n" "Sentiment:" ) return prompt def parse_yes_no(output_text): """ Estrae 'sì/si' o 'no' dall'output del modello. Supporta anche 'yes'/'no' per modelli inglesi. Ritorna True per sì/yes, False per no, None se non riconosciuto. """ text = output_text.strip().lower() if not text: return None first = text.split()[0] # italiano if first.startswith("sì") or first.startswith("si"): return True if first.startswith("no"): return False # inglese if first.startswith("yes"): return True if first.startswith("no"): return False return None def parse_sentiment_it(output_text): """ Ritorna True per positivo, False per negativo, None se non riconosciuto. """ text = output_text.strip().lower() if not text: return None first = text.split()[0] if first.startswith("pos"): return True if first.startswith("neg"): return False return None def normalize_text(s): return " ".join(s.strip().lower().split()) # ========================= # Modello: load & generate # ========================= def load_model(model_name): device = get_device() tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForCausalLM.from_pretrained(model_name) model.to(device) model.eval() return tokenizer, model, device def generate_text(tokenizer, model, device, prompt, max_new_tokens=32): inputs = tokenizer(prompt, return_tensors="pt").to(device) with torch.no_grad(): output_ids = model.generate( **inputs, max_new_tokens=max_new_tokens, do_sample=False, temperature=0.0, ) gen_text = tokenizer.decode( output_ids[0][inputs["input_ids"].shape[-1]:], skip_special_tokens=True, ) return gen_text # ========================= # Valutazione per dataset # ========================= def evaluate_on_boolq(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES, lang="en"): ds = load_boolq(num_samples=num_samples) correct = 0 total = 0 times = [] for example in ds: passage = example["passage"] question = example["question"] label = example["answer"] # True/False if lang == "en": prompt = build_boolq_prompt_en(passage, question) else: prompt = build_boolq_prompt_it(passage, question) t0 = time.time() gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=5) t1 = time.time() pred = parse_yes_no(gen_text) total += 1 times.append(t1 - t0) if pred is not None and pred == label: correct += 1 accuracy = correct / total if total > 0 else 0.0 avg_time = sum(times) / len(times) if times else None return { "model_name": model_name, "dataset": "BoolQ (en)" if lang == "en" else "BoolQ (it)", "num_samples": total, "accuracy": accuracy, "avg_time_per_sample_sec": avg_time, } def evaluate_on_squad_it(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES): ds = load_squad_it(num_samples=num_samples) correct = 0 total = 0 times = [] for example in ds: context = example["context"] question = example["question"] answers = example.get("answers", {}) gold_answers = answers.get("text", []) if isinstance(answers, dict) else [] prompt = build_squad_it_prompt(context, question) t0 = time.time() gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=32) t1 = time.time() pred = normalize_text(gen_text) total += 1 times.append(t1 - t0) if gold_answers: gold_norm = [normalize_text(a) for a in gold_answers] if any(g in pred or pred in g for g in gold_norm): correct += 1 accuracy = correct / total if total > 0 else 0.0 avg_time = sum(times) / len(times) if times else None return { "model_name": model_name, "dataset": "SQuAD-it (it)", "num_samples": total, "accuracy": accuracy, "avg_time_per_sample_sec": avg_time, } def evaluate_on_pawsx_it(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES): ds = load_pawsx_it(num_samples=num_samples) correct = 0 total = 0 times = [] for example in ds: s1 = example["sentence1"] s2 = example["sentence2"] label = example["label"] # 0: non-parafrasi, 1: parafrasi prompt = build_pawsx_it_prompt(s1, s2) t0 = time.time() gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=5) t1 = time.time() pred = parse_yes_no(gen_text) total += 1 times.append(t1 - t0) if pred is not None: is_paraphrase = (label == 1) if pred == is_paraphrase: correct += 1 accuracy = correct / total if total > 0 else 0.0 avg_time = sum(times) / len(times) if times else None return { "model_name": model_name, "dataset": "PAWS-X (it)", "num_samples": total, "accuracy": accuracy, "avg_time_per_sample_sec": avg_time, } def evaluate_on_sentiment_it(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES): ds = load_sentiment_it(num_samples=num_samples) correct = 0 total = 0 times = [] for example in ds: text = example["text"] label = example["label"] # 0: negativo, 1: positivo (tipico schema) prompt = build_sentiment_it_prompt(text) t0 = time.time() gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=5) t1 = time.time() pred = parse_sentiment_it(gen_text) total += 1 times.append(t1 - t0) if pred is not None: is_positive = (label == 1) if pred == is_positive: correct += 1 accuracy = correct / total if total > 0 else 0.0 avg_time = sum(times) / len(times) if times else None return { "model_name": model_name, "dataset": "Sentiment-it (it)", "num_samples": total, "accuracy": accuracy, "avg_time_per_sample_sec": avg_time, } def evaluate_model_on_dataset(model_name, tokenizer, model, device, dataset_key, num_samples): start_total = time.time() if dataset_key == "boolq_en": res = evaluate_on_boolq(model_name, tokenizer, model, device, num_samples=num_samples, lang="en") elif dataset_key == "squad_it": res = evaluate_on_squad_it(model_name, tokenizer, model, device, num_samples=num_samples) elif dataset_key == "pawsx_it": res = evaluate_on_pawsx_it(model_name, tokenizer, model, device, num_samples=num_samples) elif dataset_key == "sentiment_it": res = evaluate_on_sentiment_it(model_name, tokenizer, model, device, num_samples=num_samples) else: raise ValueError(f"Dataset non supportato: {dataset_key}") total_time = time.time() - start_total res["total_time_sec"] = total_time return res # ========================= # Funzioni per la UI # ========================= def add_model_field(current_count): if current_count < MAX_MODELS: current_count += 1 return current_count def get_visible_textboxes(model_count): visibility = [] for i in range(1, MAX_MODELS + 1): visibility.append(gr.update(visible=(i <= model_count))) return visibility def add_dataset_field(current_count): if current_count < MAX_DATASETS: current_count += 1 return current_count def get_visible_datasets(dataset_count): visibility = [] for i in range(1, MAX_DATASETS + 1): visibility.append(gr.update(visible=(i <= dataset_count))) return visibility def run_benchmark_ui( model_1, model_2, model_3, model_4, model_5, model_count, dataset_1, dataset_2, dataset_3, dataset_4, dataset_5, dataset_count, num_samples, ): # Raccogli modelli model_names = [] all_models = [model_1, model_2, model_3, model_4, model_5] for i in range(model_count): name = (all_models[i] or "").strip() if name: model_names.append(name) # Raccogli dataset dataset_labels = [] all_datasets = [dataset_1, dataset_2, dataset_3, dataset_4, dataset_5] for i in range(dataset_count): label = all_datasets[i] if label in LABEL_TO_KEY: dataset_labels.append(label) if len(model_names) < 2: return pd.DataFrame(), "Devi specificare almeno due modelli validi." if len(dataset_labels) < 1: return pd.DataFrame(), "Devi selezionare almeno un dataset." logs = [] results = [] logs.append(f"Avvio benchmark con {num_samples} esempi per dataset...") logs.append(f"Modelli: {', '.join(model_names)}") logs.append(f"Dataset: {', '.join(dataset_labels)}") logs.append("Device: " + get_device()) logs.append("====================================") for model_name in model_names: logs.append(f"\n[MODELLO] {model_name}") try: tokenizer, model, device = load_model(model_name) except Exception as e: logs.append(f" ERRORE nel caricamento del modello: {e}") continue for dlabel in dataset_labels: dkey = LABEL_TO_KEY[dlabel] logs.append(f" [DATASET] {dlabel}") try: res = evaluate_model_on_dataset( model_name, tokenizer, model, device, dkey, num_samples ) results.append(res) avg_time_str = ( f"{res['avg_time_per_sample_sec']:.3f}" if res["avg_time_per_sample_sec"] is not None else "N/A" ) logs.append( f" - Esempi valutati: {res['num_samples']}\n" f" - Accuracy: {res['accuracy']:.3f}\n" f" - Tempo medio per esempio (s): {avg_time_str}\n" f" - Tempo totale (s): {res['total_time_sec']:.3f}" ) except Exception as e: logs.append(f" ERRORE durante il benchmark: {e}") if results: df = pd.DataFrame(results) df = df.sort_values(by=["dataset", "accuracy"], ascending=[True, False]) else: df = pd.DataFrame() log_text = "\n".join(str(l) for l in logs) return df, log_text # ========================= # Interfaccia Gradio # ========================= with gr.Blocks(title="LLM Benchmark Space - Multi-dataset") as demo: gr.Markdown( """ # 🔍 LLM Benchmark Space (multi-dataset) Inserisci i nomi dei modelli Hugging Face (es. `Mattimax/DAC4.3`) e confrontali su uno o più dataset selezionabili da menu a tendina. - Minimo **2 modelli** - Puoi aggiungere fino a **5 modelli** con il pulsante **"+ Aggiungi modello"** - Puoi selezionare **1 o più dataset** (fino a 5) con il pulsante **"+ Aggiungi dataset"** - Output: tabella con **modello**, **dataset**, **accuracy**, numero di esempi e tempi Dataset disponibili: - BoolQ (en) - SQuAD-it (it) - PAWS-X (it) - Sentiment-it (it) """ ) with gr.Row(): with gr.Column(): # Stato numero modelli model_count_state = gr.State(value=2) model_1 = gr.Textbox( label="Modello 1", placeholder="es. Mattimax/DACMini-IT", value="", visible=True, ) model_2 = gr.Textbox( label="Modello 2", placeholder="es. Mattimax/DAC60M", value="", visible=True, ) model_3 = gr.Textbox( label="Modello 3", placeholder="Modello opzionale", value="", visible=False, ) model_4 = gr.Textbox( label="Modello 4", placeholder="Modello opzionale", value="", visible=False, ) model_5 = gr.Textbox( label="Modello 5", placeholder="Modello opzionale", value="", visible=False, ) add_model_button = gr.Button("+ Aggiungi modello") # Stato numero dataset dataset_count_state = gr.State(value=1) dataset_1 = gr.Dropdown( label="Dataset 1", choices=DATASET_LABELS, value="BoolQ (en)", visible=True, ) dataset_2 = gr.Dropdown( label="Dataset 2", choices=DATASET_LABELS, value="SQuAD-it (it)", visible=False, ) dataset_3 = gr.Dropdown( label="Dataset 3", choices=DATASET_LABELS, value="PAWS-X (it)", visible=False, ) dataset_4 = gr.Dropdown( label="Dataset 4", choices=DATASET_LABELS, value="Sentiment-it (it)", visible=False, ) dataset_5 = gr.Dropdown( label="Dataset 5", choices=DATASET_LABELS, value="BoolQ (en)", visible=False, ) add_dataset_button = gr.Button("+ Aggiungi dataset") num_samples = gr.Slider( minimum=10, maximum=200, step=10, value=DEFAULT_NUM_SAMPLES, label="Numero di esempi per dataset", ) run_button = gr.Button("🚀 Esegui benchmark", variant="primary") with gr.Column(): results_df = gr.Dataframe( headers=[ "model_name", "dataset", "num_samples", "accuracy", "avg_time_per_sample_sec", "total_time_sec", ], label="Risultati benchmark", interactive=False, ) logs_box = gr.Textbox( label="Log esecuzione", lines=25, interactive=False, ) # Logica "+ Aggiungi modello" def on_add_model(model_count): new_count = add_model_field(model_count) visibility_updates = get_visible_textboxes(new_count) return [new_count] + visibility_updates add_model_button.click( fn=on_add_model, inputs=[model_count_state], outputs=[model_count_state, model_1, model_2, model_3, model_4, model_5], ) # Logica "+ Aggiungi dataset" def on_add_dataset(dataset_count): new_count = add_dataset_field(dataset_count) visibility_updates = get_visible_datasets(new_count) return [new_count] + visibility_updates add_dataset_button.click( fn=on_add_dataset, inputs=[dataset_count_state], outputs=[dataset_count_state, dataset_1, dataset_2, dataset_3, dataset_4, dataset_5], ) # Logica "Esegui benchmark" run_button.click( fn=run_benchmark_ui, inputs=[ model_1, model_2, model_3, model_4, model_5, model_count_state, dataset_1, dataset_2, dataset_3, dataset_4, dataset_5, dataset_count_state, num_samples, ], outputs=[results_df, logs_box], ) if __name__ == "__main__": demo.launch()