""" GridOps OpenEnv Environment — wires physics + scenarios + grading together. """ from __future__ import annotations from typing import Any, Optional from uuid import uuid4 import numpy as np from openenv.core.env_server.interfaces import Environment from openenv.core.env_server.types import EnvironmentMetadata, State from gridops.models import GridOpsAction, GridOpsObservation from gridops.simulation import physics, scenarios from gridops.simulation.physics import ( BATTERY_CAPACITY_KWH, DIESEL_TANK_KWH, MicrogridState, StepFlows, ) from gridops.simulation.scenarios import ScenarioConfig, make_forecast from gridops.tasks.definitions import TASKS from gridops.tasks.graders import grade_episode class GridOpsState(State): """Extended state exposed via GET /state.""" task_id: str = "task_1_normal" hour: int = 0 done: bool = False grade: dict | None = None history: list[dict] = [] class GridOpsEnvironment(Environment): """Community microgrid RL environment.""" SUPPORTS_CONCURRENT_SESSIONS: bool = True def __init__(self): super().__init__() self._task_id = "task_1_normal" self._cfg: ScenarioConfig = TASKS[self._task_id] self._rng = np.random.default_rng(42) self._demand = np.zeros(72) self._solar = np.zeros(72) self._price = np.zeros(72) self._micro = MicrogridState() self._episode_id = str(uuid4()) self._history: list[dict] = [] self._grade: dict | None = None def reset( self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs: Any, ) -> GridOpsObservation: task_id = kwargs.get("task_id", "task_1_normal") if task_id not in TASKS: task_id = "task_1_normal" self._task_id = task_id self._cfg = TASKS[task_id] self._episode_id = episode_id or str(uuid4()) s = seed if seed is not None else 42 self._rng = np.random.default_rng(s) self._demand = scenarios.generate_demand(self._cfg, self._rng) self._solar = scenarios.generate_solar(self._cfg, self._rng) self._price = scenarios.generate_price(self._cfg, self._rng) self._micro = MicrogridState( diesel_fuel_kwh=self._cfg.diesel_fuel_capacity * DIESEL_TANK_KWH, ) self._history = [] self._grade = None # Compute initial flows so dashboard shows real data on reset h0_demand = float(self._demand[0]) h0_solar = float(self._solar[0]) h0_grid = max(0.0, h0_demand - h0_solar) self._last_flows = StepFlows( solar_kw=h0_solar, grid_import_kw=h0_grid, effective_demand_kw=h0_demand, total_supply_kw=h0_solar + h0_grid, total_consumption_kw=h0_demand, ) return self._make_observation(reward=0.0, done=False, narration="Episode started. Day 1, 06:00. Make your first decision.") def step( self, action: GridOpsAction, timeout_s: Optional[float] = None, **kwargs: Any, ) -> GridOpsObservation: if self._micro.hour >= 72: return self._make_observation( reward=0.0, done=True, narration="Episode already finished.", ) h = self._micro.hour # Check if grid is available this hour (outage = islanding mode) outage_hours = self._cfg.grid_outage_hours or [] grid_up = h not in outage_hours result = physics.step( self._micro, battery_dispatch_norm=action.battery_dispatch, diesel_norm=action.diesel_dispatch, shed_norm=action.demand_shedding, solar_kw=float(self._solar[h]), demand_kw=float(self._demand[h]), grid_price=float(self._price[h]), diesel_fuel_cap=self._cfg.diesel_fuel_capacity * DIESEL_TANK_KWH, grid_available=grid_up, ) self._last_flows = result.flows self._history.append({ "hour": h, "demand": float(self._demand[h]), "solar": float(self._solar[h]), "price": float(self._price[h]), "battery_soc": self._micro.battery_soc_kwh / BATTERY_CAPACITY_KWH, "blackout": result.state.last_blackout_kwh, "cost": result.state.last_cost, "reward": result.reward, "grid_kw": result.state.last_grid_kw, "battery_dispatch": action.battery_dispatch, "diesel": action.diesel_dispatch, "shedding": action.demand_shedding, }) if result.done: self._grade = grade_episode( self._micro, self._demand, self._solar, self._price, grid_outage_hours=self._cfg.grid_outage_hours, ) obs = self._make_observation( reward=result.reward, done=result.done, narration=result.narration, ) if result.done and self._grade: obs.metadata["grade"] = self._grade return obs @property def state(self) -> GridOpsState: return GridOpsState( episode_id=self._episode_id, step_count=self._micro.hour, task_id=self._task_id, hour=self._micro.hour, done=self._micro.hour >= 72, grade=self._grade, history=self._history, ) def get_metadata(self) -> EnvironmentMetadata: readme = None try: from pathlib import Path readme_path = Path(__file__).parent.parent.parent / "README.md" if readme_path.exists(): readme = readme_path.read_text() except Exception: pass return EnvironmentMetadata( name="GridOps", description="Community microgrid bridge operator — balance solar, battery, diesel, and grid across 3-day episodes.", version="0.2.0", readme_content=readme, ) def _make_observation(self, reward: float, done: bool, narration: str) -> GridOpsObservation: h = min(self._micro.hour, 71) rng = self._rng return GridOpsObservation( hour=float(self._micro.hour), demand_kw=float(self._demand[h]), solar_kw=float(self._solar[h]), battery_soc=self._micro.battery_soc_kwh / BATTERY_CAPACITY_KWH, grid_price=float(self._price[h]), diesel_fuel_remaining=self._micro.diesel_fuel_kwh / DIESEL_TANK_KWH, diesel_is_on=self._micro.diesel_was_on, demand_forecast_4h=make_forecast(self._demand, h, 4, self._cfg.forecast_noise, rng), solar_forecast_4h=make_forecast(self._solar, h, 4, self._cfg.forecast_noise, rng), price_forecast_4h=make_forecast(self._price, h, 4, self._cfg.forecast_noise, rng), cumulative_blackout_kwh=self._micro.cumulative_blackout_kwh, cumulative_cost=self._micro.cumulative_cost, day_of_episode=(self._micro.hour // 24) + 1, blackout_this_step=self._micro.last_blackout_kwh, cost_this_step=self._micro.last_cost, grid_kw_this_step=self._micro.last_grid_kw, narration=narration, flow_solar=self._last_flows.solar_kw, flow_grid_import=self._last_flows.grid_import_kw, flow_grid_export=self._last_flows.grid_export_kw, flow_battery_discharge=self._last_flows.battery_discharge_kw, flow_battery_charge=self._last_flows.battery_charge_kw, flow_diesel=self._last_flows.diesel_kw, flow_demand=self._last_flows.effective_demand_kw, flow_blackout=self._last_flows.blackout_kw, flow_shed=self._last_flows.shed_kw, flow_total_supply=self._last_flows.total_supply_kw, flow_total_consumption=self._last_flows.total_consumption_kw, done=done, reward=reward, )