Spaces:
Sleeping
Sleeping
| """ | |
| DC-OpenEnv: Data Centre Operations Environment β V2 | |
| Fully OpenEnv-compliant environment. | |
| Manages episodes, steps, rewards, and observations for all three DC cooling tasks. | |
| Responsibilities: | |
| - Task registry (easy / medium / hard) | |
| - Episode lifecycle: reset(), step(), state() | |
| - Delegates physics to simulation.py | |
| - Delegates scoring to graders/ | |
| - Constructs DCObservation from FacilityState | |
| """ | |
| import math | |
| import random | |
| from collections import deque | |
| from typing import Any, Deque, Dict, List, Optional | |
| from openenv.core.env_server.interfaces import Environment | |
| from .models import ( | |
| DCAction, | |
| DCObservation, | |
| DCReward, | |
| ZoneAdjustment, | |
| ZoneObservation, | |
| ) | |
| from .simulation import ( | |
| DCAction as SimDCAction, | |
| FacilityState, | |
| ZoneAdjustment as SimZoneAdjustment, | |
| ) | |
| from .scenarios.easy import build_easy_scenario | |
| from .scenarios.medium import build_medium_scenario | |
| from .scenarios.hard import build_hard_scenario | |
| from .graders.grader_easy import EasyGraderState | |
| from .graders.grader_medium import MediumGrader | |
| from .graders.grader_hard import HardGrader | |
| # ββ Constants βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SAFE_TEMP_MIN = 18.0 | |
| SAFE_TEMP_MAX = 27.0 | |
| # Hard termination thresholds (per spec Β§4 Episode Boundaries) | |
| MEDIUM_MAX_CONSECUTIVE_VIOLATIONS = 10 # any zone unsafe for 10+ consecutive steps | |
| HARD_CRITICAL_TEMP_THRESHOLD = 32.0 # Β°C | |
| HARD_CRITICAL_CONSECUTIVE_STEPS = 5 # sustained breach β episode ends, score = 0 | |
| # History buffer depth (for temporal observation) | |
| HISTORY_BUFFER_DEPTH = 3 | |
| # Chiller fault observable: COP drop below this fraction of base triggers the flag | |
| CHILLER_FAULT_COP_FRACTION = 0.60 | |
| def _reward_detail_as_dict(detail: Any) -> Dict[str, Any]: | |
| """Graders may return either a dict or a DCReward instance.""" | |
| if isinstance(detail, DCReward): | |
| return detail.model_dump() | |
| if isinstance(detail, dict): | |
| return detail | |
| return {} | |
| # ββ Task registry βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| TASK_CONFIGS: Dict[str, Dict[str, Any]] = { | |
| "easy-single-zone": { | |
| "max_steps": 20, # 20 steps Γ 12 min/step = 4 hr (full 14:00β18:00 arc) | |
| "step_scale": 2.4, # condense original 48-step plan: idx = step * 2.4 | |
| "scenario_builder": build_easy_scenario, | |
| "grader_class": EasyGraderState, | |
| "description": "Single-zone thermal runaway recovery under steady load", | |
| "hard_termination": False, | |
| }, | |
| "medium-multi-zone": { | |
| "max_steps": 30, # 30 steps Γ 24 min/step = 12 hr (full 06:00β18:00 arc) | |
| "step_scale": 4.8, # condense original 144-step plan: idx = step * 4.8 | |
| "scenario_builder": build_medium_scenario, | |
| "grader_class": MediumGrader, | |
| "description": "3-zone load surge with faulty sensor and diurnal variation", | |
| "hard_termination": True, | |
| "hard_term_mode": "violation_streak", # 10+ consecutive steps any zone unsafe | |
| }, | |
| "hard-cascading-failure": { | |
| "max_steps": 40, # 40 steps Γ 36 min/step = 24 hr (full 00:00β24:00 arc) | |
| "step_scale": 7.2, # condense original 288-step plan: idx = step * 7.2 | |
| "scenario_builder": build_hard_scenario, | |
| "grader_class": HardGrader, | |
| "description": "4-zone cascading chiller failure with carbon-aware triage", | |
| "hard_termination": True, | |
| "hard_term_mode": "critical_breach", # critical zone >32Β°C for 5+ steps β 0.0 | |
| }, | |
| } | |
| # ββ Environment βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class DCEnvironment(Environment): | |
| """ | |
| OpenEnv-compliant Data Centre environment β V2. | |
| Supports all three tasks via the TASK_CONFIGS registry. | |
| Physics delegation β simulation.py | |
| Scoring delegation β graders/ | |
| """ | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| def __init__(self, task: str = "easy-single-zone", seed: Optional[int] = None): | |
| if task not in TASK_CONFIGS: | |
| raise ValueError(f"Unknown task '{task}'. Valid: {list(TASK_CONFIGS)}") | |
| self.task = task | |
| self.seed = seed | |
| self.config = TASK_CONFIGS[task] | |
| self.max_steps: int = self.config["max_steps"] | |
| # Runtime state (populated on reset) | |
| self._facility: Optional[FacilityState] = None | |
| self._grader: Optional[Any] = None | |
| self._step_count: int = 0 | |
| self._done: bool = False | |
| self._episode_rewards: List[float] = [] | |
| # Last action (needed for rate-limiting in simulation.step()) | |
| self._last_action: Optional[SimDCAction] = None | |
| # Per-zone streak counters | |
| # consecutive_safe[zone_id] = steps in safe band | |
| # consecutive_violation[zone_id] = steps outside safe band | |
| self._consecutive_safe: Dict[str, int] = {} | |
| self._consecutive_violation: Dict[str, int] = {} | |
| # Facility-wide SLA violation streak (any zone) | |
| self._sla_violation_streak: int = 0 | |
| # Temporal history buffer: deque of per-step dicts (newest at right) | |
| self._history: Deque[Dict[str, Any]] = deque(maxlen=HISTORY_BUFFER_DEPTH) | |
| # Base chiller COP (captured at reset for fault detection) | |
| self._base_chiller_cop: float = 3.5 | |
| # Timeline condensation scale factor (set properly in reset()) | |
| self._step_scale: float = self.config.get("step_scale", 1.0) | |
| # ββ OpenEnv interface ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs) -> DCObservation: | |
| """Reset the environment and return initial observation (OpenEnv interface: returns Observation directly).""" | |
| _seed = seed if seed is not None else (self.seed if self.seed is not None else random.randint(0, 99_999)) | |
| self._facility = self.config["scenario_builder"](seed=_seed) | |
| self._grader = self.config["grader_class"]() | |
| self._step_count = 0 | |
| self._done = False | |
| self._episode_rewards = [] | |
| self._history.clear() | |
| # Configure timeline condensation: each env step covers step_scaleΓ5 sim minutes. | |
| # This drives the clock, load, carbon and sensor drift at the right rate so the | |
| # full scenario arc (4 / 12 / 24 hr) is covered within the reduced step budget. | |
| # Thermal physics keep their 5-min granularity (step_thermal uses SECONDS_PER_STEP). | |
| self._step_scale: float = self.config.get("step_scale", 1.0) | |
| self._facility.minutes_per_step = 5.0 * self._step_scale | |
| # Rescale scenario events that are indexed by raw step number so they | |
| # fire at the proportionally correct point in the condensed timeline. | |
| if self._facility.chiller_fault_step > 0: | |
| scaled_fault = max(3, round(self._facility.chiller_fault_step / self._step_scale)) | |
| self._facility.chiller_fault_step = scaled_fault | |
| # Capture base COP for fault-detection heuristic | |
| self._base_chiller_cop = self._facility.chiller_cop | |
| # Initialise per-zone streak counters | |
| self._consecutive_safe = {z.zone_id: 0 for z in self._facility.zones} | |
| self._consecutive_violation = {z.zone_id: 0 for z in self._facility.zones} | |
| self._sla_violation_streak = 0 | |
| # Seed a neutral last action so rate-limiting has a valid reference | |
| self._last_action = self._neutral_sim_action() | |
| obs = self._make_observation() | |
| obs.done = False | |
| obs.reward = None | |
| return obs | |
| def step(self, action: DCAction, timeout_s: Optional[float] = None, **kwargs) -> DCObservation: # type: ignore[override] | |
| """Apply agent action, advance simulation, compute reward, return DCObservation.""" | |
| if self._done: | |
| raise RuntimeError("Episode is done. Call reset() first.") | |
| if self._facility is None: | |
| raise RuntimeError("Call reset() before step().") | |
| # Convert Pydantic DCAction β simulation duck-type (rate-limiting compatible) | |
| sim_action = self._to_sim_action(action) | |
| # Advance simulation (rate-limiting + physics + time inside .step()) | |
| step_info = self._facility.step(sim_action, self._last_action) | |
| # Apply diurnal weather curve if the scenario provided one. | |
| # Multiply step_count by step_scale so that the full curve (144 or 288 elements) | |
| # is traversed within the condensed step budget, covering the complete scenario arc. | |
| if hasattr(self._facility, '_outside_temp_curve'): | |
| idx = min( | |
| int(self._step_count * self._step_scale), | |
| len(self._facility._outside_temp_curve) - 1, | |
| ) | |
| self._facility.outside_temp_c = self._facility._outside_temp_curve[idx] | |
| self._facility.wet_bulb_temp_c = self._facility._wet_bulb_curve[idx] | |
| self._last_action = sim_action | |
| self._step_count += 1 | |
| # Update per-zone streak counters | |
| any_violation = self._update_streaks() | |
| # Update facility-wide SLA violation streak | |
| if any_violation: | |
| self._sla_violation_streak += 1 | |
| else: | |
| self._sla_violation_streak = 0 | |
| # Push snapshot to history buffer | |
| self._push_history_snapshot() | |
| # Check hard termination conditions | |
| terminated_early, terminal_score = self._check_hard_termination() | |
| # Compute reward via grader | |
| grader_input = self._build_grader_input(action, step_info) | |
| step_reward, raw_reward_detail = self._grader.step(grader_input) | |
| reward_detail = _reward_detail_as_dict(raw_reward_detail) | |
| if terminated_early: | |
| step_reward = terminal_score | |
| self._done = True | |
| # Notify the grader so final_score() can return 0.0 on catastrophic failure | |
| if hasattr(self._grader, "mark_sla_terminated"): | |
| self._grader.mark_sla_terminated() | |
| elif self._step_count >= self.max_steps: | |
| self._done = True | |
| self._episode_rewards.append(step_reward) | |
| bd = reward_detail.get("breakdown") | |
| if not isinstance(bd, dict): | |
| bd = reward_detail | |
| reward_model = DCReward( | |
| total=step_reward, | |
| temp_reward=reward_detail.get("temp_reward", 0.0), | |
| pue_reward=reward_detail.get("pue_reward", 0.0), | |
| carbon_reward=reward_detail.get("carbon_reward", 0.0), | |
| safety_penalty=reward_detail.get("safety_penalty", 0.0), | |
| roughness_penalty=reward_detail.get("roughness_penalty", 0.0), | |
| stability_bonus=reward_detail.get("stability_bonus", 0.0), | |
| # Legacy fields | |
| temperature_penalty=reward_detail.get("safety_penalty", 0.0), | |
| humidity_penalty=0.0, | |
| breakdown=bd, | |
| ) | |
| info: Dict[str, Any] = { | |
| "action_clipped": step_info.get("action_clipped", {}), | |
| "terminated_early": terminated_early, | |
| "sla_violation_streak": self._sla_violation_streak, | |
| "consecutive_safe": dict(self._consecutive_safe), | |
| "consecutive_violation": dict(self._consecutive_violation), | |
| } | |
| if self._done: | |
| final_score = self._grader.final_score() | |
| info["final_score"] = final_score | |
| info["episode_rewards"] = list(self._episode_rewards) | |
| obs = self._make_observation() | |
| obs.done = self._done | |
| obs.reward = float(step_reward) | |
| obs.metadata = info | |
| return obs | |
| def state(self) -> dict: | |
| """Return full internal state for debugging/inspection.""" | |
| if self._facility is None: | |
| return {"status": "not_initialized"} | |
| return { | |
| "task": self.task, | |
| "step": self._step_count, | |
| "done": self._done, | |
| "facility": self._facility.to_observation_dict(), | |
| "consecutive_safe": dict(self._consecutive_safe), | |
| "consecutive_violation": dict(self._consecutive_violation), | |
| "sla_violation_streak": self._sla_violation_streak, | |
| "episode_rewards": list(self._episode_rewards), | |
| "history_depth": len(self._history), | |
| } | |
| # ββ Streak tracking ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _update_streaks(self) -> bool: | |
| """ | |
| Update per-zone consecutive safe / violation counters. | |
| Returns True if any zone is currently in violation. | |
| """ | |
| any_violation = False | |
| for zone in self._facility.zones: | |
| zid = zone.zone_id | |
| in_safe = SAFE_TEMP_MIN <= zone.temp_c <= SAFE_TEMP_MAX | |
| if in_safe: | |
| self._consecutive_safe[zid] = self._consecutive_safe.get(zid, 0) + 1 | |
| self._consecutive_violation[zid] = 0 | |
| else: | |
| self._consecutive_violation[zid] = self._consecutive_violation.get(zid, 0) + 1 | |
| self._consecutive_safe[zid] = 0 | |
| any_violation = True | |
| return any_violation | |
| # ββ Hard termination βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _check_hard_termination(self): | |
| """ | |
| Check task-specific hard termination conditions. | |
| Returns | |
| ------- | |
| terminated : bool | |
| terminal_score : float (0.0 on catastrophic failure, ignored if not terminated) | |
| """ | |
| cfg = self.config | |
| if not cfg.get("hard_termination", False): | |
| return False, 0.0 | |
| mode = cfg.get("hard_term_mode", "") | |
| if mode == "violation_streak": | |
| # Medium task: any zone unsafe for 10+ consecutive steps β terminate | |
| if self._sla_violation_streak >= MEDIUM_MAX_CONSECUTIVE_VIOLATIONS: | |
| return True, 0.0 | |
| elif mode == "critical_breach": | |
| # Hard task: any critical zone > 32Β°C for 5+ consecutive steps β score = 0 | |
| for zone in self._facility.zones: | |
| if zone.zone_priority < 2: # only critical zones (priority == 2) | |
| continue | |
| zid = zone.zone_id | |
| if ( | |
| zone.temp_c > HARD_CRITICAL_TEMP_THRESHOLD | |
| and self._consecutive_violation.get(zid, 0) >= HARD_CRITICAL_CONSECUTIVE_STEPS | |
| ): | |
| return True, 0.0 | |
| return False, 0.0 | |
| # ββ History buffer βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _push_history_snapshot(self): | |
| """Append a compact per-zone snapshot to the rolling history buffer. | |
| For sensor-faulty zones, records the reported (potentially drifted) | |
| cold-aisle reading rather than the internal ground-truth value, so the | |
| history presented to the agent is consistent with the per-step observation. | |
| """ | |
| snapshot = { | |
| "step": self._step_count, | |
| "pue": self._facility.pue, | |
| "zones": { | |
| z.zone_id: { | |
| # Mirror the observation rule: faulty sensors show reported reading. | |
| "cold_aisle_temp_c": ( | |
| z.reported_temp_c if z.sensor_faulty else round(z.temp_c, 3) | |
| ), | |
| "hot_aisle_temp_c": round(z.hot_aisle_temp_c, 3), | |
| "fan_speed_pct": z.fan_speed_pct, | |
| } | |
| for z in self._facility.zones | |
| }, | |
| } | |
| self._history.append(snapshot) | |
| def _history_as_list(self) -> List[Dict[str, Any]]: | |
| """Return history buffer as a list ordered oldest β newest (t-3, t-2, t-1).""" | |
| return list(self._history) | |
| # ββ Grader input construction ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_grader_input(self, action: DCAction, step_info: Dict) -> Dict[str, Any]: | |
| """ | |
| Assemble all data the grader needs for one step. | |
| Graders are stateless between calls β they receive everything here. | |
| """ | |
| f = self._facility | |
| return { | |
| "step": self._step_count, | |
| "zones": [ | |
| { | |
| "zone_id": z.zone_id, | |
| "temp_c": z.temp_c, | |
| "hot_aisle_temp_c": z.hot_aisle_temp_c, | |
| "it_load_kw": z.it_load_kw, | |
| "it_load_pct": z.it_load_pct, | |
| "fan_speed_pct": z.fan_speed_pct, | |
| "supply_air_temp_setpoint_c": z.supply_air_temp_setpoint_c, | |
| "zone_priority": z.zone_priority, | |
| "consecutive_safe": self._consecutive_safe.get(z.zone_id, 0), | |
| "consecutive_violation": self._consecutive_violation.get(z.zone_id, 0), | |
| } | |
| for z in f.zones | |
| ], | |
| "current_pue": f.pue, | |
| "pid_baseline_pue": f.pid_baseline_pue, | |
| "carbon_intensity_normalized": f.grid_carbon_intensity_normalized, | |
| "carbon_intensity_label": f.grid_carbon_intensity, | |
| "chiller_active": f.chiller_active, | |
| "chiller_setpoint_c": f.chiller_setpoint_c, | |
| "sla_violation_streak": self._sla_violation_streak, | |
| "action": action, | |
| "last_action": self._last_action, | |
| "action_clipped": step_info.get("action_clipped", {}), | |
| "reasoning": action.reasoning, | |
| } | |
| # ββ Observation construction βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _make_observation(self) -> DCObservation: | |
| """Convert FacilityState β DCObservation (V2 full schema).""" | |
| f = self._facility | |
| raw = f.to_observation_dict() | |
| zone_obs = [ | |
| ZoneObservation( | |
| zone_id=z["zone_id"], | |
| cold_aisle_temp_c=z["cold_aisle_temp_c"], | |
| hot_aisle_temp_c=z["hot_aisle_temp_c"], | |
| reported_temp_c=z["reported_temp_c"], | |
| supply_air_temp_c=z["supply_air_temp_c"], | |
| supply_air_temp_setpoint_c=z["supply_air_temp_setpoint_c"], | |
| it_load_kw=z["it_load_kw"], | |
| it_load_pct=z["it_load_pct"], | |
| fan_speed_pct=z["fan_speed_pct"], | |
| cooling_capacity_kw=z["cooling_capacity_kw"], | |
| humidity_pct=z["humidity_pct"], | |
| sensor_confidence=z["sensor_confidence"], | |
| zone_priority=z["zone_priority"], | |
| load_forecast_next_hour=self._forecast_load(z["zone_id"]), | |
| ) | |
| for z in raw["zones"] | |
| ] | |
| return DCObservation( | |
| step=raw["step"], | |
| timestamp_hour=raw["timestamp_hour"], | |
| timestamp_day_sin=raw["timestamp_day_sin"], | |
| timestamp_day_cos=raw["timestamp_day_cos"], | |
| outside_temp_c=raw["outside_temp_c"], | |
| wet_bulb_temp_c=raw["wet_bulb_temp_c"], | |
| chiller_active=raw["chiller_active"], | |
| chiller_setpoint_c=raw["chiller_setpoint_c"], | |
| chiller_cop=raw["chiller_cop"], | |
| chiller_fault_detected=self._chiller_fault_detected(), | |
| chiller_fault_status=self._chiller_fault_status(), | |
| ups_efficiency=raw["ups_efficiency"], | |
| current_pue=raw["current_pue"], | |
| free_cooling_potential=raw["free_cooling_potential"], | |
| grid_carbon_intensity=raw["grid_carbon_intensity"], | |
| carbon_intensity_normalized=raw["grid_carbon_intensity_normalized"], | |
| load_curve_phase=self._load_curve_phase(), | |
| zones=zone_obs, | |
| history=self._history_as_list(), | |
| sla_violation_streak=self._sla_violation_streak, | |
| maintenance_active=any( | |
| "maintenance" in note.lower() | |
| for note in raw.get("maintenance_notes", []) | |
| ), | |
| maintenance_notes=raw.get("maintenance_notes", []), | |
| upcoming_events=raw.get("upcoming_events", []), | |
| active_alerts=self._compute_active_alerts(), | |
| ) | |
| # ββ Helper: chiller fault detection (observable) βββββββββββββββββββββββββββ | |
| def _chiller_fault_detected(self) -> bool: | |
| """ | |
| Returns True if the chiller COP has dropped below 60 % of its baseline β | |
| an observable anomaly signal (not ground truth). | |
| """ | |
| if not self._facility.chiller_active: | |
| return True | |
| if self._base_chiller_cop <= 0: | |
| return False | |
| current_cop = self._facility.chiller_cop | |
| return current_cop < (self._base_chiller_cop * CHILLER_FAULT_COP_FRACTION) | |
| def _chiller_fault_status(self) -> str: | |
| """ | |
| Returns a human-readable chiller health label: | |
| 'nominal' β chiller running normally | |
| 'degrading' β COP has dropped below 60 % of baseline; still provides cooling | |
| 'offline' β chiller is fully failed; chiller_active=True in actions is ignored | |
| """ | |
| if not self._facility.chiller_active: | |
| return "offline" | |
| if self._base_chiller_cop > 0: | |
| current_cop = self._facility.chiller_cop | |
| if current_cop < (self._base_chiller_cop * CHILLER_FAULT_COP_FRACTION): | |
| return "degrading" | |
| return "nominal" | |
| # ββ Helper: load curve phase βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _load_curve_phase(self) -> str: | |
| """ | |
| Classify the current hour into a diurnal load phase. | |
| ramp_up : 06:00-10:00 | |
| peak : 10:00-17:00 | |
| ramp_down : 17:00-22:00 | |
| idle : 22:00-06:00 | |
| """ | |
| hour = self._facility.timestamp_hour | |
| if 6 <= hour < 10: | |
| return "ramp_up" | |
| elif 10 <= hour < 17: | |
| return "peak" | |
| elif 17 <= hour < 22: | |
| return "ramp_down" | |
| else: | |
| return "idle" | |
| # ββ Alert generation (environment-level, not inference-script concern) ββββ | |
| _SAFE_TEMP_MIN = 18.0 | |
| _SAFE_TEMP_MAX = 27.0 | |
| def _compute_active_alerts(self) -> List[str]: | |
| """ | |
| Derive structured alert strings from the current facility state. | |
| These are part of the observation so every client (inference script, | |
| HTTP evaluator, trained policy) sees them without extra computation. | |
| Alerts cover: chiller faults, temperature violations / near-misses, | |
| sensor faults, rising temperature trends, efficiency hints, SLA streaks. | |
| """ | |
| alerts: List[str] = [] | |
| f = self._facility | |
| # ββ Chiller state βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| fault_status = self._chiller_fault_status() | |
| if fault_status == "offline": | |
| alerts.append( | |
| "CRITICAL: Chiller is OFFLINE β fans are the ONLY cooling available. " | |
| "Set critical-zone fans to 90β100 % immediately. " | |
| "Setting chiller_active=true in your action will have NO effect." | |
| ) | |
| elif fault_status == "degrading": | |
| alerts.append( | |
| f"WARNING: Chiller is DEGRADING (COP={f.chiller_cop:.2f}, " | |
| f"nominal {self._base_chiller_cop:.2f}). " | |
| "Cooling capacity reduced. Ramp fans up now before chiller fails." | |
| ) | |
| # ββ Per-zone alerts βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| for z in f.zones: | |
| # Use observation-level temperature (faulty = reported, healthy = true) | |
| obs_temp = z.reported_temp_c if z.sensor_faulty else z.temp_c | |
| conf = z.sensor_confidence | |
| fan = z.fan_speed_pct | |
| # Active temperature violations | |
| if obs_temp > self._SAFE_TEMP_MAX: | |
| alerts.append( | |
| f"VIOLATION: {z.zone_id} OVERHEATING at {obs_temp:.1f} Β°C " | |
| f"(limit {self._SAFE_TEMP_MAX} Β°C). Immediate max cooling required." | |
| ) | |
| elif obs_temp < self._SAFE_TEMP_MIN: | |
| alerts.append( | |
| f"VIOLATION: {z.zone_id} OVERCOOLING at {obs_temp:.1f} Β°C " | |
| f"(floor {self._SAFE_TEMP_MIN} Β°C). Raise supply setpoint +2 Β°C, cut fan 15β20 %." | |
| ) | |
| # Near-boundary warnings | |
| elif self._SAFE_TEMP_MAX - 1.0 < obs_temp <= self._SAFE_TEMP_MAX: | |
| alerts.append( | |
| f"WARNING: {z.zone_id} at {obs_temp:.1f} Β°C β within 1 Β°C of " | |
| f"violation limit. Increase cooling now." | |
| ) | |
| elif self._SAFE_TEMP_MIN <= obs_temp < self._SAFE_TEMP_MIN + 1.0: | |
| alerts.append( | |
| f"WARNING: {z.zone_id} at {obs_temp:.1f} Β°C β within 1 Β°C of " | |
| "overcooling floor. Reduce fan or raise supply setpoint." | |
| ) | |
| # Sensor fault | |
| if conf < 0.5: | |
| bias = round(z.reported_temp_c - z.temp_c, 1) | |
| alerts.append( | |
| f"SENSOR FAULT: {z.zone_id} sensor_confidence={conf:.2f}. " | |
| f"Cold-aisle reading may be off by ~{abs(bias):.1f} Β°C. " | |
| "Cross-check with hot_aisle_temp_c and supply_air_temp_c " | |
| "to estimate true thermal state." | |
| ) | |
| # Efficiency hint: safe + stable zone with wastefully high fans | |
| prev_snap = list(self._history)[-1] if self._history else None | |
| if prev_snap and z.zone_id in prev_snap.get("zones", {}): | |
| prev_temp = prev_snap["zones"][z.zone_id].get("cold_aisle_temp_c", obs_temp) | |
| delta = obs_temp - prev_temp | |
| if ( | |
| self._SAFE_TEMP_MIN + 0.5 < obs_temp < self._SAFE_TEMP_MAX - 2.0 | |
| and abs(delta) < 0.3 | |
| and fan > 70.0 | |
| ): | |
| alerts.append( | |
| f"EFFICIENCY: {z.zone_id} stable at {obs_temp:.1f} Β°C with fan " | |
| f"at {fan:.0f} % β reduce to 45β65 % to improve PUE." | |
| ) | |
| # ββ Carbon ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if f.grid_carbon_intensity_normalized > 0.80: | |
| alerts.append( | |
| f"CARBON CRITICAL ({f.grid_carbon_intensity_normalized:.2f}): Grid at " | |
| "peak emissions. Reduce fan speeds on safe zones β every % counts." | |
| ) | |
| # ββ SLA streak ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if self._sla_violation_streak >= 5: | |
| alerts.append( | |
| f"SLA ALERT: {self._sla_violation_streak} consecutive violation steps. " | |
| "Hard termination triggers at 10. Take urgent corrective action NOW." | |
| ) | |
| return alerts | |
| # ββ Helper: load forecast ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _forecast_load(self, zone_id: str) -> float: | |
| """ | |
| Simple 1-hour-ahead load forecast using the facility's load curve. | |
| Returns predicted IT load in kW for the given zone. | |
| """ | |
| f = self._facility | |
| if not f.load_curve: | |
| return 0.0 | |
| zone = next((z for z in f.zones if z.zone_id == zone_id), None) | |
| if zone is None: | |
| return 0.0 | |
| future_hour = (f.timestamp_hour + 1.0) % 24.0 | |
| future_idx = int(future_hour) % 24 | |
| future_normalised = f.load_curve[future_idx] | |
| return round(zone.base_it_load_kw * future_normalised, 1) | |
| # ββ Action conversion ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _to_sim_action(self, action: DCAction) -> SimDCAction: | |
| """Convert Pydantic DCAction β simulation duck-type SimDCAction.""" | |
| sim_adjustments = [ | |
| SimZoneAdjustment( | |
| zone_id=adj.zone_id, | |
| fan_speed_pct=adj.fan_speed_pct, | |
| supply_air_temp_setpoint_c=adj.supply_air_temp_setpoint_c, | |
| ) | |
| for adj in action.zone_adjustments | |
| ] | |
| return SimDCAction( | |
| zone_adjustments=sim_adjustments, | |
| chiller_setpoint_c=action.chiller_setpoint_c, | |
| chiller_active=action.chiller_active, | |
| reasoning=action.reasoning, | |
| ) | |
| def _neutral_sim_action(self) -> SimDCAction: | |
| """ | |
| Build a neutral (do-nothing) SimDCAction from the current facility state. | |
| Used as the reference point for rate-limiting on the first real step. | |
| """ | |
| if self._facility is None: | |
| return SimDCAction(zone_adjustments=[], chiller_setpoint_c=10.0, chiller_active=True) | |
| sim_adjustments = [ | |
| SimZoneAdjustment( | |
| zone_id=z.zone_id, | |
| fan_speed_pct=z.fan_speed_pct, | |
| supply_air_temp_setpoint_c=z.supply_air_temp_setpoint_c, | |
| ) | |
| for z in self._facility.zones | |
| ] | |
| return SimDCAction( | |
| zone_adjustments=sim_adjustments, | |
| chiller_setpoint_c=self._facility.chiller_setpoint_c, | |
| chiller_active=self._facility.chiller_active, | |
| ) |