Mattimax's picture
Update app.py
e05153e verified
import time
import torch
import gradio as gr
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM
import pandas as pd
# =========================
# Configurazione generale
# =========================
MAX_MODELS = 5
MAX_DATASETS = 5
DEFAULT_NUM_SAMPLES = 50 # numero di esempi da usare per ogni dataset
def get_device():
if torch.cuda.is_available():
return "cuda"
return "cpu"
# =========================
# Definizione dataset
# =========================
DATASETS = {
"boolq_en": {
"label": "BoolQ (en)",
"language": "en",
"description": "Yes/No QA su contesti in inglese",
},
"squad_it": {
"label": "SQuAD-it (it)",
"language": "it",
"description": "QA estrattivo in italiano",
},
"pawsx_it": {
"label": "PAWS-X (it)",
"language": "it",
"description": "Parafrasi in italiano (stesso significato sì/no)",
},
"sentiment_it": {
"label": "Sentiment-it (it)",
"language": "it",
"description": "Sentiment positivo/negativo in italiano",
},
}
DATASET_LABELS = [cfg["label"] for cfg in DATASETS.values()]
LABEL_TO_KEY = {cfg["label"]: key for key, cfg in DATASETS.items()}
# =========================
# Loader dataset
# =========================
def load_boolq(num_samples=DEFAULT_NUM_SAMPLES):
ds = load_dataset("boolq", split="validation")
if num_samples is not None and num_samples < len(ds):
ds = ds.select(range(num_samples))
return ds
def load_squad_it(num_samples=DEFAULT_NUM_SAMPLES):
# Nota: se "squad_it" non esiste o ha split diversi, qui puoi adattare.
ds = load_dataset("squad_it", split="test")
if num_samples is not None and num_samples < len(ds):
ds = ds.select(range(num_samples))
return ds
def load_pawsx_it(num_samples=DEFAULT_NUM_SAMPLES):
ds = load_dataset("paws-x", "it", split="validation")
if num_samples is not None and num_samples < len(ds):
ds = ds.select(range(num_samples))
return ds
def load_sentiment_it(num_samples=DEFAULT_NUM_SAMPLES):
ds = load_dataset("sentiment-it", split="train")
if num_samples is not None and num_samples < len(ds):
ds = ds.select(range(num_samples))
return ds
# =========================
# Prompt & parsing
# =========================
def build_boolq_prompt_en(passage, question):
prompt = (
"You are a question answering system. "
"Answer strictly with 'yes' or 'no'.\n\n"
f"Passage: {passage}\n"
f"Question: {question}\n"
"Answer:"
)
return prompt
def build_boolq_prompt_it(passage, question):
prompt = (
"Sei un sistema di question answering. "
"Rispondi strettamente solo con 'sì' o 'no'.\n\n"
f"Testo: {passage}\n"
f"Domanda: {question}\n"
"Risposta:"
)
return prompt
def build_squad_it_prompt(context, question):
prompt = (
"Sei un sistema di question answering in italiano. "
"Rispondi con una breve frase che risponde alla domanda.\n\n"
f"Contesto: {context}\n"
f"Domanda: {question}\n"
"Risposta:"
)
return prompt
def build_pawsx_it_prompt(sentence1, sentence2):
prompt = (
"Sei un sistema di riconoscimento di parafrasi in italiano.\n"
"Ti verranno date due frasi. Devi dire se esprimono lo stesso significato.\n"
"Rispondi strettamente solo con 'sì' o 'no'.\n\n"
f"Frase 1: {sentence1}\n"
f"Frase 2: {sentence2}\n"
"Le due frasi hanno lo stesso significato?\n"
"Risposta:"
)
return prompt
def build_sentiment_it_prompt(text):
prompt = (
"Sei un sistema di analisi del sentiment in italiano.\n"
"Ti verrà dato un testo. Devi dire se il sentiment è positivo o negativo.\n"
"Rispondi strettamente solo con 'positivo' o 'negativo'.\n\n"
f"Testo: {text}\n"
"Sentiment:"
)
return prompt
def parse_yes_no(output_text):
"""
Estrae 'sì/si' o 'no' dall'output del modello.
Supporta anche 'yes'/'no' per modelli inglesi.
Ritorna True per sì/yes, False per no, None se non riconosciuto.
"""
text = output_text.strip().lower()
if not text:
return None
first = text.split()[0]
# italiano
if first.startswith("sì") or first.startswith("si"):
return True
if first.startswith("no"):
return False
# inglese
if first.startswith("yes"):
return True
if first.startswith("no"):
return False
return None
def parse_sentiment_it(output_text):
"""
Ritorna True per positivo, False per negativo, None se non riconosciuto.
"""
text = output_text.strip().lower()
if not text:
return None
first = text.split()[0]
if first.startswith("pos"):
return True
if first.startswith("neg"):
return False
return None
def normalize_text(s):
return " ".join(s.strip().lower().split())
# =========================
# Modello: load & generate
# =========================
def load_model(model_name):
device = get_device()
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
model.to(device)
model.eval()
return tokenizer, model, device
def generate_text(tokenizer, model, device, prompt, max_new_tokens=32):
inputs = tokenizer(prompt, return_tensors="pt").to(device)
with torch.no_grad():
output_ids = model.generate(
**inputs,
max_new_tokens=max_new_tokens,
do_sample=False,
temperature=0.0,
)
gen_text = tokenizer.decode(
output_ids[0][inputs["input_ids"].shape[-1]:],
skip_special_tokens=True,
)
return gen_text
# =========================
# Valutazione per dataset
# =========================
def evaluate_on_boolq(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES, lang="en"):
ds = load_boolq(num_samples=num_samples)
correct = 0
total = 0
times = []
for example in ds:
passage = example["passage"]
question = example["question"]
label = example["answer"] # True/False
if lang == "en":
prompt = build_boolq_prompt_en(passage, question)
else:
prompt = build_boolq_prompt_it(passage, question)
t0 = time.time()
gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=5)
t1 = time.time()
pred = parse_yes_no(gen_text)
total += 1
times.append(t1 - t0)
if pred is not None and pred == label:
correct += 1
accuracy = correct / total if total > 0 else 0.0
avg_time = sum(times) / len(times) if times else None
return {
"model_name": model_name,
"dataset": "BoolQ (en)" if lang == "en" else "BoolQ (it)",
"num_samples": total,
"accuracy": accuracy,
"avg_time_per_sample_sec": avg_time,
}
def evaluate_on_squad_it(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES):
ds = load_squad_it(num_samples=num_samples)
correct = 0
total = 0
times = []
for example in ds:
context = example["context"]
question = example["question"]
answers = example.get("answers", {})
gold_answers = answers.get("text", []) if isinstance(answers, dict) else []
prompt = build_squad_it_prompt(context, question)
t0 = time.time()
gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=32)
t1 = time.time()
pred = normalize_text(gen_text)
total += 1
times.append(t1 - t0)
if gold_answers:
gold_norm = [normalize_text(a) for a in gold_answers]
if any(g in pred or pred in g for g in gold_norm):
correct += 1
accuracy = correct / total if total > 0 else 0.0
avg_time = sum(times) / len(times) if times else None
return {
"model_name": model_name,
"dataset": "SQuAD-it (it)",
"num_samples": total,
"accuracy": accuracy,
"avg_time_per_sample_sec": avg_time,
}
def evaluate_on_pawsx_it(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES):
ds = load_pawsx_it(num_samples=num_samples)
correct = 0
total = 0
times = []
for example in ds:
s1 = example["sentence1"]
s2 = example["sentence2"]
label = example["label"] # 0: non-parafrasi, 1: parafrasi
prompt = build_pawsx_it_prompt(s1, s2)
t0 = time.time()
gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=5)
t1 = time.time()
pred = parse_yes_no(gen_text)
total += 1
times.append(t1 - t0)
if pred is not None:
is_paraphrase = (label == 1)
if pred == is_paraphrase:
correct += 1
accuracy = correct / total if total > 0 else 0.0
avg_time = sum(times) / len(times) if times else None
return {
"model_name": model_name,
"dataset": "PAWS-X (it)",
"num_samples": total,
"accuracy": accuracy,
"avg_time_per_sample_sec": avg_time,
}
def evaluate_on_sentiment_it(model_name, tokenizer, model, device, num_samples=DEFAULT_NUM_SAMPLES):
ds = load_sentiment_it(num_samples=num_samples)
correct = 0
total = 0
times = []
for example in ds:
text = example["text"]
label = example["label"] # 0: negativo, 1: positivo (tipico schema)
prompt = build_sentiment_it_prompt(text)
t0 = time.time()
gen_text = generate_text(tokenizer, model, device, prompt, max_new_tokens=5)
t1 = time.time()
pred = parse_sentiment_it(gen_text)
total += 1
times.append(t1 - t0)
if pred is not None:
is_positive = (label == 1)
if pred == is_positive:
correct += 1
accuracy = correct / total if total > 0 else 0.0
avg_time = sum(times) / len(times) if times else None
return {
"model_name": model_name,
"dataset": "Sentiment-it (it)",
"num_samples": total,
"accuracy": accuracy,
"avg_time_per_sample_sec": avg_time,
}
def evaluate_model_on_dataset(model_name, tokenizer, model, device, dataset_key, num_samples):
start_total = time.time()
if dataset_key == "boolq_en":
res = evaluate_on_boolq(model_name, tokenizer, model, device, num_samples=num_samples, lang="en")
elif dataset_key == "squad_it":
res = evaluate_on_squad_it(model_name, tokenizer, model, device, num_samples=num_samples)
elif dataset_key == "pawsx_it":
res = evaluate_on_pawsx_it(model_name, tokenizer, model, device, num_samples=num_samples)
elif dataset_key == "sentiment_it":
res = evaluate_on_sentiment_it(model_name, tokenizer, model, device, num_samples=num_samples)
else:
raise ValueError(f"Dataset non supportato: {dataset_key}")
total_time = time.time() - start_total
res["total_time_sec"] = total_time
return res
# =========================
# Funzioni per la UI
# =========================
def add_model_field(current_count):
if current_count < MAX_MODELS:
current_count += 1
return current_count
def get_visible_textboxes(model_count):
visibility = []
for i in range(1, MAX_MODELS + 1):
visibility.append(gr.update(visible=(i <= model_count)))
return visibility
def add_dataset_field(current_count):
if current_count < MAX_DATASETS:
current_count += 1
return current_count
def get_visible_datasets(dataset_count):
visibility = []
for i in range(1, MAX_DATASETS + 1):
visibility.append(gr.update(visible=(i <= dataset_count)))
return visibility
def run_benchmark_ui(
model_1,
model_2,
model_3,
model_4,
model_5,
model_count,
dataset_1,
dataset_2,
dataset_3,
dataset_4,
dataset_5,
dataset_count,
num_samples,
):
# Raccogli modelli
model_names = []
all_models = [model_1, model_2, model_3, model_4, model_5]
for i in range(model_count):
name = (all_models[i] or "").strip()
if name:
model_names.append(name)
# Raccogli dataset
dataset_labels = []
all_datasets = [dataset_1, dataset_2, dataset_3, dataset_4, dataset_5]
for i in range(dataset_count):
label = all_datasets[i]
if label in LABEL_TO_KEY:
dataset_labels.append(label)
if len(model_names) < 2:
return pd.DataFrame(), "Devi specificare almeno due modelli validi."
if len(dataset_labels) < 1:
return pd.DataFrame(), "Devi selezionare almeno un dataset."
logs = []
results = []
logs.append(f"Avvio benchmark con {num_samples} esempi per dataset...")
logs.append(f"Modelli: {', '.join(model_names)}")
logs.append(f"Dataset: {', '.join(dataset_labels)}")
logs.append("Device: " + get_device())
logs.append("====================================")
for model_name in model_names:
logs.append(f"\n[MODELLO] {model_name}")
try:
tokenizer, model, device = load_model(model_name)
except Exception as e:
logs.append(f" ERRORE nel caricamento del modello: {e}")
continue
for dlabel in dataset_labels:
dkey = LABEL_TO_KEY[dlabel]
logs.append(f" [DATASET] {dlabel}")
try:
res = evaluate_model_on_dataset(
model_name, tokenizer, model, device, dkey, num_samples
)
results.append(res)
avg_time_str = (
f"{res['avg_time_per_sample_sec']:.3f}"
if res["avg_time_per_sample_sec"] is not None
else "N/A"
)
logs.append(
f" - Esempi valutati: {res['num_samples']}\n"
f" - Accuracy: {res['accuracy']:.3f}\n"
f" - Tempo medio per esempio (s): {avg_time_str}\n"
f" - Tempo totale (s): {res['total_time_sec']:.3f}"
)
except Exception as e:
logs.append(f" ERRORE durante il benchmark: {e}")
if results:
df = pd.DataFrame(results)
df = df.sort_values(by=["dataset", "accuracy"], ascending=[True, False])
else:
df = pd.DataFrame()
log_text = "\n".join(str(l) for l in logs)
return df, log_text
# =========================
# Interfaccia Gradio
# =========================
with gr.Blocks(title="LLM Benchmark Space - Multi-dataset") as demo:
gr.Markdown(
"""
# 🔍 LLM Benchmark Space (multi-dataset)
Inserisci i nomi dei modelli Hugging Face (es. `Mattimax/DAC4.3`)
e confrontali su uno o più dataset selezionabili da menu a tendina.
- Minimo **2 modelli**
- Puoi aggiungere fino a **5 modelli** con il pulsante **"+ Aggiungi modello"**
- Puoi selezionare **1 o più dataset** (fino a 5) con il pulsante **"+ Aggiungi dataset"**
- Output: tabella con **modello**, **dataset**, **accuracy**, numero di esempi e tempi
Dataset disponibili:
- BoolQ (en)
- SQuAD-it (it)
- PAWS-X (it)
- Sentiment-it (it)
"""
)
with gr.Row():
with gr.Column():
# Stato numero modelli
model_count_state = gr.State(value=2)
model_1 = gr.Textbox(
label="Modello 1",
placeholder="es. Mattimax/DACMini-IT",
value="",
visible=True,
)
model_2 = gr.Textbox(
label="Modello 2",
placeholder="es. Mattimax/DAC60M",
value="",
visible=True,
)
model_3 = gr.Textbox(
label="Modello 3",
placeholder="Modello opzionale",
value="",
visible=False,
)
model_4 = gr.Textbox(
label="Modello 4",
placeholder="Modello opzionale",
value="",
visible=False,
)
model_5 = gr.Textbox(
label="Modello 5",
placeholder="Modello opzionale",
value="",
visible=False,
)
add_model_button = gr.Button("+ Aggiungi modello")
# Stato numero dataset
dataset_count_state = gr.State(value=1)
dataset_1 = gr.Dropdown(
label="Dataset 1",
choices=DATASET_LABELS,
value="BoolQ (en)",
visible=True,
)
dataset_2 = gr.Dropdown(
label="Dataset 2",
choices=DATASET_LABELS,
value="SQuAD-it (it)",
visible=False,
)
dataset_3 = gr.Dropdown(
label="Dataset 3",
choices=DATASET_LABELS,
value="PAWS-X (it)",
visible=False,
)
dataset_4 = gr.Dropdown(
label="Dataset 4",
choices=DATASET_LABELS,
value="Sentiment-it (it)",
visible=False,
)
dataset_5 = gr.Dropdown(
label="Dataset 5",
choices=DATASET_LABELS,
value="BoolQ (en)",
visible=False,
)
add_dataset_button = gr.Button("+ Aggiungi dataset")
num_samples = gr.Slider(
minimum=10,
maximum=200,
step=10,
value=DEFAULT_NUM_SAMPLES,
label="Numero di esempi per dataset",
)
run_button = gr.Button("🚀 Esegui benchmark", variant="primary")
with gr.Column():
results_df = gr.Dataframe(
headers=[
"model_name",
"dataset",
"num_samples",
"accuracy",
"avg_time_per_sample_sec",
"total_time_sec",
],
label="Risultati benchmark",
interactive=False,
)
logs_box = gr.Textbox(
label="Log esecuzione",
lines=25,
interactive=False,
)
# Logica "+ Aggiungi modello"
def on_add_model(model_count):
new_count = add_model_field(model_count)
visibility_updates = get_visible_textboxes(new_count)
return [new_count] + visibility_updates
add_model_button.click(
fn=on_add_model,
inputs=[model_count_state],
outputs=[model_count_state, model_1, model_2, model_3, model_4, model_5],
)
# Logica "+ Aggiungi dataset"
def on_add_dataset(dataset_count):
new_count = add_dataset_field(dataset_count)
visibility_updates = get_visible_datasets(new_count)
return [new_count] + visibility_updates
add_dataset_button.click(
fn=on_add_dataset,
inputs=[dataset_count_state],
outputs=[dataset_count_state, dataset_1, dataset_2, dataset_3, dataset_4, dataset_5],
)
# Logica "Esegui benchmark"
run_button.click(
fn=run_benchmark_ui,
inputs=[
model_1,
model_2,
model_3,
model_4,
model_5,
model_count_state,
dataset_1,
dataset_2,
dataset_3,
dataset_4,
dataset_5,
dataset_count_state,
num_samples,
],
outputs=[results_df, logs_box],
)
if __name__ == "__main__":
demo.launch()