smart_grid_env / server /smart_grid_environment.py
Puneet Gopinath
feat: add reward and done fields; update observation return in environment
56cccac unverified
Raw
History Blame Contribute Delete
6.02 kB
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Smart Grid Environment Implementation.
"""
from uuid import uuid4
import math
import random
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State
try:
from ..models import SmartGridAction, SmartGridObservation
except ImportError:
from models import SmartGridAction, SmartGridObservation
class SmartGridEnvironment(Environment):
"""
Smart Grid simulation environment with renewable generation, battery storage, and time-varying demand.
"""
# Enable concurrent WebSocket sessions.
# Set to True if your environment isolates state between instances.
# When True, multiple WebSocket clients can connect simultaneously, each
# getting their own environment instance (when using factory mode in app.py).
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
"""Initialize the smart_grid environment."""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._reset_count = 0
self.hour = 0
self.battery_level = 50
self.battery_capacity = 100
self.config = {
"base_demands": [20.0, 30.0, 25.0],
}
def _generate_demand(self):
"""
Generates demands based on the hour of the day
Peak in morning and evening.
"""
base = self.config["base_demands"]
demands = []
noise_mode = self.config.get("demand_noise", "low")
if noise_mode == "low":
noise_factor = 0.03
elif noise_mode == "high":
noise_factor = 0.1
else: # medium noise or default
noise_factor = 0.06
for b in base:
daily_variation = (
0.2 * math.sin((self.hour / 24) * 2 * math.pi) +
0.1 * math.sin((self.hour / 12) * 4 * math.pi)
)
random_noise = random.uniform(-noise_factor, noise_factor)
demand = b * (1 + daily_variation + random_noise)
if noise_mode == "high" and random.random() < 0.15:
demand *= 1.3 # unexpected 30% spike with 15% chance (high noise mode)
if 16 <= self.hour <= 21:
demand *= self.config.get("peak_demand_multiplier", 1.0)
demands.append(max(0, demand))
return demands
def _generate_supply(self):
"""
Generates supply based on the hour of the day
Solar peaks at noon, wind is more constant.
"""
solar_scale = self.config.get("solar_scale", 1.0)
solar = max(0, solar_scale * 50 * math.sin((self.hour - 6) * math.pi / 12))
wind_mode = self.config.get("wind_variation", "medium")
var = 0
if wind_mode == "low":
var = random.uniform(-3, 3)
elif wind_mode == "high":
var = random.uniform(-15, 10)
else: # medium wind variation or default
var = random.uniform(-7, 5)
wind = max(0, 15 + var)
return solar, wind
def reset(self) -> SmartGridObservation:
"""
Reset the environment.
"""
self._state = State(episode_id=str(uuid4()), step_count=0)
self._reset_count += 1
self.hour = 0
self.battery_level = 50
self.demands = self._generate_demand()
solar, wind = self._generate_supply()
obs = SmartGridObservation(
hour=self.hour,
demand_r1=self.demands[0],
demand_r2=self.demands[1],
demand_r3=self.demands[2],
solar_generation=solar,
wind_generation=wind,
battery_level=self.battery_level,
battery_capacity=self.battery_capacity,
)
return obs
def step(self, action: SmartGridAction):
"""
Execute a step in the environment.
"""
self._state.step_count += 1
solar, wind = self._generate_supply()
gen = solar + wind
battery_charge = max(0, action.charge_battery)
battery_discharge_req = max(0, -action.charge_battery)
new_level = self.battery_level + battery_charge - battery_discharge_req
penalty = 0
if new_level < 0 or new_level > self.battery_capacity:
penalty = 0.2 # Penalty for overcharging or overdischarging the battery
new_level = max(0, min(new_level, self.battery_capacity))
self.battery_level = new_level
avail_energy = gen - battery_charge + min(0, battery_discharge_req)
req_supply = (
action.supply_r1 +
action.supply_r2 +
action.supply_r3
)
total_supply = min(avail_energy, req_supply)
total_demand = sum(self.demands)
unmet_demand = max(0, total_demand - total_supply)
wasted_energy = max(0, avail_energy - total_demand)
reward = 1 - (unmet_demand / total_demand)
reward -= penalty
reward -= wasted_energy * 0.01
self.hour += 1
self.demands = self._generate_demand()
next_solar, next_wind = self._generate_supply()
done = self.hour >= 24
return SmartGridObservation(
hour=self.hour,
demand_r1=self.demands[0],
demand_r2=self.demands[1],
demand_r3=self.demands[2],
solar_generation=next_solar,
wind_generation=next_wind,
battery_level=self.battery_level,
battery_capacity=self.battery_capacity,
reward=reward,
done=done
)
@property
def state(self) -> State:
"""
Get the current environment state.
Returns:
Current State with episode_id and step_count
"""
return self._state