Spaces:
Sleeping
Sleeping
File size: 9,202 Bytes
e0ce20c | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | """
Load and prepare Store Sales - Time Series Forecasting dataset (Kaggle).
Dataset: CorporaciΓ³n Favorita (Ecuador grocery chain)
- 54 stores, 33 product families
- Daily unit sales + oil price + holidays + promotions
- Date range: 2013-01-01 to 2017-08-15 (~4.5 years)
We use a representative subset for the portfolio:
- Top N_SERIES series by total sales volume (well-established series)
- Long format: (unique_id, ds, y, [exog features...])
Fallback: if Store Sales CSV not found, automatically uses M5 via datasetsforecast.
Directory layout expected:
data/raw/
train.csv (from Kaggle zip)
test.csv
stores.csv
oil.csv
holidays_events.csv
transactions.csv
"""
from __future__ import annotations
import pandas as pd
import numpy as np
from src.config import (
DATA_RAW, DATA_PROC, TRAIN_PARQUET, TEST_PARQUET,
TARGET_COL, DATE_COL, ID_COL,
N_SERIES, HORIZON, RANDOM_SEED,
)
KAGGLE_TRAIN_CSV = DATA_RAW / "train.csv"
OIL_CSV = DATA_RAW / "oil.csv"
HOLIDAYS_CSV = DATA_RAW / "holidays_events.csv"
STORES_CSV = DATA_RAW / "stores.csv"
# ββ Store Sales loader βββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _load_store_sales() -> tuple[pd.DataFrame, pd.DataFrame]:
"""Load and reshape Kaggle Store Sales dataset into long format."""
print("Loading Store Sales (Kaggle Favorita)...")
train_raw = pd.read_csv(KAGGLE_TRAIN_CSV, parse_dates=["date"],
dtype={"store_nbr": str, "family": str})
# ββ Create unique_id = "store_{store_nbr}_{family}" βββββββββββββββββββ
train_raw[ID_COL] = ("store_" + train_raw["store_nbr"] + "_"
+ train_raw["family"].str.replace(" ", "_"))
train_raw[DATE_COL] = train_raw["date"]
train_raw[TARGET_COL] = train_raw["sales"].clip(lower=0)
print(f" Raw: {train_raw[ID_COL].nunique():,} series, "
f"{len(train_raw):,} rows")
# ββ Select top N_SERIES by total sales βββββββββββββββββββββββββββββββββ
top_ids = (
train_raw.groupby(ID_COL)[TARGET_COL]
.sum()
.nlargest(N_SERIES)
.index.tolist()
)
df = train_raw[train_raw[ID_COL].isin(top_ids)].copy()
# ββ Merge oil prices (external regressor) ββββββββββββββββββββββββββββββ
if OIL_CSV.exists():
oil = pd.read_csv(OIL_CSV, parse_dates=["date"])
oil = oil.rename(columns={"date": DATE_COL, "dcoilwtico": "oil_price"})
oil["oil_price"] = oil["oil_price"].interpolate() # fill weekends
df = df.merge(oil, on=DATE_COL, how="left")
# ββ Merge holiday flags βββββββββββββββββββββββββββββββββββββββββββββββββ
if HOLIDAYS_CSV.exists():
hol = pd.read_csv(HOLIDAYS_CSV, parse_dates=["date"])
hol = hol.rename(columns={"date": DATE_COL})
hol["is_holiday"] = (~hol["transferred"]).astype(int)
hol_agg = (hol.groupby(DATE_COL)["is_holiday"]
.max().reset_index())
df = df.merge(hol_agg, on=DATE_COL, how="left")
df["is_holiday"] = df["is_holiday"].fillna(0).astype(int)
# ββ Keep only essential columns βββββββββββββββββββββββββββββββββββββββββ
keep = [ID_COL, DATE_COL, TARGET_COL, "onpromotion"]
for col in ["oil_price", "is_holiday"]:
if col in df.columns:
keep.append(col)
df = df[keep].copy()
df = df.sort_values([ID_COL, DATE_COL]).reset_index(drop=True)
# ββ Fill date gaps (mlforecast requires complete daily sequences) ββββββββ
# Some product-store combos have missing days (store closed, out of stock).
# Fill with y=0 and forward-fill external regressors.
full_range = pd.date_range(df[DATE_COL].min(), df[DATE_COL].max(), freq="D")
filled_parts = []
exog_cols = [c for c in df.columns
if c not in [ID_COL, DATE_COL, TARGET_COL]]
for uid, grp in df.groupby(ID_COL, observed=True):
grp = grp.set_index(DATE_COL).reindex(full_range)
grp.index.name = DATE_COL
grp[ID_COL] = uid
grp[TARGET_COL] = grp[TARGET_COL].fillna(0)
for col in exog_cols:
if col in grp.columns:
grp[col] = grp[col].ffill().bfill()
filled_parts.append(grp.reset_index())
df = pd.concat(filled_parts, ignore_index=True)
df = df[[ID_COL, DATE_COL, TARGET_COL] + exog_cols]
# ββ Train / test split (last HORIZON days) βββββββββββββββββββββββββββββ
cutoff = df[DATE_COL].max() - pd.Timedelta(days=HORIZON)
train = df[df[DATE_COL] <= cutoff].copy()
test = df[df[DATE_COL] > cutoff].copy()
print(f" Subset: {df[ID_COL].nunique()} series (gaps filled with 0)")
print(f" Train: {train[DATE_COL].min().date()} -> {train[DATE_COL].max().date()} "
f"({len(train):,} rows)")
print(f" Test : {test[DATE_COL].min().date()} -> {test[DATE_COL].max().date()} "
f"({len(test):,} rows)")
return train, test
# ββ M5 fallback loader βββββββββββββββββββββββββββββββββββββββββββββββββββββ
def _load_m5_fallback() -> tuple[pd.DataFrame, pd.DataFrame]:
"""Fallback: load M5 via datasetsforecast if Store Sales CSV not found."""
print("Store Sales CSV not found - using M5 dataset (fallback).")
from datasetsforecast.m5 import M5
m5_dir = str(DATA_PROC / "m5_raw")
Y_df, X_df, _ = M5.load(directory=m5_dir)
print(f" Full M5: {Y_df[ID_COL].nunique():,} series")
# Sample N_SERIES series evenly across 3 stores
STORES = ["CA_1", "TX_1", "WI_1"]
per_store = N_SERIES // len(STORES)
rng = np.random.default_rng(RANDOM_SEED)
sampled = []
for store in STORES:
ids = Y_df[Y_df[ID_COL].str.endswith(f"_{store}")][ID_COL].unique()
n = min(per_store, len(ids))
sampled.extend(rng.choice(ids, size=n, replace=False).tolist())
Y_sub = Y_df[Y_df[ID_COL].isin(sampled)].copy()
if X_df is not None:
X_sub = X_df[X_df[ID_COL].isin(sampled)].copy()
df = Y_sub.merge(X_sub, on=[ID_COL, DATE_COL], how="left")
else:
df = Y_sub.copy()
df[DATE_COL] = pd.to_datetime(df[DATE_COL])
df = df.sort_values([ID_COL, DATE_COL]).reset_index(drop=True)
df[TARGET_COL] = df[TARGET_COL].fillna(0).clip(lower=0)
cutoff = df[DATE_COL].max() - pd.Timedelta(days=HORIZON)
train = df[df[DATE_COL] <= cutoff].copy()
test = df[df[DATE_COL] > cutoff].copy()
print(f" Subset: {df[ID_COL].nunique()} series ({per_store} per store)")
print(f" Train: {train[DATE_COL].min().date()} -> {train[DATE_COL].max().date()} "
f"({len(train):,} rows)")
print(f" Test : {test[DATE_COL].min().date()} -> {test[DATE_COL].max().date()} "
f"({len(test):,} rows)")
return train, test
# ββ Public API βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def load_data(force_reload: bool = False) -> tuple[pd.DataFrame, pd.DataFrame]:
"""
Load dataset. Uses Store Sales (Kaggle) if CSV is present,
otherwise falls back to M5 via datasetsforecast.
Returns (train_df, test_df) in long format:
unique_id | ds | y | [optional exog]
"""
if TRAIN_PARQUET.exists() and TEST_PARQUET.exists() and not force_reload:
print(f"Loading cached data from {DATA_PROC}")
train = pd.read_parquet(TRAIN_PARQUET)
test = pd.read_parquet(TEST_PARQUET)
print(f" {train[ID_COL].nunique()} series, "
f"{len(train):,} train rows, {len(test):,} test rows")
return train, test
DATA_PROC.mkdir(parents=True, exist_ok=True)
if KAGGLE_TRAIN_CSV.exists():
train, test = _load_store_sales()
else:
train, test = _load_m5_fallback()
train.to_parquet(TRAIN_PARQUET, index=False)
test.to_parquet(TEST_PARQUET, index=False)
print(f" Saved -> {DATA_PROC}")
return train, test
# Alias for backward compat
load_m5 = load_data
def get_series_summary(df: pd.DataFrame) -> pd.DataFrame:
"""Summary stats per series."""
return (
df.groupby(ID_COL)[TARGET_COL]
.agg(n_obs="count",
mean=np.mean, std=np.std,
min=np.min, max=np.max,
pct_zeros=lambda x: (x == 0).mean())
.round(2)
.reset_index()
)
if __name__ == "__main__":
train, test = load_data()
print("\nSeries summary (first 5):")
print(get_series_summary(train).head().to_string())
|