77ethers's picture
Release GridOps demo Space
7d7b92e verified
"""
Energy balance engine — the core physics of the microgrid.
KEY DESIGN (per Gemini review):
- Agent controls: battery dispatch, diesel, demand shedding
- Grid is the SLACK variable (absorbs residual, capped at ±200 kW)
- VoLL penalty (Rs 150/kWh) replaces hard reliability gate
- Battery degradation cost (Rs 2.5/kWh throughput)
- Diesel startup cost (Rs 100 if was off last step)
- Demand shedding rebound (100% of shed kWh deferred to next hour)
"""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
# ── Constants ────────────────────────────────────────────────────────────
BATTERY_CAPACITY_KWH = 500.0
BATTERY_MAX_POWER_KW = 100.0
BATTERY_EFFICIENCY = 0.90 # round-trip (applied as √0.9 each way)
BATTERY_CHARGE_EFF = 0.949 # √0.90 ≈ 0.949
BATTERY_DISCHARGE_EFF = 0.949 # agent gets 94.9% of what battery releases
BATTERY_DEGRADATION_RS = 2.5 # Rs per kWh of throughput (charge or discharge)
GRID_MAX_KW = 200.0
DIESEL_MAX_KW = 100.0
DIESEL_COST_PER_KWH = 25.0
DIESEL_STARTUP_COST = 100.0 # Rs, one-time when turning on from off
DEMAND_SHED_MAX_FRAC = 0.20
SHED_REBOUND_FRAC = 1.00 # 100% of shed energy rebounds next hour (deferred, not destroyed)
DIESEL_TANK_KWH = 2400.0 # total fuel capacity
VOLL = 150.0 # Value of Lost Load (Rs/kWh)
DT = 1.0 # 1 hour per step
@dataclass
class MicrogridState:
"""Mutable internal state of the microgrid."""
hour: int = 0
battery_soc_kwh: float = 250.0 # start half-charged
diesel_fuel_kwh: float = 2400.0
diesel_was_on: bool = False # for startup cost
shed_rebound_kwh: float = 0.0 # deferred load from previous shedding
cumulative_cost: float = 0.0
cumulative_blackout_kwh: float = 0.0
cumulative_diesel_kwh: float = 0.0
cumulative_battery_throughput_kwh: float = 0.0
total_demand_kwh: float = 0.0
# Per-step bookkeeping
last_blackout_kwh: float = 0.0
last_cost: float = 0.0
last_reward: float = 0.0
last_grid_kw: float = 0.0
@dataclass
class StepFlows:
"""Detailed energy flows for one step (all in kW)."""
# Supply side (positive = providing power to the bus)
solar_kw: float = 0.0
grid_import_kw: float = 0.0 # grid importing INTO community
battery_discharge_kw: float = 0.0 # power delivered from battery (after efficiency loss)
diesel_kw: float = 0.0
# Demand side (positive = consuming power from the bus)
effective_demand_kw: float = 0.0 # demand after shedding + rebound
grid_export_kw: float = 0.0 # surplus exported to grid
battery_charge_kw: float = 0.0 # power consumed to charge battery (before efficiency)
blackout_kw: float = 0.0 # unmet demand
curtailed_kw: float = 0.0 # excess supply that goes nowhere
# Derived
total_supply_kw: float = 0.0
total_consumption_kw: float = 0.0
shed_kw: float = 0.0 # how much was shed
rebound_kw: float = 0.0 # how much rebounded from last step
@dataclass
class StepResult:
"""What physics.step() returns to the environment."""
state: MicrogridState
reward: float
done: bool
narration: str
flows: StepFlows = None
def step(
state: MicrogridState,
battery_dispatch_norm: float,
diesel_norm: float,
shed_norm: float,
solar_kw: float,
demand_kw: float,
grid_price: float,
diesel_fuel_cap: float = DIESEL_TANK_KWH,
grid_available: bool = True,
) -> StepResult:
"""
Advance the microgrid by one hour.
Actions (agent controls):
battery_dispatch_norm: -1 (charge 100kW) to +1 (discharge 100kW)
diesel_norm: 0 (off) to 1 (100kW)
shed_norm: 0 (none) to 1 (shed 20%)
Grid is the SLACK — absorbs residual up to ±200 kW.
"""
# ── Scale actions ────────────────────────────────────────────────
battery_cmd_kw = float(np.clip(battery_dispatch_norm, -1, 1)) * BATTERY_MAX_POWER_KW
diesel_kw = float(np.clip(diesel_norm, 0, 1)) * DIESEL_MAX_KW
shed_frac = float(np.clip(shed_norm, 0, 1)) * DEMAND_SHED_MAX_FRAC
# ── Demand (with shedding rebound from last step) ────────────────
actual_demand = demand_kw + state.shed_rebound_kwh / DT # rebound is kWh, convert to kW
effective_demand = actual_demand * (1.0 - shed_frac)
shed_kwh = actual_demand * shed_frac * DT
state.shed_rebound_kwh = shed_kwh * SHED_REBOUND_FRAC # 100% rebounds next hour
# ── Diesel fuel constraint ───────────────────────────────────────
available_diesel_kwh = state.diesel_fuel_kwh
diesel_kw = min(diesel_kw, available_diesel_kwh / DT)
diesel_kw = max(0.0, diesel_kw)
# ── Battery physics ──────────────────────────────────────────────
if battery_cmd_kw > 0:
# Discharge: agent wants power FROM battery
max_discharge = min(battery_cmd_kw, state.battery_soc_kwh / DT)
battery_kw = max(0.0, max_discharge)
delivered_kw = battery_kw * BATTERY_DISCHARGE_EFF
state.battery_soc_kwh -= battery_kw * DT
else:
# Charge: agent wants to push power INTO battery
charge_cmd = abs(battery_cmd_kw)
headroom = (BATTERY_CAPACITY_KWH - state.battery_soc_kwh) / BATTERY_CHARGE_EFF
max_charge = min(charge_cmd, headroom / DT)
battery_kw = -max(0.0, max_charge) # negative = charging
delivered_kw = battery_kw # charging consumes power (negative delivery)
state.battery_soc_kwh += abs(battery_kw) * BATTERY_CHARGE_EFF * DT
state.battery_soc_kwh = float(np.clip(state.battery_soc_kwh, 0, BATTERY_CAPACITY_KWH))
battery_throughput = abs(battery_kw) * DT
state.cumulative_battery_throughput_kwh += battery_throughput
# ── Grid as slack variable ───────────────────────────────────────
# residual = what the community still needs after solar + battery + diesel
# positive → grid must import; negative → surplus exported
grid_cap = GRID_MAX_KW if grid_available else 0.0
residual = effective_demand - solar_kw - delivered_kw - diesel_kw
grid_kw = float(np.clip(residual, -grid_cap, grid_cap))
# ── Blackout / curtailment detection ─────────────────────────────
blackout_kwh = 0.0
curtailed_kw = 0.0
if residual > grid_cap:
blackout_kwh = (residual - grid_cap) * DT
elif residual < -grid_cap:
curtailed_kw = abs(residual) - grid_cap # excess that can't be exported
# ── Build flow snapshot ──────────────────────────────────────────
grid_import = max(0.0, grid_kw)
grid_export = max(0.0, -grid_kw)
batt_discharge = max(0.0, delivered_kw)
batt_charge = max(0.0, -delivered_kw) # power drawn from bus to charge
flows = StepFlows(
solar_kw=solar_kw,
grid_import_kw=grid_import,
battery_discharge_kw=batt_discharge,
diesel_kw=diesel_kw,
effective_demand_kw=effective_demand,
grid_export_kw=grid_export,
battery_charge_kw=batt_charge,
blackout_kw=blackout_kwh / DT,
curtailed_kw=curtailed_kw,
total_supply_kw=solar_kw + grid_import + batt_discharge + diesel_kw,
total_consumption_kw=effective_demand + grid_export + batt_charge,
shed_kw=actual_demand * shed_frac,
rebound_kw=state.shed_rebound_kwh / SHED_REBOUND_FRAC if shed_frac == 0 else 0,
)
# ── Cost accounting ──────────────────────────────────────────────
step_cost = 0.0
# Grid cost (import costs money, export earns revenue)
if grid_kw > 0:
step_cost += grid_price * grid_kw * DT
else:
step_cost -= grid_price * abs(grid_kw) * DT # revenue
# Diesel cost
step_cost += DIESEL_COST_PER_KWH * diesel_kw * DT
# Diesel startup cost
if diesel_kw > 0 and not state.diesel_was_on:
step_cost += DIESEL_STARTUP_COST
state.diesel_was_on = (diesel_kw > 0)
# Battery degradation cost
step_cost += BATTERY_DEGRADATION_RS * battery_throughput
# VoLL penalty (replaces hard reliability gate)
step_cost += VOLL * blackout_kwh
# Shedding penalty (comfort + political cost — Rs 40/kWh shed)
# More expensive than diesel (Rs 25), so only used as true emergency
step_cost += 40.0 * shed_kwh
# ── Fuel accounting ──────────────────────────────────────────────
state.diesel_fuel_kwh -= diesel_kw * DT
state.diesel_fuel_kwh = max(0.0, state.diesel_fuel_kwh)
# ── Cumulative tracking ──────────────────────────────────────────
state.cumulative_cost += step_cost
state.cumulative_blackout_kwh += blackout_kwh
state.cumulative_diesel_kwh += diesel_kw * DT
state.total_demand_kwh += effective_demand * DT
state.last_blackout_kwh = blackout_kwh
state.last_cost = step_cost
state.last_grid_kw = grid_kw
# ── Per-step reward (aligned with episode grader weights) ────────
# Grader = 50% cost_efficiency + 25% reliability + 25% green_score
# Step reward mirrors these proportions for consistent learning signal.
cost_signal = -step_cost / 500.0
reliability_signal = -2.0 * (blackout_kwh / max(effective_demand * DT, 1.0))
green_signal = -0.5 * (diesel_kw / DIESEL_MAX_KW) if diesel_kw > 0 else 0.0
reward = 0.50 * cost_signal + 0.25 * reliability_signal + 0.25 * green_signal
state.last_reward = reward
# ── Advance clock ────────────────────────────────────────────────
state.hour += 1
done = state.hour >= 72
# ── Narration ────────────────────────────────────────────────────
narration = _narrate(state, solar_kw, actual_demand, grid_price, blackout_kwh,
diesel_kw, shed_frac, grid_kw, delivered_kw, grid_available)
return StepResult(state=state, reward=reward, done=done, narration=narration, flows=flows)
def _narrate(
s: MicrogridState,
solar: float,
demand: float,
price: float,
blackout: float,
diesel: float,
shed: float,
grid_kw: float,
battery_kw: float,
grid_available: bool = True,
) -> str:
"""Generate a short human-readable situation summary."""
START_HOUR = 6
clock = (s.hour - 1) + START_HOUR # absolute hour since midnight Day 1
hour_of_day = clock % 24
day = clock // 24 + 1
soc_pct = s.battery_soc_kwh / BATTERY_CAPACITY_KWH * 100
parts = [f"Day {day}, {hour_of_day:02d}:00."]
if not grid_available:
parts.append("GRID OUTAGE — islanding mode! No grid import/export.")
if blackout > 0:
parts.append(f"BLACKOUT: {blackout:.0f} kWh unmet!")
elif demand > 200:
parts.append("Peak demand period.")
elif solar > 150:
parts.append("Strong solar generation.")
elif hour_of_day >= 18:
parts.append("Evening approaching — solar fading.")
elif hour_of_day < 6:
parts.append("Night — low demand, no solar.")
if grid_kw > 150:
parts.append(f"Grid import near limit ({grid_kw:.0f}/{GRID_MAX_KW:.0f} kW).")
elif grid_kw < -50:
parts.append(f"Exporting {abs(grid_kw):.0f} kW to grid at Rs {price:.1f}.")
if price > 12:
parts.append(f"Grid price high (Rs {price:.1f}/kWh).")
elif price < 5:
parts.append(f"Grid price low (Rs {price:.1f}/kWh).")
if soc_pct < 20:
parts.append(f"Battery low ({soc_pct:.0f}%).")
elif soc_pct > 80:
parts.append(f"Battery well-charged ({soc_pct:.0f}%).")
if battery_kw > 10:
parts.append(f"Battery discharging {battery_kw:.0f} kW.")
elif battery_kw < -10:
parts.append(f"Battery charging {abs(battery_kw):.0f} kW.")
if diesel > 0:
fuel_pct = s.diesel_fuel_kwh / DIESEL_TANK_KWH * 100
parts.append(f"Diesel running ({fuel_pct:.0f}% fuel left).")
if shed > 0:
parts.append(f"Demand response active ({shed * 100:.0f}% shed).")
if s.shed_rebound_kwh > 1:
parts.append(f"Rebound: +{s.shed_rebound_kwh:.0f} kW next hour.")
return " ".join(parts)