import asyncio import traceback import datetime import json import os import gradio as gr import pandas as pd from apscheduler.schedulers.background import BackgroundScheduler from huggingface_hub import HfApi, upload_file import requests from content import ( CITATION_BUTTON_LABEL, CITATION_BUTTON_TEXT, INTRODUCTION_TEXT, TITLE, SUBMISSION_TEXT, styled_error, styled_warning, model_hyperlink, format_log, ) from score import load_agent_output_dataset from loguru import logger from datasets import load_dataset, VerificationMode from utils import parse_eval_dataset, parseaddr from env import ( CONTACT_DATASET_FILE, VERSION, BENCHMARK_INTERNAL_EVALUATE_DATASET_FILE, EVALUATE_RESULT_DATASET_FILE, TOKEN, SUBMISSION_DATASET, INTERNAL_DATASET, EVALUATE_RESULT_DATASET, REPO_ID, CONTACT_DATASET ) TOKEN = os.getenv("HF_TOKEN") API = HfApi(token=TOKEN) LOCAL_DEBUG=False TYPES = ["markdown", "number", "number", "number", "number", "str", "str", "str"] benchmark_internal_evaluate_dataset = load_dataset(INTERNAL_DATASET, data_files=BENCHMARK_INTERNAL_EVALUATE_DATASET_FILE, token=TOKEN, verification_mode=VerificationMode.NO_CHECKS, download_mode="reuse_cache_if_exists",trust_remote_code=True) print(EVALUATE_RESULT_DATASET_FILE) eval_results = load_dataset(EVALUATE_RESULT_DATASET, data_files=EVALUATE_RESULT_DATASET_FILE, token=TOKEN, verification_mode=VerificationMode.NO_CHECKS, download_mode="reuse_cache_if_exists",trust_remote_code=True) benchmark_dataset = parse_eval_dataset(benchmark_internal_evaluate_dataset) # type: ignore def save_contact_info(contact_info): import tempfile import json # 加载现有联系人信息 try: contact_infos = load_dataset( CONTACT_DATASET, data_files=CONTACT_DATASET_FILE, token=TOKEN, download_mode="force_redownload", verification_mode=VerificationMode.NO_CHECKS, trust_remote_code=True ) print(f"load {CONTACT_DATASET_FILE} success, {contact_infos}") contact_info_list = list(contact_infos['train']) print("contact_info_list:", contact_info_list) contact_info_list.append(contact_info) except Exception as e: print(f"Error loading contact info: {e}") contact_info_list = [] with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as temp_file: json.dump(contact_info_list, temp_file, default=str, indent=4) temp_file_path = temp_file.name API.upload_file( path_or_fileobj=temp_file_path, path_in_repo=CONTACT_DATASET_FILE, repo_id=CONTACT_DATASET, repo_type='dataset', token=TOKEN, commit_message=f"Add new contact: {contact_info['model']} by {contact_info['organization']}" ) print(f"upload {temp_file_path} success") os.unlink(temp_file_path) # def get_dataframe_from_results(eval_results, split:str = 'train'): # try: # if hasattr(eval_results, "__getitem__"): # local_df = eval_results[split] # if hasattr(local_df, "to_pandas"): # df = local_df.to_pandas() # else: # df = pd.DataFrame(local_df.data if hasattr(local_df, "data") else local_df) # else: # df = pd.DataFrame(eval_results if isinstance(eval_results, list) else []) # print(df) # try: # if "url" in df.columns and "model" in df.columns: # df["model"] = df.apply(lambda row: model_hyperlink(row["url"], row["model"])) # except Exception as e: # print(f"Error applying model hyperlink: {e}") # pass # column_renames = { # "model": "Agent name", # "model_family": "Model family", # "score": "Average score (%)", # "date": "Submission date" # } # for i in [1, 2, 3]: # column_renames[f"score_level{i}"] = f"Level {i} score (%)" # rename_cols = {k: v for k, v in column_renames.items() if k in df.columns} # if rename_cols: # df = df.rename(columns=rename_cols) # try: # cols_to_drop = [col for col in ["system_prompt", "url"] if col in df.columns] # if cols_to_drop: # df = df.drop(columns=cols_to_drop) # except: # pass # try: # if "Average score (%)" in df.columns: # df = df.sort_values(by=["Average score (%)"], ascending=False) # except: # pass # try: # numeric_cols = [c for c in df.columns if "score" in c.lower()] # if numeric_cols: # df[numeric_cols] = df[numeric_cols].multiply(100).round(decimals=2) # except Exception as e: # print(f"Error processing numeric columns: {e}") # pass # print(df) # return df # except Exception as e: # print(f"Error in get_dataframe_from_results: {e}") # return pd.DataFrame({}) def get_dataframe_from_results(eval_results, split:str = 'train'): try: if hasattr(eval_results, "__getitem__"): local_df = eval_results[split] if hasattr(local_df, "to_pandas"): df = local_df.to_pandas() else: df = pd.DataFrame(local_df.data if hasattr(local_df, "data") else local_df) else: df = pd.DataFrame(eval_results if isinstance(eval_results, list) else []) print(df) try: if "url" in df.columns and "model" in df.columns: df["model"] = df.apply(lambda row: model_hyperlink(row["url"], row["model"])) except Exception as e: print(f"Error applying model hyperlink: {e}") pass column_renames = { "agent_name": "Agent name", # "model_family": "Model family", "total_score": "Average score (%)", "answer_score": "Answer score (%)", "reasoning_score":"Reasoning score (%)", "tool_use_score": "Tool-use score(%)", "date": "Submission date", } rename_cols = {k: v for k, v in column_renames.items() if k in df.columns} if rename_cols: df = df.rename(columns=rename_cols) try: cols_to_drop = [col for col in ["system_prompt", "url"] if col in df.columns] if cols_to_drop: df = df.drop(columns=cols_to_drop) except: pass try: if "Average score (%)" in df.columns: df = df.sort_values(by=["Average score (%)"], ascending=False) except: pass try: numeric_cols = [c for c in df.columns if "score" in c.lower()] if numeric_cols: df[numeric_cols] = df[numeric_cols].round(decimals=2) except Exception as e: print(f"Error processing numeric columns: {e}") pass print(df) return df except Exception as e: print(f"Error in get_dataframe_from_results: {e}") return pd.DataFrame({}) # 尝试创建数据框 try: eval_dataframe = get_dataframe_from_results(eval_results=eval_results) except Exception as e: print(f"Error creating dataframes: {e}") eval_dataframe = pd.DataFrame({}) def refresh(): logger.info('refreshing...') try: eval_results = load_dataset(EVALUATE_RESULT_DATASET, data_files=EVALUATE_RESULT_DATASET_FILE, token=TOKEN, verification_mode=VerificationMode.NO_CHECKS, download_mode="force_redownload",trust_remote_code=True) eval_dataframe = get_dataframe_from_results(eval_results=eval_results) return eval_dataframe except Exception as e: logger.error(f"Error in refresh: {traceback.format_exc()}") return pd.DataFrame({}) def restart_space(): try: API.restart_space(repo_id=REPO_ID, token=TOKEN) except Exception as e: print(f"Error restarting space: {e}") def add_new_eval( model: str, # model_family: str, # system_prompt: str, url: str, path_to_file, organisation: str, mail: str, profile: gr.OAuthProfile, ): try: try: with open(path_to_file, 'r', encoding='utf-8') as f: json.load(f) except json.JSONDecodeError: return styled_error("Please upload a valid JSON file.") except Exception as e: return styled_error(f"File read error: {str(traceback.format_exc())}") if not LOCAL_DEBUG: # Was the profile created less than 2 month ago? user_data = requests.get(f"https://huggingface.co/api/users/{profile.username}/overview") print(user_data.content) creation_date = json.loads(user_data.content)["createdAt"] if datetime.datetime.now() - datetime.datetime.strptime(creation_date, '%Y-%m-%dT%H:%M:%S.%fZ') < datetime.timedelta(days=10): return styled_error("This account is not authorized to submit on CAIA.") contact_infos = load_dataset(CONTACT_DATASET, data_files=CONTACT_DATASET_FILE, token=TOKEN, download_mode="force_redownload", verification_mode=VerificationMode.NO_CHECKS, trust_remote_code=True) user_submission_dates = [] try: contact_info_list = list(contact_infos['train']) # type: ignore print("contact info:", contact_info_list) for row in contact_info_list: if row.get("username") == profile.username: user_submission_dates.append(row.get("date")) except Exception as e: print(f"Error getting user submission dates: {e}") user_submission_dates = sorted(user_submission_dates) user_submission_dates = [date.strftime('%Y-%m-%d') if isinstance(date, pd.Timestamp) else datetime.datetime.strptime(str(date), '%Y-%m-%d %H:%M:%S.%f').strftime('%Y-%m-%d') for date in user_submission_dates if date] print("submission_dates: ",user_submission_dates) if len(user_submission_dates) > 0 and user_submission_dates[-1] == datetime.datetime.today().strftime('%Y-%m-%d'): return styled_error("You already submitted once today, please try again tomorrow.") # Very basic email parsing _, parsed_mail = parseaddr(mail) if not "@" in parsed_mail: return styled_warning("Please provide a valid email adress.") print("Adding new eval") if not LOCAL_DEBUG: # Check if the combination model/org already exists and prints a warning message if yes model_exists = False try: eval_results_list = list(eval_results) for row in eval_results_list: if row.get("model", "").lower() == model.lower() and row.get("organisation", "").lower() == organisation.lower(): model_exists = True break except Exception as e: print(f"Error checking model existence: {e}") if model_exists: return styled_warning("This model has been already submitted.") if path_to_file is None: return styled_warning("Please attach a file.") file_path = path_to_file if hasattr(path_to_file, 'name'): try: file_path = path_to_file.name except: pass # SAVE CONTACT contact_info = { "model": model, "url": url, "organisation": organisation, "username": profile.username, "mail": mail, "date": pd.Timestamp(datetime.datetime.now()) } if LOCAL_DEBUG: print("mock uploaded contact info") else: save_contact_info(contact_info) # SCORE SUBMISSION file_path = path_to_file.name print("Simulate submission...") agent_output = load_agent_output_dataset(dataset_path=file_path) agent_output_task_ids = set(output.task_id for output in agent_output) benchmark_task_ids = set(item.task_id for item in benchmark_dataset) if agent_output_task_ids != benchmark_task_ids: return styled_error("The task IDs in agent outputs do not match the task IDs in benchmark dataset.") # 上传提交文件到SUBMISSION_DATASET if not LOCAL_DEBUG: try: API.upload_file( path_or_fileobj=file_path, path_in_repo=f"{VERSION}/<{str(model)}>_<{str(organisation)}>_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json", repo_id=SUBMISSION_DATASET, repo_type='dataset', token=TOKEN, commit_message=f"Add submission: {model} by {organisation}" ) print(f"Successfully uploaded submission file to {SUBMISSION_DATASET}") except Exception as e: print(f"Error uploading submission file: {e}") return styled_error(f"upload file failed: {str(e)}") else: print("mock uploaded submission file") return format_log(f"Model {model} submitted successfully by {organisation}.\nPlease wait a few hours and refresh the leaderboard to see your score.") except Exception as e: print(e) return styled_error(f"An error occurred, please open a discussion and indicate at what time you encountered the error.\n") def new_submission(): return styled_error("This feature is not available yet.") # 修复datatype定义 demo = gr.Blocks() with demo: gr.HTML(TITLE) gr.Markdown(INTRODUCTION_TEXT, elem_classes="markdown-text") with gr.Tab("Results: "): leaderboard_table = gr.Dataframe( value=eval_dataframe, interactive=False, column_widths=["20%"], datatype=TYPES ) refresh_button = gr.Button("Refresh") refresh_button.click( refresh, inputs=[], outputs=[ leaderboard_table, ], ) with gr.Accordion("Submission"): with gr.Row(): gr.Markdown(SUBMISSION_TEXT, elem_classes="markdown-text") with gr.Row(): model_name_textbox = gr.Textbox(label="Agent name") # model_family_textbox = gr.Textbox(label="Model family") # system_prompt_textbox = gr.Textbox(label="System prompt example") with gr.Row(): url_textbox = gr.Textbox(label="Url to assistant/agent information") with gr.Row(): organisation = gr.Textbox(label="Organization") with gr.Row(): mail = gr.Textbox(label="Contact email (will be stored privately & used if there is an issue with your submission)") with gr.Row(): file_output = gr.File() with gr.Row(): gr.LoginButton() submit_button = gr.Button("Submit Eval") submission_result = gr.Markdown() submit_button.click( add_new_eval, [ model_name_textbox, # model_family_textbox, # system_prompt_textbox, url_textbox, file_output, organisation, mail, ], submission_result, ) scheduler = BackgroundScheduler() scheduler.add_job(restart_space, "interval", seconds=3600) scheduler.start() demo.launch(debug=True)