import gymnasium as gym import numpy as np from gymnasium import spaces class SmartGridEnv(gym.Env): """ Smart Grid Battery Management Environment (Gymnasium-compatible) Goal: Minimize daily electricity cost by intelligently charging/discharging a home battery, using solar generation, and interacting with the grid. Observation (4 values, all normalized to [0, 1]): [battery_soc, solar_gen_kw, house_demand_kw, grid_price] Action space (Discrete 3): 0 = Hold — do nothing beyond covering net load from grid/solar 1 = Charge — buy from grid to fill battery (10 kW rate) 2 = Discharge — draw from battery to cover load (10 kW rate) Reward: Negative net cost per step (agent learns to minimize cost). Includes sell-back revenue when solar surplus is fed to grid. Includes a small battery health penalty for extreme SoC operation. Fixes vs. original: - Observation space bounds match actual value ranges - Solar/demand energy balance applied in all action branches - Grid sell-back (feed-in tariff) modeled in HOLD branch - Battery SoC clamped to [0, battery_capacity] - Charging efficiency loss (90%) modeled - Correlated time-series for price/solar/demand (no more i.i.d. jumps) - Battery health penalty for operating near 0% or 100% SoC - render() method added - Fully compatible with check_env() and VecNormalize """ metadata = {"render_modes": ["human"]} # Physical constants BATTERY_CAPACITY = 100.0 # kWh CHARGE_RATE = 10.0 # kW (max charge/discharge per step) CHARGE_EFFICIENCY = 0.90 # 90% round-trip efficiency FEED_IN_TARIFF = 0.50 # sell surplus solar at 50% of grid price SOC_PENALTY_COEF = 0.005 # small penalty for extreme battery levels # Observation high limits (battery 0-100 %, solar 0-10 kW, # demand 0-10 kW, price 0-1.5 $/kWh to cover double-peak) OBS_HIGH = np.array([100.0, 10.0, 10.0, 1.5], dtype=np.float32) OBS_LOW = np.zeros(4, dtype=np.float32) def __init__(self, render_mode=None): super().__init__() self.render_mode = render_mode # --- Action / Observation Spaces --- self.action_space = spaces.Discrete(3) self.observation_space = spaces.Box( low=self.OBS_LOW, high=self.OBS_HIGH, shape=(4,), dtype=np.float32, ) # Internal state self.current_step = 0 self.current_battery = self.BATTERY_CAPACITY * 0.5 self._state = self._make_initial_state() # For correlated time-series generation self._price_base = 0.2 # $/kWh (drifts each step) self._demand_base = 3.0 # kW self._solar_base = 0.0 # kW # ------------------------------------------------------------------ # Gymnasium API # ------------------------------------------------------------------ def reset(self, seed=None, options=None): super().reset(seed=seed) self.current_step = 0 self.current_battery = self.BATTERY_CAPACITY * 0.5 self._price_base = 0.2 self._demand_base = 3.0 self._solar_base = 0.0 self._state = self._make_initial_state() return self._state.copy(), {} def step(self, action): assert self.action_space.contains(action), f"Invalid action {action}" battery, solar, demand, price = self._state # ---- 1. Compute net load BEFORE battery action ---- # Positive → house needs more than solar provides (must buy or discharge) # Negative → solar surplus (can sell back or charge) net_load = demand - solar grid_cost = 0.0 # positive = paying, negative = earning # ---- 2. Execute battery action ---- if action == 1: # CHARGE from grid # How much can we actually charge? headroom = self.BATTERY_CAPACITY - battery charge_requested = min(self.CHARGE_RATE, headroom) # Efficiency: we buy more from grid than actually stored grid_draw = charge_requested / self.CHARGE_EFFICIENCY self.current_battery = np.clip( battery + charge_requested, 0.0, self.BATTERY_CAPACITY ) # Also cover net_load from grid grid_cost = (max(0.0, net_load) + grid_draw) * price # If solar surplus even after load, get sell-back credit grid_cost -= max(0.0, -net_load) * price * self.FEED_IN_TARIFF elif action == 2: # DISCHARGE battery to cover load # How much battery can supply? discharge_requested = min(self.CHARGE_RATE, battery) self.current_battery = np.clip( battery - discharge_requested, 0.0, self.BATTERY_CAPACITY ) # Remaining load after battery contribution residual_load = net_load - discharge_requested if residual_load > 0: grid_cost = residual_load * price # still need some grid else: grid_cost = residual_load * price * self.FEED_IN_TARIFF # surplus → sell else: # HOLD — let solar + grid balance the load if net_load > 0: grid_cost = net_load * price # buy deficit from grid else: grid_cost = net_load * price * self.FEED_IN_TARIFF # sell surplus # ---- 3. Battery health penalty (discourages extreme SoC) ---- soc_frac = self.current_battery / self.BATTERY_CAPACITY health_penalty = self.SOC_PENALTY_COEF * ( max(0.0, soc_frac - 0.9) + max(0.0, 0.1 - soc_frac) ) # ---- 4. Reward ---- reward = -(grid_cost + health_penalty) # ---- 5. Advance time, generate next state ---- self.current_step += 1 terminated = self.current_step >= 24 truncated = False if not terminated: self._state = self._generate_next_state() else: self._state = np.zeros(4, dtype=np.float32) info = { "cost": float(grid_cost), "battery_soc": float(self.current_battery), "solar_kw": float(solar), "demand_kw": float(demand), "price": float(price), "action": int(action), } if self.render_mode == "human": self.render() return self._state.copy(), float(reward), terminated, truncated, info def render(self): b, s, d, p = self._state print( f"[Hour {self.current_step:02d}] " f"Battery={self.current_battery:.1f}% | " f"Solar={s:.2f}kW | Demand={d:.2f}kW | Price=${p:.3f}/kWh" ) def close(self): pass # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _make_initial_state(self) -> np.ndarray: return np.array( [self.current_battery, 0.0, 2.5, 0.15], dtype=np.float32 ) def _generate_next_state(self) -> np.ndarray: """ Correlated time-series generation so successive hours are smooth. Each variable drifts toward a time-of-day mean with Gaussian noise. """ hour = self.current_step # 0–23 # --- Solar: bell-curve peaking at noon, zero at night --- if 6 <= hour <= 18: solar_mean = 5.0 * np.exp(-0.5 * ((hour - 12) / 3.5) ** 2) else: solar_mean = 0.0 self._solar_base += 0.3 * (solar_mean - self._solar_base) next_solar = float(np.clip( self._solar_base + self.np_random.normal(0, 0.4), 0.0, 10.0 )) # --- Demand: morning and evening peaks --- demand_mean = ( 2.5 + 2.0 * np.exp(-0.5 * ((hour - 8) / 1.5) ** 2) # morning + 3.0 * np.exp(-0.5 * ((hour - 19) / 2.0) ** 2) # evening ) self._demand_base += 0.3 * (demand_mean - self._demand_base) next_demand = float(np.clip( self._demand_base + self.np_random.normal(0, 0.5), 0.0, 10.0 )) # --- Price: cheap at night, expensive at peak (17–21) --- if 17 <= hour <= 21: price_mean = 0.65 elif 6 <= hour <= 9: price_mean = 0.30 elif 23 <= hour or hour <= 5: price_mean = 0.12 else: price_mean = 0.22 self._price_base += 0.4 * (price_mean - self._price_base) next_price = float(np.clip( self._price_base + self.np_random.normal(0, 0.03), 0.05, 1.5 )) return np.array( [self.current_battery, next_solar, next_demand, next_price], dtype=np.float32 )