import streamlit as st import polars as pl import altair as alt from huggingface_hub import hf_hub_download import traceback # --- Configuration and Setup --- st.set_page_config(layout="wide") alt.data_transformers.enable( "default" ) # Use default, vegafusion might need server extension # alt.data_transformers.enable("vegafusion") # Optional: if vegafusion works in your env st.title("Search Arena V1 Dataset Analysis") # --- Hugging Face Authentication & Data Loading --- @st.cache_data(ttl=3600) # Cache data for 1 hour def load_data(): """Loads the dataset from Hugging Face Hub.""" try: repo_id = "lmarena-ai/search-arena-v1-7k" filename = "data/search-arena-v1-preference-7k.parquet" # Download the parquet file using the token if provided # hf_hub_download handles caching locally within the Space's container local_path = hf_hub_download( repo_id=repo_id, filename=filename, repo_type="dataset" ) df = pl.read_parquet(local_path) df = df.with_columns(pl.col("timestamp").dt.date().alias("date")) return df except Exception as e: st.error(f"Error loading data: {e}") st.warning( "Ensure you have added your Hugging Face token (with read access) as a secret named 'HF_TOKEN' in your Space settings." ) return None df = load_data() if df is None: st.stop() # Stop execution if data loading failed # --- Data Processing and Visualization --- st.header("Dataset Overview") # **Date Histogram** st.subheader("Data Collection Timeline") try: date_counts = df.group_by("date").agg(pl.len().alias("count")).sort("date") chart_dates = ( alt.Chart(date_counts.to_pandas()) # Convert to Pandas for Altair/Streamlit .mark_bar() .encode( x=alt.X( "date:T", axis=alt.Axis(labelAngle=0, title="Date") ), # Adjusted angle y=alt.Y("count:Q", axis=alt.Axis(title="Frequency")), tooltip=["date:T", "count:Q"], ) .properties(title="Histogram of Dates") ) st.altair_chart(chart_dates, use_container_width=True) except Exception as e: st.error(f"Error generating date histogram: {e}") # --- Model Performance Analysis --- st.header("Model Performance") # **Calculate Model Statistics** @st.cache_data(ttl=3600) def calculate_model_stats(_df): """Calculates wins, losses, ties, and win rate for each model.""" unique_models = ( pl.concat([_df["model_a"], _df["model_b"]]).unique().sort().to_list() ) results = [] for model in unique_models: # Wins wins_as_a = _df.filter( (pl.col("model_a") == model) & (pl.col("winner") == "model_a") ).height wins_as_b = _df.filter( (pl.col("model_b") == model) & (pl.col("winner") == "model_b") ).height total_wins = wins_as_a + wins_as_b # Losses losses_as_a = _df.filter( (pl.col("model_a") == model) & (pl.col("winner") == "model_b") ).height losses_as_b = _df.filter( (pl.col("model_b") == model) & (pl.col("winner") == "model_a") ).height total_losses = losses_as_a + losses_as_b # Ties (including bothbad) ties_as_a = _df.filter( (pl.col("model_a") == model) & (pl.col("winner").str.contains("tie")) ).height ties_as_b = _df.filter( (pl.col("model_b") == model) & (pl.col("winner").str.contains("tie")) ).height total_ties = ties_as_a + ties_as_b # Total Matches total_matches = _df.filter( (pl.col("model_a") == model) | (pl.col("model_b") == model) ).height # Win Rate win_rate = ( round(total_wins / total_matches * 100, 2) if total_matches > 0 else 0 ) results.append( { "model": model, "wins": total_wins, "losses": total_losses, "ties": total_ties, "total_matches": total_matches, "win_rate (%)": win_rate, } ) results_df = pl.DataFrame(results).sort("win_rate (%)", descending=True) return results_df, unique_models results_df, unique_models = calculate_model_stats(df) st.subheader("Overall Win Rates") st.dataframe(results_df.to_pandas(), use_container_width=True) # Display as dataframe # **Head-to-Head Analysis** st.subheader("Head-to-Head Matchups (Wins-Losses-Ties-BothBad)") @st.cache_data(ttl=3600) def calculate_head_to_head(_df, _unique_models): """Calculates head-to-head results.""" head_to_head = [] for model_1 in _unique_models: row = {"model": model_1} for model_2 in _unique_models: if model_1 == model_2: row[model_2] = "N/A" continue matches_ab = _df.filter( (pl.col("model_a") == model_1) & (pl.col("model_b") == model_2) ) matches_ba = _df.filter( (pl.col("model_a") == model_2) & (pl.col("model_b") == model_1) ) wins_1 = ( matches_ab.filter(pl.col("winner") == "model_a").height + matches_ba.filter(pl.col("winner") == "model_b").height ) wins_2 = ( matches_ab.filter(pl.col("winner") == "model_b").height + matches_ba.filter(pl.col("winner") == "model_a").height ) ties = ( matches_ab.filter(pl.col("winner") == "tie").height + matches_ba.filter(pl.col("winner") == "tie").height ) bothbad = ( matches_ab.filter(pl.col("winner") == "tie (bothbad)").height + matches_ba.filter(pl.col("winner") == "tie (bothbad)").height ) total = wins_1 + wins_2 + ties + bothbad row[model_2] = ( f"{wins_1}-{wins_2}-{ties}-{bothbad}" if total > 0 else "0-0-0-0" ) head_to_head.append(row) return pl.DataFrame(head_to_head) head_to_head_df = calculate_head_to_head(df, unique_models) st.dataframe(head_to_head_df.to_pandas(), use_container_width=True) # **Heatmaps** st.subheader("Head-to-Head Heatmaps") @st.cache_data(ttl=3600) def prepare_heatmap_data(_head_to_head_df): """Prepares data for heatmaps.""" melted_df = _head_to_head_df.unpivot(index=["model"], variable_name="opponent") parsed_data = [] for row in melted_df.iter_rows(named=True): model, opponent, value = row["model"], row["opponent"], row["value"] if value != "N/A": try: parts = value.split("-") wins, losses, ties, bothbad = map(int, parts) parsed_data.extend( [ { "model": model, "opponent": opponent, "metric": "wins", "value": wins, }, { "model": model, "opponent": opponent, "metric": "losses", "value": losses, }, { "model": model, "opponent": opponent, "metric": "ties", "value": ties, }, { "model": model, "opponent": opponent, "metric": "bothbad", "value": bothbad, }, ] ) except (ValueError, IndexError): st.warning( f"Could not parse head-to-head value: '{value}' for {model} vs {opponent}" ) continue # Skip malformed entries return pl.DataFrame(parsed_data) metrics_df = prepare_heatmap_data(head_to_head_df) def create_heatmap(data_pd, metric, color_scheme): """Helper function to create an Altair heatmap.""" if data_pd.empty: return alt.Chart(pd.DataFrame({"x": [], "y": [], "value": []})).mark_text( text=f"No data for {metric}" ) median_value = data_pd["value"].median() heatmap = ( alt.Chart(data_pd) .mark_rect() .encode( x=alt.X( "opponent:N", title="Opponent", sort=unique_models, axis=alt.Axis(labelLimit=0, labelAngle=90), ), # Ensure consistent sorting y=alt.Y( "model:N", title="Model", sort=unique_models ), # Ensure consistent sorting color=alt.Color( "value:Q", scale=alt.Scale(scheme=color_scheme), title=f"{metric.capitalize()}", ), tooltip=["model", "opponent", "value"], ) .properties( title=f"{metric.capitalize()}", width=alt.Step(40), height=alt.Step(40) ) # Adjust step for size ) text = heatmap.mark_text(baseline="middle").encode( text="value:Q", color=alt.condition( f"datum.value > {median_value}", alt.value("white"), alt.value("black") ), tooltip=["model", "opponent", "value"], # Ensure tooltip is on text too ) return heatmap + text # Filter and convert data for each heatmap within Streamlit rendering flow col1, col2 = st.columns(2) with col1: try: wins_pd = metrics_df.filter(pl.col("metric") == "wins").to_pandas() if not wins_pd.empty: wins_heatmap = create_heatmap(wins_pd, "wins", "greens") st.altair_chart(wins_heatmap, use_container_width=True) else: st.write("No 'wins' data for heatmap.") except Exception as e: st.error(f"Error creating wins heatmap: {e}") try: ties_pd = metrics_df.filter(pl.col("metric") == "ties").to_pandas() if not ties_pd.empty: ties_heatmap = create_heatmap(ties_pd, "ties", "purples") st.altair_chart(ties_heatmap, use_container_width=True) else: st.write("No 'ties' data for heatmap.") except Exception as e: st.error(f"Error creating ties heatmap: {e}") with col2: try: losses_pd = metrics_df.filter(pl.col("metric") == "losses").to_pandas() if not losses_pd.empty: losses_heatmap = create_heatmap(losses_pd, "losses", "oranges") st.altair_chart(losses_heatmap, use_container_width=True) else: st.write("No 'losses' data for heatmap.") except Exception as e: st.error(f"Error creating losses heatmap: {e}") try: bothbad_pd = metrics_df.filter(pl.col("metric") == "bothbad").to_pandas() if not bothbad_pd.empty: bothbad_heatmap = create_heatmap(bothbad_pd, "bothbad", "reds") st.altair_chart(bothbad_heatmap, use_container_width=True) else: st.write("No 'bothbad' data for heatmap.") except Exception as e: st.error(f"Error creating bothbad heatmap: {e}") # --- Detailed Outcome Analysis --- st.header("Detailed Outcome Analysis") # **Reshape Data for Detailed Analysis** @st.cache_data(ttl=3600) def reshape_data_for_analysis(_df): """Reshapes data for outcome analysis per model.""" # Helper to safely extract struct fields, returning None if field doesn't exist or is null def safe_struct_field(col_name, field_name): return ( pl.when(pl.col(col_name).struct.field(field_name).is_not_null()) .then(pl.col(col_name).struct.field(field_name)) .otherwise(None) ) # Or pl.lit(0) or appropriate default df_model_a = _df.select( [ pl.col("model_a").alias("model"), pl.when(pl.col("winner") == "model_a") .then(pl.lit("win")) .when(pl.col("winner") == "tie") .then(pl.lit("tie")) .when(pl.col("winner") == "tie (bothbad)") .then(pl.lit("tie (bothbad)")) .otherwise(pl.lit("loss")) .alias("outcome"), "language", "turn", "date", safe_struct_field("conv_metadata", "response_length_a").alias( "response_length" ), safe_struct_field("conv_metadata", "num_citations_a").alias( "num_citations" ), ] ) df_model_b = _df.select( [ pl.col("model_b").alias("model"), pl.when(pl.col("winner") == "model_b") .then(pl.lit("win")) .when(pl.col("winner") == "tie") .then(pl.lit("tie")) .when(pl.col("winner") == "tie (bothbad)") .then(pl.lit("tie (bothbad)")) .otherwise(pl.lit("loss")) .alias("outcome"), "language", "turn", "date", safe_struct_field("conv_metadata", "response_length_b").alias( "response_length" ), safe_struct_field("conv_metadata", "num_citations_b").alias( "num_citations" ), ] ) # Drop rows where essential analysis columns are null if necessary df_models = pl.concat([df_model_a, df_model_b], how="vertical") df_models = df_models.fill_null( 0 ) # Fill nulls after concat, maybe with 0 or a specific strategy return df_models df_models = reshape_data_for_analysis(df) # **Calculate Rates** @st.cache_data(ttl=3600) def calculate_rates(_df_models): """Calculates various outcome rates per model.""" df_rates = ( _df_models.group_by("model") .agg( wins=pl.col("outcome").eq("win").sum(), losses=pl.col("outcome").eq("loss").sum(), ties=pl.col("outcome").eq("tie").sum(), tie_bothbad=pl.col("outcome").eq("tie (bothbad)").sum(), ) .with_columns( total=pl.sum_horizontal(["wins", "losses", "ties", "tie_bothbad"]), ) .filter(pl.col("total") > 0) # Avoid division by zero .with_columns( win_rate=pl.col("wins") / pl.col("total"), loss_rate=pl.col("losses") / pl.col("total"), tie_rate=pl.col("ties") / pl.col("total"), tie_bothbad_rate=pl.col("tie_bothbad") / pl.col("total"), weighted_rate=( (pl.col("wins") + 0.5 * pl.col("ties") + 0.25 * pl.col("tie_bothbad")) / pl.col("total") ), ) ) return df_rates df_rates = calculate_rates(df_models) # **Outcome Distribution Chart** st.subheader("Outcome Distribution by Model") try: outcome_order = ["win", "loss", "tie", "tie (bothbad)"] winner_bar = ( alt.Chart(df_models.to_pandas()) .mark_bar() .encode( x=alt.X( "model:N", title="Model", sort=unique_models, axis=alt.Axis(labelLimit=0), ), y=alt.Y("count():Q", title="Count"), color=alt.Color( "outcome:N", title="Outcome", scale=alt.Scale( domain=outcome_order, range=["#1f77b4", "#d62728", "#2ca02c", "#9467bd"], ), sort=outcome_order, ), # Explicit colors/order tooltip=["model", "outcome", "count()"], order=alt.Order( "color_outcome_sort_index:Q" ), # Ensure stack order matches legend ) .transform_calculate( # Create a field for sorting based on the domain order color_outcome_sort_index=f"{{'win': 0, 'loss': 1, 'tie': 2, 'tie (bothbad)': 3}}[datum.outcome]" ) ) st.altair_chart(winner_bar, use_container_width=True) except Exception as e: st.error(f"Error generating outcome distribution chart: {e}") # **Rates Chart** st.subheader("Outcome Rates by Model") try: df_rates_long = ( df_rates.select( ["model", "win_rate", "loss_rate", "tie_rate", "tie_bothbad_rate"] ) .unpivot( index=["model"], variable_name="rate_type", value_name="rate_value", ) .with_columns( # Clean up rate type names for display pl.col("rate_type").str.replace("_rate", "").str.to_titlecase() ) ) rate_order = ["Win", "Loss", "Tie", "Tie Bothbad"] # Order for stacking and legend stacked_bar = ( alt.Chart(df_rates_long.to_pandas()) .mark_bar() .encode( x=alt.X( "model:N", title="Model", sort=unique_models, axis=alt.Axis(labelLimit=0), ), y=alt.Y( "rate_value:Q", title="Rate", stack="normalize", axis=alt.Axis(format="%"), ), # Normalize stack color=alt.Color("rate_type:N", title="Rate Type", sort=rate_order), order=alt.Order( "color_rate_type_sort_index:Q" ), # Use calculation for stack order tooltip=["model", "rate_type", alt.Tooltip("rate_value:Q", format=".1%")], ) .transform_calculate( # Create a field for sorting based on the domain order color_rate_type_sort_index=f"{{'Win': 0, 'Loss': 1, 'Tie': 2, 'Tie Bothbad': 3}}[datum.rate_type]" ) ) weighted_line = ( alt.Chart(df_rates.to_pandas()) .mark_line(point=True, color="orange", strokeDash=[5, 5]) # Dashed line .encode( x=alt.X("model:N", title="Model", sort=unique_models), y=alt.Y( "weighted_rate:Q", title="Weighted Rate", axis=alt.Axis(format=".1%") ), tooltip=[ "model", alt.Tooltip("weighted_rate:Q", title="Weighted Rate", format=".1%"), ], ) ) rates_chart = ( (stacked_bar + weighted_line) .properties(title="Stacked Outcome Rates by Model (Weighted Rate Overlay)") .resolve_scale(y="independent") ) # Independent Y-axis for line vs bars st.altair_chart(rates_chart, use_container_width=True) except Exception as e: st.error(f"Error generating rates chart: {e}") # --- Multilingual Performance --- st.header("Multilingual Performance") # **Language Frequency** st.subheader("Language Distribution") try: # Calculate language frequency from df_models which has one row per model appearance lang_freq_df = ( df_models["language"].value_counts().rename({"count": "total_samples"}) ) language_freq_chart = ( alt.Chart(lang_freq_df.to_pandas()) .mark_bar() .encode( x=alt.X("language:N", sort="-y", title="Language"), y=alt.Y("total_samples:Q", title="Number of Comparisons"), tooltip=["language", "total_samples"], ) ) st.altair_chart(language_freq_chart, use_container_width=True) except Exception as e: st.error(f"Error generating language frequency chart: {e}") # **Win Rate by Language Heatmap** st.subheader("Win Rate by Model and Language") try: win_rate_lang_df = ( df_models.group_by(["model", "language"]) .agg(wins=(pl.col("outcome") == "win").sum(), total=pl.len()) .filter(pl.col("total") > 0) # Avoid division by zero .with_columns(win_rate=pl.col("wins") / pl.col("total")) ) win_rate_language_heatmap = ( alt.Chart(win_rate_lang_df.to_pandas()) .mark_rect() .encode( x=alt.X( "model:N", title="Model", sort=unique_models, axis=alt.Axis(labelLimit=0), ), y=alt.Y("language:N", title="Language"), color=alt.Color( "win_rate:Q", title="Win Rate", scale=alt.Scale(scheme="blues"), legend=alt.Legend(format=".0%"), ), tooltip=[ "model", "language", alt.Tooltip("win_rate:Q", format=".1%"), "total", ], ) ) st.altair_chart(win_rate_language_heatmap, use_container_width=True) except Exception as e: st.error(f"Error generating win rate by language heatmap: {e}") # **Outcome Distribution by Language and Model (Faceted & Wrapped)** st.subheader("Outcome Distribution by Language and Model") try: # --- Data Prep for Annotation (as per working example) --- @st.cache_data(ttl=3600) def prepare_data_for_language_facet(_df_models): df_language_totals = _df_models.group_by("language").agg(total_samples=pl.len()) df_models_with_totals = _df_models.join( df_language_totals, on="language", how="left" ) return df_models_with_totals df_models_for_facet = prepare_data_for_language_facet(df_models) # Convert to Pandas for Altair df_models_pd = df_models_for_facet.to_pandas() # --- Base Bar Chart (using the full dataset) --- bar_chart = ( alt.Chart(df_models_pd) # Use the common DataFrame .mark_bar() .encode( x=alt.X( "model:N", title="Model", axis=alt.Axis(labels=True, labelLimit=0), ), # Show labels, rotated # --- Choose one Y encoding --- # For absolute counts (like example): y=alt.Y("count():Q", title="Count", stack="zero"), # For normalized bars: # y=alt.Y("count():Q", title="%", stack="normalize", axis=alt.Axis(format="%")), # ----------------------------- color=alt.Color( "outcome:N", title="Outcome", sort=outcome_order, # Use the existing outcome_order list scale=alt.Scale( domain=outcome_order, range=[ "green", "orange", "lightblue", "red", ], ), ), tooltip=[ "model", "language", "outcome", "count()", alt.Tooltip("total_samples:Q", title="Total Samples in Language"), ], order=alt.Order( # Important for consistent stacking, esp. if normalized "color_outcome_sort_index:Q" ), ) .transform_calculate( # Field for sorting stack order color_outcome_sort_index=f"{{'win': 0, 'loss': 1, 'tie': 2, 'tie (bothbad)': 3}}[datum.outcome]" ) ) # --- Text Annotation (using the full dataset with transform_aggregate) --- text_chart = ( alt.Chart(df_models_pd) # Use the *same* common DataFrame .mark_text( align="left", baseline="middle", dx=5, dy=-5, color="black", fontSize=10 ) # Adjusted position/appearance .encode( # Position text relative to the top-left corner of the facet using values x=alt.value(5), # Small offset from the left edge y=alt.value(15), # Small offset from the top edge text=alt.Text("total_samples:Q", format=",d"), color=alt.value("black"), # Explicit text color # Tooltip for the text itself (optional) tooltip=[alt.Tooltip("total_samples:Q", title="Total Samples in Language")], ) .transform_aggregate( # Aggregate within Altair to get max total_samples per language # Note: total_samples is constant per language, so max() is just a way to get it once total_samples="max(total_samples)", groupby=["language"], # Group by the faceting variable ) ) # --- Layer the charts THEN Facet (Correct Approach) --- language_outcome_chart = ( # Layer first using '+' (bar_chart + text_chart) .facet( # Then facet the layered chart facet=alt.Facet( "language:N", title="Language", header=alt.Header(titleOrient="top", labelOrient="top"), ), columns=5, # Wrap into 3 columns ) .resolve_scale( y="independent" # Resolve Y scale because text has different positioning logic ) ) st.altair_chart(language_outcome_chart, use_container_width=True) except Exception as e: import traceback # Make sure traceback is imported at the top of your script st.error( f"Error generating faceted outcome distribution by language: {e}\n{traceback.format_exc()}" ) # --- Performance Over Turns and Time --- st.header("Performance Dynamics") # **Win Rate by Turn Heatmap** st.subheader("Win Rate by Turn") try: win_rate_turn_df = ( df_models.group_by(["model", "turn"]) .agg(wins=(pl.col("outcome") == "win").sum(), total=pl.len()) .filter(pl.col("total") > 0) .with_columns(win_rate=pl.col("wins") / pl.col("total")) ) win_rate_turn_heatmap = ( alt.Chart(win_rate_turn_df.to_pandas()) .mark_rect() .encode( x=alt.X("turn:O", title="Turn"), # Treat turn as ordinal y=alt.Y( "model:N", title="Model", sort=unique_models, axis=alt.Axis(labelLimit=0), ), color=alt.Color( "win_rate:Q", title="Win Rate", scale=alt.Scale(scheme="blues"), legend=alt.Legend(format=".0%"), ), tooltip=[ "model", "turn", alt.Tooltip("win_rate:Q", format=".1%"), "total", ], ) .properties(title="Win Rate by Model and Turn") ) st.altair_chart(win_rate_turn_heatmap, use_container_width=True) except Exception as e: st.error(f"Error generating win rate by turn heatmap: {e}") # **Wins Over Time Line Chart** st.subheader("Wins Over Time") try: wins_time_df = ( df_models.filter(pl.col("outcome") == "win") .group_by(["date", "model"]) .agg(win_count=pl.len()) .sort("date") ) time_line = ( alt.Chart(wins_time_df.to_pandas()) .mark_line(point=True) .encode( x=alt.X("date:T", title="Date"), y=alt.Y("win_count:Q", title="Daily Win Count"), color=alt.Color("model:N", title="Model"), tooltip=["model", "date", "win_count"], ) .properties(title="Wins by Model Over Time") ) st.altair_chart(time_line, use_container_width=True) except Exception as e: st.error(f"Error generating wins over time chart: {e}") # --- Response Characteristics --- st.header("Response Characteristics vs. Outcome") # **Response Length Boxplot (Faceted, No Outliers, Adjusted Y-Axis, Detailed Tooltip)** st.subheader( "Response Length Distribution by Outcome (Box Plot - Adjusted Scale, No Outliers)" ) try: # Filter using .and_() for logical AND response_length_df = df_models.filter( pl.col("response_length").is_not_null().and_(pl.col("response_length") >= 0) ) # Convert to Pandas for Altair and percentile calculation response_length_pd = response_length_df.to_pandas() # --- Calculate a reasonable upper limit for the Y-axis --- valid_lengths = response_length_pd["response_length"].dropna() valid_lengths = valid_lengths[valid_lengths >= 0] if not valid_lengths.empty: upper_limit = valid_lengths.quantile(0.99) upper_limit = max(upper_limit, 100) # Example minimum range else: upper_limit = 1000 # Default if no valid data st.write( f"(Note: Y-axis for Response Length capped at ~{int(upper_limit)} [99th percentile] to improve visibility)" ) response_facet_boxplot = ( alt.Chart(response_length_pd) .mark_boxplot( extent="min-max", # Keep whiskers extending to min/max within 1.5*IQR outliers=False, # Keep outliers hidden ) .encode( x=alt.X( "model:N", title="Model", sort=unique_models, axis=alt.Axis(labelLimit=0), ), # Keep axis formatting y=alt.Y( "response_length:Q", title="Response Length", scale=alt.Scale(domain=[0, upper_limit], clamp=True), # Keep clamping ), color=alt.Color( "model:N", title="Model", legend=None ), # Color by model for clarity within facet ) .facet( # Keep the same faceting by outcome column=alt.Column("outcome:N", title="Outcome", sort=outcome_order) ) .properties( title="Response Length Distribution by Model per Outcome (Box Plot, Adjusted Scale, No Outliers)" ) ) st.altair_chart(response_facet_boxplot, use_container_width=True) except Exception as e: import traceback # Ensure traceback is imported st.error(f"Error generating response length boxplot: {e}\n{traceback.format_exc()}") # --- Citations Scatter Plot (Faceted) --- st.subheader("Number of Citations by Outcome") try: # Corrected filter using .and_() for logical AND citation_df = df_models.filter( pl.col("num_citations").is_not_null().and_(pl.col("num_citations") >= 0) ) # Aggregate counts for bubble size citation_agg = citation_df.group_by(["model", "outcome", "num_citations"]).agg( count=pl.len() ) # Convert to Pandas if needed citation_agg_pd = citation_agg.to_pandas() citation_facet = ( alt.Chart(citation_agg_pd) .mark_circle() .encode( x=alt.X( "model:N", title="Model", sort=unique_models, axis=alt.Axis(labelLimit=0), ), # Keep axis formatting y=alt.Y( "num_citations:Q", title="Number of Citations", axis=alt.Axis(tickMinStep=1), ), # Ensure integer ticks # --- Modify the legend within alt.Size --- size=alt.Size( "count:Q", title="Number of Responses", legend=alt.Legend( symbolFillColor="lightblue", # Set legend symbol fill color to white symbolStrokeColor="lightblue", # Optional: add a subtle border if needed ), ), # ------------------------------------------ color=alt.Color( "model:N", title="Model", legend=None ), # Keep model legend hidden tooltip=["model", "outcome", "num_citations", "count"], ) .facet(column=alt.Column("outcome:N", title="Outcome", sort=outcome_order)) .properties(title="Citations by Model per Outcome") ) st.altair_chart(citation_facet, use_container_width=True) except Exception as e: import traceback # Ensure traceback is imported st.error(f"Error generating citation facet plot: {e}\n{traceback.format_exc()}") # --- Language Leaderboard --- st.header("Top/Worst Models per Language") @st.cache_data(ttl=3600) def calculate_language_ranks(_df_models): """Calculates top/worst models based on win rate per language.""" df_win_rates = ( _df_models.group_by(["language", "model"]) .agg(wins=pl.col("outcome").eq("win").sum(), total=pl.len()) .filter(pl.col("total") > 5) # Require minimum samples for ranking .with_columns(win_rate=pl.col("wins") / pl.col("total")) ) if df_win_rates.is_empty(): return pl.DataFrame() # Return empty if no language meets criteria df_ranked = df_win_rates.with_columns( rank_top=pl.col("win_rate") .rank(method="min", descending=True) .over("language"), rank_worst=pl.col("win_rate") .rank(method="min", descending=False) .over("language"), ) df_top_1 = ( df_ranked.filter(pl.col("rank_top") == 1) .group_by("language") .agg( top_model_1=pl.col("model").first(), win_rate_1=pl.col("win_rate").first(), total_samples=pl.col("total").sum(), # Sum totals for the language ) ) df_top_2 = ( df_ranked.filter(pl.col("rank_top") == 2) .group_by("language") .agg(top_model_2=pl.col("model").first(), win_rate_2=pl.col("win_rate").first()) ) df_worst_1 = ( df_ranked.filter(pl.col("rank_worst") == 1) .group_by("language") .agg( worst_model_1=pl.col("model").first(), worst_win_rate_1=pl.col("win_rate").first(), ) ) df_worst_2 = ( df_ranked.filter(pl.col("rank_worst") == 2) .group_by("language") .agg( worst_model_2=pl.col("model").first(), worst_win_rate_2=pl.col("win_rate").first(), ) ) # Combine using outer joins to handle cases where ranks don't exist (e.g., < 4 models) df_table = ( df_top_1.join(df_top_2, on="language", how="left") .join(df_worst_1, on="language", how="left") .join(df_worst_2, on="language", how="left") .select( [ "language", "total_samples", "top_model_1", pl.col("win_rate_1").round(3).alias("WR #1"), "top_model_2", pl.col("win_rate_2").round(3).alias("WR #2"), "worst_model_1", pl.col("worst_win_rate_1").round(3).alias("WR Worst #1"), "worst_model_2", pl.col("worst_win_rate_2").round(3).alias("WR Worst #2"), ] ) .sort("language") .fill_null("N/A") ) # Fill missing ranks with N/A return df_table df_language_table = calculate_language_ranks(df_models) if not df_language_table.is_empty(): st.subheader("Top & Bottom 2 Models by Win Rate per Language (Min 5 Comparisons)") st.dataframe(df_language_table.to_pandas(), use_container_width=True) else: st.subheader("Top & Bottom 2 Models by Win Rate per Language") st.write( "Insufficient data (fewer than 5 comparisons) for one or more languages to generate rankings." ) # **Model Ranking Counts** if not df_language_table.is_empty(): st.subheader("How Often Models Rank Top/Worst Across Languages") try: df_top_1_counts = ( df_language_table["top_model_1"] .value_counts() .rename({"top_model_1": "model", "count": "Rank 1 Count"}) .sort("Rank 1 Count", descending=True) ) df_worst_1_counts = ( df_language_table["worst_model_1"] .value_counts() .rename({"worst_model_1": "model", "count": "Worst Rank Count"}) .sort("Worst Rank Count", descending=True) ) col1, col2 = st.columns(2) with col1: st.write("**Times Ranked #1**") st.dataframe(df_top_1_counts.to_pandas(), use_container_width=True) with col2: st.write("**Times Ranked Worst**") st.dataframe(df_worst_1_counts.to_pandas(), use_container_width=True) except Exception as e: st.error(f"Error generating model ranking counts: {e}") # --- Footer --- st.markdown("---") st.markdown( "Analysis based on the `lmarena-ai/search-arena-v1-7k` dataset on Hugging Face." )