P2-ETF-DEEPWAVE-DL / preprocess.py
P2SAMAPA's picture
[auto] Sync code from GitHub
6281630 verified
# preprocess.py β€” Fixed version
# Key change: targets are now CLASS LABELS (which ETF had highest next-day return)
# not raw log-returns. This prevents the model from collapsing to near-zero predictions.
import os
import numpy as np
import pandas as pd
import pywt
from sklearn.preprocessing import StandardScaler
import joblib
import config
os.makedirs(config.DATA_DIR, exist_ok=True)
os.makedirs(config.MODELS_DIR, exist_ok=True)
# ─── Column normalisation ─────────────────────────────────────────────────────
def flatten_columns(df: pd.DataFrame) -> pd.DataFrame:
if isinstance(df.columns, pd.MultiIndex):
df.columns = [col[1] if col[1] else col[0] for col in df.columns]
df.columns = [str(c).strip() for c in df.columns]
return df
def normalize_etf_columns(df: pd.DataFrame) -> pd.DataFrame:
df = flatten_columns(df)
rename = {}
for col in df.columns:
for etf in config.ETFS + config.BENCHMARKS:
if col.startswith(etf):
rename[col] = etf
if rename:
df = df.rename(columns=rename)
df = df.loc[:, ~df.columns.duplicated()]
return df
# ─── Wavelet decomposition ────────────────────────────────────────────────────
def wavelet_decompose_1d(series: np.ndarray,
wavelet: str = config.WAVELET,
level: int = config.WAVELET_LEVELS) -> np.ndarray:
series = np.array(series).flatten().astype(float)
coeffs = pywt.wavedec(series, wavelet, level=level)
reconstructed = []
for i, c in enumerate(coeffs):
zeros = [np.zeros_like(cc) for cc in coeffs]
zeros[i] = c
rec = pywt.waverec(zeros, wavelet)[:len(series)]
if len(rec) < len(series):
rec = np.pad(rec, (0, len(series) - len(rec)))
reconstructed.append(rec)
out = np.stack(reconstructed, axis=1)
assert out.ndim == 2, f"wavelet shape error: {out.shape}"
return out
def apply_wavelet_to_df(df: pd.DataFrame) -> pd.DataFrame:
parts = []
for col in df.columns:
series = df[col].values.flatten().astype(float)
decomp = wavelet_decompose_1d(series)
labels = [f"{col}_A3", f"{col}_D3", f"{col}_D2", f"{col}_D1"]
parts.append(pd.DataFrame(decomp, index=df.index, columns=labels))
return pd.concat(parts, axis=1)
# ─── Feature builder ─────────────────────────────────────────────────────────
def build_features(data: dict) -> pd.DataFrame:
etf_ret = normalize_etf_columns(data["etf_ret"].copy())
etf_vol = normalize_etf_columns(data["etf_vol"].copy())
macro = flatten_columns(data["macro"].copy())
etf_cols = [c for c in config.ETFS if c in etf_ret.columns]
if not etf_cols:
print(f" WARNING: No ETF cols found. Available: {list(etf_ret.columns)}")
etf_ret = etf_ret[etf_cols] if etf_cols else etf_ret
etf_vol = etf_vol[[c for c in config.ETFS if c in etf_vol.columns]]
etf_vol.columns = [f"{c}_vol" for c in etf_vol.columns]
combined = pd.concat([etf_ret, etf_vol, macro], axis=1).dropna()
return apply_wavelet_to_df(combined)
# ─── TARGET: CLASS LABEL (not raw return) ────────────────────────────────────
def build_targets_classification(data: dict) -> pd.Series:
"""
For each day, the target is the INTEGER INDEX of the ETF with the
highest next-day log-return. Shape: (T,) dtype int32.
This converts the problem from regression (near-zero MSE trap)
to classification (cross-entropy loss β†’ meaningful softmax output).
"""
ret = normalize_etf_columns(data["etf_ret"].copy())
etf_cols = [c for c in config.ETFS if c in ret.columns]
ret = ret[etf_cols]
# shift(-1): tomorrow's return is the target for today's features
fwd = ret.shift(-1).dropna()
# argmax across ETFs β†’ integer class label 0..4
labels = fwd.values.argmax(axis=1)
return pd.Series(labels, index=fwd.index, dtype=np.int32)
# ─── Sequence windowing ───────────────────────────────────────────────────────
def make_sequences(features: pd.DataFrame,
targets: pd.Series,
lookback: int):
common = features.index.intersection(targets.index)
features = features.loc[common]
targets = targets.loc[common]
feat_arr = features.values.astype(np.float32)
tgt_arr = targets.values.astype(np.int32)
dates = features.index
X, y, d = [], [], []
for i in range(lookback, len(feat_arr)):
X.append(feat_arr[i - lookback : i])
y.append(tgt_arr[i])
d.append(dates[i])
return (np.array(X, dtype=np.float32),
np.array(y, dtype=np.int32),
np.array(d))
# ─── Split ────────────────────────────────────────────────────────────────────
def split_data(X, y, dates,
train_frac = config.TRAIN_SPLIT,
val_frac = config.VAL_SPLIT):
N = len(X)
t_end = int(N * train_frac)
v_end = int(N * (train_frac + val_frac))
return (X[:t_end], y[:t_end], dates[:t_end],
X[t_end:v_end], y[t_end:v_end], dates[t_end:v_end],
X[v_end:], y[v_end:], dates[v_end:])
# ─── Scaler ───────────────────────────────────────────────────────────────────
def fit_scaler(X_train: np.ndarray) -> StandardScaler:
N, L, F = X_train.shape
scaler = StandardScaler()
scaler.fit(X_train.reshape(-1, F))
return scaler
def apply_scaler(X: np.ndarray, scaler: StandardScaler) -> np.ndarray:
N, L, F = X.shape
return scaler.transform(X.reshape(-1, F)).reshape(N, L, F)
def save_scaler(scaler, lookback):
path = os.path.join(config.MODELS_DIR, f"scaler_lb{lookback}.pkl")
joblib.dump(scaler, path)
def load_scaler(lookback):
path = os.path.join(config.MODELS_DIR, f"scaler_lb{lookback}.pkl")
return joblib.load(path)
# ─── Full pipeline ────────────────────────────────────────────────────────────
def run_preprocessing(data: dict, lookback: int) -> dict:
print(f"\nPreprocessing with lookback={lookback}d ...")
features = build_features(data)
targets = build_targets_classification(data) # integer class labels
assert len(targets) > 0, "No targets generated"
print(f" Class distribution: {pd.Series(targets).value_counts().to_dict()}")
X, y, dates = make_sequences(features, targets, lookback)
(X_tr, y_tr, d_tr,
X_va, y_va, d_va,
X_te, y_te, d_te) = split_data(X, y, dates)
scaler = fit_scaler(X_tr)
X_tr_sc = apply_scaler(X_tr, scaler)
X_va_sc = apply_scaler(X_va, scaler)
X_te_sc = apply_scaler(X_te, scaler)
save_scaler(scaler, lookback)
n_etf_raw = len(config.ETFS) * 2
n_etf_features = n_etf_raw * (config.WAVELET_LEVELS + 1)
print(f" X shape: {X_tr_sc.shape} y shape: {y_tr.shape} "
f"(int class labels 0-{len(config.ETFS)-1})")
print(f" Train={len(X_tr)} Val={len(X_va)} Test={len(X_te)}")
return dict(
X_tr=X_tr_sc, y_tr=y_tr, d_tr=d_tr,
X_va=X_va_sc, y_va=y_va, d_va=d_va,
X_te=X_te_sc, y_te=y_te, d_te=d_te,
scaler=scaler,
n_features = X_tr_sc.shape[2],
n_etf_features= n_etf_features,
lookback = lookback,
n_classes = len(config.ETFS),
feature_names = list(features.columns),
)