msm's picture
legend fixes
e45f29b
import streamlit as st
import polars as pl
import altair as alt
from huggingface_hub import hf_hub_download
import traceback
# --- Configuration and Setup ---
st.set_page_config(layout="wide")
alt.data_transformers.enable(
"default"
) # Use default, vegafusion might need server extension
# alt.data_transformers.enable("vegafusion") # Optional: if vegafusion works in your env
st.title("Search Arena V1 Dataset Analysis")
# --- Hugging Face Authentication & Data Loading ---
@st.cache_data(ttl=3600) # Cache data for 1 hour
def load_data():
"""Loads the dataset from Hugging Face Hub."""
try:
repo_id = "lmarena-ai/search-arena-v1-7k"
filename = "data/search-arena-v1-preference-7k.parquet"
# Download the parquet file using the token if provided
# hf_hub_download handles caching locally within the Space's container
local_path = hf_hub_download(
repo_id=repo_id, filename=filename, repo_type="dataset"
)
df = pl.read_parquet(local_path)
df = df.with_columns(pl.col("timestamp").dt.date().alias("date"))
return df
except Exception as e:
st.error(f"Error loading data: {e}")
st.warning(
"Ensure you have added your Hugging Face token (with read access) as a secret named 'HF_TOKEN' in your Space settings."
)
return None
df = load_data()
if df is None:
st.stop() # Stop execution if data loading failed
# --- Data Processing and Visualization ---
st.header("Dataset Overview")
# **Date Histogram**
st.subheader("Data Collection Timeline")
try:
date_counts = df.group_by("date").agg(pl.len().alias("count")).sort("date")
chart_dates = (
alt.Chart(date_counts.to_pandas()) # Convert to Pandas for Altair/Streamlit
.mark_bar()
.encode(
x=alt.X(
"date:T", axis=alt.Axis(labelAngle=0, title="Date")
), # Adjusted angle
y=alt.Y("count:Q", axis=alt.Axis(title="Frequency")),
tooltip=["date:T", "count:Q"],
)
.properties(title="Histogram of Dates")
)
st.altair_chart(chart_dates, use_container_width=True)
except Exception as e:
st.error(f"Error generating date histogram: {e}")
# --- Model Performance Analysis ---
st.header("Model Performance")
# **Calculate Model Statistics**
@st.cache_data(ttl=3600)
def calculate_model_stats(_df):
"""Calculates wins, losses, ties, and win rate for each model."""
unique_models = (
pl.concat([_df["model_a"], _df["model_b"]]).unique().sort().to_list()
)
results = []
for model in unique_models:
# Wins
wins_as_a = _df.filter(
(pl.col("model_a") == model) & (pl.col("winner") == "model_a")
).height
wins_as_b = _df.filter(
(pl.col("model_b") == model) & (pl.col("winner") == "model_b")
).height
total_wins = wins_as_a + wins_as_b
# Losses
losses_as_a = _df.filter(
(pl.col("model_a") == model) & (pl.col("winner") == "model_b")
).height
losses_as_b = _df.filter(
(pl.col("model_b") == model) & (pl.col("winner") == "model_a")
).height
total_losses = losses_as_a + losses_as_b
# Ties (including bothbad)
ties_as_a = _df.filter(
(pl.col("model_a") == model) & (pl.col("winner").str.contains("tie"))
).height
ties_as_b = _df.filter(
(pl.col("model_b") == model) & (pl.col("winner").str.contains("tie"))
).height
total_ties = ties_as_a + ties_as_b
# Total Matches
total_matches = _df.filter(
(pl.col("model_a") == model) | (pl.col("model_b") == model)
).height
# Win Rate
win_rate = (
round(total_wins / total_matches * 100, 2) if total_matches > 0 else 0
)
results.append(
{
"model": model,
"wins": total_wins,
"losses": total_losses,
"ties": total_ties,
"total_matches": total_matches,
"win_rate (%)": win_rate,
}
)
results_df = pl.DataFrame(results).sort("win_rate (%)", descending=True)
return results_df, unique_models
results_df, unique_models = calculate_model_stats(df)
st.subheader("Overall Win Rates")
st.dataframe(results_df.to_pandas(), use_container_width=True) # Display as dataframe
# **Head-to-Head Analysis**
st.subheader("Head-to-Head Matchups (Wins-Losses-Ties-BothBad)")
@st.cache_data(ttl=3600)
def calculate_head_to_head(_df, _unique_models):
"""Calculates head-to-head results."""
head_to_head = []
for model_1 in _unique_models:
row = {"model": model_1}
for model_2 in _unique_models:
if model_1 == model_2:
row[model_2] = "N/A"
continue
matches_ab = _df.filter(
(pl.col("model_a") == model_1) & (pl.col("model_b") == model_2)
)
matches_ba = _df.filter(
(pl.col("model_a") == model_2) & (pl.col("model_b") == model_1)
)
wins_1 = (
matches_ab.filter(pl.col("winner") == "model_a").height
+ matches_ba.filter(pl.col("winner") == "model_b").height
)
wins_2 = (
matches_ab.filter(pl.col("winner") == "model_b").height
+ matches_ba.filter(pl.col("winner") == "model_a").height
)
ties = (
matches_ab.filter(pl.col("winner") == "tie").height
+ matches_ba.filter(pl.col("winner") == "tie").height
)
bothbad = (
matches_ab.filter(pl.col("winner") == "tie (bothbad)").height
+ matches_ba.filter(pl.col("winner") == "tie (bothbad)").height
)
total = wins_1 + wins_2 + ties + bothbad
row[model_2] = (
f"{wins_1}-{wins_2}-{ties}-{bothbad}" if total > 0 else "0-0-0-0"
)
head_to_head.append(row)
return pl.DataFrame(head_to_head)
head_to_head_df = calculate_head_to_head(df, unique_models)
st.dataframe(head_to_head_df.to_pandas(), use_container_width=True)
# **Heatmaps**
st.subheader("Head-to-Head Heatmaps")
@st.cache_data(ttl=3600)
def prepare_heatmap_data(_head_to_head_df):
"""Prepares data for heatmaps."""
melted_df = _head_to_head_df.unpivot(index=["model"], variable_name="opponent")
parsed_data = []
for row in melted_df.iter_rows(named=True):
model, opponent, value = row["model"], row["opponent"], row["value"]
if value != "N/A":
try:
parts = value.split("-")
wins, losses, ties, bothbad = map(int, parts)
parsed_data.extend(
[
{
"model": model,
"opponent": opponent,
"metric": "wins",
"value": wins,
},
{
"model": model,
"opponent": opponent,
"metric": "losses",
"value": losses,
},
{
"model": model,
"opponent": opponent,
"metric": "ties",
"value": ties,
},
{
"model": model,
"opponent": opponent,
"metric": "bothbad",
"value": bothbad,
},
]
)
except (ValueError, IndexError):
st.warning(
f"Could not parse head-to-head value: '{value}' for {model} vs {opponent}"
)
continue # Skip malformed entries
return pl.DataFrame(parsed_data)
metrics_df = prepare_heatmap_data(head_to_head_df)
def create_heatmap(data_pd, metric, color_scheme):
"""Helper function to create an Altair heatmap."""
if data_pd.empty:
return alt.Chart(pd.DataFrame({"x": [], "y": [], "value": []})).mark_text(
text=f"No data for {metric}"
)
median_value = data_pd["value"].median()
heatmap = (
alt.Chart(data_pd)
.mark_rect()
.encode(
x=alt.X(
"opponent:N",
title="Opponent",
sort=unique_models,
axis=alt.Axis(labelLimit=0, labelAngle=90),
), # Ensure consistent sorting
y=alt.Y(
"model:N", title="Model", sort=unique_models
), # Ensure consistent sorting
color=alt.Color(
"value:Q",
scale=alt.Scale(scheme=color_scheme),
title=f"{metric.capitalize()}",
),
tooltip=["model", "opponent", "value"],
)
.properties(
title=f"{metric.capitalize()}", width=alt.Step(40), height=alt.Step(40)
) # Adjust step for size
)
text = heatmap.mark_text(baseline="middle").encode(
text="value:Q",
color=alt.condition(
f"datum.value > {median_value}", alt.value("white"), alt.value("black")
),
tooltip=["model", "opponent", "value"], # Ensure tooltip is on text too
)
return heatmap + text
# Filter and convert data for each heatmap within Streamlit rendering flow
col1, col2 = st.columns(2)
with col1:
try:
wins_pd = metrics_df.filter(pl.col("metric") == "wins").to_pandas()
if not wins_pd.empty:
wins_heatmap = create_heatmap(wins_pd, "wins", "greens")
st.altair_chart(wins_heatmap, use_container_width=True)
else:
st.write("No 'wins' data for heatmap.")
except Exception as e:
st.error(f"Error creating wins heatmap: {e}")
try:
ties_pd = metrics_df.filter(pl.col("metric") == "ties").to_pandas()
if not ties_pd.empty:
ties_heatmap = create_heatmap(ties_pd, "ties", "purples")
st.altair_chart(ties_heatmap, use_container_width=True)
else:
st.write("No 'ties' data for heatmap.")
except Exception as e:
st.error(f"Error creating ties heatmap: {e}")
with col2:
try:
losses_pd = metrics_df.filter(pl.col("metric") == "losses").to_pandas()
if not losses_pd.empty:
losses_heatmap = create_heatmap(losses_pd, "losses", "oranges")
st.altair_chart(losses_heatmap, use_container_width=True)
else:
st.write("No 'losses' data for heatmap.")
except Exception as e:
st.error(f"Error creating losses heatmap: {e}")
try:
bothbad_pd = metrics_df.filter(pl.col("metric") == "bothbad").to_pandas()
if not bothbad_pd.empty:
bothbad_heatmap = create_heatmap(bothbad_pd, "bothbad", "reds")
st.altair_chart(bothbad_heatmap, use_container_width=True)
else:
st.write("No 'bothbad' data for heatmap.")
except Exception as e:
st.error(f"Error creating bothbad heatmap: {e}")
# --- Detailed Outcome Analysis ---
st.header("Detailed Outcome Analysis")
# **Reshape Data for Detailed Analysis**
@st.cache_data(ttl=3600)
def reshape_data_for_analysis(_df):
"""Reshapes data for outcome analysis per model."""
# Helper to safely extract struct fields, returning None if field doesn't exist or is null
def safe_struct_field(col_name, field_name):
return (
pl.when(pl.col(col_name).struct.field(field_name).is_not_null())
.then(pl.col(col_name).struct.field(field_name))
.otherwise(None)
) # Or pl.lit(0) or appropriate default
df_model_a = _df.select(
[
pl.col("model_a").alias("model"),
pl.when(pl.col("winner") == "model_a")
.then(pl.lit("win"))
.when(pl.col("winner") == "tie")
.then(pl.lit("tie"))
.when(pl.col("winner") == "tie (bothbad)")
.then(pl.lit("tie (bothbad)"))
.otherwise(pl.lit("loss"))
.alias("outcome"),
"language",
"turn",
"date",
safe_struct_field("conv_metadata", "response_length_a").alias(
"response_length"
),
safe_struct_field("conv_metadata", "num_citations_a").alias(
"num_citations"
),
]
)
df_model_b = _df.select(
[
pl.col("model_b").alias("model"),
pl.when(pl.col("winner") == "model_b")
.then(pl.lit("win"))
.when(pl.col("winner") == "tie")
.then(pl.lit("tie"))
.when(pl.col("winner") == "tie (bothbad)")
.then(pl.lit("tie (bothbad)"))
.otherwise(pl.lit("loss"))
.alias("outcome"),
"language",
"turn",
"date",
safe_struct_field("conv_metadata", "response_length_b").alias(
"response_length"
),
safe_struct_field("conv_metadata", "num_citations_b").alias(
"num_citations"
),
]
)
# Drop rows where essential analysis columns are null if necessary
df_models = pl.concat([df_model_a, df_model_b], how="vertical")
df_models = df_models.fill_null(
0
) # Fill nulls after concat, maybe with 0 or a specific strategy
return df_models
df_models = reshape_data_for_analysis(df)
# **Calculate Rates**
@st.cache_data(ttl=3600)
def calculate_rates(_df_models):
"""Calculates various outcome rates per model."""
df_rates = (
_df_models.group_by("model")
.agg(
wins=pl.col("outcome").eq("win").sum(),
losses=pl.col("outcome").eq("loss").sum(),
ties=pl.col("outcome").eq("tie").sum(),
tie_bothbad=pl.col("outcome").eq("tie (bothbad)").sum(),
)
.with_columns(
total=pl.sum_horizontal(["wins", "losses", "ties", "tie_bothbad"]),
)
.filter(pl.col("total") > 0) # Avoid division by zero
.with_columns(
win_rate=pl.col("wins") / pl.col("total"),
loss_rate=pl.col("losses") / pl.col("total"),
tie_rate=pl.col("ties") / pl.col("total"),
tie_bothbad_rate=pl.col("tie_bothbad") / pl.col("total"),
weighted_rate=(
(pl.col("wins") + 0.5 * pl.col("ties") + 0.25 * pl.col("tie_bothbad"))
/ pl.col("total")
),
)
)
return df_rates
df_rates = calculate_rates(df_models)
# **Outcome Distribution Chart**
st.subheader("Outcome Distribution by Model")
try:
outcome_order = ["win", "loss", "tie", "tie (bothbad)"]
winner_bar = (
alt.Chart(df_models.to_pandas())
.mark_bar()
.encode(
x=alt.X(
"model:N",
title="Model",
sort=unique_models,
axis=alt.Axis(labelLimit=0),
),
y=alt.Y("count():Q", title="Count"),
color=alt.Color(
"outcome:N",
title="Outcome",
scale=alt.Scale(
domain=outcome_order,
range=["#1f77b4", "#d62728", "#2ca02c", "#9467bd"],
),
sort=outcome_order,
), # Explicit colors/order
tooltip=["model", "outcome", "count()"],
order=alt.Order(
"color_outcome_sort_index:Q"
), # Ensure stack order matches legend
)
.transform_calculate(
# Create a field for sorting based on the domain order
color_outcome_sort_index=f"{{'win': 0, 'loss': 1, 'tie': 2, 'tie (bothbad)': 3}}[datum.outcome]"
)
)
st.altair_chart(winner_bar, use_container_width=True)
except Exception as e:
st.error(f"Error generating outcome distribution chart: {e}")
# **Rates Chart**
st.subheader("Outcome Rates by Model")
try:
df_rates_long = (
df_rates.select(
["model", "win_rate", "loss_rate", "tie_rate", "tie_bothbad_rate"]
)
.unpivot(
index=["model"],
variable_name="rate_type",
value_name="rate_value",
)
.with_columns(
# Clean up rate type names for display
pl.col("rate_type").str.replace("_rate", "").str.to_titlecase()
)
)
rate_order = ["Win", "Loss", "Tie", "Tie Bothbad"] # Order for stacking and legend
stacked_bar = (
alt.Chart(df_rates_long.to_pandas())
.mark_bar()
.encode(
x=alt.X(
"model:N",
title="Model",
sort=unique_models,
axis=alt.Axis(labelLimit=0),
),
y=alt.Y(
"rate_value:Q",
title="Rate",
stack="normalize",
axis=alt.Axis(format="%"),
), # Normalize stack
color=alt.Color("rate_type:N", title="Rate Type", sort=rate_order),
order=alt.Order(
"color_rate_type_sort_index:Q"
), # Use calculation for stack order
tooltip=["model", "rate_type", alt.Tooltip("rate_value:Q", format=".1%")],
)
.transform_calculate(
# Create a field for sorting based on the domain order
color_rate_type_sort_index=f"{{'Win': 0, 'Loss': 1, 'Tie': 2, 'Tie Bothbad': 3}}[datum.rate_type]"
)
)
weighted_line = (
alt.Chart(df_rates.to_pandas())
.mark_line(point=True, color="orange", strokeDash=[5, 5]) # Dashed line
.encode(
x=alt.X("model:N", title="Model", sort=unique_models),
y=alt.Y(
"weighted_rate:Q", title="Weighted Rate", axis=alt.Axis(format=".1%")
),
tooltip=[
"model",
alt.Tooltip("weighted_rate:Q", title="Weighted Rate", format=".1%"),
],
)
)
rates_chart = (
(stacked_bar + weighted_line)
.properties(title="Stacked Outcome Rates by Model (Weighted Rate Overlay)")
.resolve_scale(y="independent")
) # Independent Y-axis for line vs bars
st.altair_chart(rates_chart, use_container_width=True)
except Exception as e:
st.error(f"Error generating rates chart: {e}")
# --- Multilingual Performance ---
st.header("Multilingual Performance")
# **Language Frequency**
st.subheader("Language Distribution")
try:
# Calculate language frequency from df_models which has one row per model appearance
lang_freq_df = (
df_models["language"].value_counts().rename({"count": "total_samples"})
)
language_freq_chart = (
alt.Chart(lang_freq_df.to_pandas())
.mark_bar()
.encode(
x=alt.X("language:N", sort="-y", title="Language"),
y=alt.Y("total_samples:Q", title="Number of Comparisons"),
tooltip=["language", "total_samples"],
)
)
st.altair_chart(language_freq_chart, use_container_width=True)
except Exception as e:
st.error(f"Error generating language frequency chart: {e}")
# **Win Rate by Language Heatmap**
st.subheader("Win Rate by Model and Language")
try:
win_rate_lang_df = (
df_models.group_by(["model", "language"])
.agg(wins=(pl.col("outcome") == "win").sum(), total=pl.len())
.filter(pl.col("total") > 0) # Avoid division by zero
.with_columns(win_rate=pl.col("wins") / pl.col("total"))
)
win_rate_language_heatmap = (
alt.Chart(win_rate_lang_df.to_pandas())
.mark_rect()
.encode(
x=alt.X(
"model:N",
title="Model",
sort=unique_models,
axis=alt.Axis(labelLimit=0),
),
y=alt.Y("language:N", title="Language"),
color=alt.Color(
"win_rate:Q",
title="Win Rate",
scale=alt.Scale(scheme="blues"),
legend=alt.Legend(format=".0%"),
),
tooltip=[
"model",
"language",
alt.Tooltip("win_rate:Q", format=".1%"),
"total",
],
)
)
st.altair_chart(win_rate_language_heatmap, use_container_width=True)
except Exception as e:
st.error(f"Error generating win rate by language heatmap: {e}")
# **Outcome Distribution by Language and Model (Faceted & Wrapped)**
st.subheader("Outcome Distribution by Language and Model")
try:
# --- Data Prep for Annotation (as per working example) ---
@st.cache_data(ttl=3600)
def prepare_data_for_language_facet(_df_models):
df_language_totals = _df_models.group_by("language").agg(total_samples=pl.len())
df_models_with_totals = _df_models.join(
df_language_totals, on="language", how="left"
)
return df_models_with_totals
df_models_for_facet = prepare_data_for_language_facet(df_models)
# Convert to Pandas for Altair
df_models_pd = df_models_for_facet.to_pandas()
# --- Base Bar Chart (using the full dataset) ---
bar_chart = (
alt.Chart(df_models_pd) # Use the common DataFrame
.mark_bar()
.encode(
x=alt.X(
"model:N",
title="Model",
axis=alt.Axis(labels=True, labelLimit=0),
), # Show labels, rotated
# --- Choose one Y encoding ---
# For absolute counts (like example):
y=alt.Y("count():Q", title="Count", stack="zero"),
# For normalized bars:
# y=alt.Y("count():Q", title="%", stack="normalize", axis=alt.Axis(format="%")),
# -----------------------------
color=alt.Color(
"outcome:N",
title="Outcome",
sort=outcome_order, # Use the existing outcome_order list
scale=alt.Scale(
domain=outcome_order,
range=[
"green",
"orange",
"lightblue",
"red",
],
),
),
tooltip=[
"model",
"language",
"outcome",
"count()",
alt.Tooltip("total_samples:Q", title="Total Samples in Language"),
],
order=alt.Order( # Important for consistent stacking, esp. if normalized
"color_outcome_sort_index:Q"
),
)
.transform_calculate(
# Field for sorting stack order
color_outcome_sort_index=f"{{'win': 0, 'loss': 1, 'tie': 2, 'tie (bothbad)': 3}}[datum.outcome]"
)
)
# --- Text Annotation (using the full dataset with transform_aggregate) ---
text_chart = (
alt.Chart(df_models_pd) # Use the *same* common DataFrame
.mark_text(
align="left", baseline="middle", dx=5, dy=-5, color="black", fontSize=10
) # Adjusted position/appearance
.encode(
# Position text relative to the top-left corner of the facet using values
x=alt.value(5), # Small offset from the left edge
y=alt.value(15), # Small offset from the top edge
text=alt.Text("total_samples:Q", format=",d"),
color=alt.value("black"), # Explicit text color
# Tooltip for the text itself (optional)
tooltip=[alt.Tooltip("total_samples:Q", title="Total Samples in Language")],
)
.transform_aggregate(
# Aggregate within Altair to get max total_samples per language
# Note: total_samples is constant per language, so max() is just a way to get it once
total_samples="max(total_samples)",
groupby=["language"], # Group by the faceting variable
)
)
# --- Layer the charts THEN Facet (Correct Approach) ---
language_outcome_chart = (
# Layer first using '+'
(bar_chart + text_chart)
.facet(
# Then facet the layered chart
facet=alt.Facet(
"language:N",
title="Language",
header=alt.Header(titleOrient="top", labelOrient="top"),
),
columns=5, # Wrap into 3 columns
)
.resolve_scale(
y="independent" # Resolve Y scale because text has different positioning logic
)
)
st.altair_chart(language_outcome_chart, use_container_width=True)
except Exception as e:
import traceback # Make sure traceback is imported at the top of your script
st.error(
f"Error generating faceted outcome distribution by language: {e}\n{traceback.format_exc()}"
)
# --- Performance Over Turns and Time ---
st.header("Performance Dynamics")
# **Win Rate by Turn Heatmap**
st.subheader("Win Rate by Turn")
try:
win_rate_turn_df = (
df_models.group_by(["model", "turn"])
.agg(wins=(pl.col("outcome") == "win").sum(), total=pl.len())
.filter(pl.col("total") > 0)
.with_columns(win_rate=pl.col("wins") / pl.col("total"))
)
win_rate_turn_heatmap = (
alt.Chart(win_rate_turn_df.to_pandas())
.mark_rect()
.encode(
x=alt.X("turn:O", title="Turn"), # Treat turn as ordinal
y=alt.Y(
"model:N",
title="Model",
sort=unique_models,
axis=alt.Axis(labelLimit=0),
),
color=alt.Color(
"win_rate:Q",
title="Win Rate",
scale=alt.Scale(scheme="blues"),
legend=alt.Legend(format=".0%"),
),
tooltip=[
"model",
"turn",
alt.Tooltip("win_rate:Q", format=".1%"),
"total",
],
)
.properties(title="Win Rate by Model and Turn")
)
st.altair_chart(win_rate_turn_heatmap, use_container_width=True)
except Exception as e:
st.error(f"Error generating win rate by turn heatmap: {e}")
# **Wins Over Time Line Chart**
st.subheader("Wins Over Time")
try:
wins_time_df = (
df_models.filter(pl.col("outcome") == "win")
.group_by(["date", "model"])
.agg(win_count=pl.len())
.sort("date")
)
time_line = (
alt.Chart(wins_time_df.to_pandas())
.mark_line(point=True)
.encode(
x=alt.X("date:T", title="Date"),
y=alt.Y("win_count:Q", title="Daily Win Count"),
color=alt.Color("model:N", title="Model"),
tooltip=["model", "date", "win_count"],
)
.properties(title="Wins by Model Over Time")
)
st.altair_chart(time_line, use_container_width=True)
except Exception as e:
st.error(f"Error generating wins over time chart: {e}")
# --- Response Characteristics ---
st.header("Response Characteristics vs. Outcome")
# **Response Length Boxplot (Faceted, No Outliers, Adjusted Y-Axis, Detailed Tooltip)**
st.subheader(
"Response Length Distribution by Outcome (Box Plot - Adjusted Scale, No Outliers)"
)
try:
# Filter using .and_() for logical AND
response_length_df = df_models.filter(
pl.col("response_length").is_not_null().and_(pl.col("response_length") >= 0)
)
# Convert to Pandas for Altair and percentile calculation
response_length_pd = response_length_df.to_pandas()
# --- Calculate a reasonable upper limit for the Y-axis ---
valid_lengths = response_length_pd["response_length"].dropna()
valid_lengths = valid_lengths[valid_lengths >= 0]
if not valid_lengths.empty:
upper_limit = valid_lengths.quantile(0.99)
upper_limit = max(upper_limit, 100) # Example minimum range
else:
upper_limit = 1000 # Default if no valid data
st.write(
f"(Note: Y-axis for Response Length capped at ~{int(upper_limit)} [99th percentile] to improve visibility)"
)
response_facet_boxplot = (
alt.Chart(response_length_pd)
.mark_boxplot(
extent="min-max", # Keep whiskers extending to min/max within 1.5*IQR
outliers=False, # Keep outliers hidden
)
.encode(
x=alt.X(
"model:N",
title="Model",
sort=unique_models,
axis=alt.Axis(labelLimit=0),
), # Keep axis formatting
y=alt.Y(
"response_length:Q",
title="Response Length",
scale=alt.Scale(domain=[0, upper_limit], clamp=True), # Keep clamping
),
color=alt.Color(
"model:N", title="Model", legend=None
), # Color by model for clarity within facet
)
.facet(
# Keep the same faceting by outcome
column=alt.Column("outcome:N", title="Outcome", sort=outcome_order)
)
.properties(
title="Response Length Distribution by Model per Outcome (Box Plot, Adjusted Scale, No Outliers)"
)
)
st.altair_chart(response_facet_boxplot, use_container_width=True)
except Exception as e:
import traceback # Ensure traceback is imported
st.error(f"Error generating response length boxplot: {e}\n{traceback.format_exc()}")
# --- Citations Scatter Plot (Faceted) ---
st.subheader("Number of Citations by Outcome")
try:
# Corrected filter using .and_() for logical AND
citation_df = df_models.filter(
pl.col("num_citations").is_not_null().and_(pl.col("num_citations") >= 0)
)
# Aggregate counts for bubble size
citation_agg = citation_df.group_by(["model", "outcome", "num_citations"]).agg(
count=pl.len()
)
# Convert to Pandas if needed
citation_agg_pd = citation_agg.to_pandas()
citation_facet = (
alt.Chart(citation_agg_pd)
.mark_circle()
.encode(
x=alt.X(
"model:N",
title="Model",
sort=unique_models,
axis=alt.Axis(labelLimit=0),
), # Keep axis formatting
y=alt.Y(
"num_citations:Q",
title="Number of Citations",
axis=alt.Axis(tickMinStep=1),
), # Ensure integer ticks
# --- Modify the legend within alt.Size ---
size=alt.Size(
"count:Q",
title="Number of Responses",
legend=alt.Legend(
symbolFillColor="lightblue", # Set legend symbol fill color to white
symbolStrokeColor="lightblue", # Optional: add a subtle border if needed
),
),
# ------------------------------------------
color=alt.Color(
"model:N", title="Model", legend=None
), # Keep model legend hidden
tooltip=["model", "outcome", "num_citations", "count"],
)
.facet(column=alt.Column("outcome:N", title="Outcome", sort=outcome_order))
.properties(title="Citations by Model per Outcome")
)
st.altair_chart(citation_facet, use_container_width=True)
except Exception as e:
import traceback # Ensure traceback is imported
st.error(f"Error generating citation facet plot: {e}\n{traceback.format_exc()}")
# --- Language Leaderboard ---
st.header("Top/Worst Models per Language")
@st.cache_data(ttl=3600)
def calculate_language_ranks(_df_models):
"""Calculates top/worst models based on win rate per language."""
df_win_rates = (
_df_models.group_by(["language", "model"])
.agg(wins=pl.col("outcome").eq("win").sum(), total=pl.len())
.filter(pl.col("total") > 5) # Require minimum samples for ranking
.with_columns(win_rate=pl.col("wins") / pl.col("total"))
)
if df_win_rates.is_empty():
return pl.DataFrame() # Return empty if no language meets criteria
df_ranked = df_win_rates.with_columns(
rank_top=pl.col("win_rate")
.rank(method="min", descending=True)
.over("language"),
rank_worst=pl.col("win_rate")
.rank(method="min", descending=False)
.over("language"),
)
df_top_1 = (
df_ranked.filter(pl.col("rank_top") == 1)
.group_by("language")
.agg(
top_model_1=pl.col("model").first(),
win_rate_1=pl.col("win_rate").first(),
total_samples=pl.col("total").sum(), # Sum totals for the language
)
)
df_top_2 = (
df_ranked.filter(pl.col("rank_top") == 2)
.group_by("language")
.agg(top_model_2=pl.col("model").first(), win_rate_2=pl.col("win_rate").first())
)
df_worst_1 = (
df_ranked.filter(pl.col("rank_worst") == 1)
.group_by("language")
.agg(
worst_model_1=pl.col("model").first(),
worst_win_rate_1=pl.col("win_rate").first(),
)
)
df_worst_2 = (
df_ranked.filter(pl.col("rank_worst") == 2)
.group_by("language")
.agg(
worst_model_2=pl.col("model").first(),
worst_win_rate_2=pl.col("win_rate").first(),
)
)
# Combine using outer joins to handle cases where ranks don't exist (e.g., < 4 models)
df_table = (
df_top_1.join(df_top_2, on="language", how="left")
.join(df_worst_1, on="language", how="left")
.join(df_worst_2, on="language", how="left")
.select(
[
"language",
"total_samples",
"top_model_1",
pl.col("win_rate_1").round(3).alias("WR #1"),
"top_model_2",
pl.col("win_rate_2").round(3).alias("WR #2"),
"worst_model_1",
pl.col("worst_win_rate_1").round(3).alias("WR Worst #1"),
"worst_model_2",
pl.col("worst_win_rate_2").round(3).alias("WR Worst #2"),
]
)
.sort("language")
.fill_null("N/A")
) # Fill missing ranks with N/A
return df_table
df_language_table = calculate_language_ranks(df_models)
if not df_language_table.is_empty():
st.subheader("Top & Bottom 2 Models by Win Rate per Language (Min 5 Comparisons)")
st.dataframe(df_language_table.to_pandas(), use_container_width=True)
else:
st.subheader("Top & Bottom 2 Models by Win Rate per Language")
st.write(
"Insufficient data (fewer than 5 comparisons) for one or more languages to generate rankings."
)
# **Model Ranking Counts**
if not df_language_table.is_empty():
st.subheader("How Often Models Rank Top/Worst Across Languages")
try:
df_top_1_counts = (
df_language_table["top_model_1"]
.value_counts()
.rename({"top_model_1": "model", "count": "Rank 1 Count"})
.sort("Rank 1 Count", descending=True)
)
df_worst_1_counts = (
df_language_table["worst_model_1"]
.value_counts()
.rename({"worst_model_1": "model", "count": "Worst Rank Count"})
.sort("Worst Rank Count", descending=True)
)
col1, col2 = st.columns(2)
with col1:
st.write("**Times Ranked #1**")
st.dataframe(df_top_1_counts.to_pandas(), use_container_width=True)
with col2:
st.write("**Times Ranked Worst**")
st.dataframe(df_worst_1_counts.to_pandas(), use_container_width=True)
except Exception as e:
st.error(f"Error generating model ranking counts: {e}")
# --- Footer ---
st.markdown("---")
st.markdown(
"Analysis based on the `lmarena-ai/search-arena-v1-7k` dataset on Hugging Face."
)