llmlagbench / src /utils.py
fzarnecki's picture
Display fix
71eefff
import math
import json
import pandas as pd
from statistics import mean
from datetime import datetime
from collections import defaultdict
### UTILITY
def _safe_numeric(value):
"""Convert value to float, returning NaN for invalid values."""
try:
if value is None or (isinstance(value, float) and math.isnan(value)):
return float("nan")
return float(value)
except Exception:
return float("nan")
def aggregate_weekly_to_monthly(dates, values):
"""
Aggregate raw data points (weekly granularity) to monthly averages.
Args:
dates: List of date strings in 'YYYY-MM-DD' format
values: List of corresponding values (e.g., faithfulness scores)
Returns:
Dictionary mapping 'YYYY_MM' to average value for that month
"""
monthly_data = defaultdict(list)
for date_str, value in zip(dates, values):
try:
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
month_key = f"{date_obj.year}_{date_obj.month:02d}"
monthly_data[month_key].append(value)
except Exception:
continue # Skip invalid dates
# Calculate average for each month
monthly_averages = {}
for month_key, values_list in monthly_data.items():
if values_list:
monthly_averages[month_key] = round(mean(values_list), 2)
return monthly_averages
def extract_provider_from_model_name(model_name):
"""
Extract provider from model name path.
Args:
model_name: String like 'CohereForAI/c4ai-command-a-03-2025'
Returns:
Provider string (e.g., 'CohereForAI') or empty string if not found
"""
if '/' in model_name:
return model_name.split('/')[0]
return ""
def convert_changepoints_to_monthly(changepoint_dates):
"""
Convert changepoint dates to YYYY_MM format.
Args:
changepoint_dates: List of date strings in 'YYYY-MM-DD' format
Returns:
List of strings in 'YYYY_MM' format
"""
monthly_changepoints = []
for date_str in changepoint_dates:
try:
date_obj = datetime.strptime(date_str, '%Y-%m-%d')
month_key = f"{date_obj.year}_{date_obj.month:02d}"
monthly_changepoints.append(month_key)
except Exception:
continue # Skip invalid dates
return monthly_changepoints
def calc_year_avg():
"""TODO: Calculate yearly average."""
return 1
def avg_smoothing():
"""TODO: Apply smoothing to averages."""
pass
def calculate_cumulative_average(values):
"""
Calculate cumulative average for a list of values.
Args:
values: List of numeric values
Returns:
List of cumulative averages where cumulative_avg[i] = mean(values[0:i+1])
"""
cumulative_avg = []
running_sum = 0.0
for i, value in enumerate(values):
running_sum += value
cumulative_avg.append(running_sum / (i + 1))
return cumulative_avg
### DATA LOADING
def load_data(filepath):
"""Load JSON data from file."""
with open(filepath, "r") as f:
return json.load(f)
def load_model_metadata(filepath):
"""Load model metadata from JSON file."""
with open(filepath, "r") as f:
return json.load(f)
def load_raw_model_data(filepath, model_name):
"""
Load raw data for a specific model from leaderboard_graph_data.json.
Args:
filepath: Path to the JSON file
model_name: Name of the model to load
Returns:
Dictionary containing model data with keys:
- dates: List of date strings
- faithfulness: List of faithfulness scores
- cumulative_refusals: List of cumulative refusal counts
- segments: List of segment dictionaries
- changepoint_dates: List of changepoint date strings
- total_observations: Total number of observations
"""
with open(filepath, "r") as f:
all_data = json.load(f)
return all_data.get(model_name, {})
def prepare_mappings(data):
"""Create lookup dictionaries from loaded data."""
models_map = {m["id"]: m["name"] for m in data["models"]}
metrics = [m["id"] for m in data["metrics"]]
return models_map, metrics
def build_year_column_mapping(years, months):
"""Build mapping of year -> list of aggregated month columns."""
return {year: [f"{year}_{month}" for month in months] for year in years}
### DATA TRANSFORMATION
def validate_equal_measurements(data):
"""
Validate measurement counts across models and warn about discrepancies.
Args:
data: Dictionary with model names as keys
Returns:
tuple: (is_valid, measurement_counts_dict, message)
- is_valid: Always True now (we allow different counts)
- measurement_counts_dict: Dict mapping model_name -> count
- message: Info/warning message about the counts
"""
measurement_counts = {}
for model_name, model_data in data.items():
dates = model_data.get('dates', [])
measurement_counts[model_name] = len(dates)
if len(measurement_counts) == 0:
return True, {}, "No models found in data"
# Find max count
max_count = max(measurement_counts.values())
min_count = min(measurement_counts.values())
if max_count == min_count:
# All models have same count
return True, measurement_counts, f"All models have {max_count} measurements"
# Models have different counts - create warning message
warning_msg = f"⚠️ Models have different measurement counts (range: {min_count}-{max_count}):\n"
# Show models with fewer than max samples
models_with_fewer = []
for model, count in sorted(measurement_counts.items(), key=lambda x: x[1]):
if count < max_count:
models_with_fewer.append(f" {model}: {count} samples (missing {max_count - count})")
if models_with_fewer:
warning_msg += "\n".join(models_with_fewer)
warning_msg += f"\n\nModels with maximum samples ({max_count}):\n"
for model, count in measurement_counts.items():
if count == max_count:
warning_msg += f" {model}\n"
return True, measurement_counts, warning_msg
def transform_leaderboard_data_to_dataframe(data, years, months, model_metadata=None):
"""
Transform new leaderboard_graph_data.json format into DataFrame-compatible structure.
Args:
data: Dictionary with model names as keys
years: List of year strings (e.g., ["2021", "2022", ...])
months: List of month strings (e.g., ["01", "02", ...])
model_metadata: Optional dictionary with model metadata (parameters, release date, etc.)
Returns:
List of row dictionaries ready for DataFrame creation
"""
# Validate measurements and get counts per model
is_valid, measurement_counts, message = validate_equal_measurements(data)
# print(message)
rows = []
for model_name, model_data in data.items():
# Convert changepoints to monthly format
changepoints = convert_changepoints_to_monthly(
model_data.get("changepoint_dates", [])
)
row = {
"Model": model_name,
"1st Detected cutoff": changepoints[0].replace("_", ".") if len(changepoints) > 0 else "",
"2nd Detected cutoff": changepoints[1].replace("_", ".") if len(changepoints) > 1 else "",
"trend_changepoints": changepoints # Keep for chart rendering (in YYYY_MM format)
}
# Add metadata if available
# Try exact match first, then try with provider prefix
metadata = None
if model_metadata:
if model_name in model_metadata:
metadata = model_metadata[model_name]
else:
# Try adding provider prefix
provider = extract_provider_from_model_name(model_name)
if provider:
prefixed_name = f"{provider}/{model_name}"
metadata = model_metadata.get(prefixed_name)
else:
# Try all possible provider prefixes for models without /
for key in model_metadata.keys():
if key.endswith(f"/{model_name}") or key == model_name:
metadata = model_metadata[key]
break
if metadata:
row["Provider"] = metadata.get("Provider", "")
row["Parameters"] = metadata.get("Parameters", "")
row["Provider cutoff"] = metadata.get("Provider cutoff", "")
row["Release date"] = metadata.get("Release date", "")
row["Self-declared cutoff"] = metadata.get("Model cutoff", "")
else:
# Set empty values if metadata not available
# Fall back to extracting provider from model name if no metadata
row["Provider"] = extract_provider_from_model_name(model_name)
row["Parameters"] = ""
row["Provider cutoff"] = ""
row["Release date"] = ""
row["Self-declared cutoff"] = ""
# Aggregate faithfulness data to monthly averages
dates = model_data.get("dates", [])
faithfulness = model_data.get("faithfulness", [])
monthly_averages = aggregate_weekly_to_monthly(dates, faithfulness)
# Calculate evaluation period (min and max dates)
if dates:
try:
date_objects = [datetime.strptime(d, '%Y-%m-%d') for d in dates]
min_date = min(date_objects).strftime('%Y-%m-%d')
max_date = max(date_objects).strftime('%Y-%m-%d')
row["Evaluation period"] = f"{min_date} - {max_date}"
except Exception:
row["Evaluation period"] = ""
else:
row["Evaluation period"] = ""
# Add monthly columns (e.g., "2021_01", "2021_02", ...)
for month_key, avg_value in monthly_averages.items():
row[month_key] = avg_value
# Calculate yearly averages
all_years_values = [] # Collect all monthly values for overall average (specific to this model)
for year in years:
year_values = []
for month in months:
month_key = f"{year}_{month}"
if month_key in monthly_averages:
year_values.append(monthly_averages[month_key])
# Add aggregated year column
row[year] = round(mean(year_values), 2) if year_values else None
# Collect for overall average calculation
all_years_values.extend(year_values)
# Calculate overall average across all years
# Note: This is calculated from the model's actual sample count
# Models with fewer samples will have their average based only on their available data
row["Overall Average"] = round(mean(all_years_values), 2) if all_years_values else None
rows.append(row)
return rows
def extract_metric_value(month_data, metric):
"""
Extract metric value from month data, trying new format first,
then falling back to old format.
"""
# Try new format: "avg_accuracy"
new_key = f"avg_{metric.lower()}"
val = month_data.get(new_key)
return val
def process_month_data(result, year, month, metrics):
"""Process data for a single month and return row updates and values."""
row_updates = {}
month_vals = []
year_vals = []
month_data = result.get("results", {}).get(year, {}).get(month, {})
for metric in metrics:
val = extract_metric_value(month_data, metric)
# Store metric-specific column (e.g., "accuracy_2023_01")
row_updates[f"{metric}_{year}_{month}"] = val
# Collect numeric values for aggregation
if val is not None:
try:
numeric_val = float(val)
month_vals.append(numeric_val)
year_vals.append(numeric_val)
except Exception:
pass # Ignore non-numeric values for aggregation
# Add aggregated month column (average across metrics)
row_updates[f"{year}_{month}"] = (
round(mean(month_vals), 2) if month_vals else None
)
return row_updates, year_vals
def process_result_row(result, models_map, metrics, years, months):
"""Process a single result entry into a dataframe row."""
row = {"Model": models_map.get(result["id"], result["id"])}
# Keep any provider/metadata columns
row.update(result.get("columns", {}))
# Add trend breakpoint
row["trend_breakpoint"] = result.get("trend_breakpoint")
# Process each year
for year in years:
all_year_vals = []
for month in months:
month_updates, year_vals = process_month_data(
result, year, month, metrics
)
row.update(month_updates)
all_year_vals.extend(year_vals)
# Add aggregated year column
row[year] = round(mean(all_year_vals), 2) if all_year_vals else None
return row
def create_dataframe(cfg, data, models_map=None, metrics=None, model_metadata=None):
"""
Transform loaded data into a pandas DataFrame.
Supports both old format (with models_map and metrics) and new format
(direct model data dictionary).
"""
# Check if this is the new format (direct model dictionary)
if models_map is None and metrics is None:
# New format: data is already the model dictionary
rows = transform_leaderboard_data_to_dataframe(
data, cfg.get("years"), cfg.get("months"), model_metadata
)
else:
# Old format: data contains "results" key
rows = [
process_result_row(
result, models_map, metrics, cfg.get("years"), cfg.get("months")
)
for result in data["results"]
]
return pd.DataFrame(rows)
### COLUMN DEFINITIONS
def get_aggregated_columns(years, year_to_columns):
"""Get lists of aggregated year and month columns."""
aggregated_cols_year = years
aggregated_cols_month = [
col for year in years for col in year_to_columns[year]
]
return aggregated_cols_year, aggregated_cols_month