Spaces:
Running
Running
| import logging | |
| from pathlib import Path | |
| from typing import Iterable, Optional | |
| import datasets | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import typer | |
| import yaml | |
| from gluonts.dataset.split import split | |
| from gluonts.ev.metrics import MASE, MeanWeightedSumQuantileLoss | |
| from gluonts.itertools import batcher | |
| from gluonts.model.evaluation import evaluate_forecasts | |
| from gluonts.model.forecast import QuantileForecast, SampleForecast | |
| from tqdm.auto import tqdm | |
| from chronos import BaseChronosPipeline, Chronos2Pipeline, ChronosBoltPipeline, ChronosPipeline, ForecastType | |
| app = typer.Typer(pretty_exceptions_enable=False) | |
| QUANTILES = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] | |
| def to_gluonts_univariate(hf_dataset: datasets.Dataset): | |
| series_fields = [col for col in hf_dataset.features if isinstance(hf_dataset.features[col], datasets.Sequence)] | |
| series_fields.remove("timestamp") | |
| dataset_length = hf_dataset.info.splits["train"].num_examples * len(series_fields) | |
| # Assumes that all time series in the dataset have the same frequency | |
| dataset_freq = pd.DatetimeIndex(hf_dataset[0]["timestamp"]).to_period()[0].freqstr | |
| gts_dataset = [] | |
| for hf_entry in hf_dataset: | |
| for field in series_fields: | |
| gts_dataset.append( | |
| { | |
| "start": pd.Period( | |
| hf_entry["timestamp"][0], | |
| freq=dataset_freq, | |
| ), | |
| "target": hf_entry[field], | |
| } | |
| ) | |
| assert len(gts_dataset) == dataset_length | |
| return gts_dataset | |
| def load_and_split_dataset(backtest_config: dict): | |
| hf_repo = backtest_config["hf_repo"] | |
| dataset_name = backtest_config["name"] | |
| offset = backtest_config["offset"] | |
| prediction_length = backtest_config["prediction_length"] | |
| num_rolls = backtest_config["num_rolls"] | |
| # This is needed because the datasets in autogluon/chronos_datasets_extra cannot | |
| # be distribued due to license restrictions and must be generated on the fly | |
| trust_remote_code = True if hf_repo == "autogluon/chronos_datasets_extra" else False | |
| ds = datasets.load_dataset(hf_repo, dataset_name, split="train", trust_remote_code=trust_remote_code) | |
| ds.set_format("numpy") | |
| gts_dataset = to_gluonts_univariate(ds) | |
| # Split dataset for evaluation | |
| _, test_template = split(gts_dataset, offset=offset) | |
| test_data = test_template.generate_instances(prediction_length, windows=num_rolls) | |
| return test_data | |
| def generate_forecasts( | |
| test_data_input: Iterable, | |
| pipeline: BaseChronosPipeline, | |
| prediction_length: int, | |
| batch_size: int, | |
| **predict_kwargs, | |
| ): | |
| # Generate forecasts | |
| forecast_outputs = [] | |
| for batch in tqdm(batcher(test_data_input, batch_size=batch_size)): | |
| context = [torch.tensor(entry["target"]) for entry in batch] | |
| quantiles, _ = pipeline.predict_quantiles( | |
| context, | |
| prediction_length=prediction_length, | |
| quantile_levels=QUANTILES, | |
| **predict_kwargs, | |
| ) | |
| if isinstance(quantiles, list): | |
| # This is needed for Chronos-2 support which returns a list of tensors | |
| quantiles = np.stack(quantiles).squeeze(axis=1) | |
| quantiles = quantiles.swapaxes(-1, -2) | |
| forecast_outputs.append(quantiles) | |
| forecast_outputs = np.concatenate(forecast_outputs) | |
| # Convert forecast samples into gluonts Forecast objects | |
| forecasts = [] | |
| for item, ts in zip(forecast_outputs, test_data_input): | |
| forecast_start_date = ts["start"] + len(ts["target"]) | |
| if pipeline.forecast_type == ForecastType.SAMPLES: | |
| forecasts.append(SampleForecast(samples=item, start_date=forecast_start_date)) | |
| elif pipeline.forecast_type == ForecastType.QUANTILES: | |
| forecasts.append( | |
| QuantileForecast( | |
| forecast_arrays=item, | |
| forecast_keys=list(map(str, QUANTILES)), | |
| start_date=forecast_start_date, | |
| ) | |
| ) | |
| return forecasts | |
| def eval_pipeline_and_save_results( | |
| pipeline: BaseChronosPipeline, | |
| config_path: Path, | |
| metrics_path: Path, | |
| model_id: str, | |
| batch_size: int, | |
| **predict_kwargs, | |
| ): | |
| # Load backtest configs | |
| with open(config_path) as fp: | |
| backtest_configs = yaml.safe_load(fp) | |
| result_rows = [] | |
| for config in backtest_configs: | |
| dataset_name = config["name"] | |
| prediction_length = config["prediction_length"] | |
| logger.info(f"Loading {dataset_name}") | |
| test_data = load_and_split_dataset(backtest_config=config) | |
| logger.info(f"Generating forecasts for {dataset_name} ({len(test_data.input)} time series)") | |
| forecasts = generate_forecasts( | |
| test_data.input, | |
| pipeline=pipeline, | |
| prediction_length=prediction_length, | |
| batch_size=batch_size, | |
| **predict_kwargs, | |
| ) | |
| logger.info(f"Evaluating forecasts for {dataset_name}") | |
| metrics = ( | |
| evaluate_forecasts( | |
| forecasts, | |
| test_data=test_data, | |
| metrics=[ | |
| MASE(), | |
| MeanWeightedSumQuantileLoss(QUANTILES), | |
| ], | |
| batch_size=5000, | |
| ) | |
| .reset_index(drop=True) | |
| .to_dict(orient="records") | |
| ) | |
| result_rows.append({"dataset": dataset_name, "model": model_id, **metrics[0]}) | |
| # Save results to a CSV file | |
| results_df = ( | |
| pd.DataFrame(result_rows) | |
| .rename( | |
| {"MASE[0.5]": "MASE", "mean_weighted_sum_quantile_loss": "WQL"}, | |
| axis="columns", | |
| ) | |
| .sort_values(by="dataset") | |
| ) | |
| results_df.to_csv(metrics_path, index=False) | |
| def chronos( | |
| config_path: Path, | |
| metrics_path: Path, | |
| model_id: str = "amazon/chronos-t5-small", | |
| device: str = "cuda", | |
| torch_dtype: str = "bfloat16", | |
| batch_size: int = 32, | |
| num_samples: int = 20, | |
| temperature: Optional[float] = None, | |
| top_k: Optional[int] = None, | |
| top_p: Optional[float] = None, | |
| ): | |
| """Evaluate Chronos models. | |
| Parameters | |
| ---------- | |
| config_path : Path | |
| Path to the evaluation config. See ./configs/. | |
| metrics_path : Path | |
| Path to the CSV file where metrics will be saved. | |
| model_id : str, optional, default = "amazon/chronos-t5-small" | |
| HuggingFace ID of the Chronos model or local path | |
| Available models IDs: | |
| - amazon/chronos-t5-tiny | |
| - amazon/chronos-t5-mini | |
| - amazon/chronos-t5-small | |
| - amazon/chronos-t5-base | |
| - amazon/chronos-t5-large | |
| device : str, optional, default = "cuda" | |
| Device on which inference will be performed | |
| torch_dtype : str, optional | |
| Model's dtype, by default "bfloat16" | |
| batch_size : int, optional, default = 32 | |
| Batch size for inference. For Chronos-Bolt models, significantly larger | |
| batch sizes can be used | |
| num_samples : int, optional, default = 20 | |
| Number of samples to draw when using the original Chronos models | |
| temperature : Optional[float], optional, default = 1.0 | |
| Softmax temperature to used for the original Chronos models | |
| top_k : Optional[int], optional, default = 50 | |
| Top-K sampling, by default None | |
| top_p : Optional[float], optional, default = 1.0 | |
| Top-p sampling, by default None | |
| """ | |
| if isinstance(torch_dtype, str): | |
| torch_dtype = getattr(torch, torch_dtype) | |
| assert isinstance(torch_dtype, torch.dtype) | |
| # Load Chronos | |
| pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype) | |
| assert isinstance(pipeline, ChronosPipeline) | |
| eval_pipeline_and_save_results( | |
| pipeline=pipeline, | |
| config_path=config_path, | |
| metrics_path=metrics_path, | |
| model_id=model_id, | |
| batch_size=batch_size, | |
| num_samples=num_samples, | |
| temperature=temperature, | |
| top_k=top_k, | |
| top_p=top_p, | |
| ) | |
| def chronos_bolt( | |
| config_path: Path, | |
| metrics_path: Path, | |
| model_id: str = "amazon/chronos-bolt-base", | |
| device: str = "cuda", | |
| torch_dtype: str = "float32", | |
| batch_size: int = 32, | |
| ): | |
| """Evaluate Chronos-Bolt models. | |
| Parameters | |
| ---------- | |
| config_path : Path | |
| Path to the evaluation config. See ./configs/. | |
| metrics_path : Path | |
| Path to the CSV file where metrics will be saved. | |
| model_id : str, optional, default = "amazon/chronos-bolt-base" | |
| HuggingFace ID of the Chronos model or local path | |
| Available model IDs: | |
| - amazon/chronos-bolt-tiny | |
| - amazon/chronos-bolt-mini | |
| - amazon/chronos-bolt-small | |
| - amazon/chronos-bolt-base | |
| device : str, optional, default = "cuda" | |
| Device on which inference will be performed | |
| torch_dtype : str, optional | |
| Model's dtype, by default "bfloat16" | |
| batch_size : int, optional, default = 32 | |
| Batch size for inference. For Chronos-Bolt models, significantly larger | |
| batch sizes can be used | |
| """ | |
| if isinstance(torch_dtype, str): | |
| torch_dtype = getattr(torch, torch_dtype) | |
| assert isinstance(torch_dtype, torch.dtype) | |
| # Load Chronos | |
| pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype) | |
| assert isinstance(pipeline, ChronosBoltPipeline) | |
| eval_pipeline_and_save_results( | |
| pipeline=pipeline, | |
| config_path=config_path, | |
| metrics_path=metrics_path, | |
| model_id=model_id, | |
| batch_size=batch_size, | |
| ) | |
| def chronos_2( | |
| config_path: Path, | |
| metrics_path: Path, | |
| model_id: str = "amazon/chronos-2", | |
| device: str = "cuda", | |
| torch_dtype: str = "float32", | |
| batch_size: int = 32, | |
| cross_learning: bool = False, | |
| ): | |
| """Evaluate Chronos-2 models. | |
| Parameters | |
| ---------- | |
| config_path : Path | |
| Path to the evaluation config. See ./configs/. | |
| metrics_path : Path | |
| Path to the CSV file where metrics will be saved. | |
| model_id : str, optional, default = "amazon/chronos-2" FIXME | |
| HuggingFace ID of the Chronos model or local path | |
| Available model IDs: | |
| - amazon/chronos-2 FIXME | |
| device : str, optional, default = "cuda" | |
| Device on which inference will be performed | |
| torch_dtype : str, optional | |
| Model's dtype, by default "bfloat16" | |
| batch_size : int, optional, default = 32 | |
| Batch size for inference. For Chronos-Bolt models, significantly larger | |
| batch sizes can be used | |
| cross_learning: bool, optional, default = False | |
| If True, cross-learning is enables and model makes joint predictions for all | |
| items in the batch | |
| """ | |
| if isinstance(torch_dtype, str): | |
| torch_dtype = getattr(torch, torch_dtype) | |
| assert isinstance(torch_dtype, torch.dtype) | |
| # Load Chronos | |
| pipeline = BaseChronosPipeline.from_pretrained(model_id, device_map=device, torch_dtype=torch_dtype) | |
| assert isinstance(pipeline, Chronos2Pipeline) | |
| eval_pipeline_and_save_results( | |
| pipeline=pipeline, | |
| config_path=config_path, | |
| metrics_path=metrics_path, | |
| model_id=model_id, | |
| batch_size=batch_size, | |
| cross_learning=cross_learning, | |
| ) | |
| if __name__ == "__main__": | |
| logging.basicConfig(format="%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger("Chronos Evaluation") | |
| logger.setLevel(logging.INFO) | |
| app() | |