| """ |
| Energy balance engine — the core physics of the microgrid. |
| |
| KEY DESIGN (per Gemini review): |
| - Agent controls: battery dispatch, diesel, demand shedding |
| - Grid is the SLACK variable (absorbs residual, capped at ±200 kW) |
| - VoLL penalty (Rs 150/kWh) replaces hard reliability gate |
| - Battery degradation cost (Rs 2.5/kWh throughput) |
| - Diesel startup cost (Rs 100 if was off last step) |
| - Demand shedding rebound (100% of shed kWh deferred to next hour) |
| """ |
|
|
| from __future__ import annotations |
|
|
| from dataclasses import dataclass |
|
|
| import numpy as np |
|
|
|
|
| |
|
|
| BATTERY_CAPACITY_KWH = 500.0 |
| BATTERY_MAX_POWER_KW = 100.0 |
| BATTERY_EFFICIENCY = 0.90 |
| BATTERY_CHARGE_EFF = 0.949 |
| BATTERY_DISCHARGE_EFF = 0.949 |
| BATTERY_DEGRADATION_RS = 2.5 |
| GRID_MAX_KW = 200.0 |
| DIESEL_MAX_KW = 100.0 |
| DIESEL_COST_PER_KWH = 25.0 |
| DIESEL_STARTUP_COST = 100.0 |
| DEMAND_SHED_MAX_FRAC = 0.20 |
| SHED_REBOUND_FRAC = 1.00 |
| DIESEL_TANK_KWH = 2400.0 |
| VOLL = 150.0 |
| DT = 1.0 |
|
|
|
|
| @dataclass |
| class MicrogridState: |
| """Mutable internal state of the microgrid.""" |
|
|
| hour: int = 0 |
| battery_soc_kwh: float = 250.0 |
| diesel_fuel_kwh: float = 2400.0 |
| diesel_was_on: bool = False |
| shed_rebound_kwh: float = 0.0 |
| cumulative_cost: float = 0.0 |
| cumulative_blackout_kwh: float = 0.0 |
| cumulative_diesel_kwh: float = 0.0 |
| cumulative_battery_throughput_kwh: float = 0.0 |
| total_demand_kwh: float = 0.0 |
|
|
| |
| last_blackout_kwh: float = 0.0 |
| last_cost: float = 0.0 |
| last_reward: float = 0.0 |
| last_grid_kw: float = 0.0 |
|
|
|
|
| @dataclass |
| class StepFlows: |
| """Detailed energy flows for one step (all in kW).""" |
|
|
| |
| solar_kw: float = 0.0 |
| grid_import_kw: float = 0.0 |
| battery_discharge_kw: float = 0.0 |
| diesel_kw: float = 0.0 |
|
|
| |
| effective_demand_kw: float = 0.0 |
| grid_export_kw: float = 0.0 |
| battery_charge_kw: float = 0.0 |
| blackout_kw: float = 0.0 |
| curtailed_kw: float = 0.0 |
|
|
| |
| total_supply_kw: float = 0.0 |
| total_consumption_kw: float = 0.0 |
| shed_kw: float = 0.0 |
| rebound_kw: float = 0.0 |
|
|
|
|
| @dataclass |
| class StepResult: |
| """What physics.step() returns to the environment.""" |
|
|
| state: MicrogridState |
| reward: float |
| done: bool |
| narration: str |
| flows: StepFlows = None |
|
|
|
|
| def step( |
| state: MicrogridState, |
| battery_dispatch_norm: float, |
| diesel_norm: float, |
| shed_norm: float, |
| solar_kw: float, |
| demand_kw: float, |
| grid_price: float, |
| diesel_fuel_cap: float = DIESEL_TANK_KWH, |
| grid_available: bool = True, |
| ) -> StepResult: |
| """ |
| Advance the microgrid by one hour. |
| |
| Actions (agent controls): |
| battery_dispatch_norm: -1 (charge 100kW) to +1 (discharge 100kW) |
| diesel_norm: 0 (off) to 1 (100kW) |
| shed_norm: 0 (none) to 1 (shed 20%) |
| |
| Grid is the SLACK — absorbs residual up to ±200 kW. |
| """ |
| |
| battery_cmd_kw = float(np.clip(battery_dispatch_norm, -1, 1)) * BATTERY_MAX_POWER_KW |
| diesel_kw = float(np.clip(diesel_norm, 0, 1)) * DIESEL_MAX_KW |
| shed_frac = float(np.clip(shed_norm, 0, 1)) * DEMAND_SHED_MAX_FRAC |
|
|
| |
| actual_demand = demand_kw + state.shed_rebound_kwh / DT |
| effective_demand = actual_demand * (1.0 - shed_frac) |
| shed_kwh = actual_demand * shed_frac * DT |
| state.shed_rebound_kwh = shed_kwh * SHED_REBOUND_FRAC |
|
|
| |
| available_diesel_kwh = state.diesel_fuel_kwh |
| diesel_kw = min(diesel_kw, available_diesel_kwh / DT) |
| diesel_kw = max(0.0, diesel_kw) |
|
|
| |
| if battery_cmd_kw > 0: |
| |
| max_discharge = min(battery_cmd_kw, state.battery_soc_kwh / DT) |
| battery_kw = max(0.0, max_discharge) |
| delivered_kw = battery_kw * BATTERY_DISCHARGE_EFF |
| state.battery_soc_kwh -= battery_kw * DT |
| else: |
| |
| charge_cmd = abs(battery_cmd_kw) |
| headroom = (BATTERY_CAPACITY_KWH - state.battery_soc_kwh) / BATTERY_CHARGE_EFF |
| max_charge = min(charge_cmd, headroom / DT) |
| battery_kw = -max(0.0, max_charge) |
| delivered_kw = battery_kw |
| state.battery_soc_kwh += abs(battery_kw) * BATTERY_CHARGE_EFF * DT |
|
|
| state.battery_soc_kwh = float(np.clip(state.battery_soc_kwh, 0, BATTERY_CAPACITY_KWH)) |
| battery_throughput = abs(battery_kw) * DT |
| state.cumulative_battery_throughput_kwh += battery_throughput |
|
|
| |
| |
| |
| grid_cap = GRID_MAX_KW if grid_available else 0.0 |
| residual = effective_demand - solar_kw - delivered_kw - diesel_kw |
| grid_kw = float(np.clip(residual, -grid_cap, grid_cap)) |
|
|
| |
| blackout_kwh = 0.0 |
| curtailed_kw = 0.0 |
| if residual > grid_cap: |
| blackout_kwh = (residual - grid_cap) * DT |
| elif residual < -grid_cap: |
| curtailed_kw = abs(residual) - grid_cap |
|
|
| |
| grid_import = max(0.0, grid_kw) |
| grid_export = max(0.0, -grid_kw) |
| batt_discharge = max(0.0, delivered_kw) |
| batt_charge = max(0.0, -delivered_kw) |
|
|
| flows = StepFlows( |
| solar_kw=solar_kw, |
| grid_import_kw=grid_import, |
| battery_discharge_kw=batt_discharge, |
| diesel_kw=diesel_kw, |
| effective_demand_kw=effective_demand, |
| grid_export_kw=grid_export, |
| battery_charge_kw=batt_charge, |
| blackout_kw=blackout_kwh / DT, |
| curtailed_kw=curtailed_kw, |
| total_supply_kw=solar_kw + grid_import + batt_discharge + diesel_kw, |
| total_consumption_kw=effective_demand + grid_export + batt_charge, |
| shed_kw=actual_demand * shed_frac, |
| rebound_kw=state.shed_rebound_kwh / SHED_REBOUND_FRAC if shed_frac == 0 else 0, |
| ) |
|
|
| |
| step_cost = 0.0 |
|
|
| |
| if grid_kw > 0: |
| step_cost += grid_price * grid_kw * DT |
| else: |
| step_cost -= grid_price * abs(grid_kw) * DT |
|
|
| |
| step_cost += DIESEL_COST_PER_KWH * diesel_kw * DT |
|
|
| |
| if diesel_kw > 0 and not state.diesel_was_on: |
| step_cost += DIESEL_STARTUP_COST |
| state.diesel_was_on = (diesel_kw > 0) |
|
|
| |
| step_cost += BATTERY_DEGRADATION_RS * battery_throughput |
|
|
| |
| step_cost += VOLL * blackout_kwh |
|
|
| |
| |
| step_cost += 40.0 * shed_kwh |
|
|
| |
| state.diesel_fuel_kwh -= diesel_kw * DT |
| state.diesel_fuel_kwh = max(0.0, state.diesel_fuel_kwh) |
|
|
| |
| state.cumulative_cost += step_cost |
| state.cumulative_blackout_kwh += blackout_kwh |
| state.cumulative_diesel_kwh += diesel_kw * DT |
| state.total_demand_kwh += effective_demand * DT |
| state.last_blackout_kwh = blackout_kwh |
| state.last_cost = step_cost |
| state.last_grid_kw = grid_kw |
|
|
| |
| |
| |
| cost_signal = -step_cost / 500.0 |
| reliability_signal = -2.0 * (blackout_kwh / max(effective_demand * DT, 1.0)) |
| green_signal = -0.5 * (diesel_kw / DIESEL_MAX_KW) if diesel_kw > 0 else 0.0 |
| reward = 0.50 * cost_signal + 0.25 * reliability_signal + 0.25 * green_signal |
| state.last_reward = reward |
|
|
| |
| state.hour += 1 |
| done = state.hour >= 72 |
|
|
| |
| narration = _narrate(state, solar_kw, actual_demand, grid_price, blackout_kwh, |
| diesel_kw, shed_frac, grid_kw, delivered_kw, grid_available) |
|
|
| return StepResult(state=state, reward=reward, done=done, narration=narration, flows=flows) |
|
|
|
|
| def _narrate( |
| s: MicrogridState, |
| solar: float, |
| demand: float, |
| price: float, |
| blackout: float, |
| diesel: float, |
| shed: float, |
| grid_kw: float, |
| battery_kw: float, |
| grid_available: bool = True, |
| ) -> str: |
| """Generate a short human-readable situation summary.""" |
| START_HOUR = 6 |
| clock = (s.hour - 1) + START_HOUR |
| hour_of_day = clock % 24 |
| day = clock // 24 + 1 |
| soc_pct = s.battery_soc_kwh / BATTERY_CAPACITY_KWH * 100 |
|
|
| parts = [f"Day {day}, {hour_of_day:02d}:00."] |
|
|
| if not grid_available: |
| parts.append("GRID OUTAGE — islanding mode! No grid import/export.") |
|
|
| if blackout > 0: |
| parts.append(f"BLACKOUT: {blackout:.0f} kWh unmet!") |
| elif demand > 200: |
| parts.append("Peak demand period.") |
| elif solar > 150: |
| parts.append("Strong solar generation.") |
| elif hour_of_day >= 18: |
| parts.append("Evening approaching — solar fading.") |
| elif hour_of_day < 6: |
| parts.append("Night — low demand, no solar.") |
|
|
| if grid_kw > 150: |
| parts.append(f"Grid import near limit ({grid_kw:.0f}/{GRID_MAX_KW:.0f} kW).") |
| elif grid_kw < -50: |
| parts.append(f"Exporting {abs(grid_kw):.0f} kW to grid at Rs {price:.1f}.") |
|
|
| if price > 12: |
| parts.append(f"Grid price high (Rs {price:.1f}/kWh).") |
| elif price < 5: |
| parts.append(f"Grid price low (Rs {price:.1f}/kWh).") |
|
|
| if soc_pct < 20: |
| parts.append(f"Battery low ({soc_pct:.0f}%).") |
| elif soc_pct > 80: |
| parts.append(f"Battery well-charged ({soc_pct:.0f}%).") |
|
|
| if battery_kw > 10: |
| parts.append(f"Battery discharging {battery_kw:.0f} kW.") |
| elif battery_kw < -10: |
| parts.append(f"Battery charging {abs(battery_kw):.0f} kW.") |
|
|
| if diesel > 0: |
| fuel_pct = s.diesel_fuel_kwh / DIESEL_TANK_KWH * 100 |
| parts.append(f"Diesel running ({fuel_pct:.0f}% fuel left).") |
|
|
| if shed > 0: |
| parts.append(f"Demand response active ({shed * 100:.0f}% shed).") |
| if s.shed_rebound_kwh > 1: |
| parts.append(f"Rebound: +{s.shed_rebound_kwh:.0f} kW next hour.") |
|
|
| return " ".join(parts) |
|
|