Spaces:
Sleeping
Sleeping
| """ | |
| Stability Index Engine | |
| ====================== | |
| Computes a deterministic composite risk score SI β [0, 10] per minute | |
| from the generated power and water 1MIN CSV files. | |
| Formula (v1.0 β matches stability_index_spec.docx): | |
| SI(t) = SI_raw(t) Γ V_dep(t) | |
| SI_raw = 10 Γ (w_batΒ·C_bat + w_freshΒ·C_fresh + w_wasteΒ·C_waste | |
| + w_energyΒ·C_energy + w_stableΒ·C_stable) | |
| V_dep = clamp(ttc / horizon_hr, 0.5, 1.0) | |
| where ttc = min(time_to_crit_battery, time_to_crit_fresh) [hours] | |
| Output CSV columns: | |
| Time, SI, SI_raw, C_bat, C_fresh, C_waste, C_grey, C_black, | |
| C_energy, C_stable, V_dep, ttc_hr, band, warm_up, | |
| Battery_Level_Pct, FreshTank_Level_L, GreyTank_Level_L, BlackTank_Level_L | |
| Usage: | |
| python stability_index.py # uses ./output/ | |
| python stability_index.py --power_dir data/power --water_dir data/water | |
| python stability_index.py --config si_config.json --out si_scores.csv | |
| python stability_index.py --summary | |
| """ | |
| import csv | |
| import json | |
| import math | |
| import argparse | |
| import statistics | |
| from collections import deque | |
| from pathlib import Path | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # DEFAULT CONFIGURATION | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DEFAULT_CONFIG = { | |
| # --- Component weights (must sum to 1.0) --- | |
| "w_bat": 0.35, | |
| "w_fresh": 0.25, | |
| "w_waste": 0.12, | |
| "w_energy": 0.18, | |
| "w_stable": 0.10, | |
| # --- Battery thresholds (%) --- | |
| "bat_crit_pct": 20.0, | |
| "bat_warn_pct": 50.0, | |
| # --- Fresh water thresholds (%) --- | |
| "fresh_crit_pct": 15.0, | |
| "fresh_warn_pct": 40.0, | |
| # --- Tank capacities (litres) --- | |
| "fresh_cap_L": 378.541, # 100 gal | |
| "grey_cap_L": 189.271, # 50 gal | |
| "black_cap_L": 170.344, # 45 gal | |
| # --- Waste tank penalty start (fill fraction 0β1) --- | |
| "grey_penalty_start": 0.70, | |
| "black_penalty_start": 0.60, | |
| # --- Waste composite weights --- | |
| "grey_weight_in_waste": 0.60, | |
| "black_weight_in_waste": 0.40, | |
| # --- Energy balance (C_energy) --- | |
| "energy_window_min": 60, | |
| "solar_cap_kW": 5.25, | |
| # --- Consumption stability (C_stable) --- | |
| "stable_window_min": 30, | |
| "stable_k": 2.0, | |
| "stable_mu_floor": 0.05, | |
| # --- Depletion velocity modifier (V_dep) --- | |
| "velocity_window_min": 60, | |
| "velocity_horizon_hr": 4.0, | |
| "velocity_min_factor": 0.5, | |
| # --- Score band thresholds --- | |
| "bands": [ | |
| {"min": 8.0, "max": 10.0, "label": "Excellent"}, | |
| {"min": 6.0, "max": 8.0, "label": "Good"}, | |
| {"min": 4.0, "max": 6.0, "label": "Fair"}, | |
| {"min": 2.0, "max": 4.0, "label": "Poor"}, | |
| {"min": 0.0, "max": 2.0, "label": "Critical"}, | |
| ], | |
| } | |
| LOAD_COLS = [ | |
| "HVAC_Flow_kW", "Lighting_Flow_kW", "Devices_Flow_kW", "Fridge_Flow_kW", | |
| "WaterPump_Flow_kW", "Cooking_Flow_kW", "Inverter_Flow_kW", "Unmetered_Flow_kW", | |
| ] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # HELPERS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def clamp(val, lo, hi): | |
| return max(lo, min(hi, val)) | |
| def band_label(si, bands): | |
| for b in bands: | |
| if b["min"] <= si <= b["max"]: | |
| return b["label"] | |
| return "Critical" | |
| def load_config(path): | |
| cfg = dict(DEFAULT_CONFIG) | |
| if path: | |
| with open(path) as f: | |
| cfg.update(json.load(f)) | |
| total_w = cfg["w_bat"] + cfg["w_fresh"] + cfg["w_waste"] + cfg["w_energy"] + cfg["w_stable"] | |
| if abs(total_w - 1.0) > 1e-6: | |
| raise ValueError(f"Component weights must sum to 1.0, got {total_w:.6f}") | |
| return cfg | |
| def load_csv(path): | |
| with open(path, newline="") as f: | |
| return list(csv.DictReader(f)) | |
| def merge_rows(power_rows, water_rows): | |
| """Inner join power + water rows on Time. Both files must be from same 1MIN export.""" | |
| if len(power_rows) == len(water_rows): | |
| merged = [] | |
| for i, (p, w) in enumerate(zip(power_rows, water_rows)): | |
| if p["Time"] != w["Time"]: | |
| raise ValueError( | |
| f"Timestamp mismatch at row {i}: power={p['Time']} water={w['Time']}" | |
| ) | |
| merged.append({**p, **w}) | |
| return merged | |
| # Fallback: join by timestamp key | |
| water_idx = {r["Time"]: r for r in water_rows} | |
| return [{**p, **water_idx[p["Time"]]} for p in power_rows if p["Time"] in water_idx] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # COMPONENT SCORE FUNCTIONS | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def c_bat(bat_pct, crit, warn): | |
| """ | |
| Piecewise linear Battery SOC score β [0, 1]. | |
| Zones (configurable via crit/warn thresholds): | |
| [0, crit) β [0.00, 0.30) steep penalty | |
| [crit, warn) β [0.30, 0.70) moderate penalty | |
| [warn, 100] β [0.70, 1.00] comfort zone | |
| """ | |
| p = clamp(bat_pct, 0.0, 100.0) / 100.0 | |
| c = crit / 100.0 | |
| w = warn / 100.0 | |
| if p < c: | |
| return 0.00 + 0.30 * (p / c) | |
| elif p < w: | |
| return 0.30 + 0.40 * ((p - c) / (w - c)) | |
| else: | |
| return 0.70 + 0.30 * ((p - w) / (1.0 - w)) | |
| def c_fresh(fresh_L, cap_L, crit_pct, warn_pct): | |
| """ | |
| Piecewise linear Fresh Water score β identical formula to c_bat, | |
| applied to fill percentage. β [0, 1]. | |
| """ | |
| pct = clamp(fresh_L / cap_L * 100.0, 0.0, 100.0) | |
| return c_bat(pct, crit_pct, warn_pct) | |
| def headroom_score(fill, penalty_start): | |
| """ | |
| Single-tank waste headroom score. | |
| fill : fraction of tank capacity used [0, 1] | |
| penalty_start : fill fraction above which extra penalty applies | |
| Linear headroom below penalty_start; steeply penalised above. | |
| Returns 1.0 (empty) β 0.0 (overflow). | |
| """ | |
| fill = clamp(fill, 0.0, 1.0) | |
| headroom = 1.0 - fill | |
| if fill <= penalty_start: | |
| return headroom | |
| excess = fill - penalty_start | |
| range_above = 1.0 - penalty_start | |
| factor = 1.0 - 3.0 * excess / range_above | |
| return clamp(headroom * factor, 0.0, 1.0) | |
| def c_waste(grey_L, black_L, grey_cap, black_cap, | |
| grey_penalty, black_penalty, grey_w, black_w): | |
| """ | |
| Composite waste headroom = grey_w Γ C_grey + black_w Γ C_black. | |
| Returns (C_waste, C_grey, C_black). | |
| """ | |
| cg = headroom_score(grey_L / grey_cap, grey_penalty) | |
| cb = headroom_score(black_L / black_cap, black_penalty) | |
| return grey_w * cg + black_w * cb, cg, cb | |
| def c_energy(gen_window, load_window, solar_cap_kw): | |
| """ | |
| Rolling-window energy balance. | |
| net_norm = clamp((mean_gen - mean_load) / solar_cap, -1, +1) | |
| C_energy = 0.5 + 0.5 Γ net_norm | |
| """ | |
| if not gen_window: | |
| return 0.5 | |
| gen_mean = sum(gen_window) / len(gen_window) | |
| load_mean = sum(load_window) / len(load_window) | |
| net_norm = clamp((gen_mean - load_mean) / max(solar_cap_kw, 0.01), -1.0, 1.0) | |
| return 0.5 + 0.5 * net_norm | |
| def c_stable(load_window, k, mu_floor): | |
| """ | |
| Consumption stability via Coefficient of Variation. | |
| CV = Ο / max(ΞΌ, mu_floor) | |
| C_stable = 1 / (1 + k Γ CV) | |
| """ | |
| n = len(load_window) | |
| if n < 2: | |
| return 1.0 | |
| mu = sum(load_window) / n | |
| if mu < 1e-6: | |
| return 1.0 | |
| variance = sum((x - mu) ** 2 for x in load_window) / (n - 1) | |
| sigma = math.sqrt(variance) | |
| cv = sigma / max(mu, mu_floor) | |
| return 1.0 / (1.0 + k * cv) | |
| def v_dep(bat_window, fresh_window, | |
| bat_pct_now, fresh_L_now, | |
| fresh_cap_L, bat_crit_pct, | |
| horizon_hr, v_floor): | |
| """ | |
| Depletion velocity modifier β (V_dep, ttc_hr). | |
| Computes velocity of battery (% / hr) and fresh water (L / hr), | |
| estimates time to critical threshold for each, takes the minimum, | |
| then maps to a [v_floor, 1.0] suppression factor over horizon_hr. | |
| """ | |
| N = len(bat_window) | |
| hrs = N / 60.0 | |
| INF = float("inf") | |
| # Battery velocity (%/hr) β negative means depleting | |
| v_bat = ((bat_pct_now - bat_window[0]) / hrs) if N >= 2 and hrs > 0 else 0.0 | |
| if v_bat < 0: | |
| gap_bat = bat_pct_now - bat_crit_pct | |
| ttc_bat = (gap_bat / max(abs(v_bat), 0.01)) if gap_bat > 0 else 0.0 | |
| else: | |
| ttc_bat = INF | |
| # Fresh water velocity (L/hr) β negative means depleting | |
| crit_fresh_L = 0.15 * fresh_cap_L | |
| v_fresh = ((fresh_L_now - fresh_window[0]) / hrs) if N >= 2 and hrs > 0 else 0.0 | |
| if v_fresh < 0: | |
| gap_fresh = fresh_L_now - crit_fresh_L | |
| ttc_fresh = (gap_fresh / max(abs(v_fresh), 0.1)) if gap_fresh > 0 else 0.0 | |
| else: | |
| ttc_fresh = INF | |
| ttc = min(ttc_bat, ttc_fresh) | |
| ttc_hr = round(ttc, 4) if ttc != INF else 9999.0 # 9999 = "no depletion risk" | |
| vd = clamp(ttc / horizon_hr, v_floor, 1.0) if ttc != INF else 1.0 | |
| return vd, ttc_hr | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN CALCULATION LOOP | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def calculate(power_path, water_path, cfg): | |
| """ | |
| Sequentially processes every 1-minute row. | |
| Maintains rolling windows for multi-row computations. | |
| Returns list of output dicts. | |
| """ | |
| rows = merge_rows(load_csv(power_path), load_csv(water_path)) | |
| N_energy = cfg["energy_window_min"] | |
| N_stable = cfg["stable_window_min"] | |
| N_velocity = cfg["velocity_window_min"] | |
| warm_up = max(N_energy, N_velocity) # longest window = warm-up horizon | |
| gen_win = deque(maxlen=N_energy) # Solar + Shore [kW] | |
| load_win_e = deque(maxlen=N_energy) # total load [kW] β energy window | |
| load_win_s = deque(maxlen=N_stable) # total load [kW] β stability window | |
| bat_win = deque(maxlen=N_velocity) # battery % β velocity window | |
| fresh_win = deque(maxlen=N_velocity) # fresh water L β velocity window | |
| results = [] | |
| for i, row in enumerate(rows): | |
| # ββ Parse ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| bat_pct = float(row["Battery_Level_Pct"]) | |
| fresh_L = float(row["FreshTank_Level_L"]) | |
| grey_L = float(row["GreyTank_Level_L"]) | |
| black_L = float(row["BlackTank_Level_L"]) | |
| solar_kw = float(row["Solar_Flow_kW"]) | |
| shore_kw = float(row["Shore_Flow_kW"]) | |
| total_load = sum(float(row[c]) for c in LOAD_COLS) | |
| # ββ Feed rolling windows βββββββββββββββββββββββββββββββββββββββββββ | |
| gen_win.append(solar_kw + shore_kw) | |
| load_win_e.append(total_load) | |
| load_win_s.append(total_load) | |
| bat_win.append(bat_pct) | |
| fresh_win.append(fresh_L) | |
| # ββ Component scores βββββββββββββββββββββββββββββββββββββββββββββββ | |
| sc_bat = c_bat(bat_pct, cfg["bat_crit_pct"], cfg["bat_warn_pct"]) | |
| sc_fresh = c_fresh( | |
| fresh_L, cfg["fresh_cap_L"], | |
| cfg["fresh_crit_pct"], cfg["fresh_warn_pct"] | |
| ) | |
| sc_waste, sc_grey, sc_black = c_waste( | |
| grey_L, black_L, | |
| cfg["grey_cap_L"], cfg["black_cap_L"], | |
| cfg["grey_penalty_start"], cfg["black_penalty_start"], | |
| cfg["grey_weight_in_waste"], cfg["black_weight_in_waste"] | |
| ) | |
| sc_energy = c_energy(gen_win, load_win_e, cfg["solar_cap_kW"]) | |
| sc_stable = c_stable(load_win_s, cfg["stable_k"], cfg["stable_mu_floor"]) | |
| # ββ Weighted sum βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| si_raw = 10.0 * ( | |
| cfg["w_bat"] * sc_bat + | |
| cfg["w_fresh"] * sc_fresh + | |
| cfg["w_waste"] * sc_waste + | |
| cfg["w_energy"] * sc_energy + | |
| cfg["w_stable"] * sc_stable | |
| ) | |
| si_raw = clamp(si_raw, 0.0, 10.0) | |
| # ββ Depletion velocity modifier ββββββββββββββββββββββββββββββββββββ | |
| vd, ttc_hr = v_dep( | |
| bat_win, fresh_win, | |
| bat_pct, fresh_L, | |
| cfg["fresh_cap_L"], cfg["bat_crit_pct"], | |
| cfg["velocity_horizon_hr"], cfg["velocity_min_factor"] | |
| ) | |
| si_final = clamp(si_raw * vd, 0.0, 10.0) | |
| results.append({ | |
| "Time": row["Time"], | |
| "SI": round(si_final, 4), | |
| "SI_raw": round(si_raw, 4), | |
| "C_bat": round(sc_bat, 4), | |
| "C_fresh": round(sc_fresh, 4), | |
| "C_waste": round(sc_waste, 4), | |
| "C_grey": round(sc_grey, 4), | |
| "C_black": round(sc_black, 4), | |
| "C_energy": round(sc_energy, 4), | |
| "C_stable": round(sc_stable, 4), | |
| "V_dep": round(vd, 4), | |
| "ttc_hr": ttc_hr, | |
| "band": band_label(si_final, cfg["bands"]), | |
| "warm_up": "true" if i < warm_up else "false", | |
| # Passthrough context columns | |
| "Battery_Level_Pct": round(bat_pct, 2), | |
| "FreshTank_Level_L": round(fresh_L, 2), | |
| "GreyTank_Level_L": round(grey_L, 2), | |
| "BlackTank_Level_L": round(black_L, 2), | |
| }) | |
| return results | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # SUMMARY REPORT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def print_summary(results, cfg): | |
| warm = [r for r in results if r["warm_up"] == "false"] or results | |
| si_vals = [r["SI"] for r in warm] | |
| si_mean = statistics.mean(si_vals) | |
| si_min = min(si_vals) | |
| si_max = max(si_vals) | |
| si_std = statistics.stdev(si_vals) if len(si_vals) > 1 else 0.0 | |
| band_counts = {} | |
| for r in warm: | |
| band_counts[r["band"]] = band_counts.get(r["band"], 0) + 1 | |
| worst = min(warm, key=lambda r: r["SI"]) | |
| best = max(warm, key=lambda r: r["SI"]) | |
| def cmean(key): | |
| return round(statistics.mean(r[key] for r in warm), 3) | |
| W = 62 | |
| sep = "β" * W | |
| print(f"\n{'β' * W}") | |
| print(f" STABILITY INDEX β SUMMARY REPORT") | |
| print(f" Period : {results[0]['Time']} β {results[-1]['Time']}") | |
| print(f" Rows : {len(results):,} | Warm-up excluded: {len(results)-len(warm):,}") | |
| print(f"{'β' * W}") | |
| print(f"\n OVERALL SI") | |
| print(f" {sep}") | |
| print(f" Mean : {si_mean:6.3f} ({band_label(si_mean, cfg['bands'])})") | |
| print(f" Min : {si_min:6.3f} @ {worst['Time']} [{worst['band']}]") | |
| print(f" Max : {si_max:6.3f} @ {best['Time']} [{best['band']}]") | |
| print(f" Std : {si_std:6.3f}") | |
| print(f"\n BAND DISTRIBUTION") | |
| print(f" {sep}") | |
| total = len(warm) | |
| for b in ["Excellent", "Good", "Fair", "Poor", "Critical"]: | |
| n = band_counts.get(b, 0) | |
| pct = n / total * 100 if total else 0 | |
| bar = "β" * int(pct / 2) | |
| print(f" {b:<12} {n:>5} min ({pct:5.1f}%) {bar}") | |
| print(f"\n COMPONENT AVERAGES (0β1, higher = more stable)") | |
| print(f" {sep}") | |
| rows = [ | |
| ("C_bat", "Battery SOC ", cfg["w_bat"]), | |
| ("C_fresh", "Fresh Water ", cfg["w_fresh"]), | |
| ("C_waste", "Waste Headroom ", cfg["w_waste"]), | |
| ("C_grey", " β³ Grey tank ", cfg["grey_weight_in_waste"] * cfg["w_waste"]), | |
| ("C_black", " β³ Black tank ", cfg["black_weight_in_waste"] * cfg["w_waste"]), | |
| ("C_energy", "Energy Balance ", cfg["w_energy"]), | |
| ("C_stable", "Cons. Stability", cfg["w_stable"]), | |
| ("V_dep", "Depl. Velocity ", None), | |
| ] | |
| for key, label, weight in rows: | |
| val = cmean(key) | |
| bar = "β" * int(val * 20) | |
| wstr = f"w={weight:.2f}" if weight is not None else "modifier" | |
| print(f" {label} {val:.3f} {bar:<22} {wstr}") | |
| # Identify consecutive critical/poor runs | |
| crit = [r for r in warm if r["band"] in ("Critical", "Poor")] | |
| if crit: | |
| # Build index for consecutive-run detection | |
| idx_map = {r["Time"]: i for i, r in enumerate(results)} | |
| runs, run_start, prev_i = [], None, None | |
| for r in crit: | |
| ri = idx_map[r["Time"]] | |
| if prev_i is None or ri != prev_i + 1: | |
| if run_start is not None: | |
| runs.append((run_start, results[prev_i])) | |
| run_start = r | |
| prev_i = ri | |
| if run_start: | |
| runs.append((run_start, results[prev_i])) | |
| print(f"\n β POOR / CRITICAL EVENTS ({len(crit)} minutes across {len(runs)} run(s))") | |
| print(f" {sep}") | |
| for start, end in runs[:10]: | |
| dur = idx_map[end["Time"]] - idx_map[start["Time"]] + 1 | |
| print(f" {start['Time']} SI={start['SI']:.2f} " | |
| f"[{start['band']}] duration={dur} min " | |
| f"bat={start['Battery_Level_Pct']}% " | |
| f"fresh={start['FreshTank_Level_L']:.0f}L") | |
| if len(runs) > 10: | |
| print(f" β¦ and {len(runs)-10} more run(s)") | |
| else: | |
| print(f"\n β No Critical or Poor events detected.") | |
| print(f"\n{'β' * W}\n") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # CSV WRITER | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def write_csv(path, rows): | |
| if not rows: | |
| print(" No rows to write.") | |
| return | |
| Path(path).parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "w", newline="") as f: | |
| w = csv.DictWriter(f, fieldnames=rows[0].keys()) | |
| w.writeheader() | |
| w.writerows(rows) | |
| print(f" Wrote {len(rows):,} rows β {path}") | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ENTRY POINT | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description="EV Camper Stability Index Engine", | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| python stability_index.py --summary | |
| python stability_index.py --power_dir data/power --water_dir data/water | |
| python stability_index.py --config si_config.json --out scores/si.csv --summary | |
| python stability_index.py --w_bat 0.40 --w_fresh 0.20 --w_energy 0.20 --w_waste 0.10 --w_stable 0.10 | |
| """ | |
| ) | |
| # Paths | |
| parser.add_argument("--power_dir", default="output/power", | |
| help="Directory containing 1MIN.csv (default: output/power)") | |
| parser.add_argument("--water_dir", default="output/water", | |
| help="Directory containing 1MIN.csv (default: output/water)") | |
| parser.add_argument("--out", default="output/stability_index.csv", | |
| help="Output CSV path (default: output/stability_index.csv)") | |
| parser.add_argument("--config", default=None, | |
| help="JSON config file to override any DEFAULT_CONFIG value") | |
| # Inline weight overrides | |
| parser.add_argument("--w_bat", type=float, default=None, metavar="W", | |
| help="Battery SOC weight (overrides config)") | |
| parser.add_argument("--w_fresh", type=float, default=None, metavar="W") | |
| parser.add_argument("--w_waste", type=float, default=None, metavar="W") | |
| parser.add_argument("--w_energy", type=float, default=None, metavar="W") | |
| parser.add_argument("--w_stable", type=float, default=None, metavar="W") | |
| # Other inline overrides | |
| parser.add_argument("--solar_cap_kW", type=float, default=None) | |
| parser.add_argument("--energy_window_min", type=int, default=None) | |
| parser.add_argument("--stable_window_min", type=int, default=None) | |
| parser.add_argument("--velocity_horizon_hr", type=float, default=None) | |
| parser.add_argument("--fresh_cap_L", type=float, default=None) | |
| parser.add_argument("--grey_cap_L", type=float, default=None) | |
| parser.add_argument("--black_cap_L", type=float, default=None) | |
| parser.add_argument("--summary", action="store_true", | |
| help="Print a human-readable summary report to stdout") | |
| args = parser.parse_args() | |
| # Build config with optional file + CLI overrides | |
| cfg = load_config(args.config) | |
| for key in ["w_bat", "w_fresh", "w_waste", "w_energy", "w_stable", | |
| "solar_cap_kW", "energy_window_min", "stable_window_min", | |
| "velocity_horizon_hr", "fresh_cap_L", "grey_cap_L", "black_cap_L"]: | |
| val = getattr(args, key, None) | |
| if val is not None: | |
| cfg[key] = val | |
| total_w = cfg["w_bat"] + cfg["w_fresh"] + cfg["w_waste"] + cfg["w_energy"] + cfg["w_stable"] | |
| if abs(total_w - 1.0) > 1e-4: | |
| raise SystemExit( | |
| f"ERROR: weights must sum to 1.0 (got {total_w:.4f})\n" | |
| f" bat={cfg['w_bat']} fresh={cfg['w_fresh']} waste={cfg['w_waste']} " | |
| f"energy={cfg['w_energy']} stable={cfg['w_stable']}" | |
| ) | |
| power_path = str(Path(args.power_dir) / "1MIN.csv") | |
| water_path = str(Path(args.water_dir) / "1MIN.csv") | |
| print(f"\n{'='*62}") | |
| print(f" Stability Index Engine v1.0") | |
| print(f"{'='*62}") | |
| print(f" Power : {power_path}") | |
| print(f" Water : {water_path}") | |
| print(f" Weights: bat={cfg['w_bat']} fresh={cfg['w_fresh']} " | |
| f"waste={cfg['w_waste']} energy={cfg['w_energy']} stable={cfg['w_stable']}") | |
| print(f" Windows: energy={cfg['energy_window_min']}min " | |
| f"stable={cfg['stable_window_min']}min " | |
| f"velocity={cfg['velocity_window_min']}min") | |
| print() | |
| print(" Calculating...") | |
| results = calculate(power_path, water_path, cfg) | |
| write_csv(args.out, results) | |
| if args.summary: | |
| print_summary(results, cfg) | |
| print(f" Done.\n") | |
| if __name__ == "__main__": | |
| main() | |