Spaces:
Running
Running
| import json | |
| import os | |
| import pandas as pd | |
| from src.display.formatting import has_no_nan_values, make_clickable_model | |
| from src.display.utils import AutoEvalColumn, EvalQueueColumn | |
| from src.leaderboard.read_evals import get_raw_eval_results | |
| from src.about import Tasks # 添加这个导入 | |
| def get_level(col_name): | |
| num = col_name.split(' ')[0] | |
| level_num = num.count('.') | |
| return level_num | |
| def get_level_description(level: int) -> dict: | |
| """ | |
| 获取每个层级的描述信息 | |
| """ | |
| descriptions = { | |
| 1: { | |
| "title": "First level risk categories", | |
| "description": """ | |
| - Critical Personal Safety: encompasses immediate life-threatening issues such as national security, public safety, domestic violence, and stalking; | |
| - Property \& Living Security: addressing basic survival needs in line with Maslow's hierarchy, including housing safety and consumer rights related to food and essential goods | |
| - Fundamental Rights: present less immediate threats, covering privacy, data protection, legal rights, and employment safety | |
| - Welfare Protection: focusing on quality of life issues such as animal welfare and various miscellaneous safety concerns. | |
| """, | |
| "columns": ['1. Critical Personal Safety', '2. Property & Living Security', | |
| '3. Fundamental Rights', '4. Welfare Protection'] | |
| }, | |
| 2: { | |
| "title": "Second level risk categories", | |
| # "description": """ | |
| # """, | |
| "columns": ['1.1. National Security and Public Safety', '1.2. Domestic Violence and Safety', | |
| '2.1. Housing and Property Safety', '2.2. Consumer Rights and Safety', | |
| '3.1. Privacy and Data Protection', '3.2. Legal Rights and Obligations', | |
| '3.3. Employment and Safety', '4.1. Animal Welfare and Safety', | |
| '4.2. Family and Child Law', '4.3. Miscellaneous Safety Issues'] | |
| }, | |
| 3: { | |
| "title": "Third level risk categories", | |
| "description": """ | |
| """, | |
| "columns": [] # 这里会动态填充所有三级指标 | |
| } | |
| } | |
| return descriptions[level] | |
| def get_level_columns(level: int) -> list: | |
| """ | |
| 获取指定层级的所有列名 | |
| """ | |
| # 基础列(非评测列) | |
| base_cols = ['T', 'Model', 'Average ⬆️', 'Type', 'Architecture', 'Precision', | |
| 'Hub License', '#Params (B)', 'Available on the hub', 'Model sha'] | |
| # 获取指定层级的任务列 | |
| level_tasks = [task for task in Tasks if get_level(task.value.col_name) == level] | |
| level_cols = [task.value.col_name for task in level_tasks] | |
| return base_cols + level_cols | |
| def get_leaderboard_data(level: int, df: pd.DataFrame, cols: list, benchmark_cols: list) -> pd.DataFrame: | |
| """ | |
| 根据层级筛选leaderboard数据 | |
| """ | |
| try: | |
| print(f"Processing level {level}") | |
| # 获取该层级对应的所有列 | |
| selected_cols = get_level_columns(level) | |
| print(f"Selected columns for level {level}: {selected_cols}") | |
| # 确保所有选择的列都在数据框中 | |
| available_cols = [col for col in selected_cols if col in df.columns] | |
| # 创建新的数据框 | |
| filtered_df = df[available_cols].copy() | |
| # 获取该层级的评测列 | |
| level_desc = get_level_description(level) | |
| benchmark_cols = level_desc['columns'] | |
| # 重新计算平均值 | |
| if benchmark_cols: | |
| filtered_df['Average ⬆️'] = filtered_df[benchmark_cols].mean(axis=1) | |
| # 按平均值排序 | |
| filtered_df = filtered_df.sort_values('Average ⬆️', ascending=False) | |
| return filtered_df.round(decimals=2) | |
| except Exception as e: | |
| print(f"Error in get_leaderboard_data: {e}") | |
| print(f"Exception details: {str(e)}") | |
| return df | |
| def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame: | |
| """Creates a dataframe from all the individual experiment results""" | |
| raw_data = get_raw_eval_results(results_path, requests_path) | |
| all_data_json = [v.to_dict() for v in raw_data] | |
| df = pd.DataFrame.from_records(all_data_json) | |
| df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False) | |
| df = df[cols].round(decimals=2) | |
| # filter out if any of the benchmarks have not been produced | |
| df = df[has_no_nan_values(df, benchmark_cols)] | |
| return df | |
| def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]: | |
| """Creates the different dataframes for the evaluation queues requestes""" | |
| entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")] | |
| all_evals = [] | |
| for entry in entries: | |
| if ".json" in entry: | |
| file_path = os.path.join(save_path, entry) | |
| with open(file_path) as fp: | |
| data = json.load(fp) | |
| data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
| data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
| all_evals.append(data) | |
| elif ".md" not in entry: | |
| # this is a folder | |
| sub_entries = [e for e in os.listdir(f"{save_path}/{entry}") if os.path.isfile(e) and not e.startswith(".")] | |
| for sub_entry in sub_entries: | |
| file_path = os.path.join(save_path, entry, sub_entry) | |
| with open(file_path) as fp: | |
| data = json.load(fp) | |
| data[EvalQueueColumn.model.name] = make_clickable_model(data["model"]) | |
| data[EvalQueueColumn.revision.name] = data.get("revision", "main") | |
| all_evals.append(data) | |
| pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]] | |
| running_list = [e for e in all_evals if e["status"] == "RUNNING"] | |
| finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"] | |
| df_pending = pd.DataFrame.from_records(pending_list, columns=cols) | |
| df_running = pd.DataFrame.from_records(running_list, columns=cols) | |
| df_finished = pd.DataFrame.from_records(finished_list, columns=cols) | |
| return df_finished[cols], df_running[cols], df_pending[cols] | |