Spaces:
Running
Running
| import pandas as pd | |
| import json | |
| import numpy as np | |
| # Define game order | |
| GAME_ORDER = [ | |
| # "Super Mario Bros", # Commented out | |
| "Super Mario Bros", | |
| "Sokoban", | |
| "2048", | |
| "Candy Crush", | |
| # "Tetris (complete)", # Commented out | |
| "Tetris", | |
| "Ace Attorney" | |
| ] | |
| def get_organization(model_name): | |
| m = model_name.lower() | |
| if "claude" in m: | |
| return "anthropic" | |
| elif "gemini" in m: | |
| return "google" | |
| elif "o1" in m or "gpt" in m or "o3" in m or "o4" in m: | |
| return "openai" | |
| elif "deepseek" in m: | |
| return "deepseek" | |
| elif "llama" in m: | |
| return "meta" | |
| elif "grok" in m: | |
| return "xai" | |
| elif "qwen" in m: | |
| return "alibaba" | |
| elif "glm" in m: | |
| return "zhipu" | |
| elif "kimi" in m: | |
| return "moonshot" | |
| else: | |
| return "unknown" | |
| def get_sokoban_leaderboard(rank_data, limit_to_top_n=None): | |
| data = rank_data.get("Sokoban", {}).get("results", []) | |
| df = pd.DataFrame(data) | |
| df = df.rename(columns={ | |
| "model": "Player", | |
| "score": "Score", | |
| "steps": "Steps", | |
| "detail_box_on_target": "Detail Box On Target", | |
| "cracked_levels": "Levels Cracked" | |
| }) | |
| df["Organization"] = df["Player"].apply(get_organization) | |
| # Define columns to keep, ensuring 'Score' is present | |
| columns_to_keep = ["Player", "Organization", "Score", "Levels Cracked", "Detail Box On Target", "Steps"] | |
| # Filter to only columns that actually exist in the DataFrame after renaming | |
| df_columns = [col for col in columns_to_keep if col in df.columns] | |
| df = df[df_columns] | |
| if "Score" in df.columns: | |
| df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
| df = df.sort_values("Score", ascending=False) | |
| # Apply limit if specified | |
| if limit_to_top_n is not None: | |
| df = df.head(limit_to_top_n) | |
| return df | |
| def get_2048_leaderboard(rank_data, limit_to_top_n=None): | |
| data = rank_data.get("2048", {}).get("results", []) | |
| # --- Diagnostic Print Removed --- | |
| # if data and isinstance(data, list) and len(data) > 0 and isinstance(data[0], dict): | |
| # print(f"DEBUG_UTILS: Keys in first item of raw data for 2048: {list(data[0].keys())}") | |
| # elif not data: | |
| # print("DEBUG_UTILS: Raw data for 2048 is empty.") | |
| # else: | |
| # print("DEBUG_UTILS: Raw data for 2048 is not in the expected list of dicts format.") | |
| # --- End Diagnostic Print Removed --- | |
| df = pd.DataFrame(data) | |
| # print(f"DEBUG_UTILS: Columns after pd.DataFrame(data): {df.columns.tolist()}") # REMOVED | |
| df = df.rename(columns={ | |
| "model": "Player", | |
| "score": "Score", # From new JSON structure | |
| "details": "Details", # From new JSON structure | |
| "highest_tail": "Highest Tail" # Added new column | |
| # Old fields like "steps", "time", "rank" are removed | |
| }) | |
| # print(f"DEBUG_UTILS: Columns after rename: {df.columns.tolist()}") # REMOVED | |
| # Ensure 'Player' column exists before applying get_organization | |
| if "Player" in df.columns: | |
| df["Organization"] = df["Player"].apply(get_organization) | |
| else: | |
| # Handle case where 'Player' column might be missing after rename (should not happen with current logic) | |
| # print("DEBUG_UTILS: 'Player' column not found after rename, skipping Organization.") # REMOVED | |
| df["Organization"] = "unknown" # Fallback | |
| columns_to_keep = ["Player", "Organization", "Score", "Highest Tail", "Details"] # Added "Highest Tail" | |
| # Defensive check for 'Highest Tail' before filtering - REMOVED | |
| # if 'highest_tail' in df.columns and 'Highest Tail' not in df.columns: | |
| # print("DEBUG_UTILS: 'highest_tail' (lowercase) found, but 'Highest Tail' (capitalized) not. This indicates a rename issue.") | |
| # elif 'Highest Tail' not in df.columns and 'highest_tail' not in df.columns: | |
| # print("DEBUG_UTILS: Neither 'Highest Tail' nor 'highest_tail' found in columns before filtering.") | |
| # df_columns = [col for col in columns_to_keep if col in df.columns] # REMOVED logic that used df_columns | |
| # print(f"DEBUG_UTILS: df_columns selected (columns that are in columns_to_keep AND in df.columns): {df_columns}") # REMOVED | |
| # Ensure all columns in columns_to_keep exist in df, fill with np.nan if not | |
| for col_k in columns_to_keep: | |
| if col_k not in df.columns: | |
| # print(f"DEBUG_UTILS: Column '{col_k}' from columns_to_keep not found in DataFrame. Adding it with NaN values.") # REMOVED | |
| df[col_k] = np.nan # Or some other default like 'n/a' if appropriate | |
| df = df[columns_to_keep] # Use columns_to_keep directly after ensuring they exist | |
| # print(f"DEBUG_UTILS: Columns after final selection: {df.columns.tolist()}") # REMOVED | |
| if "Score" in df.columns: | |
| df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
| df = df.sort_values("Score", ascending=False) | |
| # Apply limit if specified | |
| if limit_to_top_n is not None: | |
| df = df.head(limit_to_top_n) | |
| return df | |
| def get_candy_leaderboard(rank_data, limit_to_top_n=None): | |
| data = rank_data.get("Candy Crush", {}).get("results", []) | |
| df = pd.DataFrame(data) | |
| df = df.rename(columns={ | |
| "model": "Player", | |
| "score": "Score", | |
| "details": "Details" | |
| }) | |
| df["Organization"] = df["Player"].apply(get_organization) | |
| columns_to_keep = ["Player", "Organization", "Score", "Details"] | |
| df_columns = [col for col in columns_to_keep if col in df.columns] | |
| df = df[df_columns] | |
| if "Score" in df.columns: | |
| df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
| df = df.sort_values("Score", ascending=False) | |
| # Apply limit if specified | |
| if limit_to_top_n is not None: | |
| df = df.head(limit_to_top_n) | |
| return df | |
| def get_tetris_planning_leaderboard(rank_data, limit_to_top_n=None): | |
| data = rank_data.get("Tetris", {}).get("results", []) | |
| df = pd.DataFrame(data) | |
| df = df.rename(columns={ | |
| "model": "Player", | |
| "score": "Score", # From new JSON structure | |
| "details": "Details" # From new JSON structure | |
| # Old fields like "steps_blocks", "rank" are removed | |
| }) | |
| df["Organization"] = df["Player"].apply(get_organization) | |
| columns_to_keep = ["Player", "Organization", "Score", "Details"] | |
| df_columns = [col for col in columns_to_keep if col in df.columns] | |
| df = df[df_columns] | |
| if "Score" in df.columns: | |
| df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
| df = df.sort_values("Score", ascending=False) | |
| # Apply limit if specified | |
| if limit_to_top_n is not None: | |
| df = df.head(limit_to_top_n) | |
| return df | |
| def get_ace_attorney_leaderboard(rank_data, limit_to_top_n=None): | |
| data = rank_data.get("Ace Attorney", {}).get("results", []) | |
| df = pd.DataFrame(data) | |
| df = df.rename(columns={ | |
| "model": "Player", | |
| "score": "Score", | |
| "progress": "Progress" | |
| }) | |
| df["Organization"] = df["Player"].apply(get_organization) | |
| # Define columns to keep | |
| columns_to_keep = ["Player", "Organization", "Score", "Progress"] | |
| # Filter to only columns that actually exist in the DataFrame after renaming | |
| df_columns = [col for col in columns_to_keep if col in df.columns] | |
| df = df[df_columns] | |
| if "Score" in df.columns: | |
| df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
| df = df.sort_values("Score", ascending=False) # Higher score is better | |
| # Apply limit if specified | |
| if limit_to_top_n is not None: | |
| df = df.head(limit_to_top_n) | |
| return df | |
| def get_mario_planning_leaderboard(rank_data, limit_to_top_n=None): | |
| data = rank_data.get("Super Mario Bros", {}).get("results", []) | |
| df = pd.DataFrame(data) | |
| df = df.rename(columns={ | |
| "model": "Player", | |
| "score": "Score", | |
| "detail_data": "Detail Data", | |
| "progress": "Progress" | |
| }) | |
| df["Organization"] = df["Player"].apply(get_organization) | |
| # Define columns to keep | |
| columns_to_keep = ["Player", "Organization", "Score", "Progress", "Detail Data"] | |
| df_columns = [col for col in columns_to_keep if col in df.columns] | |
| df = df[df_columns] | |
| if "Score" in df.columns: | |
| df["Score"] = pd.to_numeric(df["Score"], errors='coerce') | |
| df = df.sort_values("Score", ascending=False) | |
| # Apply limit if specified | |
| if limit_to_top_n is not None: | |
| df = df.head(limit_to_top_n) | |
| return df | |
| def calculate_rank_and_completeness(rank_data, selected_games): | |
| # Dictionary to store DataFrames for each game | |
| game_dfs = {} | |
| # Get DataFrames for selected games | |
| # if selected_games.get("Super Mario Bros"): # Commented out | |
| # game_dfs["Super Mario Bros"] = get_mario_leaderboard(rank_data) | |
| if selected_games.get("Super Mario Bros"): | |
| game_dfs["Super Mario Bros"] = get_mario_planning_leaderboard(rank_data) | |
| if selected_games.get("Sokoban"): | |
| game_dfs["Sokoban"] = get_sokoban_leaderboard(rank_data) | |
| if selected_games.get("2048"): | |
| game_dfs["2048"] = get_2048_leaderboard(rank_data) | |
| if selected_games.get("Candy Crush"): | |
| game_dfs["Candy Crush"] = get_candy_leaderboard(rank_data) | |
| # if selected_games.get("Tetris (complete)"): # Commented out | |
| # game_dfs["Tetris (complete)"] = get_tetris_leaderboard(rank_data) | |
| if selected_games.get("Tetris"): | |
| game_dfs["Tetris"] = get_tetris_planning_leaderboard(rank_data) | |
| if selected_games.get("Ace Attorney"): | |
| game_dfs["Ace Attorney"] = get_ace_attorney_leaderboard(rank_data) | |
| # Get all unique players | |
| all_players = set() | |
| for df in game_dfs.values(): | |
| all_players.update(df["Player"].unique()) | |
| all_players = sorted(list(all_players)) | |
| # Create results DataFrame | |
| results = [] | |
| for player in all_players: | |
| player_data = { | |
| "Player": player, | |
| "Organization": get_organization(player) | |
| } | |
| ranks = [] | |
| games_played = 0 | |
| # Calculate rank and completeness for each game | |
| for game in GAME_ORDER: | |
| if game in game_dfs: | |
| df = game_dfs[game] | |
| if player in df["Player"].values: | |
| games_played += 1 | |
| # Get player's score based on game type | |
| # if game == "Super Mario Bros": # Commented out | |
| # player_score = df[df["Player"] == player]["Score"].iloc[0] | |
| # rank = len(df[df["Score"] > player_score]) + 1 | |
| if game == "Super Mario Bros": | |
| player_score = df[df["Player"] == player]["Score"].iloc[0] | |
| rank = len(df[df["Score"] > player_score]) + 1 | |
| elif game == "Sokoban": | |
| player_score = df[df["Player"] == player]["Score"].iloc[0] | |
| rank = len(df[df["Score"] > player_score]) + 1 | |
| elif game == "2048": | |
| player_score = df[df["Player"] == player]["Score"].iloc[0] | |
| rank = len(df[df["Score"] > player_score]) + 1 | |
| elif game == "Candy Crush": | |
| player_score = df[df["Player"] == player]["Score"].iloc[0] | |
| rank = len(df[df["Score"] > player_score]) + 1 | |
| elif game in ["Tetris"]: | |
| player_score = df[df["Player"] == player]["Score"].iloc[0] | |
| rank = len(df[df["Score"] > player_score]) + 1 | |
| elif game == "Ace Attorney": | |
| player_score = df[df["Player"] == player]["Score"].iloc[0] | |
| rank = len(df[df["Score"] > player_score]) + 1 | |
| ranks.append(rank) | |
| player_data[f"{game} Score"] = player_score | |
| else: | |
| player_data[f"{game} Score"] = 'n/a' | |
| # Calculate average rank and completeness for sorting | |
| if ranks: | |
| player_data["Average Rank"] = round(np.mean(ranks), 2) | |
| player_data["Games Played"] = games_played | |
| else: | |
| player_data["Average Rank"] = float('inf') | |
| player_data["Games Played"] = 0 | |
| results.append(player_data) | |
| # Create DataFrame and sort by average rank and completeness | |
| df_results = pd.DataFrame(results) | |
| if not df_results.empty: | |
| # Sort by average rank (ascending) and games played (descending) | |
| df_results = df_results.sort_values( | |
| by=["Average Rank", "Games Played"], | |
| ascending=[True, False] | |
| ) | |
| # Drop the sorting columns | |
| df_results = df_results.drop(["Average Rank", "Games Played"], axis=1) | |
| return df_results | |
| def get_combined_leaderboard(rank_data, selected_games, limit_to_top_n=None): | |
| """ | |
| Get combined leaderboard for selected games | |
| Args: | |
| rank_data (dict): Dictionary containing rank data | |
| selected_games (dict): Dictionary of game names and their selection status | |
| limit_to_top_n (int, optional): Limit results to top N entries. None means no limit. | |
| Returns: | |
| pd.DataFrame: Combined leaderboard DataFrame | |
| """ | |
| # Dictionary to store DataFrames for each game | |
| game_dfs = {} | |
| # Get DataFrames for selected games | |
| # if selected_games.get("Super Mario Bros"): # Commented out | |
| # game_dfs["Super Mario Bros"] = get_mario_leaderboard(rank_data) | |
| if selected_games.get("Super Mario Bros"): | |
| game_dfs["Super Mario Bros"] = get_mario_planning_leaderboard(rank_data) | |
| if selected_games.get("Sokoban"): | |
| game_dfs["Sokoban"] = get_sokoban_leaderboard(rank_data) | |
| if selected_games.get("2048"): | |
| game_dfs["2048"] = get_2048_leaderboard(rank_data) | |
| if selected_games.get("Candy Crush"): | |
| game_dfs["Candy Crush"] = get_candy_leaderboard(rank_data) | |
| # if selected_games.get("Tetris (complete)"): # Commented out | |
| # game_dfs["Tetris (complete)"] = get_tetris_leaderboard(rank_data) | |
| if selected_games.get("Tetris"): | |
| game_dfs["Tetris"] = get_tetris_planning_leaderboard(rank_data) | |
| if selected_games.get("Ace Attorney"): | |
| game_dfs["Ace Attorney"] = get_ace_attorney_leaderboard(rank_data) | |
| # Get all unique players | |
| all_players = set() | |
| for df in game_dfs.values(): | |
| all_players.update(df["Player"].unique()) | |
| all_players = sorted(list(all_players)) | |
| # Create results DataFrame | |
| results = [] | |
| for player in all_players: | |
| player_data = { | |
| "Player": player, | |
| "Organization": get_organization(player) | |
| } | |
| # Add scores for each game | |
| for game in GAME_ORDER: | |
| if game in game_dfs: | |
| df = game_dfs[game] | |
| if player in df["Player"].values: | |
| # if game == "Super Mario Bros": # Commented out | |
| # player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
| if game == "Super Mario Bros": | |
| player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
| elif game == "Sokoban": | |
| player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
| elif game == "2048": | |
| player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
| elif game == "Candy Crush": | |
| player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
| elif game in ["Tetris"]: | |
| player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
| elif game == "Ace Attorney": | |
| player_data[f"{game} Score"] = df[df["Player"] == player]["Score"].iloc[0] | |
| else: | |
| player_data[f"{game} Score"] = 'n/a' | |
| results.append(player_data) | |
| # Create DataFrame | |
| df_results = pd.DataFrame(results) | |
| # Calculate normalized scores and average normalized score | |
| if not df_results.empty: | |
| # Import the normalize_values function from data_visualization | |
| from data_visualization import normalize_values | |
| # Calculate normalized scores for each game | |
| game_score_columns = [] | |
| for game in GAME_ORDER: | |
| score_col = f"{game} Score" | |
| if score_col in df_results.columns: | |
| game_score_columns.append(score_col) | |
| # Get numeric values, replacing 'n/a' with NaN | |
| # Use where() to avoid FutureWarning about downcasting in replace() | |
| series = df_results[score_col].copy() | |
| series = series.where(series != 'n/a', np.nan) | |
| numeric_scores = pd.to_numeric(series, errors='coerce') | |
| # Skip games where all scores are NaN or 0 | |
| valid_scores = numeric_scores.dropna() | |
| if len(valid_scores) > 0 and valid_scores.sum() > 0: | |
| mean = valid_scores.mean() | |
| std = valid_scores.std() if len(valid_scores) > 1 else 0 | |
| # Calculate normalized scores for all players | |
| normalized_scores = [] | |
| for _, row in df_results.iterrows(): | |
| score = row[score_col] | |
| if score == 'n/a' or pd.isna(score): | |
| normalized_scores.append(0) | |
| else: | |
| normalized_scores.append(normalize_values([float(score)], mean, std)[0]) | |
| df_results[f"norm_{score_col}"] = normalized_scores | |
| else: | |
| # If no valid scores, set all normalized scores to 0 | |
| df_results[f"norm_{score_col}"] = 0 | |
| # Calculate average normalized score across games | |
| normalized_columns = [f"norm_{col}" for col in game_score_columns if f"norm_{col}" in df_results.columns] | |
| if normalized_columns: | |
| df_results["Avg Normalized Score"] = df_results[normalized_columns].mean(axis=1).round(2) | |
| else: | |
| df_results["Avg Normalized Score"] = 0.0 | |
| # Reorder columns to put Avg Normalized Score after Organization | |
| base_columns = ["Player", "Organization", "Avg Normalized Score"] | |
| game_columns = [col for col in df_results.columns if col.endswith(" Score") and not col.startswith("norm_") and col != "Avg Normalized Score"] | |
| other_columns = [col for col in df_results.columns if col not in base_columns + game_columns and not col.startswith("norm_")] | |
| # Create final column order | |
| final_columns = base_columns + game_columns + other_columns | |
| df_results = df_results[final_columns] | |
| # Sort by average normalized score in descending order | |
| df_results = df_results.sort_values("Avg Normalized Score", ascending=False) | |
| # Apply limit if specified | |
| if limit_to_top_n is not None: | |
| df_results = df_results.head(limit_to_top_n) | |
| return df_results | |