Spaces:
Running
Running
| """ | |
| src.data.features | |
| ================= | |
| Feature engineering for battery lifecycle prediction. | |
| Derived features | |
| ---------------- | |
| - **SOC** (State of Charge) via Coulomb counting per cycle | |
| - **SOH** (State of Health) as percentage of nominal capacity | |
| - **RUL** (Remaining Useful Life) in cycles until EOL | |
| - **Per-cycle scalar features** for classical ML models: | |
| peak_voltage, min_voltage, voltage_range, avg_current, avg_temp, | |
| temp_rise, discharge_time, charge_time, coulombic_efficiency, | |
| Re_at_cycle, Rct_at_cycle, delta_capacity | |
| """ | |
| from __future__ import annotations | |
| import numpy as np | |
| import pandas as pd | |
| from src.data.loader import ( | |
| NOMINAL_CAPACITY_AH, | |
| get_eol_threshold, | |
| load_cycle_csv, | |
| load_discharge_capacities, | |
| load_impedance_scalars, | |
| load_metadata, | |
| ) | |
| # ββ SOC via Coulomb counting βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_soc( | |
| cycle_df: pd.DataFrame, | |
| nominal_capacity_ah: float = NOMINAL_CAPACITY_AH, | |
| ) -> pd.Series: | |
| """Compute State of Charge (%) for a single discharge cycle via Coulomb counting. | |
| SOC starts at 100% and decreases as charge is consumed: | |
| ΞQ_i = I_i Β· Ξt_i / 3600 (Ah, with Ξt in seconds) | |
| SOC_i = 100 Γ (1 β cumulative_Q / nominal_capacity) | |
| Parameters | |
| ---------- | |
| cycle_df : pd.DataFrame | |
| Must contain ``Current_measured`` (A) and ``Time`` (s). | |
| nominal_capacity_ah : float | |
| Nameplate capacity in Ah (default 2.0). | |
| Returns | |
| ------- | |
| pd.Series | |
| SOC in percent [0, 100], same length as *cycle_df*. | |
| """ | |
| current = cycle_df["Current_measured"].values | |
| time_s = cycle_df["Time"].values | |
| # Time deltas (first delta = 0) | |
| dt = np.diff(time_s, prepend=time_s[0]) | |
| dt[0] = 0.0 | |
| # Charge consumed (Ah); use absolute current for discharge (current < 0) | |
| dq = np.abs(current) * dt / 3600.0 | |
| cumulative_q = np.cumsum(dq) | |
| soc = 100.0 * (1.0 - cumulative_q / nominal_capacity_ah) | |
| return pd.Series(soc, index=cycle_df.index, name="SoC") | |
| # ββ SOH ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_soh( | |
| measured_capacity: float | np.ndarray | pd.Series, | |
| nominal_capacity_ah: float = NOMINAL_CAPACITY_AH, | |
| ) -> float | np.ndarray | pd.Series: | |
| """Compute State of Health (%) as measured capacity / nominal Γ 100.""" | |
| return (measured_capacity / nominal_capacity_ah) * 100.0 | |
| # ββ RUL ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_rul_series( | |
| capacity_series: pd.Series, | |
| eol_threshold: float, | |
| ) -> pd.Series: | |
| """Compute Remaining Useful Life (cycles) for a capacity-fade series. | |
| For each cycle *i*, RUL_i = (first cycle where capacity < eol_threshold) β i. | |
| If the battery never reaches EOL, use the last available cycle as a censored estimate. | |
| """ | |
| cap = capacity_series.values | |
| # Find EOL cycle index | |
| eol_indices = np.where(cap < eol_threshold)[0] | |
| if len(eol_indices) > 0: | |
| eol_cycle = eol_indices[0] | |
| else: | |
| eol_cycle = len(cap) # censored β battery didn't reach EOL | |
| rul = eol_cycle - np.arange(len(cap)) | |
| rul = np.clip(rul, 0, None) | |
| return pd.Series(rul, index=capacity_series.index, name="RUL") | |
| # ββ Degradation state classification ββββββββββββββββββββββββββββββββββββββββ | |
| def classify_degradation_state(soh: float | np.ndarray) -> str | np.ndarray: | |
| """Classify battery degradation into 4 states based on SOH %. | |
| States: | |
| 0 β Healthy (SOH β₯ 90%) | |
| 1 β Aging (80% β€ SOH < 90%) | |
| 2 β Near-EOL (70% β€ SOH < 80%) | |
| 3 β EOL (SOH < 70%) | |
| """ | |
| soh_arr = np.asarray(soh) | |
| labels = np.full(soh_arr.shape, 3, dtype=int) # default EOL | |
| labels[soh_arr >= 90] = 0 | |
| labels[(soh_arr >= 80) & (soh_arr < 90)] = 1 | |
| labels[(soh_arr >= 70) & (soh_arr < 80)] = 2 | |
| if soh_arr.ndim == 0: | |
| return int(labels) | |
| return labels | |
| DEGRADATION_LABELS = {0: "Healthy", 1: "Aging", 2: "Near-EOL", 3: "EOL"} | |
| # ββ Per-cycle scalar feature extraction βββββββββββββββββββββββββββββββββββββ | |
| def extract_cycle_features(cycle_df: pd.DataFrame) -> dict: | |
| """Extract scalar features from a single discharge or charge cycle. | |
| Parameters | |
| ---------- | |
| cycle_df : pd.DataFrame | |
| Raw time-series for one cycle. | |
| Returns | |
| ------- | |
| dict | |
| Feature dictionary with keys: | |
| peak_voltage, min_voltage, voltage_range, avg_current, | |
| avg_temp, temp_rise, cycle_duration | |
| """ | |
| v = cycle_df.get("Voltage_measured") | |
| i = cycle_df.get("Current_measured") | |
| t = cycle_df.get("Temperature_measured") | |
| time = cycle_df.get("Time") | |
| features: dict = {} | |
| if v is not None and len(v) > 0: | |
| features["peak_voltage"] = float(v.max()) | |
| features["min_voltage"] = float(v.min()) | |
| features["voltage_range"] = float(v.max() - v.min()) | |
| if i is not None and len(i) > 0: | |
| features["avg_current"] = float(np.abs(i).mean()) | |
| if t is not None and len(t) > 0: | |
| features["avg_temp"] = float(t.mean()) | |
| features["temp_rise"] = float(t.max() - t.min()) | |
| if time is not None and len(time) > 0: | |
| features["cycle_duration"] = float(time.iloc[-1] - time.iloc[0]) | |
| return features | |
| def build_battery_feature_dataset( | |
| *, | |
| exclude_corrupt: bool = True, | |
| verbose: bool = True, | |
| ) -> pd.DataFrame: | |
| """Build full per-cycle feature dataset across all batteries. | |
| Combines: | |
| - Capacity fade information from metadata | |
| - Impedance scalars (Re, Rct) from impedance tests (nearest-cycle interpolated) | |
| - Per-cycle scalar features extracted from raw discharge CSVs | |
| - Derived targets: SOH (%), RUL (cycles), degradation_state (0β3) | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| One row per discharge cycle, with all features and targets. | |
| """ | |
| from tqdm import tqdm | |
| # 1. Load capacity fade data | |
| cap_df = load_discharge_capacities(exclude_corrupt=exclude_corrupt) | |
| cap_df["SoH"] = compute_soh(cap_df["Capacity"]) | |
| # 2. Compute RUL per battery | |
| rul_parts: list[pd.Series] = [] | |
| for bid, group in cap_df.groupby("battery_id"): | |
| eol = get_eol_threshold(bid) | |
| rul = compute_rul_series(group["Capacity"], eol) | |
| rul_parts.append(rul) | |
| cap_df["RUL"] = pd.concat(rul_parts) | |
| # 3. Degradation state | |
| cap_df["degradation_state"] = classify_degradation_state(cap_df["SoH"].values) | |
| # 4. Impedance scalars β merge nearest impedance measurement per cycle | |
| imp_df = load_impedance_scalars(exclude_corrupt=exclude_corrupt) | |
| if not imp_df.empty: | |
| # For each battery, forward-fill impedance values across discharge cycles | |
| imp_pivot = imp_df.groupby("battery_id").apply( | |
| lambda g: g.set_index("cycle_number")[["Re", "Rct"]], include_groups=False | |
| ) | |
| re_map: dict[str, pd.Series] = {} | |
| rct_map: dict[str, pd.Series] = {} | |
| for bid in imp_df["battery_id"].unique(): | |
| if bid in imp_pivot.index.get_level_values(0): | |
| sub = imp_pivot.loc[bid] | |
| re_map[bid] = sub["Re"] | |
| rct_map[bid] = sub["Rct"] | |
| re_vals, rct_vals = [], [] | |
| for _, row in cap_df.iterrows(): | |
| bid = row["battery_id"] | |
| cn = row["cycle_number"] | |
| if bid in re_map and len(re_map[bid]) > 0: | |
| # Nearest impedance cycle | |
| idx = re_map[bid].index | |
| nearest = idx[np.argmin(np.abs(idx - cn))] | |
| re_vals.append(float(re_map[bid].loc[nearest])) | |
| rct_vals.append(float(rct_map[bid].loc[nearest])) | |
| else: | |
| re_vals.append(np.nan) | |
| rct_vals.append(np.nan) | |
| cap_df["Re"] = re_vals | |
| cap_df["Rct"] = rct_vals | |
| # 5. Extract per-cycle features from raw discharge CSVs | |
| meta = load_metadata(exclude_corrupt=exclude_corrupt, parse_dates=False) | |
| dis_meta = meta[meta["type"] == "discharge"].copy() | |
| dis_meta = dis_meta.sort_values(["battery_id", "test_id"]).reset_index(drop=True) | |
| dis_meta["cycle_number"] = dis_meta.groupby("battery_id").cumcount() | |
| # Build a uid lookup | |
| uid_lookup = dis_meta.set_index(["battery_id", "cycle_number"])["uid"].to_dict() | |
| extra_features: list[dict] = [] | |
| iterator = tqdm(cap_df.iterrows(), total=len(cap_df), desc="Extracting features") if verbose else cap_df.iterrows() | |
| for _, row in iterator: | |
| uid = uid_lookup.get((row["battery_id"], row["cycle_number"])) | |
| if uid is not None: | |
| try: | |
| cdf = load_cycle_csv(uid) | |
| feats = extract_cycle_features(cdf) | |
| except (FileNotFoundError, Exception): | |
| feats = {} | |
| else: | |
| feats = {} | |
| extra_features.append(feats) | |
| feat_df = pd.DataFrame(extra_features, index=cap_df.index) | |
| result = pd.concat([cap_df, feat_df], axis=1) | |
| # 6. Compute delta_capacity (capacity change from previous cycle) | |
| result["delta_capacity"] = result.groupby("battery_id")["Capacity"].diff().fillna(0) | |
| # 7. Coulombic efficiency placeholder β needs charge data too, fill NaN for now | |
| if "coulombic_efficiency" not in result.columns: | |
| result["coulombic_efficiency"] = np.nan | |
| return result.reset_index(drop=True) | |
| # ββ v3 enhanced features ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def add_v3_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """Add v3 physics-informed features on top of the base feature dataset. | |
| New features (6 total): | |
| - capacity_retention: Q_n / Q_1 per battery (0-1, monotonically decreasing) | |
| - cumulative_energy: cumulative Ah throughput (proxy for total energy cycled) | |
| - dRe_dn: impedance growth rate (ΞRe per cycle, forward diff) | |
| - dRct_dn: impedance growth rate (ΞRct per cycle) | |
| - soh_rolling_mean: 5-cycle rolling mean SOH (noise-smoothed degradation) | |
| - voltage_slope: cycle-over-cycle voltage midpoint slope (dV_mid/dn) | |
| Parameters | |
| ---------- | |
| df : pd.DataFrame | |
| Output from ``build_battery_feature_dataset()``. | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| Same dataframe with 6 new columns appended. | |
| """ | |
| out = df.copy() | |
| # ββ capacity_retention: Q_n / Q_1 per battery βββββββββββββββββββββββββββ | |
| first_cap = out.groupby("battery_id")["Capacity"].transform("first") | |
| out["capacity_retention"] = out["Capacity"] / first_cap.replace(0, np.nan) | |
| # ββ cumulative_energy: cumulative Ah throughput βββββββββββββββββββββββββ | |
| out["cumulative_energy"] = out.groupby("battery_id")["Capacity"].cumsum() | |
| # ββ impedance growth rates (dRe/dn, dRct/dn) βββββββββββββββββββββββββββ | |
| out["dRe_dn"] = out.groupby("battery_id")["Re"].diff().fillna(0) | |
| out["dRct_dn"] = out.groupby("battery_id")["Rct"].diff().fillna(0) | |
| # ββ SOH rolling mean (5-cycle window) βββββββββββββββββββββββββββββββββββ | |
| out["soh_rolling_mean"] = out.groupby("battery_id")["SoH"].transform( | |
| lambda s: s.rolling(window=5, min_periods=1, center=False).mean() | |
| ) | |
| # ββ voltage_slope: cycle-over-cycle mid-voltage change ββββββββββββββββββ | |
| if "peak_voltage" in out.columns and "min_voltage" in out.columns: | |
| v_mid = (out["peak_voltage"] + out["min_voltage"]) / 2.0 | |
| out["voltage_slope"] = v_mid.groupby(out["battery_id"]).diff().fillna(0) | |
| else: | |
| out["voltage_slope"] = 0.0 | |
| return out | |
| def impute_features(df: pd.DataFrame) -> pd.DataFrame: | |
| """Fix NaN handling: forward-fill within battery, then group median. | |
| Bug fix for v2 which used ``fillna(0)`` β physically impossible for Re/Rct. | |
| """ | |
| out = df.copy() | |
| numeric_cols = out.select_dtypes(include=[np.number]).columns | |
| # Step 1: forward fill within each battery (temporal continuity) | |
| for col in numeric_cols: | |
| out[col] = out.groupby("battery_id")[col].transform( | |
| lambda s: s.ffill().bfill() | |
| ) | |
| # Step 2: remaining NaN β global median (cross-battery) | |
| for col in numeric_cols: | |
| if out[col].isna().any(): | |
| median_val = out[col].median() | |
| out[col] = out[col].fillna(median_val if pd.notna(median_val) else 0) | |
| return out | |