from shiny import App, render, reactive, ui import pandas as pd import numpy as np import random import io import csv import urllib3 from io import StringIO import os # HTTP & API import requests import urllib3 from huggingface_hub import login, hf_hub_download # Constants CSV_FILE = "hf://datasets/TJStatsApps/scouting_reports/elo_rankings.csv" CSV_FILE_NAME = "elo_rankings.csv" HF_REPO_ID = "TJStatsApps/scouting_reports" HF_FILENAME = "elo_rankings.csv" PITCH_TYPES = ["Hit", "Decisions", "Power", "Speed", "Defense"] EVAL_CATEGORIES = ["ETA", "FV"] # Hugging Face login using access token from environment variable ACCESS_TOKEN = os.environ.get("ACCESS_TOKEN") if ACCESS_TOKEN: login(token=ACCESS_TOKEN) ADMIN_PW = os.getenv("ADMIN_PW") VALID_PASSWORDS = [] VALID_PASSWORDS.append(ADMIN_PW) # Initial ELO rating for new players INITIAL_ELO = 1200 K_FACTOR = 32 # ELO adjustment factor # Pre-defined list of 20 players (fallback if CSV can't be loaded) df_player_names = pd.read_csv('top_100_prospects_canonical_names.csv') DEFAULT_PLAYERS = df_player_names['canonical_name'].to_list() def load_rankings_from_hf(): """Load ELO rankings from Hugging Face dataset""" try: http = urllib3.PoolManager() response = http.request( 'GET', "https://huggingface.co/datasets/TJStatsApps/scouting_reports/resolve/main/elo_rankings.csv" ) if response.status == 200: csv_data = StringIO(response.data.decode('utf-8')) df = pd.read_csv( csv_data, quoting=csv.QUOTE_ALL, escapechar='\\', on_bad_lines='warn' ) # Convert back to internal format (Player -> name, etc.) df = df.rename(columns={ "Player": "name", "ELO": "elo", "Matches": "matches", "Wins": "wins" }) # Remove rank and win% columns if they exist df = df[["name", "elo", "matches", "wins"]] return df else: print(f"Failed to load rankings: HTTP {response.status}") return None except Exception as e: print(f"Error loading rankings from HF: {e}") return None # Define the login UI login_ui = ui.page_fluid( ui.card( ui.input_password("password", "Enter Password:", width="50%"), ui.tags.input( type="checkbox", id="authenticated", value=False, disabled=True ), ui.input_action_button("login", "Login", class_="btn-primary"), ui.output_text("login_message"), ) ) main_ui = ui.page_sidebar( ui.sidebar( ui.h3("Current Matchup"), ui.output_ui("current_matchup"), ui.br(), ui.input_action_button("new_matchup", "Generate New Matchup", class_="btn-primary"), ui.br(), ui.br(), ui.input_action_button("reset_all", "Reset All Data", class_="btn-danger"), width='600px' ), ui.div( ui.h2("ELO Rankings"), ui.div( ui.input_action_button("update_rankings", "Update Rankings", class_="btn-info", style="margin-right: 10px;"), ui.input_action_button("upload_rankings", "Upload to HF", class_="btn-warning"), style="float: right; margin-bottom: 10px;" ), style="overflow: hidden;" ), ui.card( ui.output_data_frame("rankings_table") ), ui.h2("Match History"), ui.card( ui.output_data_frame("match_history") ) ) # Combined UI with conditional panel app_ui = ui.page_fluid( ui.tags.head( ui.tags.script(src="script.js") ), ui.panel_conditional( "!input.authenticated", login_ui ), ui.panel_conditional( "input.authenticated", main_ui ) ) def server(input, output, session): # Try to load rankings from Hugging Face, fallback to default players @reactive.Effect @reactive.event(input.login) def check_password(): if input.password() in VALID_PASSWORDS: ui.update_checkbox("authenticated", value=True) ui.update_text("login_message", value="") else: ui.update_text("login_message", value="Invalid password!") ui.update_text("password", value="") @output @render.text def login_message(): return "" loaded_df = load_rankings_from_hf() if loaded_df is not None: initial_df = loaded_df ui.notification_show("Rankings loaded from Hugging Face!", type="message") else: initial_df = pd.DataFrame({ "name": DEFAULT_PLAYERS, "elo": [INITIAL_ELO] * len(DEFAULT_PLAYERS), "matches": [0] * len(DEFAULT_PLAYERS), "wins": [0] * len(DEFAULT_PLAYERS) }) ui.notification_show("Using default players (couldn't load from HF)", type="warning") # Reactive values to store data players_data = reactive.value(initial_df) matches_data = reactive.value(pd.DataFrame(columns=["player1", "player2", "winner", "player1_elo_before", "player2_elo_before", "player1_elo_after", "player2_elo_after"])) current_player1 = reactive.value("") current_player2 = reactive.value("") rankings_update_trigger = reactive.value(0) # Trigger for manual rankings update def calculate_elo_change(rating_a, rating_b, score_a): """Calculate ELO rating change for player A""" expected_a = 1 / (1 + 10 ** ((rating_b - rating_a) / 400)) return K_FACTOR * (score_a - expected_a) # def generate_random_matchup(): # """Generate a random matchup between two players, prioritizing those with fewer matchups""" # df = players_data() # if len(df) >= 2: # # Calculate weights - players with fewer matchups get higher weights # # Add 1 to avoid division by zero, then invert so fewer matchups = higher weight # max_matchups = df["matches"].max() if df["matches"].max() > 0 else 1 # weights = (max_matchups + 1) - df["matches"] # # Convert to probability weights # total_weight = weights.sum() # probabilities = weights / total_weight # # Select first player based on weights # p1 = np.random.choice(df["name"].tolist(), p=probabilities.tolist()) # # For second player, exclude the first player and recalculate weights # remaining_df = df[df["name"] != p1].copy() # if len(remaining_df) > 0: # remaining_weights = (max_matchups + 1) - remaining_df["matches"] # remaining_total = remaining_weights.sum() # remaining_probs = remaining_weights / remaining_total # p2 = np.random.choice(remaining_df["name"].tolist(), p=remaining_probs.tolist()) # else: # # Fallback if somehow no other players available # p2 = df[df["name"] != p1]["name"].iloc[0] # current_player1.set(p1) # current_player2.set(p2) # # Generate initial matchup when app starts # @reactive.effect # def _(): # if current_player1() == "" and current_player2() == "": # generate_random_matchup() def generate_random_matchup(): """Generate a random matchup between two players, prioritizing those with more wins""" df = players_data() if len(df) >= 2: df_small = df.copy() # Calculate weights - players with more wins get higher weights # Add 1 to give all players a base chance, then add wins for scaling weights = 1 + (df_small["wins"] ** 1) # Convert to probability weights total_weight = weights.sum() probabilities = weights / total_weight # Select first player based on weights p1 = np.random.choice(df_small["name"].tolist(), p=probabilities.tolist()) # For second player, exclude the first player and recalculate weights remaining_df = df[(df["name"] != p1)].copy() if len(remaining_df) > 0: remaining_weights = 1 + remaining_df["wins"] ** 1 remaining_total = remaining_weights.sum() remaining_probs = remaining_weights / remaining_total p2 = np.random.choice(remaining_df["name"].tolist(), p=remaining_probs.tolist()) else: # Fallback if somehow no other players available p2 = df[df["name"] != p1]["name"].iloc[0] current_player1.set(p1) current_player2.set(p2) # Generate initial matchup when app starts @reactive.effect def _(): if current_player1() == "" and current_player2() == "": generate_random_matchup() @reactive.effect @reactive.event(input.new_matchup) def generate_new_matchup(): """Generate a new random matchup""" generate_random_matchup() @reactive.effect @reactive.event(input.update_rankings) def trigger_rankings_update(): """Manually trigger rankings table update""" rankings_update_trigger.set(rankings_update_trigger() + 1) ui.notification_show("Rankings table updated!", type="message") @reactive.effect @reactive.event(input.upload_rankings) def upload_to_huggingface(): """Upload current rankings to Hugging Face""" try: df = players_data() # Sort by ELO rating (descending) df_sorted = df.sort_values("elo", ascending=False).reset_index(drop=True) # Add rank column df_sorted["rank"] = range(1, len(df_sorted) + 1) # Calculate win percentage df_sorted["win_pct"] = (df_sorted["wins"] / df_sorted["matches"] * 100).round(1) df_sorted.loc[df_sorted["matches"] == 0, "win_pct"] = 0 # Format columns for upload upload_df = df_sorted[["rank", "name", "elo", "matches", "wins", "win_pct"]].copy() upload_df["elo"] = upload_df["elo"].round().astype(int) upload_df.columns = ["Rank", "Player", "ELO", "Matches", "Wins", "Win %"] # Upload updated CSV to Hugging Face from huggingface_hub import HfApi csv_buffer = io.BytesIO() upload_df.to_csv(csv_buffer, index=False, quoting=csv.QUOTE_MINIMAL, escapechar="\\") csv_buffer.seek(0) # Reset buffer position api = HfApi() api.upload_file( path_or_fileobj=csv_buffer, path_in_repo="elo_rankings.csv", repo_id="TJStatsApps/scouting_reports", repo_type="dataset" ) ui.notification_show("Rankings successfully uploaded to Hugging Face!", type="message") except Exception as e: ui.notification_show(f"Upload failed: {str(e)}", type="error") @render.ui def current_matchup(): """Display the current matchup with winner buttons""" p1 = current_player1() p2 = current_player2() if not p1 or not p2: return ui.div("Generating matchup...") # Get current ELO ratings for display df = players_data() p1_elo = int(df[df["name"] == p1]["elo"].iloc[0]) p2_elo = int(df[df["name"] == p2]["elo"].iloc[0]) return ui.div( ui.h4("Who wins?", style="text-align: center; margin-bottom: 20px;"), ui.div( ui.input_action_button( "winner_p1", f"{p1}\n(ELO: {p1_elo})", class_="btn-success btn-lg", style="width: 100%; margin-bottom: 10px; white-space: pre-line;" ), ui.h5("VS", style="text-align: center; margin: 10px 0;"), ui.input_action_button( "winner_p2", f"{p2}\n(ELO: {p2_elo})", class_="btn-success btn-lg", style="width: 100%; white-space: pre-line;" ), style="text-align: center;" ) ) @reactive.effect @reactive.event(input.winner_p1) def record_p1_win(): """Record player 1 as winner""" record_match_result(current_player1(), current_player2(), current_player1()) @reactive.effect @reactive.event(input.winner_p2) def record_p2_win(): """Record player 2 as winner""" record_match_result(current_player1(), current_player2(), current_player2()) def record_match_result(p1, p2, winner): """Record a match result and update ELO ratings""" if not p1 or not p2: return df = players_data() matches_df = matches_data() # Get current ELO ratings p1_elo = df[df["name"] == p1]["elo"].iloc[0] p2_elo = df[df["name"] == p2]["elo"].iloc[0] # Calculate new ELO ratings if winner == p1: p1_score = 1 p2_score = 0 else: p1_score = 0 p2_score = 1 p1_elo_change = calculate_elo_change(p1_elo, p2_elo, p1_score) p2_elo_change = calculate_elo_change(p2_elo, p1_elo, p2_score) new_p1_elo = p1_elo + p1_elo_change new_p2_elo = p2_elo + p2_elo_change # Update player data df.loc[df["name"] == p1, "elo"] = new_p1_elo df.loc[df["name"] == p2, "elo"] = new_p2_elo df.loc[df["name"] == p1, "matches"] += 1 df.loc[df["name"] == p2, "matches"] += 1 df.loc[df["name"] == winner, "wins"] += 1 players_data.set(df) # Record match history new_match = pd.DataFrame({ "player1": [p1], "player2": [p2], "winner": [winner], "player1_elo_before": [round(p1_elo)], "player2_elo_before": [round(p2_elo)], "player1_elo_after": [round(new_p1_elo)], "player2_elo_after": [round(new_p2_elo)] }) updated_matches_df = pd.concat([new_match, matches_df], ignore_index=True) matches_data.set(updated_matches_df) # Show notification and generate new matchup loser = p1 if winner == p2 else p2 elo_change = abs(round(p1_elo_change)) if winner == p1 else abs(round(p2_elo_change)) ui.notification_show(f"{winner} defeats {loser}! (+{elo_change} ELO)", type="message") # Auto-generate next matchup generate_random_matchup() @reactive.effect @reactive.event(input.reset_all) def reset_all_data(): """Reset all player and match data""" initial_df = pd.DataFrame({ "name": DEFAULT_PLAYERS, "elo": [INITIAL_ELO] * len(DEFAULT_PLAYERS), "matches": [0] * len(DEFAULT_PLAYERS), "wins": [0] * len(DEFAULT_PLAYERS) }) players_data.set(initial_df) matches_data.set(pd.DataFrame(columns=["player1", "player2", "winner", "player1_elo_before", "player2_elo_before", "player1_elo_after", "player2_elo_after"])) generate_random_matchup() ui.notification_show("All data has been reset!", type="message") @render.data_frame def rankings_table(): """Display current ELO rankings""" # Include the update trigger to force refresh when button is clicked rankings_update_trigger() df = players_data() # Sort by ELO rating (descending) df_sorted = df.sort_values("elo", ascending=False).reset_index(drop=True) # Add rank column df_sorted["rank"] = range(1, len(df_sorted) + 1) # Calculate win percentage df_sorted["win_pct"] = (df_sorted["wins"] / df_sorted["matches"] * 100).round(1) df_sorted.loc[df_sorted["matches"] == 0, "win_pct"] = 0 # Format and reorder columns display_df = df_sorted[["rank", "name", "elo", "matches", "wins", "win_pct"]].copy() display_df["elo"] = display_df["elo"].round().astype(int) display_df.columns = ["Rank", "Player", "ELO", "Matches", "Wins", "Win %"] return display_df @render.data_frame def match_history(): """Display match history""" df = matches_data() if df.empty: return pd.DataFrame({"Message": ["No matches recorded yet. Click on a winner to start!"]}) # Format display display_df = df.copy() display_df.columns = ["Player 1", "Player 2", "Winner", "P1 ELO Before", "P2 ELO Before", "P1 ELO After", "P2 ELO After"] return display_df app = App(app_ui, server)