# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """ Smart Grid Environment Implementation. """ from uuid import uuid4 import math import random from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import State try: from ..models import SmartGridAction, SmartGridObservation except ImportError: from models import SmartGridAction, SmartGridObservation class SmartGridEnvironment(Environment): """ Smart Grid simulation environment with renewable generation, battery storage, and time-varying demand. """ # Enable concurrent WebSocket sessions. # Set to True if your environment isolates state between instances. # When True, multiple WebSocket clients can connect simultaneously, each # getting their own environment instance (when using factory mode in app.py). SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self): """Initialize the smart_grid environment.""" self._state = State(episode_id=str(uuid4()), step_count=0) self._reset_count = 0 self.hour = 0 self.battery_level = 50 self.battery_capacity = 100 self.config = { "base_demands": [20.0, 30.0, 25.0], } def _generate_demand(self): """ Generates demands based on the hour of the day Peak in morning and evening. """ base = self.config["base_demands"] demands = [] noise_mode = self.config.get("demand_noise", "low") if noise_mode == "low": noise_factor = 0.03 elif noise_mode == "high": noise_factor = 0.1 else: # medium noise or default noise_factor = 0.06 for b in base: daily_variation = ( 0.2 * math.sin((self.hour / 24) * 2 * math.pi) + 0.1 * math.sin((self.hour / 12) * 4 * math.pi) ) random_noise = random.uniform(-noise_factor, noise_factor) demand = b * (1 + daily_variation + random_noise) if noise_mode == "high" and random.random() < 0.15: demand *= 1.3 # unexpected 30% spike with 15% chance (high noise mode) if 16 <= self.hour <= 21: demand *= self.config.get("peak_demand_multiplier", 1.0) demands.append(max(0, demand)) return demands def _generate_supply(self): """ Generates supply based on the hour of the day Solar peaks at noon, wind is more constant. """ solar_scale = self.config.get("solar_scale", 1.0) solar = max(0, solar_scale * 50 * math.sin((self.hour - 6) * math.pi / 12)) wind_mode = self.config.get("wind_variation", "medium") var = 0 if wind_mode == "low": var = random.uniform(-3, 3) elif wind_mode == "high": var = random.uniform(-15, 10) else: # medium wind variation or default var = random.uniform(-7, 5) wind = max(0, 15 + var) return solar, wind def reset(self) -> SmartGridObservation: """ Reset the environment. """ self._state = State(episode_id=str(uuid4()), step_count=0) self._reset_count += 1 self.hour = 0 self.battery_level = 50 self.demands = self._generate_demand() solar, wind = self._generate_supply() obs = SmartGridObservation( hour=self.hour, demand_r1=self.demands[0], demand_r2=self.demands[1], demand_r3=self.demands[2], solar_generation=solar, wind_generation=wind, battery_level=self.battery_level, battery_capacity=self.battery_capacity, ) return obs def step(self, action: SmartGridAction): """ Execute a step in the environment. """ self._state.step_count += 1 solar, wind = self._generate_supply() gen = solar + wind battery_charge = max(0, action.charge_battery) battery_discharge_req = max(0, -action.charge_battery) new_level = self.battery_level + battery_charge - battery_discharge_req penalty = 0 if new_level < 0 or new_level > self.battery_capacity: penalty = 0.2 # Penalty for overcharging or overdischarging the battery new_level = max(0, min(new_level, self.battery_capacity)) self.battery_level = new_level avail_energy = gen - battery_charge + min(0, battery_discharge_req) req_supply = ( action.supply_r1 + action.supply_r2 + action.supply_r3 ) total_supply = min(avail_energy, req_supply) total_demand = sum(self.demands) unmet_demand = max(0, total_demand - total_supply) wasted_energy = max(0, avail_energy - total_demand) reward = 1 - (unmet_demand / total_demand) reward -= penalty reward -= wasted_energy * 0.01 self.hour += 1 self.demands = self._generate_demand() next_solar, next_wind = self._generate_supply() done = self.hour >= 24 return SmartGridObservation( hour=self.hour, demand_r1=self.demands[0], demand_r2=self.demands[1], demand_r3=self.demands[2], solar_generation=next_solar, wind_generation=next_wind, battery_level=self.battery_level, battery_capacity=self.battery_capacity, reward=reward, done=done ) @property def state(self) -> State: """ Get the current environment state. Returns: Current State with episode_id and step_count """ return self._state