Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from datetime import datetime | |
| from pathlib import Path | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from typing import List, Dict, Any | |
| # Configuration | |
| SCRIPT_DIR = Path(__file__).parent | |
| # In HF Space, votes.jsonl is in the same dir as app.py. Locally, it's two levels up. | |
| VOTES_FILE = SCRIPT_DIR / "votes.jsonl" if (SCRIPT_DIR / "votes.jsonl").exists() else SCRIPT_DIR.parent.parent / "votes.jsonl" | |
| DATASET_REPO_ID = os.getenv("DATASET_REPO_ID") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| IS_SPACE = os.getenv("SPACE_ID") is not None | |
| def sync_votes_file() -> Path: | |
| """ | |
| Syncs the votes.jsonl file from the Hugging Face Hub if running in a Space. | |
| Returns the path to the synced file. | |
| """ | |
| if IS_SPACE and DATASET_REPO_ID: | |
| try: | |
| return Path(hf_hub_download( | |
| repo_id=DATASET_REPO_ID, | |
| filename="votes.jsonl", | |
| repo_type="dataset", | |
| token=HF_TOKEN | |
| )) | |
| except Exception as e: | |
| print(f"Error syncing votes from HF: {e}") | |
| return VOTES_FILE | |
| return VOTES_FILE | |
| def log_vote( | |
| model_a: str, | |
| model_b: str, | |
| winner: str, | |
| sentence_id: str, | |
| metadata: Dict[str, Any] = None | |
| ) -> None: | |
| """Logs a vote to a local JSONL file and attempts to sync with Hugging Face.""" | |
| vote_entry = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "model_a": model_a, | |
| "model_b": model_b, | |
| "winner": winner, | |
| "sentence_id": sentence_id, | |
| "metadata": metadata or {} | |
| } | |
| with open(VOTES_FILE, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(vote_entry) + "\n") | |
| if DATASET_REPO_ID and HF_TOKEN: | |
| try: | |
| api = HfApi(token=HF_TOKEN) | |
| api.upload_file( | |
| path_or_fileobj=str(VOTES_FILE), | |
| path_in_repo="votes.jsonl", | |
| repo_id=DATASET_REPO_ID, | |
| repo_type="dataset", | |
| commit_message=f"Log vote: {model_a} vs {model_b}" | |
| ) | |
| except Exception as e: | |
| print(f"Error syncing to HF: {e}") | |
| def load_data(file_path: str) -> List[Dict[str, Any]]: | |
| """Loads configuration data from a JSON file.""" | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except FileNotFoundError: | |
| print(f"Warning: File not found: {file_path}") | |
| return [] | |
| def get_leaderboard(models_data: List[Dict[str, Any]]) -> List[List[Any]]: | |
| """Calculates Glicko-2 rankings from the votes.jsonl file.""" | |
| import glicko2 | |
| votes_path = sync_votes_file() | |
| players = {} | |
| for m in models_data: | |
| m_id = m["id"] | |
| players[m_id] = glicko2.Player(rating=1500, rd=200, vol=0.06) | |
| if not votes_path.exists(): | |
| print(f"No votes file found at {votes_path}") | |
| else: | |
| with open(votes_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| if not line.strip(): continue | |
| try: | |
| vote = json.loads(line) | |
| m_a, m_b = vote["model_a"], vote["model_b"] | |
| winner = vote["winner"] | |
| if m_a not in players or m_b not in players: | |
| continue | |
| # Glicko-2 expects matches in a rating period, but we can update | |
| # after each match for a real-time leaderboard. | |
| # winner: 1 if model_a won, 0 if model_b won, 0.5 for tie | |
| score_a = 0.5 | |
| if winner == "model_a": score_a = 1.0 | |
| elif winner == "model_b": score_a = 0.0 | |
| # Store pre-update values to ensure both players see the same state | |
| rating_a, rd_a = players[m_a].rating, players[m_a].rd | |
| rating_b, rd_b = players[m_b].rating, players[m_b].rd | |
| # Update player A based on B's state | |
| players[m_a].update_player([rating_b], [rd_b], [score_a]) | |
| # Update player B based on A's state | |
| players[m_b].update_player([rating_a], [rd_a], [1.0 - score_a]) | |
| except Exception as e: | |
| print(f"Error parsing vote: {e}") | |
| # Process final results | |
| leaderboard = [] | |
| for m in models_data: | |
| m_id = m["id"] | |
| player = players[m_id] | |
| version = "Original" if m_id.endswith("-original") else "Transformed" | |
| # Calculate vote counts | |
| vote_counts = get_vote_counts(models_data) | |
| votes = vote_counts.get(m_id, 0) | |
| rating_str = f"{round(player.rating, 1)} ± {round(player.rd * 1.96, 1)}" # 95% confidence interval | |
| # Use custom reason if provided, otherwise default | |
| if not m.get("active", True): | |
| status = m.get("reason", "Inactive") | |
| else: | |
| status = "Active" | |
| leaderboard.append([ | |
| m["name"], | |
| version, | |
| rating_str, | |
| votes, | |
| status | |
| ]) | |
| # Sort by rating | |
| leaderboard.sort(key=lambda x: players[models_data[next(i for i, v in enumerate(models_data) if v["name"] == x[0])]["id"]].rating, reverse=True) | |
| return leaderboard | |
| def get_vote_counts(models_data: List[Dict[str, Any]]) -> Dict[str, int]: | |
| """Returns a dictionary mapping model IDs to their total vote counts.""" | |
| counts = {m["id"]: 0 for m in models_data} | |
| votes_path = sync_votes_file() | |
| if not votes_path.exists(): | |
| return counts | |
| try: | |
| with open(votes_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| if not line.strip(): | |
| continue | |
| try: | |
| vote = json.loads(line) | |
| m_a, m_b = vote.get("model_a"), vote.get("model_b") | |
| if m_a in counts: | |
| counts[m_a] += 1 | |
| if m_b in counts: | |
| counts[m_b] += 1 | |
| except Exception: | |
| continue | |
| except Exception as e: | |
| print(f"Error reading votes for counts: {e}") | |
| return counts | |