NeerajCodz's picture
feat: v3 models - XGBoost R2=0.9866, GradientBoosting R2=0.9860 as default
d3996f2
"""
src.data.features
=================
Feature engineering for battery lifecycle prediction.
Derived features
----------------
- **SOC** (State of Charge) via Coulomb counting per cycle
- **SOH** (State of Health) as percentage of nominal capacity
- **RUL** (Remaining Useful Life) in cycles until EOL
- **Per-cycle scalar features** for classical ML models:
peak_voltage, min_voltage, voltage_range, avg_current, avg_temp,
temp_rise, discharge_time, charge_time, coulombic_efficiency,
Re_at_cycle, Rct_at_cycle, delta_capacity
"""
from __future__ import annotations
import numpy as np
import pandas as pd
from src.data.loader import (
NOMINAL_CAPACITY_AH,
get_eol_threshold,
load_cycle_csv,
load_discharge_capacities,
load_impedance_scalars,
load_metadata,
)
# ── SOC via Coulomb counting ─────────────────────────────────────────────────
def compute_soc(
cycle_df: pd.DataFrame,
nominal_capacity_ah: float = NOMINAL_CAPACITY_AH,
) -> pd.Series:
"""Compute State of Charge (%) for a single discharge cycle via Coulomb counting.
SOC starts at 100% and decreases as charge is consumed:
Ξ”Q_i = I_i Β· Ξ”t_i / 3600 (Ah, with Ξ”t in seconds)
SOC_i = 100 Γ— (1 βˆ’ cumulative_Q / nominal_capacity)
Parameters
----------
cycle_df : pd.DataFrame
Must contain ``Current_measured`` (A) and ``Time`` (s).
nominal_capacity_ah : float
Nameplate capacity in Ah (default 2.0).
Returns
-------
pd.Series
SOC in percent [0, 100], same length as *cycle_df*.
"""
current = cycle_df["Current_measured"].values
time_s = cycle_df["Time"].values
# Time deltas (first delta = 0)
dt = np.diff(time_s, prepend=time_s[0])
dt[0] = 0.0
# Charge consumed (Ah); use absolute current for discharge (current < 0)
dq = np.abs(current) * dt / 3600.0
cumulative_q = np.cumsum(dq)
soc = 100.0 * (1.0 - cumulative_q / nominal_capacity_ah)
return pd.Series(soc, index=cycle_df.index, name="SoC")
# ── SOH ──────────────────────────────────────────────────────────────────────
def compute_soh(
measured_capacity: float | np.ndarray | pd.Series,
nominal_capacity_ah: float = NOMINAL_CAPACITY_AH,
) -> float | np.ndarray | pd.Series:
"""Compute State of Health (%) as measured capacity / nominal Γ— 100."""
return (measured_capacity / nominal_capacity_ah) * 100.0
# ── RUL ──────────────────────────────────────────────────────────────────────
def compute_rul_series(
capacity_series: pd.Series,
eol_threshold: float,
) -> pd.Series:
"""Compute Remaining Useful Life (cycles) for a capacity-fade series.
For each cycle *i*, RUL_i = (first cycle where capacity < eol_threshold) βˆ’ i.
If the battery never reaches EOL, use the last available cycle as a censored estimate.
"""
cap = capacity_series.values
# Find EOL cycle index
eol_indices = np.where(cap < eol_threshold)[0]
if len(eol_indices) > 0:
eol_cycle = eol_indices[0]
else:
eol_cycle = len(cap) # censored β€” battery didn't reach EOL
rul = eol_cycle - np.arange(len(cap))
rul = np.clip(rul, 0, None)
return pd.Series(rul, index=capacity_series.index, name="RUL")
# ── Degradation state classification ────────────────────────────────────────
def classify_degradation_state(soh: float | np.ndarray) -> str | np.ndarray:
"""Classify battery degradation into 4 states based on SOH %.
States:
0 – Healthy (SOH β‰₯ 90%)
1 – Aging (80% ≀ SOH < 90%)
2 – Near-EOL (70% ≀ SOH < 80%)
3 – EOL (SOH < 70%)
"""
soh_arr = np.asarray(soh)
labels = np.full(soh_arr.shape, 3, dtype=int) # default EOL
labels[soh_arr >= 90] = 0
labels[(soh_arr >= 80) & (soh_arr < 90)] = 1
labels[(soh_arr >= 70) & (soh_arr < 80)] = 2
if soh_arr.ndim == 0:
return int(labels)
return labels
DEGRADATION_LABELS = {0: "Healthy", 1: "Aging", 2: "Near-EOL", 3: "EOL"}
# ── Per-cycle scalar feature extraction ─────────────────────────────────────
def extract_cycle_features(cycle_df: pd.DataFrame) -> dict:
"""Extract scalar features from a single discharge or charge cycle.
Parameters
----------
cycle_df : pd.DataFrame
Raw time-series for one cycle.
Returns
-------
dict
Feature dictionary with keys:
peak_voltage, min_voltage, voltage_range, avg_current,
avg_temp, temp_rise, cycle_duration
"""
v = cycle_df.get("Voltage_measured")
i = cycle_df.get("Current_measured")
t = cycle_df.get("Temperature_measured")
time = cycle_df.get("Time")
features: dict = {}
if v is not None and len(v) > 0:
features["peak_voltage"] = float(v.max())
features["min_voltage"] = float(v.min())
features["voltage_range"] = float(v.max() - v.min())
if i is not None and len(i) > 0:
features["avg_current"] = float(np.abs(i).mean())
if t is not None and len(t) > 0:
features["avg_temp"] = float(t.mean())
features["temp_rise"] = float(t.max() - t.min())
if time is not None and len(time) > 0:
features["cycle_duration"] = float(time.iloc[-1] - time.iloc[0])
return features
def build_battery_feature_dataset(
*,
exclude_corrupt: bool = True,
verbose: bool = True,
) -> pd.DataFrame:
"""Build full per-cycle feature dataset across all batteries.
Combines:
- Capacity fade information from metadata
- Impedance scalars (Re, Rct) from impedance tests (nearest-cycle interpolated)
- Per-cycle scalar features extracted from raw discharge CSVs
- Derived targets: SOH (%), RUL (cycles), degradation_state (0–3)
Returns
-------
pd.DataFrame
One row per discharge cycle, with all features and targets.
"""
from tqdm import tqdm
# 1. Load capacity fade data
cap_df = load_discharge_capacities(exclude_corrupt=exclude_corrupt)
cap_df["SoH"] = compute_soh(cap_df["Capacity"])
# 2. Compute RUL per battery
rul_parts: list[pd.Series] = []
for bid, group in cap_df.groupby("battery_id"):
eol = get_eol_threshold(bid)
rul = compute_rul_series(group["Capacity"], eol)
rul_parts.append(rul)
cap_df["RUL"] = pd.concat(rul_parts)
# 3. Degradation state
cap_df["degradation_state"] = classify_degradation_state(cap_df["SoH"].values)
# 4. Impedance scalars β€” merge nearest impedance measurement per cycle
imp_df = load_impedance_scalars(exclude_corrupt=exclude_corrupt)
if not imp_df.empty:
# For each battery, forward-fill impedance values across discharge cycles
imp_pivot = imp_df.groupby("battery_id").apply(
lambda g: g.set_index("cycle_number")[["Re", "Rct"]], include_groups=False
)
re_map: dict[str, pd.Series] = {}
rct_map: dict[str, pd.Series] = {}
for bid in imp_df["battery_id"].unique():
if bid in imp_pivot.index.get_level_values(0):
sub = imp_pivot.loc[bid]
re_map[bid] = sub["Re"]
rct_map[bid] = sub["Rct"]
re_vals, rct_vals = [], []
for _, row in cap_df.iterrows():
bid = row["battery_id"]
cn = row["cycle_number"]
if bid in re_map and len(re_map[bid]) > 0:
# Nearest impedance cycle
idx = re_map[bid].index
nearest = idx[np.argmin(np.abs(idx - cn))]
re_vals.append(float(re_map[bid].loc[nearest]))
rct_vals.append(float(rct_map[bid].loc[nearest]))
else:
re_vals.append(np.nan)
rct_vals.append(np.nan)
cap_df["Re"] = re_vals
cap_df["Rct"] = rct_vals
# 5. Extract per-cycle features from raw discharge CSVs
meta = load_metadata(exclude_corrupt=exclude_corrupt, parse_dates=False)
dis_meta = meta[meta["type"] == "discharge"].copy()
dis_meta = dis_meta.sort_values(["battery_id", "test_id"]).reset_index(drop=True)
dis_meta["cycle_number"] = dis_meta.groupby("battery_id").cumcount()
# Build a uid lookup
uid_lookup = dis_meta.set_index(["battery_id", "cycle_number"])["uid"].to_dict()
extra_features: list[dict] = []
iterator = tqdm(cap_df.iterrows(), total=len(cap_df), desc="Extracting features") if verbose else cap_df.iterrows()
for _, row in iterator:
uid = uid_lookup.get((row["battery_id"], row["cycle_number"]))
if uid is not None:
try:
cdf = load_cycle_csv(uid)
feats = extract_cycle_features(cdf)
except (FileNotFoundError, Exception):
feats = {}
else:
feats = {}
extra_features.append(feats)
feat_df = pd.DataFrame(extra_features, index=cap_df.index)
result = pd.concat([cap_df, feat_df], axis=1)
# 6. Compute delta_capacity (capacity change from previous cycle)
result["delta_capacity"] = result.groupby("battery_id")["Capacity"].diff().fillna(0)
# 7. Coulombic efficiency placeholder β€” needs charge data too, fill NaN for now
if "coulombic_efficiency" not in result.columns:
result["coulombic_efficiency"] = np.nan
return result.reset_index(drop=True)
# ── v3 enhanced features ────────────────────────────────────────────────────
def add_v3_features(df: pd.DataFrame) -> pd.DataFrame:
"""Add v3 physics-informed features on top of the base feature dataset.
New features (6 total):
- capacity_retention: Q_n / Q_1 per battery (0-1, monotonically decreasing)
- cumulative_energy: cumulative Ah throughput (proxy for total energy cycled)
- dRe_dn: impedance growth rate (Ξ”Re per cycle, forward diff)
- dRct_dn: impedance growth rate (Ξ”Rct per cycle)
- soh_rolling_mean: 5-cycle rolling mean SOH (noise-smoothed degradation)
- voltage_slope: cycle-over-cycle voltage midpoint slope (dV_mid/dn)
Parameters
----------
df : pd.DataFrame
Output from ``build_battery_feature_dataset()``.
Returns
-------
pd.DataFrame
Same dataframe with 6 new columns appended.
"""
out = df.copy()
# ── capacity_retention: Q_n / Q_1 per battery ───────────────────────────
first_cap = out.groupby("battery_id")["Capacity"].transform("first")
out["capacity_retention"] = out["Capacity"] / first_cap.replace(0, np.nan)
# ── cumulative_energy: cumulative Ah throughput ─────────────────────────
out["cumulative_energy"] = out.groupby("battery_id")["Capacity"].cumsum()
# ── impedance growth rates (dRe/dn, dRct/dn) ───────────────────────────
out["dRe_dn"] = out.groupby("battery_id")["Re"].diff().fillna(0)
out["dRct_dn"] = out.groupby("battery_id")["Rct"].diff().fillna(0)
# ── SOH rolling mean (5-cycle window) ───────────────────────────────────
out["soh_rolling_mean"] = out.groupby("battery_id")["SoH"].transform(
lambda s: s.rolling(window=5, min_periods=1, center=False).mean()
)
# ── voltage_slope: cycle-over-cycle mid-voltage change ──────────────────
if "peak_voltage" in out.columns and "min_voltage" in out.columns:
v_mid = (out["peak_voltage"] + out["min_voltage"]) / 2.0
out["voltage_slope"] = v_mid.groupby(out["battery_id"]).diff().fillna(0)
else:
out["voltage_slope"] = 0.0
return out
def impute_features(df: pd.DataFrame) -> pd.DataFrame:
"""Fix NaN handling: forward-fill within battery, then group median.
Bug fix for v2 which used ``fillna(0)`` β€” physically impossible for Re/Rct.
"""
out = df.copy()
numeric_cols = out.select_dtypes(include=[np.number]).columns
# Step 1: forward fill within each battery (temporal continuity)
for col in numeric_cols:
out[col] = out.groupby("battery_id")[col].transform(
lambda s: s.ffill().bfill()
)
# Step 2: remaining NaN β†’ global median (cross-battery)
for col in numeric_cols:
if out[col].isna().any():
median_val = out[col].median()
out[col] = out[col].fillna(median_val if pd.notna(median_val) else 0)
return out