""" Stability Index Engine ====================== Computes a deterministic composite risk score SI ∈ [0, 10] per minute from the generated power and water 1MIN CSV files. Formula (v1.0 — matches stability_index_spec.docx): SI(t) = SI_raw(t) × V_dep(t) SI_raw = 10 × (w_bat·C_bat + w_fresh·C_fresh + w_waste·C_waste + w_energy·C_energy + w_stable·C_stable) V_dep = clamp(ttc / horizon_hr, 0.5, 1.0) where ttc = min(time_to_crit_battery, time_to_crit_fresh) [hours] Output CSV columns: Time, SI, SI_raw, C_bat, C_fresh, C_waste, C_grey, C_black, C_energy, C_stable, V_dep, ttc_hr, band, warm_up, Battery_Level_Pct, FreshTank_Level_L, GreyTank_Level_L, BlackTank_Level_L Usage: python stability_index.py # uses ./output/ python stability_index.py --power_dir data/power --water_dir data/water python stability_index.py --config si_config.json --out si_scores.csv python stability_index.py --summary """ import csv import json import math import argparse import statistics from collections import deque from pathlib import Path # ───────────────────────────────────────────────────────────────────────────── # DEFAULT CONFIGURATION # ───────────────────────────────────────────────────────────────────────────── DEFAULT_CONFIG = { # --- Component weights (must sum to 1.0) --- "w_bat": 0.35, "w_fresh": 0.25, "w_waste": 0.12, "w_energy": 0.18, "w_stable": 0.10, # --- Battery thresholds (%) --- "bat_crit_pct": 20.0, "bat_warn_pct": 50.0, # --- Fresh water thresholds (%) --- "fresh_crit_pct": 15.0, "fresh_warn_pct": 40.0, # --- Tank capacities (litres) --- "fresh_cap_L": 378.541, # 100 gal "grey_cap_L": 189.271, # 50 gal "black_cap_L": 170.344, # 45 gal # --- Waste tank penalty start (fill fraction 0–1) --- "grey_penalty_start": 0.70, "black_penalty_start": 0.60, # --- Waste composite weights --- "grey_weight_in_waste": 0.60, "black_weight_in_waste": 0.40, # --- Energy balance (C_energy) --- "energy_window_min": 60, "solar_cap_kW": 5.25, # --- Consumption stability (C_stable) --- "stable_window_min": 30, "stable_k": 2.0, "stable_mu_floor": 0.05, # --- Depletion velocity modifier (V_dep) --- "velocity_window_min": 60, "velocity_horizon_hr": 4.0, "velocity_min_factor": 0.5, # --- Score band thresholds --- "bands": [ {"min": 8.0, "max": 10.0, "label": "Excellent"}, {"min": 6.0, "max": 8.0, "label": "Good"}, {"min": 4.0, "max": 6.0, "label": "Fair"}, {"min": 2.0, "max": 4.0, "label": "Poor"}, {"min": 0.0, "max": 2.0, "label": "Critical"}, ], } LOAD_COLS = [ "HVAC_Flow_kW", "Lighting_Flow_kW", "Devices_Flow_kW", "Fridge_Flow_kW", "WaterPump_Flow_kW", "Cooking_Flow_kW", "Inverter_Flow_kW", "Unmetered_Flow_kW", ] # ───────────────────────────────────────────────────────────────────────────── # HELPERS # ───────────────────────────────────────────────────────────────────────────── def clamp(val, lo, hi): return max(lo, min(hi, val)) def band_label(si, bands): for b in bands: if b["min"] <= si <= b["max"]: return b["label"] return "Critical" def load_config(path): cfg = dict(DEFAULT_CONFIG) if path: with open(path) as f: cfg.update(json.load(f)) total_w = cfg["w_bat"] + cfg["w_fresh"] + cfg["w_waste"] + cfg["w_energy"] + cfg["w_stable"] if abs(total_w - 1.0) > 1e-6: raise ValueError(f"Component weights must sum to 1.0, got {total_w:.6f}") return cfg def load_csv(path): with open(path, newline="") as f: return list(csv.DictReader(f)) def merge_rows(power_rows, water_rows): """Inner join power + water rows on Time. Both files must be from same 1MIN export.""" if len(power_rows) == len(water_rows): merged = [] for i, (p, w) in enumerate(zip(power_rows, water_rows)): if p["Time"] != w["Time"]: raise ValueError( f"Timestamp mismatch at row {i}: power={p['Time']} water={w['Time']}" ) merged.append({**p, **w}) return merged # Fallback: join by timestamp key water_idx = {r["Time"]: r for r in water_rows} return [{**p, **water_idx[p["Time"]]} for p in power_rows if p["Time"] in water_idx] # ───────────────────────────────────────────────────────────────────────────── # COMPONENT SCORE FUNCTIONS # ───────────────────────────────────────────────────────────────────────────── def c_bat(bat_pct, crit, warn): """ Piecewise linear Battery SOC score → [0, 1]. Zones (configurable via crit/warn thresholds): [0, crit) → [0.00, 0.30) steep penalty [crit, warn) → [0.30, 0.70) moderate penalty [warn, 100] → [0.70, 1.00] comfort zone """ p = clamp(bat_pct, 0.0, 100.0) / 100.0 c = crit / 100.0 w = warn / 100.0 if p < c: return 0.00 + 0.30 * (p / c) elif p < w: return 0.30 + 0.40 * ((p - c) / (w - c)) else: return 0.70 + 0.30 * ((p - w) / (1.0 - w)) def c_fresh(fresh_L, cap_L, crit_pct, warn_pct): """ Piecewise linear Fresh Water score — identical formula to c_bat, applied to fill percentage. → [0, 1]. """ pct = clamp(fresh_L / cap_L * 100.0, 0.0, 100.0) return c_bat(pct, crit_pct, warn_pct) def headroom_score(fill, penalty_start): """ Single-tank waste headroom score. fill : fraction of tank capacity used [0, 1] penalty_start : fill fraction above which extra penalty applies Linear headroom below penalty_start; steeply penalised above. Returns 1.0 (empty) → 0.0 (overflow). """ fill = clamp(fill, 0.0, 1.0) headroom = 1.0 - fill if fill <= penalty_start: return headroom excess = fill - penalty_start range_above = 1.0 - penalty_start factor = 1.0 - 3.0 * excess / range_above return clamp(headroom * factor, 0.0, 1.0) def c_waste(grey_L, black_L, grey_cap, black_cap, grey_penalty, black_penalty, grey_w, black_w): """ Composite waste headroom = grey_w × C_grey + black_w × C_black. Returns (C_waste, C_grey, C_black). """ cg = headroom_score(grey_L / grey_cap, grey_penalty) cb = headroom_score(black_L / black_cap, black_penalty) return grey_w * cg + black_w * cb, cg, cb def c_energy(gen_window, load_window, solar_cap_kw): """ Rolling-window energy balance. net_norm = clamp((mean_gen - mean_load) / solar_cap, -1, +1) C_energy = 0.5 + 0.5 × net_norm """ if not gen_window: return 0.5 gen_mean = sum(gen_window) / len(gen_window) load_mean = sum(load_window) / len(load_window) net_norm = clamp((gen_mean - load_mean) / max(solar_cap_kw, 0.01), -1.0, 1.0) return 0.5 + 0.5 * net_norm def c_stable(load_window, k, mu_floor): """ Consumption stability via Coefficient of Variation. CV = σ / max(μ, mu_floor) C_stable = 1 / (1 + k × CV) """ n = len(load_window) if n < 2: return 1.0 mu = sum(load_window) / n if mu < 1e-6: return 1.0 variance = sum((x - mu) ** 2 for x in load_window) / (n - 1) sigma = math.sqrt(variance) cv = sigma / max(mu, mu_floor) return 1.0 / (1.0 + k * cv) def v_dep(bat_window, fresh_window, bat_pct_now, fresh_L_now, fresh_cap_L, bat_crit_pct, horizon_hr, v_floor): """ Depletion velocity modifier → (V_dep, ttc_hr). Computes velocity of battery (% / hr) and fresh water (L / hr), estimates time to critical threshold for each, takes the minimum, then maps to a [v_floor, 1.0] suppression factor over horizon_hr. """ N = len(bat_window) hrs = N / 60.0 INF = float("inf") # Battery velocity (%/hr) — negative means depleting v_bat = ((bat_pct_now - bat_window[0]) / hrs) if N >= 2 and hrs > 0 else 0.0 if v_bat < 0: gap_bat = bat_pct_now - bat_crit_pct ttc_bat = (gap_bat / max(abs(v_bat), 0.01)) if gap_bat > 0 else 0.0 else: ttc_bat = INF # Fresh water velocity (L/hr) — negative means depleting crit_fresh_L = 0.15 * fresh_cap_L v_fresh = ((fresh_L_now - fresh_window[0]) / hrs) if N >= 2 and hrs > 0 else 0.0 if v_fresh < 0: gap_fresh = fresh_L_now - crit_fresh_L ttc_fresh = (gap_fresh / max(abs(v_fresh), 0.1)) if gap_fresh > 0 else 0.0 else: ttc_fresh = INF ttc = min(ttc_bat, ttc_fresh) ttc_hr = round(ttc, 4) if ttc != INF else 9999.0 # 9999 = "no depletion risk" vd = clamp(ttc / horizon_hr, v_floor, 1.0) if ttc != INF else 1.0 return vd, ttc_hr # ───────────────────────────────────────────────────────────────────────────── # MAIN CALCULATION LOOP # ───────────────────────────────────────────────────────────────────────────── def calculate(power_path, water_path, cfg): """ Sequentially processes every 1-minute row. Maintains rolling windows for multi-row computations. Returns list of output dicts. """ rows = merge_rows(load_csv(power_path), load_csv(water_path)) N_energy = cfg["energy_window_min"] N_stable = cfg["stable_window_min"] N_velocity = cfg["velocity_window_min"] warm_up = max(N_energy, N_velocity) # longest window = warm-up horizon gen_win = deque(maxlen=N_energy) # Solar + Shore [kW] load_win_e = deque(maxlen=N_energy) # total load [kW] — energy window load_win_s = deque(maxlen=N_stable) # total load [kW] — stability window bat_win = deque(maxlen=N_velocity) # battery % — velocity window fresh_win = deque(maxlen=N_velocity) # fresh water L — velocity window results = [] for i, row in enumerate(rows): # ── Parse ────────────────────────────────────────────────────────── bat_pct = float(row["Battery_Level_Pct"]) fresh_L = float(row["FreshTank_Level_L"]) grey_L = float(row["GreyTank_Level_L"]) black_L = float(row["BlackTank_Level_L"]) solar_kw = float(row["Solar_Flow_kW"]) shore_kw = float(row["Shore_Flow_kW"]) total_load = sum(float(row[c]) for c in LOAD_COLS) # ── Feed rolling windows ─────────────────────────────────────────── gen_win.append(solar_kw + shore_kw) load_win_e.append(total_load) load_win_s.append(total_load) bat_win.append(bat_pct) fresh_win.append(fresh_L) # ── Component scores ─────────────────────────────────────────────── sc_bat = c_bat(bat_pct, cfg["bat_crit_pct"], cfg["bat_warn_pct"]) sc_fresh = c_fresh( fresh_L, cfg["fresh_cap_L"], cfg["fresh_crit_pct"], cfg["fresh_warn_pct"] ) sc_waste, sc_grey, sc_black = c_waste( grey_L, black_L, cfg["grey_cap_L"], cfg["black_cap_L"], cfg["grey_penalty_start"], cfg["black_penalty_start"], cfg["grey_weight_in_waste"], cfg["black_weight_in_waste"] ) sc_energy = c_energy(gen_win, load_win_e, cfg["solar_cap_kW"]) sc_stable = c_stable(load_win_s, cfg["stable_k"], cfg["stable_mu_floor"]) # ── Weighted sum ─────────────────────────────────────────────────── si_raw = 10.0 * ( cfg["w_bat"] * sc_bat + cfg["w_fresh"] * sc_fresh + cfg["w_waste"] * sc_waste + cfg["w_energy"] * sc_energy + cfg["w_stable"] * sc_stable ) si_raw = clamp(si_raw, 0.0, 10.0) # ── Depletion velocity modifier ──────────────────────────────────── vd, ttc_hr = v_dep( bat_win, fresh_win, bat_pct, fresh_L, cfg["fresh_cap_L"], cfg["bat_crit_pct"], cfg["velocity_horizon_hr"], cfg["velocity_min_factor"] ) si_final = clamp(si_raw * vd, 0.0, 10.0) results.append({ "Time": row["Time"], "SI": round(si_final, 4), "SI_raw": round(si_raw, 4), "C_bat": round(sc_bat, 4), "C_fresh": round(sc_fresh, 4), "C_waste": round(sc_waste, 4), "C_grey": round(sc_grey, 4), "C_black": round(sc_black, 4), "C_energy": round(sc_energy, 4), "C_stable": round(sc_stable, 4), "V_dep": round(vd, 4), "ttc_hr": ttc_hr, "band": band_label(si_final, cfg["bands"]), "warm_up": "true" if i < warm_up else "false", # Passthrough context columns "Battery_Level_Pct": round(bat_pct, 2), "FreshTank_Level_L": round(fresh_L, 2), "GreyTank_Level_L": round(grey_L, 2), "BlackTank_Level_L": round(black_L, 2), }) return results # ───────────────────────────────────────────────────────────────────────────── # SUMMARY REPORT # ───────────────────────────────────────────────────────────────────────────── def print_summary(results, cfg): warm = [r for r in results if r["warm_up"] == "false"] or results si_vals = [r["SI"] for r in warm] si_mean = statistics.mean(si_vals) si_min = min(si_vals) si_max = max(si_vals) si_std = statistics.stdev(si_vals) if len(si_vals) > 1 else 0.0 band_counts = {} for r in warm: band_counts[r["band"]] = band_counts.get(r["band"], 0) + 1 worst = min(warm, key=lambda r: r["SI"]) best = max(warm, key=lambda r: r["SI"]) def cmean(key): return round(statistics.mean(r[key] for r in warm), 3) W = 62 sep = "─" * W print(f"\n{'═' * W}") print(f" STABILITY INDEX — SUMMARY REPORT") print(f" Period : {results[0]['Time']} → {results[-1]['Time']}") print(f" Rows : {len(results):,} | Warm-up excluded: {len(results)-len(warm):,}") print(f"{'═' * W}") print(f"\n OVERALL SI") print(f" {sep}") print(f" Mean : {si_mean:6.3f} ({band_label(si_mean, cfg['bands'])})") print(f" Min : {si_min:6.3f} @ {worst['Time']} [{worst['band']}]") print(f" Max : {si_max:6.3f} @ {best['Time']} [{best['band']}]") print(f" Std : {si_std:6.3f}") print(f"\n BAND DISTRIBUTION") print(f" {sep}") total = len(warm) for b in ["Excellent", "Good", "Fair", "Poor", "Critical"]: n = band_counts.get(b, 0) pct = n / total * 100 if total else 0 bar = "█" * int(pct / 2) print(f" {b:<12} {n:>5} min ({pct:5.1f}%) {bar}") print(f"\n COMPONENT AVERAGES (0–1, higher = more stable)") print(f" {sep}") rows = [ ("C_bat", "Battery SOC ", cfg["w_bat"]), ("C_fresh", "Fresh Water ", cfg["w_fresh"]), ("C_waste", "Waste Headroom ", cfg["w_waste"]), ("C_grey", " ↳ Grey tank ", cfg["grey_weight_in_waste"] * cfg["w_waste"]), ("C_black", " ↳ Black tank ", cfg["black_weight_in_waste"] * cfg["w_waste"]), ("C_energy", "Energy Balance ", cfg["w_energy"]), ("C_stable", "Cons. Stability", cfg["w_stable"]), ("V_dep", "Depl. Velocity ", None), ] for key, label, weight in rows: val = cmean(key) bar = "▓" * int(val * 20) wstr = f"w={weight:.2f}" if weight is not None else "modifier" print(f" {label} {val:.3f} {bar:<22} {wstr}") # Identify consecutive critical/poor runs crit = [r for r in warm if r["band"] in ("Critical", "Poor")] if crit: # Build index for consecutive-run detection idx_map = {r["Time"]: i for i, r in enumerate(results)} runs, run_start, prev_i = [], None, None for r in crit: ri = idx_map[r["Time"]] if prev_i is None or ri != prev_i + 1: if run_start is not None: runs.append((run_start, results[prev_i])) run_start = r prev_i = ri if run_start: runs.append((run_start, results[prev_i])) print(f"\n ⚠ POOR / CRITICAL EVENTS ({len(crit)} minutes across {len(runs)} run(s))") print(f" {sep}") for start, end in runs[:10]: dur = idx_map[end["Time"]] - idx_map[start["Time"]] + 1 print(f" {start['Time']} SI={start['SI']:.2f} " f"[{start['band']}] duration={dur} min " f"bat={start['Battery_Level_Pct']}% " f"fresh={start['FreshTank_Level_L']:.0f}L") if len(runs) > 10: print(f" … and {len(runs)-10} more run(s)") else: print(f"\n ✓ No Critical or Poor events detected.") print(f"\n{'═' * W}\n") # ───────────────────────────────────────────────────────────────────────────── # CSV WRITER # ───────────────────────────────────────────────────────────────────────────── def write_csv(path, rows): if not rows: print(" No rows to write.") return Path(path).parent.mkdir(parents=True, exist_ok=True) with open(path, "w", newline="") as f: w = csv.DictWriter(f, fieldnames=rows[0].keys()) w.writeheader() w.writerows(rows) print(f" Wrote {len(rows):,} rows → {path}") # ───────────────────────────────────────────────────────────────────────────── # ENTRY POINT # ───────────────────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser( description="EV Camper Stability Index Engine", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: python stability_index.py --summary python stability_index.py --power_dir data/power --water_dir data/water python stability_index.py --config si_config.json --out scores/si.csv --summary python stability_index.py --w_bat 0.40 --w_fresh 0.20 --w_energy 0.20 --w_waste 0.10 --w_stable 0.10 """ ) # Paths parser.add_argument("--power_dir", default="output/power", help="Directory containing 1MIN.csv (default: output/power)") parser.add_argument("--water_dir", default="output/water", help="Directory containing 1MIN.csv (default: output/water)") parser.add_argument("--out", default="output/stability_index.csv", help="Output CSV path (default: output/stability_index.csv)") parser.add_argument("--config", default=None, help="JSON config file to override any DEFAULT_CONFIG value") # Inline weight overrides parser.add_argument("--w_bat", type=float, default=None, metavar="W", help="Battery SOC weight (overrides config)") parser.add_argument("--w_fresh", type=float, default=None, metavar="W") parser.add_argument("--w_waste", type=float, default=None, metavar="W") parser.add_argument("--w_energy", type=float, default=None, metavar="W") parser.add_argument("--w_stable", type=float, default=None, metavar="W") # Other inline overrides parser.add_argument("--solar_cap_kW", type=float, default=None) parser.add_argument("--energy_window_min", type=int, default=None) parser.add_argument("--stable_window_min", type=int, default=None) parser.add_argument("--velocity_horizon_hr", type=float, default=None) parser.add_argument("--fresh_cap_L", type=float, default=None) parser.add_argument("--grey_cap_L", type=float, default=None) parser.add_argument("--black_cap_L", type=float, default=None) parser.add_argument("--summary", action="store_true", help="Print a human-readable summary report to stdout") args = parser.parse_args() # Build config with optional file + CLI overrides cfg = load_config(args.config) for key in ["w_bat", "w_fresh", "w_waste", "w_energy", "w_stable", "solar_cap_kW", "energy_window_min", "stable_window_min", "velocity_horizon_hr", "fresh_cap_L", "grey_cap_L", "black_cap_L"]: val = getattr(args, key, None) if val is not None: cfg[key] = val total_w = cfg["w_bat"] + cfg["w_fresh"] + cfg["w_waste"] + cfg["w_energy"] + cfg["w_stable"] if abs(total_w - 1.0) > 1e-4: raise SystemExit( f"ERROR: weights must sum to 1.0 (got {total_w:.4f})\n" f" bat={cfg['w_bat']} fresh={cfg['w_fresh']} waste={cfg['w_waste']} " f"energy={cfg['w_energy']} stable={cfg['w_stable']}" ) power_path = str(Path(args.power_dir) / "1MIN.csv") water_path = str(Path(args.water_dir) / "1MIN.csv") print(f"\n{'='*62}") print(f" Stability Index Engine v1.0") print(f"{'='*62}") print(f" Power : {power_path}") print(f" Water : {water_path}") print(f" Weights: bat={cfg['w_bat']} fresh={cfg['w_fresh']} " f"waste={cfg['w_waste']} energy={cfg['w_energy']} stable={cfg['w_stable']}") print(f" Windows: energy={cfg['energy_window_min']}min " f"stable={cfg['stable_window_min']}min " f"velocity={cfg['velocity_window_min']}min") print() print(" Calculating...") results = calculate(power_path, water_path, cfg) write_csv(args.out, results) if args.summary: print_summary(results, cfg) print(f" Done.\n") if __name__ == "__main__": main()