Spaces:
Running
Running
| import datetime | |
| import io | |
| import json | |
| import os | |
| import re | |
| from urllib.parse import urlparse | |
| import gradio as gr | |
| import pandas as pd | |
| from huggingface_hub import HfApi, hf_hub_download | |
| APP_NAME = "miniapp" | |
| # 在 Space 里通过 Secrets 配置: | |
| # - HF_TOKEN: 具有写 dataset 权限的 token(Settings -> Variables and secrets -> Secrets) | |
| # - LEADERBOARD_DATASET: 形如 "your-username/miniapp-leaderboard"(repo_type=dataset) | |
| HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") | |
| LEADERBOARD_DATASET = os.environ.get("LEADERBOARD_DATASET", "").strip() | |
| # 判断是否运行在 Hugging Face Spaces | |
| IN_SPACES = bool( | |
| os.environ.get("SPACE_ID") | |
| or os.environ.get("SPACE_REPO_NAME") | |
| or os.environ.get("SPACE_AUTHOR_NAME") | |
| or os.environ.get("system", "") == "spaces" | |
| ) | |
| MAX_ENTRIES = int(os.environ.get("MAX_ENTRIES", "200")) | |
| def _is_valid_http_url(url: str) -> bool: | |
| try: | |
| parsed = urlparse(url) | |
| return parsed.scheme in ("http", "https") and bool(parsed.netloc) | |
| except Exception: | |
| return False | |
| def _slug(s: str, max_len: int = 60) -> str: | |
| s = (s or "").strip().lower() | |
| s = re.sub(r"[^a-z0-9]+", "-", s) | |
| s = re.sub(r"-{2,}", "-", s).strip("-") | |
| return (s[:max_len] or "model") | |
| def _api() -> HfApi: | |
| return HfApi(token=HF_TOKEN) | |
| def _ensure_dataset_repo(): | |
| if not HF_TOKEN: | |
| raise RuntimeError("未配置 HF_TOKEN(Space Secrets)。") | |
| if not LEADERBOARD_DATASET: | |
| raise RuntimeError("未配置 LEADERBOARD_DATASET(例如:your-username/miniapp-leaderboard)。") | |
| api = _api() | |
| try: | |
| api.repo_info(repo_id=LEADERBOARD_DATASET, repo_type="dataset") | |
| except Exception: | |
| # 不存在则创建(public dataset;你也可以手动创建并设为 private) | |
| api.create_repo(repo_id=LEADERBOARD_DATASET, repo_type="dataset", private=False, exist_ok=True) | |
| def _empty_df() -> pd.DataFrame: | |
| return pd.DataFrame(columns=["submitted_at", "username", "model_name", "model_api", "notes"]) | |
| def _load_submissions_df() -> pd.DataFrame: | |
| if not HF_TOKEN or not LEADERBOARD_DATASET: | |
| return _empty_df() | |
| api = _api() | |
| try: | |
| files = api.list_repo_files(repo_id=LEADERBOARD_DATASET, repo_type="dataset") | |
| except Exception: | |
| return _empty_df() | |
| sub_files = sorted( | |
| [f for f in files if f.startswith("submissions/") and f.endswith(".json")], | |
| reverse=True, | |
| )[:MAX_ENTRIES] | |
| rows = [] | |
| for filename in sub_files: | |
| try: | |
| path = hf_hub_download( | |
| repo_id=LEADERBOARD_DATASET, | |
| repo_type="dataset", | |
| filename=filename, | |
| token=HF_TOKEN, | |
| ) | |
| with open(path, "r", encoding="utf-8") as fp: | |
| rows.append(json.load(fp)) | |
| except Exception: | |
| continue | |
| if not rows: | |
| return _empty_df() | |
| df = pd.DataFrame(rows) | |
| for col in ["submitted_at", "username", "model_name", "model_api", "notes"]: | |
| if col not in df.columns: | |
| df[col] = "" | |
| df = df[["submitted_at", "username", "model_name", "model_api", "notes"]] | |
| df = df.sort_values(by=["submitted_at"], ascending=False, kind="stable") | |
| return df | |
| def refresh(): | |
| return _load_submissions_df() | |
| def submit(model_name: str, model_api: str, notes: str, username: str | None): | |
| model_name = (model_name or "").strip() | |
| model_api = (model_api or "").strip() | |
| notes = (notes or "").strip() | |
| username = (username or "").strip() or "anonymous" | |
| if not model_name: | |
| return "请填写 **模型名称**。", _load_submissions_df() | |
| if not model_api: | |
| return "请填写 **模型 API**。", _load_submissions_df() | |
| if not _is_valid_http_url(model_api): | |
| return "**模型 API** 需要是合法的 `http(s)://...` URL。", _load_submissions_df() | |
| if not HF_TOKEN: | |
| return "Space 未配置 **HF_TOKEN**(Secrets),无法写入排行榜。", _load_submissions_df() | |
| if not LEADERBOARD_DATASET: | |
| return "Space 未配置 **LEADERBOARD_DATASET**(例如:`your-username/miniapp-leaderboard`)。", _load_submissions_df() | |
| _ensure_dataset_repo() | |
| api = _api() | |
| now = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z" | |
| safe_model = _slug(model_name) | |
| safe_user = _slug(username) | |
| path_in_repo = f"submissions/{now[:10]}/{now}-{safe_user}-{safe_model}.json" | |
| payload = { | |
| "submitted_at": now, | |
| "username": username, | |
| "model_name": model_name, | |
| "model_api": model_api, | |
| "notes": notes, | |
| } | |
| data = (json.dumps(payload, ensure_ascii=False, indent=2) + "\n").encode("utf-8") | |
| bio = io.BytesIO(data) | |
| api.upload_file( | |
| repo_id=LEADERBOARD_DATASET, | |
| repo_type="dataset", | |
| path_or_fileobj=bio, | |
| path_in_repo=path_in_repo, | |
| commit_message=f"miniapp: submit {username}/{model_name}", | |
| token=HF_TOKEN, | |
| ) | |
| return "已提交并写入 leaderboard。", _load_submissions_df() | |
| def build_demo() -> gr.Blocks: | |
| with gr.Blocks(title=f"{APP_NAME} leaderboard") as demo: | |
| gr.Markdown( | |
| f"## {APP_NAME} leaderboard\n\n" | |
| "提交你的模型信息后,会写入一个 Hugging Face **Dataset**,并在下方表格展示。\n\n" | |
| f"- 当前 `LEADERBOARD_DATASET`: `{LEADERBOARD_DATASET or '(未配置)'}`\n" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| model_name = gr.Textbox(label="模型名称(必填)", placeholder="例如:my-agent-v1") | |
| model_api = gr.Textbox( | |
| label="模型 API(必填)", | |
| placeholder="例如:https://api.example.com/v1/chat/completions", | |
| ) | |
| notes = gr.Textbox(label="备注(可选)", lines=4) | |
| # 纯前端版:不强制 OAuth;如果你想“只能登录用户提交”,后续再加 LoginButton | |
| if IN_SPACES: | |
| username = gr.Textbox( | |
| label="用户名(可选)", | |
| placeholder="建议填你的 HF 用户名(也可留空)", | |
| ) | |
| else: | |
| username = gr.Textbox(label="用户名(本地调试用)", value="local") | |
| submit_btn = gr.Button("提交", variant="primary") | |
| status = gr.Markdown() | |
| with gr.Column(scale=3): | |
| leaderboard = gr.Dataframe( | |
| label="Leaderboard(按提交时间倒序)", | |
| value=_load_submissions_df(), | |
| interactive=False, | |
| wrap=True, | |
| ) | |
| refresh_btn = gr.Button("刷新") | |
| submit_btn.click( | |
| submit, | |
| inputs=[model_name, model_api, notes, username], | |
| outputs=[status, leaderboard], | |
| ) | |
| refresh_btn.click(refresh, inputs=[], outputs=[leaderboard]) | |
| return demo | |
| demo = build_demo() | |
| def main(): | |
| demo.launch() | |
| if __name__ == "__main__": | |
| main() | |
| import datetime | |
| import io | |
| import json | |
| import os | |
| import re | |
| from urllib.parse import urlparse | |
| import gradio as gr | |
| import pandas as pd | |
| from huggingface_hub import HfApi, hf_hub_download | |
| APP_NAME = "miniapp" | |
| # 在 Space 里通过 Secrets 配置: | |
| # - HF_TOKEN: 具有写 dataset 权限的 token(Settings -> Variables and secrets -> Secrets) | |
| # - LEADERBOARD_DATASET: 形如 "your-username/miniapp-leaderboard"(repo_type=dataset) | |
| HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("TOKEN") or os.environ.get("HUGGINGFACE_TOKEN") | |
| LEADERBOARD_DATASET = os.environ.get("LEADERBOARD_DATASET", "").strip() | |
| # 判断是否运行在 Hugging Face Spaces | |
| IN_SPACES = bool( | |
| os.environ.get("SPACE_ID") | |
| or os.environ.get("SPACE_REPO_NAME") | |
| or os.environ.get("SPACE_AUTHOR_NAME") | |
| or os.environ.get("system", "") == "spaces" | |
| ) | |
| MAX_ENTRIES = int(os.environ.get("MAX_ENTRIES", "200")) | |
| def _is_valid_http_url(url: str) -> bool: | |
| try: | |
| parsed = urlparse(url) | |
| return parsed.scheme in ("http", "https") and bool(parsed.netloc) | |
| except Exception: | |
| return False | |
| def _slug(s: str, max_len: int = 60) -> str: | |
| s = (s or "").strip().lower() | |
| s = re.sub(r"[^a-z0-9]+", "-", s) | |
| s = re.sub(r"-{2,}", "-", s).strip("-") | |
| return (s[:max_len] or "model") | |
| def _api() -> HfApi: | |
| return HfApi(token=HF_TOKEN) | |
| def _ensure_dataset_repo(): | |
| if not HF_TOKEN: | |
| raise RuntimeError("未配置 HF_TOKEN(Space Secrets)。") | |
| if not LEADERBOARD_DATASET: | |
| raise RuntimeError("未配置 LEADERBOARD_DATASET(例如:your-username/miniapp-leaderboard)。") | |
| api = _api() | |
| try: | |
| api.repo_info(repo_id=LEADERBOARD_DATASET, repo_type="dataset") | |
| except Exception: | |
| # 不存在则创建(public dataset;你也可以手动创建并设为 private) | |
| api.create_repo(repo_id=LEADERBOARD_DATASET, repo_type="dataset", private=False, exist_ok=True) | |
| def _load_submissions_df() -> pd.DataFrame: | |
| if not HF_TOKEN or not LEADERBOARD_DATASET: | |
| return pd.DataFrame(columns=["submitted_at", "username", "model_name", "model_api", "notes"]) | |
| api = _api() | |
| try: | |
| files = api.list_repo_files(repo_id=LEADERBOARD_DATASET, repo_type="dataset") | |
| except Exception: | |
| return pd.DataFrame(columns=["submitted_at", "username", "model_name", "model_api", "notes"]) | |
| sub_files = sorted( | |
| [f for f in files if f.startswith("submissions/") and f.endswith(".json")], | |
| reverse=True, | |
| )[:MAX_ENTRIES] | |
| rows = [] | |
| for filename in sub_files: | |
| try: | |
| path = hf_hub_download( | |
| repo_id=LEADERBOARD_DATASET, | |
| repo_type="dataset", | |
| filename=filename, | |
| token=HF_TOKEN, | |
| ) | |
| with open(path, "r", encoding="utf-8") as fp: | |
| rows.append(json.load(fp)) | |
| except Exception: | |
| continue | |
| if not rows: | |
| return pd.DataFrame(columns=["submitted_at", "username", "model_name", "model_api", "notes"]) | |
| df = pd.DataFrame(rows) | |
| # 统一列顺序 | |
| for col in ["submitted_at", "username", "model_name", "model_api", "notes"]: | |
| if col not in df.columns: | |
| df[col] = "" | |
| df = df[["submitted_at", "username", "model_name", "model_api", "notes"]] | |
| df = df.sort_values(by=["submitted_at"], ascending=False, kind="stable") | |
| return df | |
| def refresh(): | |
| return _load_submissions_df() | |
| def submit(model_name: str, model_api: str, notes: str, username: str | None): | |
| model_name = (model_name or "").strip() | |
| model_api = (model_api or "").strip() | |
| notes = (notes or "").strip() | |
| username = (username or "").strip() or "anonymous" | |
| if not model_name: | |
| return "请填写 **模型名称**。", _load_submissions_df() | |
| if not model_api: | |
| return "请填写 **模型 API**。", _load_submissions_df() | |
| if not _is_valid_http_url(model_api): | |
| return "**模型 API** 需要是合法的 `http(s)://...` URL。", _load_submissions_df() | |
| if not HF_TOKEN: | |
| return "Space 未配置 **HF_TOKEN**(Secrets),无法写入排行榜。", _load_submissions_df() | |
| if not LEADERBOARD_DATASET: | |
| return "Space 未配置 **LEADERBOARD_DATASET**(例如:`your-username/miniapp-leaderboard`)。", _load_submissions_df() | |
| _ensure_dataset_repo() | |
| api = _api() | |
| now = datetime.datetime.utcnow().replace(microsecond=0).isoformat() + "Z" | |
| safe_model = _slug(model_name) | |
| safe_user = _slug(username) | |
| path_in_repo = f"submissions/{now[:10]}/{now}-{safe_user}-{safe_model}.json" | |
| payload = { | |
| "submitted_at": now, | |
| "username": username, | |
| "model_name": model_name, | |
| "model_api": model_api, | |
| "notes": notes, | |
| } | |
| data = (json.dumps(payload, ensure_ascii=False, indent=2) + "\n").encode("utf-8") | |
| bio = io.BytesIO(data) | |
| api.upload_file( | |
| repo_id=LEADERBOARD_DATASET, | |
| repo_type="dataset", | |
| path_or_fileobj=bio, | |
| path_in_repo=path_in_repo, | |
| commit_message=f"miniapp: submit {username}/{model_name}", | |
| token=HF_TOKEN, | |
| ) | |
| return "已提交并写入 leaderboard。", _load_submissions_df() | |
| with gr.Blocks(title=f"{APP_NAME} leaderboard") as demo: | |
| gr.Markdown( | |
| f"## {APP_NAME} leaderboard\n\n" | |
| "提交你的模型信息后,会写入一个 Hugging Face **Dataset**,并在下方表格展示。\n\n" | |
| f"- 当前 `LEADERBOARD_DATASET`: `{LEADERBOARD_DATASET or '(未配置)'}`\n" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| model_name = gr.Textbox(label="模型名称(必填)", placeholder="例如:my-agent-v1") | |
| model_api = gr.Textbox( | |
| label="模型 API(必填)", | |
| placeholder="例如:https://api.example.com/v1/chat/completions", | |
| ) | |
| notes = gr.Textbox(label="备注(可选)", lines=4) | |
| # 纯前端版:不强制 OAuth;在 Space 里建议你自己加 LoginButton 做鉴权 | |
| if IN_SPACES: | |
| username = gr.Textbox( | |
| label="用户名(可选)", | |
| placeholder="建议填你的 HF 用户名(也可留空)", | |
| ) | |
| else: | |
| username = gr.Textbox(label="用户名(本地调试用)", value="local") | |
| submit_btn = gr.Button("提交", variant="primary") | |
| status = gr.Markdown() | |
| with gr.Column(scale=3): | |
| leaderboard = gr.Dataframe( | |
| label="Leaderboard(按提交时间倒序)", | |
| value=_load_submissions_df(), | |
| interactive=False, | |
| wrap=True, | |
| ) | |
| refresh_btn = gr.Button("刷新") | |
| submit_btn.click( | |
| submit, | |
| inputs=[model_name, model_api, notes, username], | |
| outputs=[status, leaderboard], | |
| ) | |
| refresh_btn.click(refresh, inputs=[], outputs=[leaderboard]) | |
| def main(): | |
| demo.launch() | |
| if __name__ == "__main__": | |
| main() | |
| # Display the results | |
| if HAS_TOKEN and not LOCAL_DEBUG: | |
| try: | |
| eval_results = load_dataset( | |
| RESULTS_DATASET, | |
| YEAR_VERSION, | |
| token=TOKEN, | |
| download_mode="force_redownload", | |
| verification_mode=VerificationMode.NO_CHECKS, | |
| ) | |
| except Exception as e: | |
| print(e) | |
| eval_results = None | |
| try: | |
| contact_infos = load_dataset( | |
| CONTACT_DATASET, | |
| YEAR_VERSION, | |
| token=TOKEN, | |
| download_mode="force_redownload", | |
| verification_mode=VerificationMode.NO_CHECKS, | |
| ) | |
| except Exception as e: | |
| print(e) | |
| contact_infos = None | |
| else: | |
| eval_results = None | |
| contact_infos = None | |
| def get_dataframe_from_results(eval_results, split): | |
| if eval_results is None: | |
| return pd.DataFrame(columns=EMPTY_LEADERBOARD_COLUMNS) | |
| local_df = eval_results[split] | |
| local_df = local_df.map(lambda row: {"model": model_hyperlink(row["url"], row["model"])}) | |
| local_df = local_df.remove_columns(["system_prompt", "url"]) | |
| local_df = local_df.rename_column("model", "Agent name") | |
| local_df = local_df.rename_column("model_family", "Model family") | |
| local_df = local_df.rename_column("score", "Average score (%)") | |
| for i in [1, 2, 3]: | |
| local_df = local_df.rename_column(f"score_level{i}", f"Level {i} score (%)") | |
| local_df = local_df.rename_column("date", "Submission date") | |
| df = pd.DataFrame(local_df) | |
| df = df.sort_values(by=["Average score (%)"], ascending=False) | |
| numeric_cols = [c for c in local_df.column_names if "score" in c] | |
| df[numeric_cols] = df[numeric_cols].multiply(100).round(decimals=2) | |
| #df = df.style.format("{:.2%}", subset=numeric_cols) | |
| return df | |
| #eval_dataframe_val = get_dataframe_from_results(eval_results=eval_results, split="validation") | |
| eval_dataframe_test = get_dataframe_from_results(eval_results=eval_results, split="test") | |
| # Gold answers | |
| if HAS_TOKEN and not LOCAL_DEBUG: | |
| gold_dataset = load_dataset( | |
| INTERNAL_DATA_DATASET, | |
| f"{YEAR_VERSION}_all", | |
| token=TOKEN, | |
| ) | |
| gold_results = { | |
| split: {row["task_id"]: row for row in gold_dataset[split]} | |
| for split in ["test", "validation"] | |
| } | |
| else: | |
| gold_results = {"test": {}, "validation": {}} | |
| def restart_space(): | |
| if IN_SPACES and HAS_TOKEN: | |
| api.restart_space(repo_id=LEADERBOARD_PATH, token=TOKEN) | |
| TYPES = ["markdown", "number", "number", "number", "number", "str", "str", "str"] | |
| def add_new_eval( | |
| #val_or_test: str, | |
| model: str, | |
| model_family: str, | |
| system_prompt: str, | |
| url: str, | |
| path_to_file: str, | |
| organisation: str, | |
| mail: str, | |
| profile: gr.OAuthProfile, | |
| ): | |
| val_or_test = "test" | |
| try: | |
| if not HAS_TOKEN or LOCAL_DEBUG: | |
| return format_error( | |
| "Submissions are disabled in local mode. Set env TOKEN (Hugging Face token) and rerun to enable submissions." | |
| ) | |
| # Was the profile created less than 2 month ago? | |
| user_data = requests.get(f"https://huggingface.co/api/users/{profile.username}/overview") | |
| creation_date = json.loads(user_data.content)["createdAt"] | |
| if datetime.datetime.now() - datetime.datetime.strptime(creation_date, '%Y-%m-%dT%H:%M:%S.%fZ') < datetime.timedelta(days=60): | |
| return format_error("This account is not authorized to submit on GAIA.") | |
| contact_infos = load_dataset(CONTACT_DATASET, YEAR_VERSION, token=TOKEN, download_mode="force_redownload", verification_mode=VerificationMode.NO_CHECKS, trust_remote_code=True) | |
| user_submission_dates = sorted(row["date"] for row in contact_infos[val_or_test] if row["username"] == profile.username) | |
| if len(user_submission_dates) > 0 and user_submission_dates[-1] == datetime.datetime.today().strftime('%Y-%m-%d'): | |
| return format_error("You already submitted once today, please try again tomorrow.") | |
| is_validation = val_or_test == "validation" | |
| # Very basic email parsing | |
| _, parsed_mail = parseaddr(mail) | |
| if not "@" in parsed_mail: | |
| return format_warning("Please provide a valid email adress.") | |
| print("Adding new eval") | |
| # Check if the combination model/org already exists and prints a warning message if yes | |
| if model.lower() in set([m.lower() for m in eval_results[val_or_test]["model"]]) and organisation.lower() in set([o.lower() for o in eval_results[val_or_test]["organisation"]]): | |
| return format_warning("This model has been already submitted.") | |
| if path_to_file is None: | |
| return format_warning("Please attach a file.") | |
| # SAVE UNSCORED SUBMISSION | |
| if LOCAL_DEBUG: | |
| print("mock uploaded submission") | |
| else: | |
| api.upload_file( | |
| repo_id=SUBMISSION_DATASET, | |
| path_or_fileobj=path_to_file.name, | |
| path_in_repo=f"{organisation}/{model}/{YEAR_VERSION}_{val_or_test}_raw_{datetime.datetime.today()}.jsonl", | |
| repo_type="dataset", | |
| token=TOKEN | |
| ) | |
| # SAVE CONTACT | |
| contact_info = { | |
| "model": model, | |
| "model_family": model_family, | |
| "url": url, | |
| "organisation": organisation, | |
| "username": profile.username, | |
| "mail": mail, | |
| "date": datetime.datetime.today().strftime('%Y-%m-%d') | |
| } | |
| contact_infos[val_or_test]= contact_infos[val_or_test].add_item(contact_info) | |
| if LOCAL_DEBUG: | |
| print("mock uploaded contact info") | |
| else: | |
| contact_infos.push_to_hub(CONTACT_DATASET, config_name = YEAR_VERSION, token=TOKEN) | |
| # SCORE SUBMISSION | |
| file_path = path_to_file.name | |
| scores = {"all": 0, 1: 0, 2: 0, 3: 0} | |
| num_questions = {"all": 0, 1: 0, 2: 0, 3: 0} | |
| task_ids = [] | |
| with open(f"scored/{organisation}_{model}.jsonl", "w") as scored_file: | |
| with open(file_path, 'r') as f: | |
| for ix, line in enumerate(f): | |
| try: | |
| task = json.loads(line) | |
| except Exception: | |
| return format_error(f"Line {ix} is incorrectly formatted. Please fix it and resubmit your file.") | |
| if "model_answer" not in task: | |
| return format_error(f"Line {ix} contains no model_answer key. Please fix it and resubmit your file.") | |
| answer = task["model_answer"] | |
| task_id = task["task_id"] | |
| try: | |
| level = int(gold_results[val_or_test][task_id]["Level"]) | |
| except KeyError: | |
| return format_error(f"{task_id} not found in split {val_or_test}. Are you sure you submitted the correct file?") | |
| score = question_scorer(task['model_answer'], gold_results[val_or_test][task_id]["Final answer"]) | |
| scored_file.write( | |
| json.dumps({ | |
| "id": task_id, | |
| "model_answer": answer, | |
| "score": score, | |
| "level": level | |
| }) + "\n" | |
| ) | |
| task_ids.append(task_id) | |
| scores["all"] += score | |
| scores[level] += score | |
| num_questions["all"] += 1 | |
| num_questions[level] += 1 | |
| # Check if there's any duplicate in the submission | |
| if len(task_ids) != len(set(task_ids)): | |
| return format_error("There are duplicates in your submission. Please check your file and resubmit it.") | |
| if any([num_questions[level] != ref_level_len[val_or_test][level] for level in [1, 2, 3]]): | |
| return format_error(f"Your submission has {num_questions[1]} questions for level 1, {num_questions[2]} for level 2, and {num_questions[3]} for level 3, but it should have {ref_level_len[val_or_test][1]}, {ref_level_len[val_or_test][2]}, and {ref_level_len[val_or_test][3]} respectively. Please check your submission.") | |
| # SAVE SCORED SUBMISSION | |
| if LOCAL_DEBUG: | |
| print("mock uploaded scored submission") | |
| else: | |
| api.upload_file( | |
| repo_id=SUBMISSION_DATASET, | |
| path_or_fileobj=f"scored/{organisation}_{model}.jsonl", | |
| path_in_repo=f"{organisation}/{model}/{YEAR_VERSION}_{val_or_test}_scored_{datetime.datetime.today()}.jsonl", | |
| repo_type="dataset", | |
| token=TOKEN | |
| ) | |
| # Save scored file | |
| if is_validation: | |
| api.upload_file( | |
| repo_id=SUBMISSION_DATASET_PUBLIC, | |
| path_or_fileobj=f"scored/{organisation}_{model}.jsonl", | |
| path_in_repo=f"{organisation}/{model}/{YEAR_VERSION}_{val_or_test}_scored_{datetime.datetime.today()}.jsonl", | |
| repo_type="dataset", | |
| token=TOKEN | |
| ) | |
| # SAVE TO LEADERBOARD DATA | |
| eval_entry = { | |
| "model": model, | |
| "model_family": model_family, | |
| "system_prompt": system_prompt, | |
| "url": url, | |
| "organisation": organisation, | |
| "score": scores["all"]/ref_scores_len[val_or_test], | |
| "score_level1": scores[1]/num_questions[1], | |
| "score_level2": scores[2]/num_questions[2], | |
| "score_level3": scores[3]/num_questions[3], | |
| "date": datetime.datetime.today().strftime('%Y-%m-%d') | |
| } | |
| if num_questions[1] + num_questions[2] + num_questions[3] != ref_scores_len[val_or_test]: | |
| return format_error(f"Your submission has {len(scores['all'])} questions for the {val_or_test} set, but it should have {ref_scores_len[val_or_test]}. Please check your submission.") | |
| # Catching spam submissions of 100% | |
| if all((eval_entry[k] == 1 for k in ["score_level1", "score_level2", "score_level3"])): | |
| return format_error(f"There was a problem with your submission. Please open a discussion.") | |
| # Testing for duplicates - to see if we want to add something like it as it would allow people to try to see the content of other submissions | |
| #eval_entry_no_date = {k: v for k, v in eval_entry if k != "date"} | |
| #columns_no_date = [c for c in eval_results[val_or_test].column_names if c != "date"] | |
| #if eval_entry_no_date in eval_results[val_or_test].select_columns(columns_no_date): | |
| # return format_error(f"Your submission is an exact duplicate from an existing submission.") | |
| eval_results[val_or_test] = eval_results[val_or_test].add_item(eval_entry) | |
| print(eval_results) | |
| if LOCAL_DEBUG: | |
| print("mock uploaded results to lb") | |
| else: | |
| eval_results.push_to_hub(RESULTS_DATASET, config_name = YEAR_VERSION, token=TOKEN) | |
| return format_log(f"Model {model} submitted by {organisation} successfully.\nPlease wait a few hours and refresh the leaderboard to see your score displayed.") | |
| except Exception as e: | |
| print(e) | |
| return format_error(f"An error occurred, please open a discussion and indicate at what time you encountered the error.\n") | |
| def refresh(): | |
| if HAS_TOKEN and not LOCAL_DEBUG: | |
| try: | |
| eval_results = load_dataset( | |
| RESULTS_DATASET, | |
| YEAR_VERSION, | |
| token=TOKEN, | |
| download_mode="force_redownload", | |
| verification_mode=VerificationMode.NO_CHECKS, | |
| ) | |
| except Exception as e: | |
| print(e) | |
| eval_results = None | |
| else: | |
| eval_results = None | |
| return get_dataframe_from_results(eval_results=eval_results, split="test") | |
| def upload_file(files): | |
| file_paths = [file.name for file in files] | |
| return file_paths | |
| demo = gr.Blocks() | |
| with demo: | |
| gr.HTML(TITLE) | |
| gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") | |
| with gr.Row(): | |
| with gr.Accordion("📙 Citation", open=False): | |
| citation_button = gr.Textbox( | |
| value=CITATION_BUTTON_TEXT, | |
| label=CITATION_BUTTON_LABEL, | |
| elem_id="citation-button", | |
| ) #.style(show_copy_button=True) | |
| gr.Markdown("Results: Test") | |
| leaderboard_table_test = gr.components.Dataframe( | |
| value=eval_dataframe_test, datatype=TYPES, interactive=False, | |
| column_widths=["20%"] | |
| ) | |
| #with gr.Tab("Results: Validation"): | |
| # leaderboard_table_val = gr.components.Dataframe( | |
| # value=eval_dataframe_val, datatype=TYPES, interactive=False, | |
| # column_widths=["20%"] | |
| # ) | |
| refresh_button = gr.Button("Refresh") | |
| refresh_button.click( | |
| refresh, | |
| inputs=[], | |
| outputs=[ | |
| #leaderboard_table_val, | |
| leaderboard_table_test, | |
| ], | |
| ) | |
| with gr.Accordion("Submit a new model for evaluation"): | |
| with gr.Row(): | |
| gr.Markdown(SUBMISSION_TEXT, elem_classes="markdown-text") | |
| with gr.Row(): | |
| with gr.Column(): | |
| #level_of_test = gr.Radio(["test"], value="test", label="Split") | |
| model_name_textbox = gr.Textbox(label="Agent name") | |
| model_family_textbox = gr.Textbox(label="Model family") | |
| system_prompt_textbox = gr.Textbox(label="System prompt example") | |
| url_textbox = gr.Textbox(label="Url to model information") | |
| with gr.Column(): | |
| organisation = gr.Textbox(label="Organisation") | |
| mail = gr.Textbox(label="Contact email (will be stored privately, & used if there is an issue with your submission)") | |
| file_output = gr.File() | |
| with gr.Row(): | |
| gr.LoginButton() | |
| submit_button = gr.Button("Submit Eval On Test") | |
| submission_result = gr.Markdown() | |
| submit_button.click( | |
| add_new_eval, | |
| [ | |
| #level_of_test, | |
| model_name_textbox, | |
| model_family_textbox, | |
| system_prompt_textbox, | |
| url_textbox, | |
| file_output, | |
| organisation, | |
| ], | |
| submission_result, | |
| ) | |
| if IN_SPACES and HAS_TOKEN: | |
| scheduler = BackgroundScheduler() | |
| scheduler.add_job(restart_space, "interval", seconds=3600) | |
| scheduler.start() | |
| demo.launch(debug=True) | |