hamzabouajila's picture
feat: enhance evaluation pipeline and error handling
34052ff
raw
history blame
3.32 kB
import json
import os
import pandas as pd
from datetime import datetime, timedelta
import dateutil
from src.display.formatting import has_no_nan_values, make_clickable_model
from src.display.utils import AutoEvalColumn, EvalQueueColumn, ModelType, Tasks, Precision, WeightType
from src.leaderboard.read_evals import get_raw_eval_results
def get_leaderboard_df(results_path: str, requests_path: str, cols: list, benchmark_cols: list) -> pd.DataFrame:
"""Creates a dataframe from all the individual experiment results"""
raw_data = get_raw_eval_results(results_path, requests_path)
all_data_json = [v.to_dict() for v in raw_data]
df = pd.DataFrame.from_records(all_data_json)
if df.empty:
print("No evaluation results found. Returning empty DataFrame with correct columns.")
return pd.DataFrame(columns=cols)
df = df.sort_values(by=[AutoEvalColumn().average.name], ascending=False)
df = df[cols].round(decimals=4)
df = df[has_no_nan_values(df, benchmark_cols)]
return df
def get_evaluation_queue_df(save_path: str, cols: list) -> list[pd.DataFrame]:
"""Creates the different dataframes for the evaluation queues requestes"""
all_evals = []
# Define a threshold to identify "stuck" jobs
time_threshold = datetime.now() - timedelta(hours=1)
# Use os.walk for a robust way to find all files recursively
for root, _, files in os.walk(save_path):
for filename in files:
if filename.endswith(".json"):
file_path = os.path.join(root, filename)
try:
with open(file_path, "r") as fp:
data = json.load(fp)
# Check for "stuck" jobs
if data.get("status") == "RUNNING":
submitted_time_str = data.get("submitted_at")
if submitted_time_str:
submitted_time = dateutil.parser.isoparse(submitted_time_str)
if submitted_time < time_threshold:
print(f"Stuck job detected for {data['model']}. Changing status to PENDING.")
data["status"] = "PENDING"
data[EvalQueueColumn.model.name] = make_clickable_model(data["model"])
data[EvalQueueColumn.revision.name] = data.get("revision", "main")
all_evals.append(data)
except Exception as e:
print(f"Error processing file {file_path}: {e}")
continue
pending_list = [e for e in all_evals if e["status"] in ["PENDING", "RERUN"]]
running_list = [e for e in all_evals if e["status"] == "RUNNING"]
finished_list = [e for e in all_evals if e["status"].startswith("FINISHED") or e["status"] == "PENDING_NEW_EVAL"]
df_pending = pd.DataFrame.from_records(pending_list, columns=cols) if pending_list else pd.DataFrame(columns=cols)
df_running = pd.DataFrame.from_records(running_list, columns=cols) if running_list else pd.DataFrame(columns=cols)
df_finished = pd.DataFrame.from_records(finished_list, columns=cols) if finished_list else pd.DataFrame(columns=cols)
return df_finished[cols], df_running[cols], df_pending[cols]