Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import time | |
| from datetime import datetime, timezone | |
| import gradio as gr | |
| import pandas as pd | |
| from huggingface_hub import HfApi, hf_hub_download | |
| # --------------------------------------------------------------------------- | |
| # CONFIGURATION | |
| # --------------------------------------------------------------------------- | |
| SCORES_REPO = "CatsCanWrite/mewsicbench-scores" # Public dataset | |
| REQUESTS_REPO = "CatsCanWrite/mewsicbench-requests" # Public dataset | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| API = HfApi() | |
| # Rate limiting: minimum seconds between submissions from the same IP | |
| RATE_LIMIT_SECONDS = 10 | |
| # In-memory rate limit tracker: {client_ip: last_request_timestamp} | |
| _rate_limit_store: dict[str, float] = {} | |
| def _get_client_ip(request) -> str: | |
| """Extract client IP from Gradio request object.""" | |
| try: | |
| if hasattr(request, "client"): | |
| host = request.client.host | |
| if host: | |
| return str(host) | |
| if hasattr(request, "request") and hasattr(request.request, "client"): | |
| host = request.request.client.host | |
| if host: | |
| return str(host) | |
| except Exception: | |
| pass | |
| return "unknown" | |
| def _is_rate_limited(client_ip: str) -> bool: | |
| """Check if the given IP has submitted too recently.""" | |
| now = time.time() | |
| last = _rate_limit_store.get(client_ip, 0) | |
| if now - last < RATE_LIMIT_SECONDS: | |
| return True | |
| _rate_limit_store[client_ip] = now | |
| return False | |
| def load_leaderboard() -> pd.DataFrame: | |
| """Load the latest scores from the scores dataset.""" | |
| if not HF_TOKEN: | |
| return empty_leaderboard() | |
| try: | |
| from datasets import load_dataset | |
| ds = load_dataset(SCORES_REPO, token=HF_TOKEN) | |
| df = ds["train"].to_pandas() | |
| # Build column mapping | |
| col_map = { | |
| "model_id": "Model ID", | |
| "overall_score": "Overall", | |
| "meter_score": "Meter", | |
| "verse_score": "Verse", | |
| "focus_score": "Focus", | |
| "avg_think_tokens": "Thinking", | |
| "evaluated_at": "Evaluated At", | |
| } | |
| display_cols = [c for c in col_map if c in df.columns] | |
| df = df[display_cols].copy() | |
| # Sort by overall score descending before formatting | |
| if "overall_score" in df.columns: | |
| df = df.sort_values("overall_score", ascending=False).reset_index(drop=True) | |
| df.insert(0, "Rank", range(1, len(df) + 1)) | |
| # Apply coloring | |
| def color_score(val): | |
| if val >= 0.9: | |
| return "color: #008000;" # green | |
| elif val >= 0.75: | |
| return "color: #606e00;" # yeen | |
| elif val >= 0.5: | |
| return "color: #ff6f00;" # orange | |
| else: | |
| return "color: #9f0000;" # red | |
| # # Format percentages | |
| # for col in ["overall_score", "meter_score", "verse_score", "focus_score"]: | |
| # if col in df.columns: | |
| # df[col] = df[col].apply( | |
| # lambda x: f"{x * 100:.1f}%" if pd.notna(x) else "N/A" | |
| # ) | |
| # Format dates | |
| if "evaluated_at" in df.columns: | |
| df["evaluated_at"] = pd.to_datetime(df["evaluated_at"], errors="coerce") | |
| df["evaluated_at"] = df["evaluated_at"].dt.strftime("%Y-%m-%d %H:%M") | |
| # Rename to human-readable names | |
| df.rename(columns=col_map, inplace=True) | |
| # Ensure column order matches empty leaderboard | |
| final_cols = [c for c in empty_leaderboard().columns if c in df.columns] | |
| df = df[final_cols] | |
| styled = ( | |
| df.style | |
| .map(color_score, subset=["Overall", "Meter", "Verse", "Focus"]) | |
| .format({ | |
| "Overall": "{:.1%}", | |
| "Meter": "{:.1%}", | |
| "Verse": "{:.1%}", | |
| "Focus": "{:.1%}", | |
| "Thinking": "{:0.0f}", | |
| }) | |
| .hide(axis="index") | |
| ) | |
| return styled | |
| except Exception as e: | |
| print(f"Could not load scores dataset: {e}") | |
| return empty_leaderboard() | |
| def empty_leaderboard() -> pd.DataFrame: | |
| return pd.DataFrame( | |
| columns=[ | |
| "Rank", | |
| "Model ID", | |
| "Overall", | |
| "Meter", | |
| "Verse", | |
| "Focus", | |
| "Thinking", | |
| "Evaluated At", | |
| ] | |
| ) | |
| def request_model(model_id: str, request: gr.Request) -> str: | |
| """ | |
| Handle a model evaluation request. | |
| 1. If the model already has a score, report it. | |
| 2. If the model is already in the requests dataset, report that. | |
| 3. Otherwise, add it to the requests dataset. | |
| """ | |
| if not HF_TOKEN: | |
| return "**Error:** `HF_TOKEN` is not configured in this Space." | |
| model_id = model_id.strip().lower() | |
| if not model_id: | |
| return "**Error:** Please enter a Model ID." | |
| # Rate limiting | |
| client_ip = _get_client_ip(request) | |
| if _is_rate_limited(client_ip): | |
| return ( | |
| f"**Rate limit:** Please wait at least {RATE_LIMIT_SECONDS} seconds " | |
| "between requests." | |
| ) | |
| # Case 1: Already evaluated | |
| try: | |
| path = hf_hub_download( | |
| repo_id=SCORES_REPO, | |
| filename=f"scores/{model_id}.json", | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| with open(path, "r", encoding="utf-8") as f: | |
| score = json.load(f) | |
| return ( | |
| f"**{model_id}** has already been evaluated!\n\n" | |
| f"- **Overall Score:** {score.get('overall_score', 'N/A'):.0%}\n" | |
| f"- **Meter:** {score.get('meter_score', 'N/A'):.0%}\n" | |
| f"- **Verse:** {score.get('verse_score', 'N/A'):.0%}\n" | |
| f"- **Focus:** {score.get('focus_score', 'N/A'):.0%}\n" | |
| f"- **Evaluated At:** {score.get('evaluated_at', 'N/A')}" | |
| ) | |
| except Exception: | |
| pass # Not in scores dataset | |
| # Case 2: Already requested | |
| try: | |
| path = hf_hub_download( | |
| repo_id=REQUESTS_REPO, | |
| filename=f"requests/{model_id}.json", | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| with open(path, "r", encoding="utf-8") as f: | |
| req = json.load(f) | |
| return ( | |
| f"**{model_id}** has already been requested for evaluation.\n\n" | |
| f"- **Requested At:** {req.get('requested_at', 'N/A')}\n\n" | |
| "Please check back later for results." | |
| ) | |
| except Exception: | |
| pass # Not in requests dataset | |
| # Case 3: New request | |
| timestamp = datetime.now(timezone.utc).isoformat() | |
| record = { | |
| "model_id": model_id, | |
| "requested_at": timestamp, | |
| } | |
| data = json.dumps(record, indent=2).encode("utf-8") | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=data, | |
| path_in_repo=f"requests/{model_id}.json", | |
| repo_id=REQUESTS_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| return ( | |
| f"**Request submitted!** {model_id} has been added to the evaluation queue.\n\n" | |
| f"- **Requested At:** {timestamp}\n\n" | |
| "Results will appear on the leaderboard once evaluation is complete." | |
| ) | |
| except Exception as e: | |
| return f"**Error:** Could not submit request: {e}" | |