Spaces:
Running
Running
File size: 7,628 Bytes
f381be8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 | """
src.data.preprocessing
======================
Data preprocessing, windowing, splitting, and scaler management.
Provides:
- Battery-grouped train/test split (no data leakage between batteries)
- Sliding-window sequence builder for sequential models (LSTM, Transformer)
- Scaler fitting / saving / loading (StandardScaler β MinMaxScaler)
- Down-sampling of per-cycle time-series to fixed-length bins
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Literal
import joblib
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from src.data.loader import ARTIFACTS_DIR
SCALER_DIR = ARTIFACTS_DIR / "scalers"
SCALER_DIR.mkdir(parents=True, exist_ok=True)
# ββ Train/test split by battery groups βββββββββββββββββββββββββββββββββββββββ
def group_battery_split(
df: pd.DataFrame,
train_ratio: float = 0.8,
random_state: int = 42,
battery_col: str = "battery_id",
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Split DataFrame into train/test by grouping at the battery level.
This prevents data leakage: all cycles from a battery appear in either
train or test, never both.
Parameters
----------
df : pd.DataFrame
train_ratio : float
Fraction of batteries used for training.
random_state : int
battery_col : str
Returns
-------
(train_df, test_df) : tuple of pd.DataFrame
"""
rng = np.random.RandomState(random_state)
# Sort first so shuffle is deterministic regardless of insertion order
batteries = np.array(sorted(df[battery_col].unique()))
rng.shuffle(batteries)
n_train = max(1, int(len(batteries) * train_ratio))
train_bats = set(batteries[:n_train])
test_bats = set(batteries[n_train:])
train_df = df[df[battery_col].isin(train_bats)].reset_index(drop=True)
test_df = df[df[battery_col].isin(test_bats)].reset_index(drop=True)
return train_df, test_df
# ββ Leave-one-battery-out split ββββββββββββββββββββββββββββββββββββββββββββββ
def leave_one_battery_out(
df: pd.DataFrame,
test_battery: str,
battery_col: str = "battery_id",
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Leave one battery out for testing (zero-shot generalization).
Parameters
----------
df : pd.DataFrame
test_battery : str
Battery ID to hold out (e.g. "B0005").
Returns
-------
(train_df, test_df) : tuple of pd.DataFrame
"""
test_df = df[df[battery_col] == test_battery].reset_index(drop=True)
train_df = df[df[battery_col] != test_battery].reset_index(drop=True)
return train_df, test_df
# ββ Sliding window sequences ββββββββββββββββββββββββββββββββββββββββββββββββ
def make_sliding_windows(
values: np.ndarray,
window_size: int = 32,
stride: int = 1,
) -> tuple[np.ndarray, np.ndarray]:
"""Create overlapping sliding windows from a 1D or 2D array.
For a 1D input of shape ``(T,)`` β windows of shape ``(N, window_size)``
and targets of shape ``(N,)`` (the element right after each window).
For a 2D input of shape ``(T, F)`` β windows ``(N, window_size, F)``
and targets ``(N, F)`` or ``(N,)`` depending on downstream usage.
Parameters
----------
values : np.ndarray
Shape ``(T,)`` or ``(T, F)``.
window_size : int
stride : int
Returns
-------
(X, y) : tuple of np.ndarray
"""
if values.ndim == 1:
values = values.reshape(-1, 1)
T, F = values.shape
X, y = [], []
for i in range(0, T - window_size, stride):
X.append(values[i : i + window_size])
y.append(values[i + window_size])
X = np.array(X)
y = np.array(y)
if F == 1:
y = y.ravel()
return X, y
def make_multistep_windows(
values: np.ndarray,
input_window: int = 32,
output_window: int = 8,
stride: int = 1,
) -> tuple[np.ndarray, np.ndarray]:
"""Create sliding windows with multi-step targets.
Parameters
----------
values : np.ndarray
Shape ``(T,)`` or ``(T, F)``.
input_window : int
output_window : int
stride : int
Returns
-------
(X, y) : tuple of np.ndarray
X shape: ``(N, input_window, F)``, y shape: ``(N, output_window, F)`` or ``(N, output_window)``.
"""
if values.ndim == 1:
values = values.reshape(-1, 1)
T, F = values.shape
X, y = [], []
for i in range(0, T - input_window - output_window + 1, stride):
X.append(values[i : i + input_window])
y.append(values[i + input_window : i + input_window + output_window])
X = np.array(X)
y = np.array(y)
if F == 1:
y = y.squeeze(-1)
return X, y
# ββ Fixed-length bin downsampling ββββββββββββββββββββββββββββββββββββββββββββ
def downsample_to_bins(
cycle_df: pd.DataFrame,
n_bins: int = 20,
columns: list[str] | None = None,
) -> pd.DataFrame:
"""Downsample a single-cycle DataFrame to exactly *n_bins* rows.
Each bin is the mean of a roughly equal-sized chunk.
"""
if columns is not None:
cycle_df = cycle_df[columns]
chunks = np.array_split(cycle_df.values, n_bins)
binned = np.array([chunk.mean(axis=0) for chunk in chunks])
return pd.DataFrame(binned, columns=cycle_df.columns if columns is None else columns)
# ββ Scaler utilities βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
def fit_and_save_scaler(
data: np.ndarray | pd.DataFrame,
scaler_type: Literal["standard", "minmax"] = "standard",
name: str = "default",
) -> StandardScaler | MinMaxScaler:
"""Fit a scaler on training data and persist to disk.
Parameters
----------
data : array-like
Training data.
scaler_type : {"standard", "minmax"}
name : str
Filename stem for saved scaler.
Returns
-------
Fitted scaler object.
"""
scaler = StandardScaler() if scaler_type == "standard" else MinMaxScaler()
if isinstance(data, pd.DataFrame):
data = data.values
if data.ndim == 1:
data = data.reshape(-1, 1)
scaler.fit(data)
path = SCALER_DIR / f"{name}_{scaler_type}.joblib"
joblib.dump(scaler, path)
return scaler
def load_scaler(name: str, scaler_type: Literal["standard", "minmax"] = "standard"):
"""Load a previously saved scaler from disk."""
path = SCALER_DIR / f"{name}_{scaler_type}.joblib"
if not path.exists():
raise FileNotFoundError(f"Scaler not found: {path}")
return joblib.load(path)
# ββ Feature/target column definitions ββββββββββββββββββββββββββββββββββββββββ
FEATURE_COLS_SCALAR = [
"cycle_number",
"ambient_temperature",
"peak_voltage",
"min_voltage",
"voltage_range",
"avg_current",
"avg_temp",
"temp_rise",
"cycle_duration",
"Re",
"Rct",
"delta_capacity",
]
TARGET_SOH = "SoH"
TARGET_RUL = "RUL"
TARGET_DEGRADATION = "degradation_state"
SEQUENCE_FEATURE_COLS = [
"Voltage_measured",
"Current_measured",
"Temperature_measured",
"SoC",
]
|