Spaces:
Running
Running
| import pandas as pd | |
| import numpy as np | |
| import os | |
| import tempfile | |
| import matplotlib.pyplot as plt | |
| from sklearn.metrics import r2_score, root_mean_squared_error | |
| from pytorch_forecasting import TimeSeriesDataSet, TemporalFusionTransformer, DeepAR, NHiTS, Baseline | |
| from pytorch_forecasting.data import GroupNormalizer | |
| from pytorch_lightning import Trainer | |
| from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint | |
| from pytorch_lightning.loggers import CSVLogger | |
| import torch | |
| from torch.utils.data import DataLoader | |
| def prepare_forecasting_data(df, max_prediction_length, max_encoder_length): | |
| df = df.copy() | |
| df.reset_index(inplace=True) | |
| df.columns = ['time_idx', 'target'] if df.shape[1] == 2 else ['time_idx'] + df.columns.tolist()[1:] | |
| df['group'] = "series_1" | |
| df['time_idx'] = np.arange(len(df)) | |
| training_cutoff = df["time_idx"].max() - max_prediction_length | |
| training = TimeSeriesDataSet( | |
| df[lambda x: x.time_idx <= training_cutoff], | |
| time_idx="time_idx", | |
| target="target", | |
| group_ids=["group"], | |
| max_encoder_length=max_encoder_length, | |
| max_prediction_length=max_prediction_length, | |
| static_categoricals=[], | |
| static_reals=[], | |
| time_varying_known_categoricals=[], | |
| time_varying_known_reals=["time_idx"], | |
| time_varying_unknown_categoricals=[], | |
| time_varying_unknown_reals=["target"], | |
| target_normalizer=GroupNormalizer(groups=["group"]), | |
| ) | |
| validation = TimeSeriesDataSet.from_dataset(training, df, predict=True, stop_randomization=True) | |
| train_dataloader = training.to_dataloader(train=True, batch_size=64, num_workers=0) | |
| val_dataloader = validation.to_dataloader(train=False, batch_size=64, num_workers=0) | |
| return training, train_dataloader, val_dataloader, df | |
| def run_pytorch_forecasting(df, model_type, horizon, lag, future_horizon=0, device='cpu'): | |
| torch.set_float32_matmul_precision('high') | |
| torch_device = torch.device('cuda' if device == 'GPU' and torch.cuda.is_available() else 'cpu') | |
| training, train_dataloader, val_dataloader, full_df = prepare_forecasting_data(df, horizon, lag) | |
| pl_trainer = Trainer( | |
| max_epochs=30, | |
| gpus=1 if torch_device.type == 'cuda' else 0, | |
| logger=CSVLogger("lightning_logs"), | |
| enable_checkpointing=True, | |
| callbacks=[ | |
| EarlyStopping(monitor="val_loss", patience=3, mode="min"), | |
| ModelCheckpoint(monitor="val_loss", mode="min", save_top_k=1) | |
| ], | |
| gradient_clip_val=0.1 | |
| ) | |
| if model_type == "TFT": | |
| model = TemporalFusionTransformer.from_dataset(training, learning_rate=0.03) | |
| elif model_type == "DeepAR": | |
| model = DeepAR.from_dataset(training, learning_rate=0.03) | |
| elif model_type == "NHiTS": | |
| model = NHiTS.from_dataset(training, learning_rate=0.03) | |
| else: | |
| raise ValueError(f"Unsupported model type: {model_type}") | |
| pl_trainer.fit(model, train_dataloaders=train_dataloader, val_dataloaders=val_dataloader) | |
| best_model_path = pl_trainer.checkpoint_callback.best_model_path | |
| best_model = model.load_from_checkpoint(best_model_path) | |
| actuals = torch.cat([y[0] for x, y in iter(val_dataloader)]) | |
| predictions = best_model.predict(val_dataloader) | |
| rmse = root_mean_squared_error(actuals, predictions, squared=False) | |
| r2 = r2_score(actuals, predictions) | |
| metrics = f"Test RMSE: {rmse:.3f}, Test R2: {r2:.3f}" | |
| fig = plt.figure(figsize=(12, 6)) | |
| plt.plot(actuals.numpy(), label="Actual") | |
| plt.plot(predictions.numpy(), label="Predicted") | |
| plt.title(f"{model_type} Forecast Result") | |
| plt.legend() | |
| export_df = pd.DataFrame({"Actual": actuals.numpy(), "Predicted": predictions.numpy()}) | |
| export_path = os.path.join(tempfile.gettempdir(), f"{model_type}_forecast.csv") | |
| export_df.to_csv(export_path, index=False) | |
| return actuals.numpy(), predictions.numpy(), fig, metrics, export_path | |