Spaces:
Running
Running
| import math | |
| import json | |
| import pandas as pd | |
| from statistics import mean | |
| from datetime import datetime | |
| from collections import defaultdict | |
| ### UTILITY | |
| def _safe_numeric(value): | |
| """Convert value to float, returning NaN for invalid values.""" | |
| try: | |
| if value is None or (isinstance(value, float) and math.isnan(value)): | |
| return float("nan") | |
| return float(value) | |
| except Exception: | |
| return float("nan") | |
| def aggregate_weekly_to_monthly(dates, values): | |
| """ | |
| Aggregate raw data points (weekly granularity) to monthly averages. | |
| Args: | |
| dates: List of date strings in 'YYYY-MM-DD' format | |
| values: List of corresponding values (e.g., faithfulness scores) | |
| Returns: | |
| Dictionary mapping 'YYYY_MM' to average value for that month | |
| """ | |
| monthly_data = defaultdict(list) | |
| for date_str, value in zip(dates, values): | |
| try: | |
| date_obj = datetime.strptime(date_str, '%Y-%m-%d') | |
| month_key = f"{date_obj.year}_{date_obj.month:02d}" | |
| monthly_data[month_key].append(value) | |
| except Exception: | |
| continue # Skip invalid dates | |
| # Calculate average for each month | |
| monthly_averages = {} | |
| for month_key, values_list in monthly_data.items(): | |
| if values_list: | |
| monthly_averages[month_key] = round(mean(values_list), 2) | |
| return monthly_averages | |
| def extract_provider_from_model_name(model_name): | |
| """ | |
| Extract provider from model name path. | |
| Args: | |
| model_name: String like 'CohereForAI/c4ai-command-a-03-2025' | |
| Returns: | |
| Provider string (e.g., 'CohereForAI') or empty string if not found | |
| """ | |
| if '/' in model_name: | |
| return model_name.split('/')[0] | |
| return "" | |
| def convert_changepoints_to_monthly(changepoint_dates): | |
| """ | |
| Convert changepoint dates to YYYY_MM format. | |
| Args: | |
| changepoint_dates: List of date strings in 'YYYY-MM-DD' format | |
| Returns: | |
| List of strings in 'YYYY_MM' format | |
| """ | |
| monthly_changepoints = [] | |
| for date_str in changepoint_dates: | |
| try: | |
| date_obj = datetime.strptime(date_str, '%Y-%m-%d') | |
| month_key = f"{date_obj.year}_{date_obj.month:02d}" | |
| monthly_changepoints.append(month_key) | |
| except Exception: | |
| continue # Skip invalid dates | |
| return monthly_changepoints | |
| def calc_year_avg(): | |
| """TODO: Calculate yearly average.""" | |
| return 1 | |
| def avg_smoothing(): | |
| """TODO: Apply smoothing to averages.""" | |
| pass | |
| def calculate_cumulative_average(values): | |
| """ | |
| Calculate cumulative average for a list of values. | |
| Args: | |
| values: List of numeric values | |
| Returns: | |
| List of cumulative averages where cumulative_avg[i] = mean(values[0:i+1]) | |
| """ | |
| cumulative_avg = [] | |
| running_sum = 0.0 | |
| for i, value in enumerate(values): | |
| running_sum += value | |
| cumulative_avg.append(running_sum / (i + 1)) | |
| return cumulative_avg | |
| ### DATA LOADING | |
| def load_data(filepath): | |
| """Load JSON data from file.""" | |
| with open(filepath, "r") as f: | |
| return json.load(f) | |
| def load_model_metadata(filepath): | |
| """Load model metadata from JSON file.""" | |
| with open(filepath, "r") as f: | |
| return json.load(f) | |
| def load_raw_model_data(filepath, model_name): | |
| """ | |
| Load raw data for a specific model from leaderboard_graph_data.json. | |
| Args: | |
| filepath: Path to the JSON file | |
| model_name: Name of the model to load | |
| Returns: | |
| Dictionary containing model data with keys: | |
| - dates: List of date strings | |
| - faithfulness: List of faithfulness scores | |
| - cumulative_refusals: List of cumulative refusal counts | |
| - segments: List of segment dictionaries | |
| - changepoint_dates: List of changepoint date strings | |
| - total_observations: Total number of observations | |
| """ | |
| with open(filepath, "r") as f: | |
| all_data = json.load(f) | |
| return all_data.get(model_name, {}) | |
| def prepare_mappings(data): | |
| """Create lookup dictionaries from loaded data.""" | |
| models_map = {m["id"]: m["name"] for m in data["models"]} | |
| metrics = [m["id"] for m in data["metrics"]] | |
| return models_map, metrics | |
| def build_year_column_mapping(years, months): | |
| """Build mapping of year -> list of aggregated month columns.""" | |
| return {year: [f"{year}_{month}" for month in months] for year in years} | |
| ### DATA TRANSFORMATION | |
| def validate_equal_measurements(data): | |
| """ | |
| Validate measurement counts across models and warn about discrepancies. | |
| Args: | |
| data: Dictionary with model names as keys | |
| Returns: | |
| tuple: (is_valid, measurement_counts_dict, message) | |
| - is_valid: Always True now (we allow different counts) | |
| - measurement_counts_dict: Dict mapping model_name -> count | |
| - message: Info/warning message about the counts | |
| """ | |
| measurement_counts = {} | |
| for model_name, model_data in data.items(): | |
| dates = model_data.get('dates', []) | |
| measurement_counts[model_name] = len(dates) | |
| if len(measurement_counts) == 0: | |
| return True, {}, "No models found in data" | |
| # Find max count | |
| max_count = max(measurement_counts.values()) | |
| min_count = min(measurement_counts.values()) | |
| if max_count == min_count: | |
| # All models have same count | |
| return True, measurement_counts, f"All models have {max_count} measurements" | |
| # Models have different counts - create warning message | |
| warning_msg = f"⚠️ Models have different measurement counts (range: {min_count}-{max_count}):\n" | |
| # Show models with fewer than max samples | |
| models_with_fewer = [] | |
| for model, count in sorted(measurement_counts.items(), key=lambda x: x[1]): | |
| if count < max_count: | |
| models_with_fewer.append(f" {model}: {count} samples (missing {max_count - count})") | |
| if models_with_fewer: | |
| warning_msg += "\n".join(models_with_fewer) | |
| warning_msg += f"\n\nModels with maximum samples ({max_count}):\n" | |
| for model, count in measurement_counts.items(): | |
| if count == max_count: | |
| warning_msg += f" {model}\n" | |
| return True, measurement_counts, warning_msg | |
| def transform_leaderboard_data_to_dataframe(data, years, months, model_metadata=None): | |
| """ | |
| Transform new leaderboard_graph_data.json format into DataFrame-compatible structure. | |
| Args: | |
| data: Dictionary with model names as keys | |
| years: List of year strings (e.g., ["2021", "2022", ...]) | |
| months: List of month strings (e.g., ["01", "02", ...]) | |
| model_metadata: Optional dictionary with model metadata (parameters, release date, etc.) | |
| Returns: | |
| List of row dictionaries ready for DataFrame creation | |
| """ | |
| # Validate measurements and get counts per model | |
| is_valid, measurement_counts, message = validate_equal_measurements(data) | |
| # print(message) | |
| rows = [] | |
| for model_name, model_data in data.items(): | |
| # Convert changepoints to monthly format | |
| changepoints = convert_changepoints_to_monthly( | |
| model_data.get("changepoint_dates", []) | |
| ) | |
| row = { | |
| "Model": model_name, | |
| "1st Detected cutoff": changepoints[0].replace("_", ".") if len(changepoints) > 0 else "", | |
| "2nd Detected cutoff": changepoints[1].replace("_", ".") if len(changepoints) > 1 else "", | |
| "trend_changepoints": changepoints # Keep for chart rendering (in YYYY_MM format) | |
| } | |
| # Add metadata if available | |
| # Try exact match first, then try with provider prefix | |
| metadata = None | |
| if model_metadata: | |
| if model_name in model_metadata: | |
| metadata = model_metadata[model_name] | |
| else: | |
| # Try adding provider prefix | |
| provider = extract_provider_from_model_name(model_name) | |
| if provider: | |
| prefixed_name = f"{provider}/{model_name}" | |
| metadata = model_metadata.get(prefixed_name) | |
| else: | |
| # Try all possible provider prefixes for models without / | |
| for key in model_metadata.keys(): | |
| if key.endswith(f"/{model_name}") or key == model_name: | |
| metadata = model_metadata[key] | |
| break | |
| if metadata: | |
| row["Provider"] = metadata.get("Provider", "") | |
| row["Parameters"] = metadata.get("Parameters", "") | |
| row["Provider cutoff"] = metadata.get("Provider cutoff", "") | |
| row["Release date"] = metadata.get("Release date", "") | |
| row["Self-declared cutoff"] = metadata.get("Model cutoff", "") | |
| else: | |
| # Set empty values if metadata not available | |
| # Fall back to extracting provider from model name if no metadata | |
| row["Provider"] = extract_provider_from_model_name(model_name) | |
| row["Parameters"] = "" | |
| row["Provider cutoff"] = "" | |
| row["Release date"] = "" | |
| row["Self-declared cutoff"] = "" | |
| # Aggregate faithfulness data to monthly averages | |
| dates = model_data.get("dates", []) | |
| faithfulness = model_data.get("faithfulness", []) | |
| monthly_averages = aggregate_weekly_to_monthly(dates, faithfulness) | |
| # Calculate evaluation period (min and max dates) | |
| if dates: | |
| try: | |
| date_objects = [datetime.strptime(d, '%Y-%m-%d') for d in dates] | |
| min_date = min(date_objects).strftime('%Y-%m-%d') | |
| max_date = max(date_objects).strftime('%Y-%m-%d') | |
| row["Evaluation period"] = f"{min_date} - {max_date}" | |
| except Exception: | |
| row["Evaluation period"] = "" | |
| else: | |
| row["Evaluation period"] = "" | |
| # Add monthly columns (e.g., "2021_01", "2021_02", ...) | |
| for month_key, avg_value in monthly_averages.items(): | |
| row[month_key] = avg_value | |
| # Calculate yearly averages | |
| all_years_values = [] # Collect all monthly values for overall average (specific to this model) | |
| for year in years: | |
| year_values = [] | |
| for month in months: | |
| month_key = f"{year}_{month}" | |
| if month_key in monthly_averages: | |
| year_values.append(monthly_averages[month_key]) | |
| # Add aggregated year column | |
| row[year] = round(mean(year_values), 2) if year_values else None | |
| # Collect for overall average calculation | |
| all_years_values.extend(year_values) | |
| # Calculate overall average across all years | |
| # Note: This is calculated from the model's actual sample count | |
| # Models with fewer samples will have their average based only on their available data | |
| row["Overall Average"] = round(mean(all_years_values), 2) if all_years_values else None | |
| rows.append(row) | |
| return rows | |
| def extract_metric_value(month_data, metric): | |
| """ | |
| Extract metric value from month data, trying new format first, | |
| then falling back to old format. | |
| """ | |
| # Try new format: "avg_accuracy" | |
| new_key = f"avg_{metric.lower()}" | |
| val = month_data.get(new_key) | |
| return val | |
| def process_month_data(result, year, month, metrics): | |
| """Process data for a single month and return row updates and values.""" | |
| row_updates = {} | |
| month_vals = [] | |
| year_vals = [] | |
| month_data = result.get("results", {}).get(year, {}).get(month, {}) | |
| for metric in metrics: | |
| val = extract_metric_value(month_data, metric) | |
| # Store metric-specific column (e.g., "accuracy_2023_01") | |
| row_updates[f"{metric}_{year}_{month}"] = val | |
| # Collect numeric values for aggregation | |
| if val is not None: | |
| try: | |
| numeric_val = float(val) | |
| month_vals.append(numeric_val) | |
| year_vals.append(numeric_val) | |
| except Exception: | |
| pass # Ignore non-numeric values for aggregation | |
| # Add aggregated month column (average across metrics) | |
| row_updates[f"{year}_{month}"] = ( | |
| round(mean(month_vals), 2) if month_vals else None | |
| ) | |
| return row_updates, year_vals | |
| def process_result_row(result, models_map, metrics, years, months): | |
| """Process a single result entry into a dataframe row.""" | |
| row = {"Model": models_map.get(result["id"], result["id"])} | |
| # Keep any provider/metadata columns | |
| row.update(result.get("columns", {})) | |
| # Add trend breakpoint | |
| row["trend_breakpoint"] = result.get("trend_breakpoint") | |
| # Process each year | |
| for year in years: | |
| all_year_vals = [] | |
| for month in months: | |
| month_updates, year_vals = process_month_data( | |
| result, year, month, metrics | |
| ) | |
| row.update(month_updates) | |
| all_year_vals.extend(year_vals) | |
| # Add aggregated year column | |
| row[year] = round(mean(all_year_vals), 2) if all_year_vals else None | |
| return row | |
| def create_dataframe(cfg, data, models_map=None, metrics=None, model_metadata=None): | |
| """ | |
| Transform loaded data into a pandas DataFrame. | |
| Supports both old format (with models_map and metrics) and new format | |
| (direct model data dictionary). | |
| """ | |
| # Check if this is the new format (direct model dictionary) | |
| if models_map is None and metrics is None: | |
| # New format: data is already the model dictionary | |
| rows = transform_leaderboard_data_to_dataframe( | |
| data, cfg.get("years"), cfg.get("months"), model_metadata | |
| ) | |
| else: | |
| # Old format: data contains "results" key | |
| rows = [ | |
| process_result_row( | |
| result, models_map, metrics, cfg.get("years"), cfg.get("months") | |
| ) | |
| for result in data["results"] | |
| ] | |
| return pd.DataFrame(rows) | |
| ### COLUMN DEFINITIONS | |
| def get_aggregated_columns(years, year_to_columns): | |
| """Get lists of aggregated year and month columns.""" | |
| aggregated_cols_year = years | |
| aggregated_cols_month = [ | |
| col for year in years for col in year_to_columns[year] | |
| ] | |
| return aggregated_cols_year, aggregated_cols_month | |