File size: 5,069 Bytes
a283aa6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 |
import gymnasium as gym
from gymnasium import spaces
import numpy as np
class EVChargeEnv(gym.Env):
"""
EV charging environment.
Goal:
- Reach full battery (charge = 1.0)
- Minimize cost
- Avoid stressing the grid
State (obs):
[charge_level, price, grid_load, time_step_norm]
Action:
continuous charging rate in [0.0, 1.0]
"""
metadata = {"render_modes": ["human"]}
def __init__(self, max_steps: int = 48, scenario: str = "medium"):
super().__init__()
# Scenario difficulty
assert scenario in ["easy", "medium", "hard"]
self.scenario = scenario
# Observation: charge, price, load, time
self.observation_space = spaces.Box(
low=np.array([0.0, 0.0, 0.0, 0.0], dtype=np.float32),
high=np.array([1.0, 1.0, 1.0, 1.0], dtype=np.float32),
dtype=np.float32,
)
# Action: charge rate between 0 and 1
self.action_space = spaces.Box(
low=np.array([0.0], dtype=np.float32),
high=np.array([1.0], dtype=np.float32),
dtype=np.float32,
)
self.max_steps = max_steps
self.step_count = 0
# Internal state
self.charge = 0.0
self.price = 0.0
self.grid_load = 0.0
# Scenario parameters (set in reset)
self.base_price = 0.3
self.base_load = 0.5
self.load_threshold = 0.8 # above this → overload penalty
self.charge_rate_scale = 0.08 # how fast battery fills
def _set_scenario_params(self):
"""Set parameters based on difficulty scenario."""
if self.scenario == "easy":
self.base_price = 0.25
self.base_load = 0.4
self.load_threshold = 0.9
self.charge_rate_scale = 0.10
elif self.scenario == "medium":
self.base_price = 0.30
self.base_load = 0.5
self.load_threshold = 0.85
self.charge_rate_scale = 0.08
else: # hard
self.base_price = 0.35
self.base_load = 0.6
self.load_threshold = 0.8
self.charge_rate_scale = 0.06
def reset(self, seed=None, options=None):
super().reset(seed=seed)
if seed is not None:
np.random.seed(seed)
self._set_scenario_params()
self.step_count = 0
# Random initial charge, slightly low
self.charge = np.random.uniform(0.1, 0.4)
# Start price/load around base with small noise
self.price = np.clip(self.base_price + np.random.normal(0, 0.05), 0.0, 1.0)
self.grid_load = np.clip(self.base_load + np.random.normal(0, 0.05), 0.0, 1.0)
obs = self._get_obs()
return obs, {}
def _get_obs(self):
time_step_norm = self.step_count / max(1, self.max_steps - 1)
return np.array(
[self.charge, self.price, self.grid_load, time_step_norm],
dtype=np.float32,
)
def step(self, action):
self.step_count += 1
# Clamp action into valid range
a = float(np.clip(action[0], 0.0, 1.0))
# --- Dynamics ---
# Battery charging
self.charge += a * self.charge_rate_scale
self.charge = float(np.clip(self.charge, 0.0, 1.0))
# Price & load as noisy processes around base values
self.price = float(
np.clip(
self.price * 0.7
+ self.base_price * 0.3
+ np.random.normal(0, 0.05),
0.0,
1.0,
)
)
self.grid_load = float(
np.clip(
self.grid_load * 0.6
+ self.base_load * 0.4
+ np.random.normal(0, 0.07),
0.0,
1.0,
)
)
# --- Reward ---
# Progress reward
progress = a * self.charge_rate_scale
progress_reward = progress * 5.0 # scaled up
# Cost penalty (higher price * more charging = worse)
cost_penalty = self.price * a * 4.0
# Grid overload penalty if we charge too much when load is high
effective_load = self.grid_load + a * 0.2
overload = max(0.0, effective_load - self.load_threshold)
overload_penalty = overload * 6.0
# Small time penalty to encourage faster completion
time_penalty = 0.01
reward = progress_reward - cost_penalty - overload_penalty - time_penalty
# Episode done?
terminated = self.charge >= 0.999
truncated = self.step_count >= self.max_steps
obs = self._get_obs()
info = {
"progress_reward": progress_reward,
"cost_penalty": cost_penalty,
"overload_penalty": overload_penalty,
}
return obs, reward, terminated, truncated, info
def render(self):
print(
f"step={self.step_count} charge={self.charge:.3f} "
f"price={self.price:.3f} load={self.grid_load:.3f}"
)
|