Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| from datetime import datetime, timedelta, timezone | |
| import pandas as pd | |
| from huggingface_hub import HfApi | |
| import gradio as gr | |
| import html | |
| import re | |
| API = HfApi() | |
| SUBMISSIONS_REPO = "roc-hci/turing-bench-submissions" | |
| RESULTS_REPO = "roc-hci/turing-bench-results" | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| GOLD_LABELS = json.loads(os.environ.get("PRIVATE_LABELS")) | |
| def submit_prediction(model_name: str, predictions_file, profile: gr.OAuthProfile | None) -> str: | |
| """Upload a submission to the submissions dataset.""" | |
| if profile is None: | |
| return "You must be logged in with your HuggingFace account to submit." | |
| if not model_name.strip(): | |
| return "Please enter a model name." | |
| if predictions_file is None: | |
| return "Please upload a predictions file." | |
| username = profile.name | |
| # --- 1. Rate limiting: reject if same user submitted within the last 30 seconds --- | |
| try: | |
| tree = API.list_repo_tree( | |
| repo_id=SUBMISSIONS_REPO, | |
| path_in_repo="metadata", | |
| repo_type="dataset", | |
| expand=True, # populates last_commit on each RepoFile | |
| token=HF_TOKEN, | |
| ) | |
| cutoff = datetime.now(tz=timezone.utc) - timedelta(seconds=30) | |
| for entry in tree: | |
| # entry is a RepoFile or RepoFolder; skip folders | |
| if not hasattr(entry, "rfilename"): | |
| continue | |
| filename = entry.rfilename.split("/")[-1] # e.g. "alice_mymodel_2024-...json" | |
| if not filename.startswith(f"{username}_"): | |
| continue | |
| if entry.last_commit is None: | |
| continue | |
| last_modified = entry.last_commit.date # aware datetime (UTC) | |
| if last_modified >= cutoff: | |
| wait_secs = int((last_modified - cutoff).total_seconds()) + 1 | |
| return ( | |
| f"Rate limit exceeded. You already submitted within the last 30 seconds. " | |
| f"Please wait {wait_secs} second(s) before resubmitting." | |
| ) | |
| except Exception as e: | |
| return f"Failed to check rate limit: {e}" | |
| # --- 2. Validate the predictions file --- | |
| try: | |
| df = pd.read_csv(predictions_file.name) | |
| except Exception as e: | |
| return f"Failed to read predictions file: {e}. Ensure it is a valid .csv file." | |
| if "who_is_human" not in df.columns: | |
| return "Invalid predictions file: missing required column `who_is_human`." | |
| valid_values = {"A", "B", "NA"} | |
| invalid_mask = ~df["who_is_human"].astype(str).isin(valid_values) | |
| if invalid_mask.any(): | |
| bad = df.loc[invalid_mask, "who_is_human"].unique().tolist() | |
| return ( | |
| f"Invalid predictions file: `who_is_human` contains unexpected values: {bad}. " | |
| "Only 'A', 'B', or 'NA' (string) are allowed." | |
| ) | |
| # --- 3. Check length against gold labels --- | |
| try: | |
| gold_labels = GOLD_LABELS["who_is_human"] | |
| except KeyError: | |
| return "Server error: GOLD_LABELS does not contain the key 'who_is_human'." | |
| if len(df) != len(gold_labels): | |
| return ( | |
| f"Invalid predictions file: expected {len(gold_labels)} rows to match the evaluation set, " | |
| f"but got {len(df)}." | |
| ) | |
| # --- All checks passed: upload submission --- | |
| timestamp = datetime.now().isoformat() | |
| submission_id = f"{username}_{model_name}_{timestamp}".replace(" ", "_") | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=predictions_file.name, | |
| path_in_repo=f"predictions/{submission_id}.jsonl", | |
| repo_id=SUBMISSIONS_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| except Exception as e: | |
| return f"Failed to upload predictions file: {e}" | |
| metadata = { | |
| "model_name": model_name, | |
| "submitted_by": username, | |
| "submission_time": timestamp, | |
| "predictions_file": f"predictions/{submission_id}.jsonl", | |
| } | |
| try: | |
| API.upload_file( | |
| path_or_fileobj=json.dumps(metadata).encode(), | |
| path_in_repo=f"metadata/{submission_id}.json", | |
| repo_id=SUBMISSIONS_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| ) | |
| except Exception as e: | |
| return f"Predictions uploaded but failed to save metadata: {e}" | |
| return ( | |
| f"Submitted as **{username}**! Your submission ID is `{submission_id}`. " | |
| "Results will appear on the leaderboard once evaluation is complete." | |
| ) | |
| def load_results() -> pd.DataFrame: | |
| """Load the latest results from the results dataset.""" | |
| try: | |
| from huggingface_hub import HfFileSystem | |
| import json | |
| fs = HfFileSystem(token=HF_TOKEN) | |
| files = fs.glob(f"datasets/{RESULTS_REPO}/results/*.json", refresh=True) | |
| records = [json.loads(fs.read_text(f, refresh=True)) for f in files] | |
| df = pd.DataFrame(records) | |
| df = df.sort_values("accuracy", ascending=False).reset_index(drop=True) | |
| # Round accuracy to 4 decimal places | |
| if "accuracy" in df.columns: | |
| df["accuracy"] = df["accuracy"].round(4) | |
| # Reformat submission_time to "YYYY-MM-DD HH:MM:SS" | |
| if "submission_time" in df.columns: | |
| df["submission_time"] = pd.to_datetime( | |
| df["submission_time"], errors="coerce" | |
| ).dt.strftime("%Y-%m-%d %H:%M:%S") | |
| # Rename columns | |
| df = df.rename(columns={ | |
| "model_name": "Model", | |
| "submitted_by": "User", | |
| "submission_time": "Time", | |
| "accuracy": "Accuracy", | |
| }) | |
| return df | |
| except Exception as e: | |
| print(f"Error loading results: {e}") | |
| return pd.DataFrame(columns=["Model", "User", "Time" "Accuracy"]) | |
| def _format_inline(text: str) -> str: | |
| escaped = html.escape(text.strip()) | |
| escaped = re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", escaped) | |
| escaped = re.sub(r"`([^`]+)`", r"<code>\1</code>", escaped) | |
| return escaped | |
| def markdown_to_html(markdown: str, elem_classes: str = "html-block") -> str: | |
| lines = markdown.strip().splitlines() | |
| blocks: list[str] = [] | |
| paragraph: list[str] = [] | |
| list_items: list[str] = [] | |
| code_lines: list[str] = [] | |
| code_language = "" | |
| in_code_block = False | |
| def flush_paragraph(): | |
| if paragraph: | |
| content = " ".join(part.strip() for part in paragraph if part.strip()) | |
| if content: | |
| blocks.append(f"<p>{_format_inline(content)}</p>") | |
| paragraph.clear() | |
| def flush_list(): | |
| if list_items: | |
| items_html = "".join(f"<li>{item}</li>" for item in list_items) | |
| blocks.append(f"<ul>{items_html}</ul>") | |
| list_items.clear() | |
| for raw_line in lines: | |
| stripped = raw_line.strip() | |
| if stripped.startswith("```"): | |
| flush_paragraph() | |
| flush_list() | |
| if in_code_block: | |
| code_html = html.escape("\n".join(code_lines)) | |
| language_class = f' class="language-{code_language}"' if code_language else "" | |
| blocks.append(f"<pre><code{language_class}>{code_html}</code></pre>") | |
| code_lines.clear() | |
| code_language = "" | |
| in_code_block = False | |
| else: | |
| in_code_block = True | |
| code_language = stripped.removeprefix("```").strip() | |
| continue | |
| if in_code_block: | |
| code_lines.append(raw_line.rstrip()) | |
| continue | |
| if not stripped: | |
| flush_paragraph() | |
| flush_list() | |
| continue | |
| heading_match = re.match(r"^(#{1,6})\s+(.*)$", stripped) | |
| if heading_match: | |
| flush_paragraph() | |
| flush_list() | |
| level = len(heading_match.group(1)) | |
| blocks.append(f"<h{level}>{_format_inline(heading_match.group(2))}</h{level}>") | |
| continue | |
| if stripped.startswith("- "): | |
| flush_paragraph() | |
| list_items.append(_format_inline(stripped[2:])) | |
| continue | |
| flush_list() | |
| paragraph.append(stripped) | |
| flush_paragraph() | |
| flush_list() | |
| return f'<div class="{elem_classes}">{"".join(blocks)}</div>' | |
| def _format_accuracy(value) -> str: | |
| if pd.isna(value): | |
| return "N/A" | |
| return f"{float(value):.4f}" |