Smart-Grid-System / smart_grid_env.py
chinmay0805's picture
inital commit
06f9287
import gymnasium as gym
import numpy as np
from gymnasium import spaces
class SmartGridEnv(gym.Env):
"""
Smart Grid Battery Management Environment (Gymnasium-compatible)
Goal: Minimize daily electricity cost by intelligently charging/discharging
a home battery, using solar generation, and interacting with the grid.
Observation (4 values, all normalized to [0, 1]):
[battery_soc, solar_gen_kw, house_demand_kw, grid_price]
Action space (Discrete 3):
0 = Hold — do nothing beyond covering net load from grid/solar
1 = Charge — buy from grid to fill battery (10 kW rate)
2 = Discharge — draw from battery to cover load (10 kW rate)
Reward:
Negative net cost per step (agent learns to minimize cost).
Includes sell-back revenue when solar surplus is fed to grid.
Includes a small battery health penalty for extreme SoC operation.
Fixes vs. original:
- Observation space bounds match actual value ranges
- Solar/demand energy balance applied in all action branches
- Grid sell-back (feed-in tariff) modeled in HOLD branch
- Battery SoC clamped to [0, battery_capacity]
- Charging efficiency loss (90%) modeled
- Correlated time-series for price/solar/demand (no more i.i.d. jumps)
- Battery health penalty for operating near 0% or 100% SoC
- render() method added
- Fully compatible with check_env() and VecNormalize
"""
metadata = {"render_modes": ["human"]}
# Physical constants
BATTERY_CAPACITY = 100.0 # kWh
CHARGE_RATE = 10.0 # kW (max charge/discharge per step)
CHARGE_EFFICIENCY = 0.90 # 90% round-trip efficiency
FEED_IN_TARIFF = 0.50 # sell surplus solar at 50% of grid price
SOC_PENALTY_COEF = 0.005 # small penalty for extreme battery levels
# Observation high limits (battery 0-100 %, solar 0-10 kW,
# demand 0-10 kW, price 0-1.5 $/kWh to cover double-peak)
OBS_HIGH = np.array([100.0, 10.0, 10.0, 1.5], dtype=np.float32)
OBS_LOW = np.zeros(4, dtype=np.float32)
def __init__(self, render_mode=None):
super().__init__()
self.render_mode = render_mode
# --- Action / Observation Spaces ---
self.action_space = spaces.Discrete(3)
self.observation_space = spaces.Box(
low=self.OBS_LOW,
high=self.OBS_HIGH,
shape=(4,),
dtype=np.float32,
)
# Internal state
self.current_step = 0
self.current_battery = self.BATTERY_CAPACITY * 0.5
self._state = self._make_initial_state()
# For correlated time-series generation
self._price_base = 0.2 # $/kWh (drifts each step)
self._demand_base = 3.0 # kW
self._solar_base = 0.0 # kW
# ------------------------------------------------------------------
# Gymnasium API
# ------------------------------------------------------------------
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.current_step = 0
self.current_battery = self.BATTERY_CAPACITY * 0.5
self._price_base = 0.2
self._demand_base = 3.0
self._solar_base = 0.0
self._state = self._make_initial_state()
return self._state.copy(), {}
def step(self, action):
assert self.action_space.contains(action), f"Invalid action {action}"
battery, solar, demand, price = self._state
# ---- 1. Compute net load BEFORE battery action ----
# Positive → house needs more than solar provides (must buy or discharge)
# Negative → solar surplus (can sell back or charge)
net_load = demand - solar
grid_cost = 0.0 # positive = paying, negative = earning
# ---- 2. Execute battery action ----
if action == 1: # CHARGE from grid
# How much can we actually charge?
headroom = self.BATTERY_CAPACITY - battery
charge_requested = min(self.CHARGE_RATE, headroom)
# Efficiency: we buy more from grid than actually stored
grid_draw = charge_requested / self.CHARGE_EFFICIENCY
self.current_battery = np.clip(
battery + charge_requested,
0.0, self.BATTERY_CAPACITY
)
# Also cover net_load from grid
grid_cost = (max(0.0, net_load) + grid_draw) * price
# If solar surplus even after load, get sell-back credit
grid_cost -= max(0.0, -net_load) * price * self.FEED_IN_TARIFF
elif action == 2: # DISCHARGE battery to cover load
# How much battery can supply?
discharge_requested = min(self.CHARGE_RATE, battery)
self.current_battery = np.clip(
battery - discharge_requested,
0.0, self.BATTERY_CAPACITY
)
# Remaining load after battery contribution
residual_load = net_load - discharge_requested
if residual_load > 0:
grid_cost = residual_load * price # still need some grid
else:
grid_cost = residual_load * price * self.FEED_IN_TARIFF # surplus → sell
else: # HOLD — let solar + grid balance the load
if net_load > 0:
grid_cost = net_load * price # buy deficit from grid
else:
grid_cost = net_load * price * self.FEED_IN_TARIFF # sell surplus
# ---- 3. Battery health penalty (discourages extreme SoC) ----
soc_frac = self.current_battery / self.BATTERY_CAPACITY
health_penalty = self.SOC_PENALTY_COEF * (
max(0.0, soc_frac - 0.9) + max(0.0, 0.1 - soc_frac)
)
# ---- 4. Reward ----
reward = -(grid_cost + health_penalty)
# ---- 5. Advance time, generate next state ----
self.current_step += 1
terminated = self.current_step >= 24
truncated = False
if not terminated:
self._state = self._generate_next_state()
else:
self._state = np.zeros(4, dtype=np.float32)
info = {
"cost": float(grid_cost),
"battery_soc": float(self.current_battery),
"solar_kw": float(solar),
"demand_kw": float(demand),
"price": float(price),
"action": int(action),
}
if self.render_mode == "human":
self.render()
return self._state.copy(), float(reward), terminated, truncated, info
def render(self):
b, s, d, p = self._state
print(
f"[Hour {self.current_step:02d}] "
f"Battery={self.current_battery:.1f}% | "
f"Solar={s:.2f}kW | Demand={d:.2f}kW | Price=${p:.3f}/kWh"
)
def close(self):
pass
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _make_initial_state(self) -> np.ndarray:
return np.array(
[self.current_battery, 0.0, 2.5, 0.15],
dtype=np.float32
)
def _generate_next_state(self) -> np.ndarray:
"""
Correlated time-series generation so successive hours are smooth.
Each variable drifts toward a time-of-day mean with Gaussian noise.
"""
hour = self.current_step # 0–23
# --- Solar: bell-curve peaking at noon, zero at night ---
if 6 <= hour <= 18:
solar_mean = 5.0 * np.exp(-0.5 * ((hour - 12) / 3.5) ** 2)
else:
solar_mean = 0.0
self._solar_base += 0.3 * (solar_mean - self._solar_base)
next_solar = float(np.clip(
self._solar_base + self.np_random.normal(0, 0.4),
0.0, 10.0
))
# --- Demand: morning and evening peaks ---
demand_mean = (
2.5
+ 2.0 * np.exp(-0.5 * ((hour - 8) / 1.5) ** 2) # morning
+ 3.0 * np.exp(-0.5 * ((hour - 19) / 2.0) ** 2) # evening
)
self._demand_base += 0.3 * (demand_mean - self._demand_base)
next_demand = float(np.clip(
self._demand_base + self.np_random.normal(0, 0.5),
0.0, 10.0
))
# --- Price: cheap at night, expensive at peak (17–21) ---
if 17 <= hour <= 21:
price_mean = 0.65
elif 6 <= hour <= 9:
price_mean = 0.30
elif 23 <= hour or hour <= 5:
price_mean = 0.12
else:
price_mean = 0.22
self._price_base += 0.4 * (price_mean - self._price_base)
next_price = float(np.clip(
self._price_base + self.np_random.normal(0, 0.03),
0.05, 1.5
))
return np.array(
[self.current_battery, next_solar, next_demand, next_price],
dtype=np.float32
)