gridops-demo / gridops /server /environment.py
77ethers's picture
Release GridOps demo Space
7d7b92e verified
"""
GridOps OpenEnv Environment — wires physics + scenarios + grading together.
"""
from __future__ import annotations
from typing import Any, Optional
from uuid import uuid4
import numpy as np
from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import EnvironmentMetadata, State
from gridops.models import GridOpsAction, GridOpsObservation
from gridops.simulation import physics, scenarios
from gridops.simulation.physics import (
BATTERY_CAPACITY_KWH,
DIESEL_TANK_KWH,
MicrogridState,
StepFlows,
)
from gridops.simulation.scenarios import ScenarioConfig, make_forecast
from gridops.tasks.definitions import TASKS
from gridops.tasks.graders import grade_episode
class GridOpsState(State):
"""Extended state exposed via GET /state."""
task_id: str = "task_1_normal"
hour: int = 0
done: bool = False
grade: dict | None = None
history: list[dict] = []
class GridOpsEnvironment(Environment):
"""Community microgrid RL environment."""
SUPPORTS_CONCURRENT_SESSIONS: bool = True
def __init__(self):
super().__init__()
self._task_id = "task_1_normal"
self._cfg: ScenarioConfig = TASKS[self._task_id]
self._rng = np.random.default_rng(42)
self._demand = np.zeros(72)
self._solar = np.zeros(72)
self._price = np.zeros(72)
self._micro = MicrogridState()
self._episode_id = str(uuid4())
self._history: list[dict] = []
self._grade: dict | None = None
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
**kwargs: Any,
) -> GridOpsObservation:
task_id = kwargs.get("task_id", "task_1_normal")
if task_id not in TASKS:
task_id = "task_1_normal"
self._task_id = task_id
self._cfg = TASKS[task_id]
self._episode_id = episode_id or str(uuid4())
s = seed if seed is not None else 42
self._rng = np.random.default_rng(s)
self._demand = scenarios.generate_demand(self._cfg, self._rng)
self._solar = scenarios.generate_solar(self._cfg, self._rng)
self._price = scenarios.generate_price(self._cfg, self._rng)
self._micro = MicrogridState(
diesel_fuel_kwh=self._cfg.diesel_fuel_capacity * DIESEL_TANK_KWH,
)
self._history = []
self._grade = None
# Compute initial flows so dashboard shows real data on reset
h0_demand = float(self._demand[0])
h0_solar = float(self._solar[0])
h0_grid = max(0.0, h0_demand - h0_solar)
self._last_flows = StepFlows(
solar_kw=h0_solar,
grid_import_kw=h0_grid,
effective_demand_kw=h0_demand,
total_supply_kw=h0_solar + h0_grid,
total_consumption_kw=h0_demand,
)
return self._make_observation(reward=0.0, done=False, narration="Episode started. Day 1, 06:00. Make your first decision.")
def step(
self,
action: GridOpsAction,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> GridOpsObservation:
if self._micro.hour >= 72:
return self._make_observation(
reward=0.0, done=True,
narration="Episode already finished.",
)
h = self._micro.hour
# Check if grid is available this hour (outage = islanding mode)
outage_hours = self._cfg.grid_outage_hours or []
grid_up = h not in outage_hours
result = physics.step(
self._micro,
battery_dispatch_norm=action.battery_dispatch,
diesel_norm=action.diesel_dispatch,
shed_norm=action.demand_shedding,
solar_kw=float(self._solar[h]),
demand_kw=float(self._demand[h]),
grid_price=float(self._price[h]),
diesel_fuel_cap=self._cfg.diesel_fuel_capacity * DIESEL_TANK_KWH,
grid_available=grid_up,
)
self._last_flows = result.flows
self._history.append({
"hour": h,
"demand": float(self._demand[h]),
"solar": float(self._solar[h]),
"price": float(self._price[h]),
"battery_soc": self._micro.battery_soc_kwh / BATTERY_CAPACITY_KWH,
"blackout": result.state.last_blackout_kwh,
"cost": result.state.last_cost,
"reward": result.reward,
"grid_kw": result.state.last_grid_kw,
"battery_dispatch": action.battery_dispatch,
"diesel": action.diesel_dispatch,
"shedding": action.demand_shedding,
})
if result.done:
self._grade = grade_episode(
self._micro, self._demand, self._solar, self._price,
grid_outage_hours=self._cfg.grid_outage_hours,
)
obs = self._make_observation(
reward=result.reward,
done=result.done,
narration=result.narration,
)
if result.done and self._grade:
obs.metadata["grade"] = self._grade
return obs
@property
def state(self) -> GridOpsState:
return GridOpsState(
episode_id=self._episode_id,
step_count=self._micro.hour,
task_id=self._task_id,
hour=self._micro.hour,
done=self._micro.hour >= 72,
grade=self._grade,
history=self._history,
)
def get_metadata(self) -> EnvironmentMetadata:
readme = None
try:
from pathlib import Path
readme_path = Path(__file__).parent.parent.parent / "README.md"
if readme_path.exists():
readme = readme_path.read_text()
except Exception:
pass
return EnvironmentMetadata(
name="GridOps",
description="Community microgrid bridge operator — balance solar, battery, diesel, and grid across 3-day episodes.",
version="0.2.0",
readme_content=readme,
)
def _make_observation(self, reward: float, done: bool, narration: str) -> GridOpsObservation:
h = min(self._micro.hour, 71)
rng = self._rng
return GridOpsObservation(
hour=float(self._micro.hour),
demand_kw=float(self._demand[h]),
solar_kw=float(self._solar[h]),
battery_soc=self._micro.battery_soc_kwh / BATTERY_CAPACITY_KWH,
grid_price=float(self._price[h]),
diesel_fuel_remaining=self._micro.diesel_fuel_kwh / DIESEL_TANK_KWH,
diesel_is_on=self._micro.diesel_was_on,
demand_forecast_4h=make_forecast(self._demand, h, 4, self._cfg.forecast_noise, rng),
solar_forecast_4h=make_forecast(self._solar, h, 4, self._cfg.forecast_noise, rng),
price_forecast_4h=make_forecast(self._price, h, 4, self._cfg.forecast_noise, rng),
cumulative_blackout_kwh=self._micro.cumulative_blackout_kwh,
cumulative_cost=self._micro.cumulative_cost,
day_of_episode=(self._micro.hour // 24) + 1,
blackout_this_step=self._micro.last_blackout_kwh,
cost_this_step=self._micro.last_cost,
grid_kw_this_step=self._micro.last_grid_kw,
narration=narration,
flow_solar=self._last_flows.solar_kw,
flow_grid_import=self._last_flows.grid_import_kw,
flow_grid_export=self._last_flows.grid_export_kw,
flow_battery_discharge=self._last_flows.battery_discharge_kw,
flow_battery_charge=self._last_flows.battery_charge_kw,
flow_diesel=self._last_flows.diesel_kw,
flow_demand=self._last_flows.effective_demand_kw,
flow_blackout=self._last_flows.blackout_kw,
flow_shed=self._last_flows.shed_kw,
flow_total_supply=self._last_flows.total_supply_kw,
flow_total_consumption=self._last_flows.total_consumption_kw,
done=done,
reward=reward,
)