Spaces:
Running
Running
| import os | |
| import pandas as pd | |
| import numpy as np | |
| import json | |
| from typing import List, Tuple, Optional | |
| import yaml | |
| from pathlib import Path | |
| from scipy import stats | |
| from timebench.evaluation.data import Dataset, get_dataset_settings, load_dataset_config | |
| from src.hf_config import get_datasets_root, get_config_root | |
| def load_time_results(root_dir, model_name, dataset_with_freq, horizon): | |
| """ | |
| Load TIME results from NPZ files for a specific model, dataset, and horizon. | |
| Args: | |
| root_dir: Root directory containing TIME results (e.g., "output/results") | |
| model_name: Model name (e.g., "moirai_small") | |
| dataset_with_freq: Dataset and freq combined (e.g., "Water_Quality_Darwin/15T") | |
| horizon: Horizon name (e.g., "short", "medium", "long") | |
| Returns: | |
| tuple: (metrics_dict, predictions_dict, config_dict) or (None, None, None) if not found | |
| """ | |
| horizon_dir = os.path.join(root_dir, model_name, dataset_with_freq, horizon) | |
| metrics_path = os.path.join(horizon_dir, "metrics.npz") | |
| predictions_path = os.path.join(horizon_dir, "predictions.npz") | |
| config_path = os.path.join(horizon_dir, "config.json") | |
| if not os.path.exists(metrics_path) or not os.path.exists(predictions_path): | |
| return None, None, None | |
| metrics = np.load(metrics_path) | |
| predictions = np.load(predictions_path) | |
| metrics_dict = {k: metrics[k] for k in metrics.files} | |
| predictions_dict = {k: predictions[k] for k in predictions.files} | |
| config_dict = {} | |
| if os.path.exists(config_path): | |
| with open(config_path, "r") as f: | |
| config_dict = json.load(f) | |
| return metrics_dict, predictions_dict, config_dict | |
| def get_all_datasets_results(root_dir="output/results"): | |
| """ | |
| Load dataset-level leaderboard by reading TIME NPZ files and aggregating. | |
| Args: | |
| root_dir (str): Path to the TIME results root directory (e.g., "output/results"). | |
| Returns: | |
| pd.DataFrame: DataFrame containing dataset-level results with columns | |
| ["model", "dataset", "freq", "dataset_id", "horizon", "MASE", "CRPS", "MAE", "MSE"]. | |
| - dataset: Original dataset name (e.g., "Traffic") | |
| - freq: Frequency string (e.g., "15T", "1H") | |
| - dataset_id: Unique identifier as "dataset/freq" (e.g., "Traffic/15T") | |
| Number of Rows: num_model x num_dataset_freq_combinations x num_horizons | |
| """ | |
| rows = [] | |
| if not os.path.exists(root_dir): | |
| print(f"Error: root_dir={root_dir} does not exist") | |
| return pd.DataFrame(columns=["model", "dataset", "freq", "dataset_id", "horizon", "MASE", "CRPS", "MAE", "MSE"]) | |
| for model in os.listdir(root_dir): | |
| model_dir = os.path.join(root_dir, model) | |
| if not os.path.isdir(model_dir): | |
| continue | |
| for dataset in os.listdir(model_dir): | |
| dataset_dir = os.path.join(model_dir, dataset) | |
| if not os.path.isdir(dataset_dir): | |
| continue | |
| # Nested structure: model/dataset/freq/horizon/ | |
| for freq_dir in os.listdir(dataset_dir): | |
| freq_path = os.path.join(dataset_dir, freq_dir) | |
| if not os.path.isdir(freq_path): | |
| continue | |
| for horizon in ["short", "medium", "long"]: | |
| dataset_with_freq = f"{dataset}/{freq_dir}" | |
| metrics_dict, _, config_dict = load_time_results(root_dir, model, dataset_with_freq, horizon) | |
| if metrics_dict is None: | |
| continue | |
| # Aggregate metrics | |
| mase = np.nanmean(metrics_dict.get("MASE", np.array([]))) | |
| crps = np.nanmean(metrics_dict.get("CRPS", np.array([]))) | |
| mae = np.nanmean(metrics_dict.get("MAE", np.array([]))) | |
| mse = np.nanmean(metrics_dict.get("MSE", np.array([]))) | |
| rows.append({ | |
| "model": model, | |
| "dataset": dataset, | |
| "freq": freq_dir, | |
| "dataset_id": dataset_with_freq, # Unique identifier: dataset/freq | |
| "horizon": horizon, | |
| "MASE": mase, | |
| "CRPS": crps, | |
| "MAE": mae, | |
| "MSE": mse, | |
| }) | |
| if rows: | |
| return pd.DataFrame(rows) | |
| else: | |
| return pd.DataFrame(columns=["model", "dataset", "freq", "dataset_id", "horizon", "MASE", "CRPS", "MAE", "MSE"]) | |
| def get_dataset_display_map(datasets_df: pd.DataFrame) -> Tuple[dict, dict]: | |
| """ | |
| Generate smart display name mapping for datasets. | |
| For datasets with only one freq: display as "dataset" (e.g., "Australia_Solar") | |
| For datasets with multiple freqs: display as "dataset/freq" (e.g., "Traffic/15T") | |
| Args: | |
| datasets_df: DataFrame with 'dataset', 'freq', 'dataset_id' columns | |
| Returns: | |
| Tuple of: | |
| - id_to_display: dict mapping dataset_id -> display_name | |
| - display_to_id: dict mapping display_name -> dataset_id | |
| """ | |
| if datasets_df.empty: | |
| return {}, {} | |
| # Count unique freqs per dataset | |
| freq_counts = datasets_df.groupby('dataset')['freq'].nunique() | |
| # Build mappings | |
| id_to_display = {} | |
| display_to_id = {} | |
| unique_configs = datasets_df[['dataset', 'freq', 'dataset_id']].drop_duplicates() | |
| for _, row in unique_configs.iterrows(): | |
| dataset_id = row['dataset_id'] | |
| dataset_name = row['dataset'] | |
| if freq_counts[dataset_name] > 1: | |
| # Multiple freqs: display as dataset/freq | |
| display_name = dataset_id | |
| else: | |
| # Single freq: display as dataset only | |
| display_name = dataset_name | |
| id_to_display[dataset_id] = display_name | |
| display_to_id[display_name] = dataset_id | |
| return id_to_display, display_to_id | |
| def get_all_variates_results(root_dir: str = "output/results") -> pd.DataFrame: | |
| """ | |
| Collect all variate-individual-level results from TIME NPZ files. | |
| Each (series, variate) combination is treated as an independent variate individual. | |
| Metrics are aggregated only across windows (not across series). | |
| Uses actual series_names and variate_names from Dataset objects. | |
| Args: | |
| root_dir (str): Path to the TIME results root directory (e.g., "output/results"). | |
| Returns: | |
| pd.DataFrame: DataFrame with columns: | |
| ["dataset_id", "series_name", "variate_name", "is_uts", "model", "horizon", "MASE", "CRPS", "MAE", "MSE"] | |
| Number of Rows: num_models x num_datasets x num_horizons x num_series x num_variates | |
| """ | |
| rows = [] | |
| if not os.path.exists(root_dir): | |
| print(f"[get_all_variates_results] root_dir={root_dir} does not exist") | |
| return pd.DataFrame(columns=["dataset_id", "series_name", "variate_name", "is_uts", "model", "horizon", "MASE", "CRPS", "MAE", "MSE"]) | |
| # Cache for dataset info (series_names, variate_names) to avoid repeated loading | |
| dataset_info_cache = {} | |
| for model in os.listdir(root_dir): | |
| model_dir = os.path.join(root_dir, model) | |
| if not os.path.isdir(model_dir): | |
| continue | |
| for dataset in os.listdir(model_dir): | |
| dataset_dir = os.path.join(model_dir, dataset) | |
| if not os.path.isdir(dataset_dir): | |
| continue | |
| # Nested structure: model/dataset/freq/horizon/ | |
| for freq_dir in os.listdir(dataset_dir): | |
| freq_path = os.path.join(dataset_dir, freq_dir) | |
| if not os.path.isdir(freq_path): | |
| continue | |
| dataset_id = f"{dataset}/{freq_dir}" | |
| # Get series_names and variate_names (use cache) | |
| if dataset_id not in dataset_info_cache: | |
| series_names = None | |
| variate_names = None | |
| is_uts = False | |
| try: | |
| hf_dataset_root = str(get_datasets_root()) | |
| if os.path.exists(hf_dataset_root): | |
| config_root = get_config_root() | |
| config_path = config_root / "datasets.yaml" | |
| config = load_dataset_config(config_path) if config_path.exists() else {} | |
| settings = get_dataset_settings(dataset_id, "short", config) | |
| dataset_obj = Dataset( | |
| name=dataset_id, | |
| term="short", | |
| prediction_length=settings.get("prediction_length"), | |
| test_length=settings.get("test_length"), | |
| storage_path=hf_dataset_root, | |
| ) | |
| # Get series names | |
| if "item_id" in dataset_obj.hf_dataset.column_names: | |
| series_names = list(dataset_obj.hf_dataset["item_id"]) | |
| else: | |
| series_names = [f"item_{i}" for i in range(len(dataset_obj.hf_dataset))] | |
| # Get variate names | |
| variate_names = dataset_obj.get_variate_names() | |
| if variate_names is None: | |
| # UTS mode: variate_names = series_names, and is_uts = True | |
| is_uts = True | |
| variate_names = series_names | |
| else: | |
| variate_names = list(variate_names) | |
| except Exception as e: | |
| print(f"[get_all_variates_results] Error loading Dataset info for {dataset_id}: {e}") | |
| dataset_info_cache[dataset_id] = { | |
| "series_names": series_names, | |
| "variate_names": variate_names, | |
| "is_uts": is_uts, | |
| } | |
| info = dataset_info_cache[dataset_id] | |
| series_names = info["series_names"] | |
| variate_names = info["variate_names"] | |
| is_uts = info["is_uts"] | |
| for horizon in ["short", "medium", "long"]: | |
| metrics_dict, _, _ = load_time_results(root_dir, model, dataset_id, horizon) | |
| if metrics_dict is None: | |
| continue | |
| # Get metrics arrays: shape = (num_series, num_windows, num_variates) | |
| mase_arr = metrics_dict.get("MASE", np.array([])) | |
| crps_arr = metrics_dict.get("CRPS", np.array([])) | |
| mae_arr = metrics_dict.get("MAE", np.array([])) | |
| mse_arr = metrics_dict.get("MSE", np.array([])) | |
| if mase_arr.size == 0: | |
| continue | |
| num_series, num_windows, num_variates = mase_arr.shape | |
| # Iterate over each (series, variate) combination | |
| for series_idx in range(num_series): | |
| series_name = series_names[series_idx] if series_names and series_idx < len(series_names) else f"item_{series_idx}" | |
| for variate_idx in range(num_variates): | |
| # For UTS: variate_name = series_name (since each series is its own variate) | |
| if is_uts: | |
| variate_name = series_name | |
| else: | |
| variate_name = variate_names[variate_idx] if variate_names and variate_idx < len(variate_names) else str(variate_idx) | |
| # Aggregate only across windows | |
| mase = np.nanmean(mase_arr[series_idx, :, variate_idx]) | |
| crps = np.nanmean(crps_arr[series_idx, :, variate_idx]) | |
| mae = np.nanmean(mae_arr[series_idx, :, variate_idx]) | |
| mse = np.nanmean(mse_arr[series_idx, :, variate_idx]) | |
| # Skip if all values are NaN | |
| if np.isnan(mase) and np.isnan(crps): | |
| continue | |
| rows.append({ | |
| "dataset_id": dataset_id, | |
| "series_name": series_name, | |
| "variate_name": variate_name, | |
| "is_uts": is_uts, | |
| "model": model, | |
| "horizon": horizon, | |
| "MASE": mase, | |
| "CRPS": crps, | |
| "MAE": mae, | |
| "MSE": mse, | |
| }) | |
| if rows: | |
| return pd.DataFrame(rows) | |
| else: | |
| return pd.DataFrame(columns=["dataset_id", "series_name", "variate_name", "is_uts", "model", "horizon", "MASE", "CRPS", "MAE", "MSE"]) | |
| def get_all_domains_and_freq(conf_dir="conf/data", datasets=None): | |
| """ | |
| Scan YAML files and collect all unique domains. | |
| """ | |
| domains, freqs = set(), set() | |
| for ds in datasets: | |
| yaml_path = os.path.join(conf_dir, f"{ds}.yaml") | |
| if os.path.exists(yaml_path): | |
| with open(yaml_path, "r") as f: | |
| meta = yaml.safe_load(f) | |
| domain = meta.get("domain") | |
| freq = meta.get("freq") | |
| if domain: | |
| domains.add(domain) | |
| if freq: | |
| freqs.add(freq) | |
| return sorted(list(domains)), sorted(list(freqs)) | |
| def get_dataset_choices(results_root="output/results") -> Tuple[List[str], dict, dict]: | |
| """ | |
| Get list of available datasets from TIME results with smart display names. | |
| For datasets with only one freq: display as "dataset" (e.g., "Australia_Solar") | |
| For datasets with multiple freqs: display as "dataset/freq" (e.g., "Traffic/15T") | |
| Args: | |
| results_root: Path to the TIME results root directory | |
| Returns: | |
| Tuple of: | |
| - display_names: Sorted list of display names for UI dropdown | |
| - display_to_id: dict mapping display_name -> dataset_id | |
| - id_to_display: dict mapping dataset_id -> display_name | |
| """ | |
| if not os.path.exists(results_root): | |
| return [], {}, {} | |
| # Collect all dataset/freq combinations | |
| dataset_freq_pairs = set() # Set of (dataset, freq) tuples | |
| for model in os.listdir(results_root): | |
| model_dir = os.path.join(results_root, model) | |
| if not os.path.isdir(model_dir): | |
| continue | |
| for dataset in os.listdir(model_dir): | |
| dataset_dir = os.path.join(model_dir, dataset) | |
| if not os.path.isdir(dataset_dir): | |
| continue | |
| # Check directory structure | |
| has_horizon_dirs = any(os.path.isdir(os.path.join(dataset_dir, h)) for h in ["short", "medium", "long"]) | |
| if has_horizon_dirs: | |
| # Direct structure (legacy): treat as dataset with empty freq | |
| # This shouldn't happen in the new structure but handle for safety | |
| for horizon in ["short", "medium", "long"]: | |
| config_path = os.path.join(dataset_dir, horizon, "config.json") | |
| if os.path.exists(config_path): | |
| dataset_freq_pairs.add((dataset, "")) | |
| break | |
| else: | |
| # Nested structure: model/dataset/freq/horizon/ | |
| for freq_dir in os.listdir(dataset_dir): | |
| freq_path = os.path.join(dataset_dir, freq_dir) | |
| if not os.path.isdir(freq_path): | |
| continue | |
| for horizon in ["short", "medium", "long"]: | |
| config_path = os.path.join(freq_path, horizon, "config.json") | |
| if os.path.exists(config_path): | |
| dataset_freq_pairs.add((dataset, freq_dir)) | |
| break | |
| if not dataset_freq_pairs: | |
| return [], {}, {} | |
| # Count freqs per dataset | |
| from collections import Counter | |
| dataset_freq_count = Counter(ds for ds, _ in dataset_freq_pairs) | |
| # Build mappings | |
| id_to_display = {} | |
| display_to_id = {} | |
| for dataset, freq in dataset_freq_pairs: | |
| if freq: | |
| dataset_id = f"{dataset}/{freq}" | |
| else: | |
| dataset_id = dataset | |
| if dataset_freq_count[dataset] > 1: | |
| # Multiple freqs: display as dataset/freq | |
| display_name = dataset_id | |
| else: | |
| # Single freq: display as dataset only | |
| display_name = dataset | |
| id_to_display[dataset_id] = display_name | |
| display_to_id[display_name] = dataset_id | |
| # Sort display names for UI | |
| display_names = sorted(display_to_id.keys()) | |
| return display_names, display_to_id, id_to_display | |
| def compute_ranks(df: pd.DataFrame, groupby_cols: str | List[str]) -> pd.DataFrame: | |
| """ | |
| Compute ranks for models across datasets based on MASE and CRPS. | |
| Args: | |
| df (pd.DataFrame): Dataset-level results with columns | |
| ["model", "dataset", "MASE", "CRPS"]. | |
| Returns: | |
| pd.DataFrame: Dataframe with ["model", "MASE_rank", "CRPS_rank"]. | |
| """ | |
| if isinstance(groupby_cols, str): | |
| groupby_cols = [groupby_cols] | |
| if df.empty: | |
| return pd.DataFrame(columns=["model", "MASE_rank", "CRPS_rank"]) | |
| df = df.copy() | |
| df["MASE_rank"] = df.groupby(groupby_cols)["MASE"].rank(method="first", ascending=True) | |
| df["CRPS_rank"] = df.groupby(groupby_cols)["CRPS"].rank(method="first", ascending=True) | |
| return df | |
| def normalize_by_seasonal_naive( | |
| df: pd.DataFrame, | |
| baseline_model: str = "seasonal_naive", | |
| metrics: List[str] = None, | |
| groupby_cols: List[str] = None, | |
| ) -> pd.DataFrame: | |
| """ | |
| Normalize metrics by Seasonal Naive baseline for each (dataset_id, horizon) group. | |
| For each group, divides each model's metric values by Seasonal Naive's values. | |
| This makes Seasonal Naive the baseline (=1.0) for comparison. | |
| Args: | |
| df (pd.DataFrame): Dataset-level results with columns including | |
| ["model", "dataset_id", "horizon", "MASE", "CRPS", ...]. | |
| baseline_model (str): Name of the baseline model. Defaults to "seasonal_naive". | |
| metrics (List[str]): List of metric columns to normalize. Defaults to ["MASE", "CRPS"]. | |
| groupby_cols (List[str]): Columns to group by for normalization. | |
| Defaults to ["dataset_id", "horizon"]. | |
| Returns: | |
| pd.DataFrame: DataFrame with normalized metric values. | |
| - Configurations without baseline model results are excluded. | |
| - NaN/inf values from division are handled. | |
| """ | |
| if metrics is None: | |
| metrics = ["MASE", "CRPS"] | |
| if groupby_cols is None: | |
| groupby_cols = ["dataset_id", "horizon"] | |
| if df.empty: | |
| return df.copy() | |
| # Check if baseline model exists | |
| if baseline_model not in df["model"].values: | |
| print(f"[normalize_by_seasonal_naive] Warning: baseline model '{baseline_model}' not found in data") | |
| return df.copy() | |
| # Work on a copy | |
| df_normalized = df.copy() | |
| # Get baseline values for each group | |
| baseline_df = df[df["model"] == baseline_model].copy() | |
| # Create a mapping: (dataset_id, horizon) -> {metric: baseline_value} | |
| baseline_values = {} | |
| for _, row in baseline_df.iterrows(): | |
| key = tuple(row[col] for col in groupby_cols) | |
| baseline_values[key] = {metric: row[metric] for metric in metrics} | |
| # Normalize each row | |
| rows_to_keep = [] | |
| for idx, row in df_normalized.iterrows(): | |
| key = tuple(row[col] for col in groupby_cols) | |
| # Skip configurations without baseline results | |
| if key not in baseline_values: | |
| continue | |
| rows_to_keep.append(idx) | |
| # Normalize each metric | |
| for metric in metrics: | |
| baseline_val = baseline_values[key][metric] | |
| if baseline_val is not None and baseline_val != 0 and not np.isnan(baseline_val): | |
| df_normalized.at[idx, metric] = row[metric] / baseline_val | |
| else: | |
| # Handle division by zero or NaN baseline | |
| df_normalized.at[idx, metric] = np.nan | |
| # Keep only rows with valid baseline | |
| df_normalized = df_normalized.loc[rows_to_keep].copy() | |
| # Handle any remaining inf values | |
| for metric in metrics: | |
| df_normalized[metric] = df_normalized[metric].replace([np.inf, -np.inf], np.nan) | |
| return df_normalized | |
| def load_features(root_dir: str = "features", category: str = "public-benchmarks", split: str = "test") -> pd.DataFrame: | |
| """ | |
| Load time series features for all datasets (legacy function). | |
| Args: | |
| root_dir (str): Path to features root directory. | |
| category (str): Dataset category (e.g., "public-benchmarks"). | |
| split (str): Which split to load ("full" or "test"). | |
| Returns: | |
| pd.DataFrame: Concatenated DataFrame with dataset column. | |
| """ | |
| base_dir = os.path.join(root_dir, category) | |
| all_data = [] | |
| for dataset in os.listdir(base_dir): | |
| dataset_dir = os.path.join(base_dir, dataset) | |
| csv_path = os.path.join(dataset_dir, f"{split}.csv") | |
| if os.path.exists(csv_path): | |
| df = pd.read_csv(csv_path) | |
| df["dataset"] = dataset # add dataset name | |
| cols = ["dataset"] + [c for c in df.columns if c != "dataset"] # 让 dataset 列放到第一列 | |
| df = df[cols] | |
| all_data.append(df) | |
| if all_data: | |
| df = pd.concat(all_data, ignore_index=True) | |
| if "unique_id" in df.columns: | |
| df = df.rename(columns={"unique_id": "variate_name"}) | |
| return df | |
| else: | |
| return pd.DataFrame() | |
| def load_all_features(features_root: str = "output/features", split: str = "test") -> pd.DataFrame: | |
| """ | |
| Load time series features for all datasets from output/features directory. | |
| Expected structure: features_root/{dataset}/{freq}/{split}.csv | |
| Each CSV should have columns: dataset_id, series_name, variate_name, ...features... | |
| Args: | |
| features_root (str): Path to features root directory (e.g., "output/features"). | |
| split (str): Which split to load ("full" or "test"). | |
| Returns: | |
| pd.DataFrame: Concatenated DataFrame with all variate features. | |
| Columns: ["dataset_id", "series_name", "variate_name", "unique_id", | |
| "is_random_walk", "has_spike_presence", "trend_strength", ...] | |
| """ | |
| all_data = [] | |
| if not os.path.exists(features_root): | |
| print(f"[load_all_features] features_root={features_root} does not exist") | |
| return pd.DataFrame() | |
| for dataset in os.listdir(features_root): | |
| dataset_dir = os.path.join(features_root, dataset) | |
| if not os.path.isdir(dataset_dir): | |
| continue | |
| for freq in os.listdir(dataset_dir): | |
| freq_dir = os.path.join(dataset_dir, freq) | |
| if not os.path.isdir(freq_dir): | |
| continue | |
| csv_path = os.path.join(freq_dir, f"{split}.csv") | |
| if not os.path.exists(csv_path): | |
| # Fallback: try full.csv if test.csv doesn't exist | |
| csv_path = os.path.join(freq_dir, "full.csv") | |
| if os.path.exists(csv_path): | |
| try: | |
| df = pd.read_csv(csv_path) | |
| all_data.append(df) | |
| except Exception as e: | |
| print(f"[load_all_features] Error loading {csv_path}: {e}") | |
| if all_data: | |
| features_df = pd.concat(all_data, ignore_index=True) | |
| print(f"[load_all_features] Loaded {len(features_df)} variate features from {len(all_data)} datasets") | |
| return features_df | |
| else: | |
| print(f"[load_all_features] No features found in {features_root}") | |
| return pd.DataFrame() | |
| def binarize_features(df: pd.DataFrame, exclude: list) -> pd.DataFrame: | |
| """ | |
| Binarize features in df based on their median values. | |
| Columns in exclude will be skipped. | |
| Args: | |
| df (pd.DataFrame): Input dataframe with feature values. | |
| exclude (list): Columns to exclude from binarization. | |
| Returns: | |
| pd.DataFrame: Model_A dataframe where selected feature columns are binarized (0/1). | |
| """ | |
| # Select target feature columns | |
| feature_cols = [col for col in df.columns if col not in exclude] | |
| # Copy to avoid modifying original | |
| df_binarized = df.copy() | |
| # Compute medians | |
| medians = df[feature_cols].median() | |
| # Apply binarization | |
| for col in feature_cols: | |
| threshold = medians[col] | |
| df_binarized[col] = (df[col] > threshold).astype(int) | |
| return df_binarized | |