Spaces:
Running
Running
File size: 4,879 Bytes
e57e9d1 06d9b9c e57e9d1 06d9b9c e57e9d1 06d9b9c e57e9d1 06d9b9c e57e9d1 06d9b9c e57e9d1 06d9b9c e57e9d1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 | """
TimeSeriesDataSet builder for pytorch_forecasting.
Wraps the feature_store output into train / validation / test splits
with proper temporal ordering (no leakage).
"""
from __future__ import annotations
import logging
import os
from typing import Optional
import numpy as np
import pandas as pd
from deep_learning.config import TFTASROConfig, get_tft_config
logger = logging.getLogger(__name__)
def build_datasets(
master_df: pd.DataFrame,
time_varying_unknown_reals: list[str],
time_varying_known_reals: list[str],
target_cols: list[str],
cfg: Optional[TFTASROConfig] = None,
):
"""
Create pytorch_forecasting TimeSeriesDataSet objects for train / val / test.
Uses chronological splitting:
[train | val | test]
Returns:
(training_dataset, validation_dataset, test_dataset)
"""
from pytorch_forecasting import TimeSeriesDataSet
if cfg is None:
cfg = get_tft_config()
n = len(master_df)
test_size = int(n * cfg.training.test_ratio)
val_size = int(n * cfg.training.val_ratio)
train_size = n - val_size - test_size
if train_size < cfg.model.max_encoder_length + cfg.model.max_prediction_length:
raise ValueError(
f"Not enough data for TFT: {train_size} train rows, "
f"need at least {cfg.model.max_encoder_length + cfg.model.max_prediction_length}"
)
train_cutoff = master_df["time_idx"].iloc[train_size - 1]
val_cutoff = master_df["time_idx"].iloc[train_size + val_size - 1]
logger.info(
"Data split: train=%d (idx<=%.0f), val=%d (idx<=%.0f), test=%d",
train_size, train_cutoff, val_size, val_cutoff, test_size,
)
target = target_cols[0] if target_cols else "target"
training = TimeSeriesDataSet(
master_df[master_df["time_idx"] <= train_cutoff],
time_idx="time_idx",
target=target,
group_ids=["group_id"],
max_encoder_length=cfg.model.max_encoder_length,
max_prediction_length=cfg.model.max_prediction_length,
time_varying_unknown_reals=time_varying_unknown_reals,
time_varying_known_reals=time_varying_known_reals,
static_categoricals=["group_id"],
add_relative_time_idx=True,
add_target_scales=True,
add_encoder_length=True,
allow_missing_timesteps=True,
)
validation = TimeSeriesDataSet.from_dataset(
training,
master_df[
(master_df["time_idx"] > train_cutoff - cfg.model.max_encoder_length)
& (master_df["time_idx"] <= val_cutoff)
],
stop_randomization=True,
)
test = TimeSeriesDataSet.from_dataset(
training,
master_df[master_df["time_idx"] > val_cutoff - cfg.model.max_encoder_length],
stop_randomization=True,
)
logger.info(
"Datasets created: train=%d samples, val=%d, test=%d | "
"encoder_len=%d, prediction_len=%d | "
"%d unknown reals, %d known reals",
len(training),
len(validation),
len(test),
cfg.model.max_encoder_length,
cfg.model.max_prediction_length,
len(time_varying_unknown_reals),
len(time_varying_known_reals),
)
return training, validation, test
def _resolve_num_workers(configured: int) -> int:
"""
Return a safe num_workers value for the current platform.
On Windows (os.name == 'nt'), PyTorch DataLoader multiprocessing requires
the script to be inside an ``if __name__ == '__main__'`` guard, which is
not the case in training scripts. Force 0 to avoid deadlocks.
On Linux/macOS (GitHub Actions, HF Spaces), use the configured value;
default to 2 when the config still carries the old 0.
"""
if os.name == "nt":
return 0
# On POSIX: honour config; upgrade 0 → 2 as a sensible floor
return max(configured, 2)
def create_dataloaders(
training_dataset,
validation_dataset,
test_dataset=None,
cfg: Optional[TFTASROConfig] = None,
):
"""
Create PyTorch DataLoaders from TimeSeriesDataSet objects.
"""
if cfg is None:
cfg = get_tft_config()
nw = _resolve_num_workers(cfg.training.num_workers)
logger.info(
"DataLoader workers: %d (platform=%s, configured=%d)",
nw, os.name, cfg.training.num_workers,
)
train_dl = training_dataset.to_dataloader(
train=True,
batch_size=cfg.training.batch_size,
num_workers=nw,
)
val_dl = validation_dataset.to_dataloader(
train=False,
batch_size=cfg.training.batch_size,
num_workers=nw,
)
test_dl = None
if test_dataset is not None:
test_dl = test_dataset.to_dataloader(
train=False,
batch_size=cfg.training.batch_size,
num_workers=nw,
)
return train_dl, val_dl, test_dl
|