Spaces:
Sleeping
Sleeping
Puneet Gopinath
feat: add reward and done fields; update observation return in environment
56cccac unverified | # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """ | |
| Smart Grid Environment Implementation. | |
| """ | |
| from uuid import uuid4 | |
| import math | |
| import random | |
| from openenv.core.env_server.interfaces import Environment | |
| from openenv.core.env_server.types import State | |
| try: | |
| from ..models import SmartGridAction, SmartGridObservation | |
| except ImportError: | |
| from models import SmartGridAction, SmartGridObservation | |
| class SmartGridEnvironment(Environment): | |
| """ | |
| Smart Grid simulation environment with renewable generation, battery storage, and time-varying demand. | |
| """ | |
| # Enable concurrent WebSocket sessions. | |
| # Set to True if your environment isolates state between instances. | |
| # When True, multiple WebSocket clients can connect simultaneously, each | |
| # getting their own environment instance (when using factory mode in app.py). | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self): | |
| """Initialize the smart_grid environment.""" | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._reset_count = 0 | |
| self.hour = 0 | |
| self.battery_level = 50 | |
| self.battery_capacity = 100 | |
| self.config = { | |
| "base_demands": [20.0, 30.0, 25.0], | |
| } | |
| def _generate_demand(self): | |
| """ | |
| Generates demands based on the hour of the day | |
| Peak in morning and evening. | |
| """ | |
| base = self.config["base_demands"] | |
| demands = [] | |
| noise_mode = self.config.get("demand_noise", "low") | |
| if noise_mode == "low": | |
| noise_factor = 0.03 | |
| elif noise_mode == "high": | |
| noise_factor = 0.1 | |
| else: # medium noise or default | |
| noise_factor = 0.06 | |
| for b in base: | |
| daily_variation = ( | |
| 0.2 * math.sin((self.hour / 24) * 2 * math.pi) + | |
| 0.1 * math.sin((self.hour / 12) * 4 * math.pi) | |
| ) | |
| random_noise = random.uniform(-noise_factor, noise_factor) | |
| demand = b * (1 + daily_variation + random_noise) | |
| if noise_mode == "high" and random.random() < 0.15: | |
| demand *= 1.3 # unexpected 30% spike with 15% chance (high noise mode) | |
| if 16 <= self.hour <= 21: | |
| demand *= self.config.get("peak_demand_multiplier", 1.0) | |
| demands.append(max(0, demand)) | |
| return demands | |
| def _generate_supply(self): | |
| """ | |
| Generates supply based on the hour of the day | |
| Solar peaks at noon, wind is more constant. | |
| """ | |
| solar_scale = self.config.get("solar_scale", 1.0) | |
| solar = max(0, solar_scale * 50 * math.sin((self.hour - 6) * math.pi / 12)) | |
| wind_mode = self.config.get("wind_variation", "medium") | |
| var = 0 | |
| if wind_mode == "low": | |
| var = random.uniform(-3, 3) | |
| elif wind_mode == "high": | |
| var = random.uniform(-15, 10) | |
| else: # medium wind variation or default | |
| var = random.uniform(-7, 5) | |
| wind = max(0, 15 + var) | |
| return solar, wind | |
| def reset(self) -> SmartGridObservation: | |
| """ | |
| Reset the environment. | |
| """ | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._reset_count += 1 | |
| self.hour = 0 | |
| self.battery_level = 50 | |
| self.demands = self._generate_demand() | |
| solar, wind = self._generate_supply() | |
| obs = SmartGridObservation( | |
| hour=self.hour, | |
| demand_r1=self.demands[0], | |
| demand_r2=self.demands[1], | |
| demand_r3=self.demands[2], | |
| solar_generation=solar, | |
| wind_generation=wind, | |
| battery_level=self.battery_level, | |
| battery_capacity=self.battery_capacity, | |
| ) | |
| return obs | |
| def step(self, action: SmartGridAction): | |
| """ | |
| Execute a step in the environment. | |
| """ | |
| self._state.step_count += 1 | |
| solar, wind = self._generate_supply() | |
| gen = solar + wind | |
| battery_charge = max(0, action.charge_battery) | |
| battery_discharge_req = max(0, -action.charge_battery) | |
| new_level = self.battery_level + battery_charge - battery_discharge_req | |
| penalty = 0 | |
| if new_level < 0 or new_level > self.battery_capacity: | |
| penalty = 0.2 # Penalty for overcharging or overdischarging the battery | |
| new_level = max(0, min(new_level, self.battery_capacity)) | |
| self.battery_level = new_level | |
| avail_energy = gen - battery_charge + min(0, battery_discharge_req) | |
| req_supply = ( | |
| action.supply_r1 + | |
| action.supply_r2 + | |
| action.supply_r3 | |
| ) | |
| total_supply = min(avail_energy, req_supply) | |
| total_demand = sum(self.demands) | |
| unmet_demand = max(0, total_demand - total_supply) | |
| wasted_energy = max(0, avail_energy - total_demand) | |
| reward = 1 - (unmet_demand / total_demand) | |
| reward -= penalty | |
| reward -= wasted_energy * 0.01 | |
| self.hour += 1 | |
| self.demands = self._generate_demand() | |
| next_solar, next_wind = self._generate_supply() | |
| done = self.hour >= 24 | |
| return SmartGridObservation( | |
| hour=self.hour, | |
| demand_r1=self.demands[0], | |
| demand_r2=self.demands[1], | |
| demand_r3=self.demands[2], | |
| solar_generation=next_solar, | |
| wind_generation=next_wind, | |
| battery_level=self.battery_level, | |
| battery_capacity=self.battery_capacity, | |
| reward=reward, | |
| done=done | |
| ) | |
| def state(self) -> State: | |
| """ | |
| Get the current environment state. | |
| Returns: | |
| Current State with episode_id and step_count | |
| """ | |
| return self._state | |