Spaces:
Runtime error
Runtime error
File size: 7,359 Bytes
8e5aec2 a1cf030 350da81 60f464d 350da81 a1cf030 350da81 a1cf030 350da81 f0bd283 350da81 60f464d 350da81 60f464d 350da81 a1cf030 350da81 f0bd283 350da81 f0bd283 350da81 8e5aec2 350da81 8e5aec2 1596349 8e5aec2 1596349 a1cf030 8e5aec2 1596349 8e5aec2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 | import json
import os
import pandas as pd
from src.display.formatting import has_no_nan_values, make_clickable_model
from src.display.utils import AutoEvalColumn, EvalQueueColumn, benchmark_display_name, benchmark_internal_name
def _as_track_float(value):
if value is None:
return None
return float(value)
def _format_track_pair(hard_value, easy_value):
if hard_value is None or easy_value is None:
return "-"
return f"{hard_value:.1f}({easy_value:.1f})"
def _format_track_pair_for_rank(hard_value, easy_value, rank_by: str):
rank_by = (rank_by or "hard").strip().lower()
if rank_by == "easy":
return _format_track_pair(easy_value, hard_value)
return _format_track_pair(hard_value, easy_value)
def _get_track_pair(score_bucket):
if not isinstance(score_bucket, dict):
return None, None
return _as_track_float(score_bucket.get("hard")), _as_track_float(score_bucket.get("easy"))
def _lookup_track_pair(entry, column_name):
by_dimension = entry.get("by_dimension") or {}
candidate_names = []
for name in (column_name, benchmark_internal_name(column_name), benchmark_display_name(column_name)):
if name not in candidate_names:
candidate_names.append(name)
for candidate_name in candidate_names:
if candidate_name in by_dimension:
return _get_track_pair(by_dimension[candidate_name])
by_domain = entry.get("by_domain") or {}
for candidate_name in candidate_names:
if candidate_name in by_domain:
return _get_track_pair(by_domain[candidate_name])
return None, None
def _compute_average_pair(entry, benchmark_cols):
domain_average = _get_track_pair(entry.get("AverageByDomain"))
dimension_average = _get_track_pair(entry.get("AverageByDimension"))
if domain_average != (None, None) and dimension_average != (None, None):
hard_close = abs(domain_average[0] - dimension_average[0]) <= 0.11
easy_close = abs(domain_average[1] - dimension_average[1]) <= 0.11
if hard_close and easy_close:
return domain_average
if domain_average != (None, None) and dimension_average == (None, None):
return domain_average
if dimension_average != (None, None) and domain_average == (None, None):
return dimension_average
hard_scores = []
easy_scores = []
for column_name in benchmark_cols:
hard_score, easy_score = _lookup_track_pair(entry, column_name)
if hard_score is not None:
hard_scores.append(hard_score)
if easy_score is not None:
easy_scores.append(easy_score)
if hard_scores and easy_scores:
return sum(hard_scores) / len(hard_scores), sum(easy_scores) / len(easy_scores)
if domain_average != (None, None):
return domain_average
return dimension_average
def get_commit_results_df(commit_results_path: str, cols: list, benchmark_cols: list, rank_by: str = "hard") -> pd.DataFrame:
"""Creates a dataframe from commit_results.jsonl for display-only leaderboards."""
if not os.path.exists(commit_results_path):
return pd.DataFrame(columns=cols)
rank_by = (rank_by or "hard").strip().lower()
all_rows = []
with open(commit_results_path, encoding="utf-8") as fp:
for line in fp:
line = line.strip()
if not line:
continue
entry = json.loads(line)
hard_average, easy_average = _compute_average_pair(entry, benchmark_cols)
row = {
AutoEvalColumn.model.name: entry.get("Model", ""),
AutoEvalColumn.average.name: _format_track_pair_for_rank(hard_average, easy_average, rank_by),
"__hard_avg": hard_average,
"__easy_avg": easy_average,
}
for column_name in benchmark_cols:
hard_score, easy_score = _lookup_track_pair(entry, column_name)
row[benchmark_display_name(column_name)] = _format_track_pair_for_rank(hard_score, easy_score, rank_by)
all_rows.append(row)
if not all_rows:
return pd.DataFrame(columns=cols)
df = pd.DataFrame.from_records(all_rows)
if rank_by == "easy":
sort_columns = ["__easy_avg", "__hard_avg", AutoEvalColumn.model.name]
else:
sort_columns = ["__hard_avg", "__easy_avg", AutoEvalColumn.model.name]
df = df.sort_values(
by=sort_columns,
ascending=[False, False, True],
na_position="last",
)
return df[cols]
def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame:
"""Creates a dataframe from all the individual experiment results"""
from src.leaderboard.read_evals import get_raw_eval_results
raw_data = get_raw_eval_results(results_path, requests_path)
if not raw_data:
return pd.DataFrame(columns=cols)
all_data_json = [v.to_dict() for v in raw_data]
df = pd.DataFrame.from_records(all_data_json)
if df.empty:
return pd.DataFrame(columns=cols)
df = df.rename(columns={column_name: benchmark_display_name(column_name) for column_name in benchmark_cols})
df = df.sort_values(by=[AutoEvalColumn.average.name], ascending=False)
df = df[cols].round(decimals=2)
# filter out if any of the benchmarks have not been produced
df = df[has_no_nan_values(df, benchmark_cols)]
return df
def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]:
"""Creates the different dataframes for the evaluation queues requestes"""
entries = [entry for entry in os.listdir(save_path) if not entry.startswith(".")]
all_evals = []
for entry in entries:
if ".json" in entry:
file_path = os.path.join(save_path, entry)
with open(file_path) as fp:
data = json.load(fp)
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"])
data[EvalQueueColumn.revision.name] = data.get("revision", "main")
all_evals.append(data)
elif ".md" not in entry:
# this is a folder
sub_entries = [
e
for e in os.listdir(f"{save_path}/{entry}")
if os.path.isfile(os.path.join(save_path, entry, e)) and not e.startswith(".")
]
for sub_entry in sub_entries:
file_path = os.path.join(save_path, entry, sub_entry)
with open(file_path) as fp:
data = json.load(fp)
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"])
data[EvalQueueColumn.revision.name] = data.get("revision", "main")
all_evals.append(data)
pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]]
running_list = [e for e in all_evals if e["status"] == "RUNNING"]
finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"]
df_pending = pd.DataFrame.from_records(pending_list, columns=cols)
df_running = pd.DataFrame.from_records(running_list, columns=cols)
df_finished = pd.DataFrame.from_records(finished_list, columns=cols)
return df_finished[cols], df_running[cols], df_pending[cols]
|