Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| from __future__ import annotations | |
| from typing import Any, Final, Optional | |
| from openenv.core import Environment as BaseEnvironment | |
| from fusion_lab.models import ( | |
| ActionMonitor, | |
| ActionIntent, | |
| LowDimBoundaryParams, | |
| MagnitudeName, | |
| RewardBreakdown, | |
| StellaratorAction, | |
| StellaratorObservation, | |
| StellaratorState, | |
| ) | |
| from server.contract import N_FIELD_PERIODS, RESET_SEEDS | |
| from server.physics import ( | |
| ASPECT_RATIO_MAX, | |
| AVERAGE_TRIANGULARITY_MAX, | |
| EDGE_IOTA_OVER_NFP_MIN, | |
| FEASIBILITY_TOLERANCE, | |
| EvaluationMetrics, | |
| build_boundary_from_params, | |
| evaluate_boundary, | |
| ) | |
| BUDGET: Final[int] = 6 | |
| PARAMETER_RANGES: Final[dict[str, tuple[float, float]]] = { | |
| "aspect_ratio": (3.2, 3.8), | |
| "elongation": (1.2, 1.8), | |
| "rotational_transform": (1.2, 1.9), | |
| "triangularity_scale": (0.4, 0.7), | |
| } | |
| PARAMETER_DELTAS: Final[dict[str, dict[str, float]]] = { | |
| "aspect_ratio": {"small": 0.05, "medium": 0.1, "large": 0.2}, | |
| "elongation": {"small": 0.05, "medium": 0.1, "large": 0.2}, | |
| "rotational_transform": {"small": 0.05, "medium": 0.1, "large": 0.2}, | |
| "triangularity_scale": {"small": 0.02, "medium": 0.05, "large": 0.1}, | |
| } | |
| TARGET_SPEC: Final[str] = ( | |
| "Optimize the P1 benchmark using a custom low-dimensional boundary family derived " | |
| "from a rotating-ellipse seed. Constraints: aspect ratio <= 4.0, average " | |
| "triangularity <= -0.5, abs(edge rotational transform / n_field_periods) >= 0.3. " | |
| "All actions use low-fidelity verification. Submit ends the episode with an explicit " | |
| "terminal evaluation and reward bonus. Budget: 6 evaluations including submit." | |
| ) | |
| FAILURE_PENALTY: Final[float] = -2.0 | |
| FEASIBILITY_DELTA_WEIGHT: Final[float] = 2.0 | |
| TRIANGULARITY_REPAIR_WEIGHT: Final[float] = 2.0 | |
| ASPECT_RATIO_REPAIR_WEIGHT: Final[float] = 1.0 | |
| IOTA_REPAIR_WEIGHT: Final[float] = 1.0 | |
| BEST_FEASIBILITY_BONUS_WEIGHT: Final[float] = 1.5 | |
| BEST_SCORE_BONUS_WEIGHT: Final[float] = 0.75 | |
| NEAR_FEASIBILITY_THRESHOLD: Final[float] = 0.02 | |
| NEAR_FEASIBILITY_BONUS: Final[float] = 1.0 | |
| NO_PROGRESS_STEP_THRESHOLD: Final[int] = 3 | |
| NO_PROGRESS_PENALTY: Final[float] = -0.2 | |
| REPEAT_STATE_PENALTY: Final[float] = -0.15 | |
| STEP_COST_BY_MAGNITUDE: Final[dict[MagnitudeName, float]] = { | |
| "small": -0.05, | |
| "medium": -0.1, | |
| "large": -0.2, | |
| } | |
| RESTORE_STEP_COST: Final[float] = -0.1 | |
| class StellaratorEnvironment( | |
| BaseEnvironment[StellaratorAction, StellaratorObservation, StellaratorState] | |
| ): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self._state = StellaratorState() | |
| self._last_metrics: EvaluationMetrics | None = None | |
| self._last_successful_metrics: EvaluationMetrics | None = None | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> StellaratorObservation: | |
| params = self._initial_params(seed) | |
| metrics = self._evaluate_params(params, fidelity="low") | |
| self._state = StellaratorState( | |
| episode_id=episode_id, | |
| step_count=0, | |
| initial_params=params, | |
| current_params=params, | |
| best_params=params, | |
| initial_low_fidelity_score=metrics.p1_score, | |
| best_low_fidelity_score=metrics.p1_score, | |
| best_low_fidelity_feasibility=metrics.p1_feasibility, | |
| budget_total=BUDGET, | |
| budget_remaining=BUDGET, | |
| episode_done=False, | |
| constraints_satisfied=metrics.constraints_satisfied, | |
| total_reward=0.0, | |
| ) | |
| self._state.visited_state_keys = [self._state_key(params)] | |
| self._last_metrics = metrics | |
| self._last_successful_metrics = None if metrics.evaluation_failed else metrics | |
| return self._build_observation( | |
| metrics, | |
| action_summary="Episode started from a frozen low-dimensional seed.", | |
| reward_breakdown=RewardBreakdown(intent="run"), | |
| action_monitor=self._action_monitor_for_no_op(StellaratorAction(intent="run")), | |
| ) | |
| def step( | |
| self, | |
| action: StellaratorAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> StellaratorObservation: | |
| if self._state.episode_done or self._state.budget_remaining <= 0: | |
| metrics = self._last_metrics or self._evaluate_params( | |
| self._state.current_params, | |
| fidelity="low", | |
| ) | |
| reward_breakdown = RewardBreakdown(intent=action.intent) | |
| return self._build_observation( | |
| metrics, | |
| action_summary="Episode already ended. Call reset() before sending more actions.", | |
| reward=0.0, | |
| reward_breakdown=reward_breakdown, | |
| action_monitor=self._action_monitor_for_no_op(action), | |
| done=True, | |
| ) | |
| self._state.step_count += 1 | |
| if action.intent == "submit": | |
| return self._handle_submit() | |
| if action.intent == "restore_best": | |
| return self._handle_restore() | |
| return self._handle_run(action) | |
| def state(self) -> StellaratorState: | |
| return self._state | |
| def _handle_run(self, action: StellaratorAction) -> StellaratorObservation: | |
| if not all([action.parameter, action.direction, action.magnitude]): | |
| return self._handle_invalid_run() | |
| self._state.budget_remaining -= 1 | |
| params_before = self._state.current_params | |
| params, clamped, no_op = self._apply_action( | |
| params=self._state.current_params, | |
| parameter=action.parameter, | |
| direction=action.direction, | |
| magnitude=action.magnitude, | |
| ) | |
| repeat_state = self._is_repeat_state(params) | |
| action_monitor = self._build_action_monitor( | |
| action=action, | |
| params_before=params_before, | |
| params_after=params, | |
| clamped=clamped, | |
| no_op=no_op, | |
| repeat_state=repeat_state, | |
| ) | |
| metrics = self._evaluate_params(params, fidelity="low") | |
| self._state.current_params = params | |
| self._state.constraints_satisfied = metrics.constraints_satisfied | |
| ( | |
| best_low_fidelity_feasibility_before, | |
| best_low_fidelity_score_before, | |
| step_improved, | |
| no_progress_steps, | |
| ) = self._advance_low_fidelity_progress(params, metrics) | |
| done = self._state.budget_remaining <= 0 | |
| reward_breakdown = self._compute_reward_breakdown( | |
| metrics, | |
| action.intent, | |
| done, | |
| magnitude=action.magnitude, | |
| best_low_fidelity_feasibility_before=best_low_fidelity_feasibility_before, | |
| best_low_fidelity_score_before=best_low_fidelity_score_before, | |
| step_improved=step_improved, | |
| no_progress_steps=no_progress_steps, | |
| repeat_state=repeat_state, | |
| ) | |
| reward = reward_breakdown.total | |
| summary = self._summary_run(action, metrics, action_monitor) | |
| self._state.history.append(summary) | |
| self._state.total_reward = round(self._state.total_reward + reward, 4) | |
| self._last_metrics = metrics | |
| if not metrics.evaluation_failed: | |
| self._last_successful_metrics = metrics | |
| self._state.episode_done = done | |
| return self._build_observation( | |
| metrics, | |
| action_summary=summary, | |
| reward=reward, | |
| reward_breakdown=reward_breakdown, | |
| action_monitor=action_monitor, | |
| done=done, | |
| ) | |
| def _handle_submit(self) -> StellaratorObservation: | |
| self._state.budget_remaining -= 1 | |
| action = StellaratorAction(intent="submit") | |
| action_monitor = self._build_action_monitor( | |
| action=action, | |
| params_before=self._state.current_params, | |
| params_after=self._state.current_params, | |
| ) | |
| metrics = self._evaluate_params(self._state.current_params, fidelity="low") | |
| self._state.constraints_satisfied = metrics.constraints_satisfied | |
| reward_breakdown = self._compute_reward_breakdown( | |
| metrics, | |
| "submit", | |
| done=True, | |
| ) | |
| reward = reward_breakdown.total | |
| summary = self._summary_submit(metrics) | |
| self._state.history.append(summary) | |
| self._state.total_reward = round(self._state.total_reward + reward, 4) | |
| self._state.episode_done = True | |
| self._last_metrics = metrics | |
| if not metrics.evaluation_failed: | |
| self._last_successful_metrics = metrics | |
| return self._build_observation( | |
| metrics, | |
| action_summary=summary, | |
| reward=reward, | |
| reward_breakdown=reward_breakdown, | |
| action_monitor=action_monitor, | |
| done=True, | |
| ) | |
| def _handle_restore(self) -> StellaratorObservation: | |
| self._state.budget_remaining -= 1 | |
| params_before = self._state.current_params | |
| self._state.current_params = self._state.best_params | |
| repeat_state = self._is_repeat_state(self._state.current_params) | |
| action = StellaratorAction(intent="restore_best") | |
| action_monitor = self._build_action_monitor( | |
| action=action, | |
| params_before=params_before, | |
| params_after=self._state.current_params, | |
| no_op=params_before == self._state.current_params, | |
| repeat_state=repeat_state, | |
| used_best_params=True, | |
| ) | |
| metrics = self._evaluate_params(self._state.current_params, fidelity="low") | |
| self._state.constraints_satisfied = metrics.constraints_satisfied | |
| ( | |
| best_low_fidelity_feasibility_before, | |
| best_low_fidelity_score_before, | |
| step_improved, | |
| no_progress_steps, | |
| ) = self._advance_low_fidelity_progress(self._state.current_params, metrics) | |
| done = self._state.budget_remaining <= 0 | |
| reward_breakdown = self._compute_reward_breakdown( | |
| metrics, | |
| "restore_best", | |
| done, | |
| best_low_fidelity_feasibility_before=best_low_fidelity_feasibility_before, | |
| best_low_fidelity_score_before=best_low_fidelity_score_before, | |
| step_improved=step_improved, | |
| no_progress_steps=no_progress_steps, | |
| repeat_state=repeat_state, | |
| ) | |
| reward = reward_breakdown.total | |
| summary = self._summary_restore(metrics, action_monitor) | |
| self._state.history.append(summary) | |
| self._state.total_reward = round(self._state.total_reward + reward, 4) | |
| self._last_metrics = metrics | |
| if not metrics.evaluation_failed: | |
| self._last_successful_metrics = metrics | |
| self._state.episode_done = done | |
| return self._build_observation( | |
| metrics, | |
| action_summary=summary, | |
| reward=reward, | |
| reward_breakdown=reward_breakdown, | |
| action_monitor=action_monitor, | |
| done=done, | |
| ) | |
| def _handle_invalid_run(self) -> StellaratorObservation: | |
| self._state.budget_remaining -= 1 | |
| metrics = self._last_metrics or self._evaluate_params( | |
| self._state.current_params, | |
| fidelity="low", | |
| ) | |
| done = self._state.budget_remaining <= 0 | |
| summary = "Invalid run action: parameter, direction, and magnitude are required." | |
| self._state.history.append(summary) | |
| reward_breakdown = RewardBreakdown(intent="run", invalid_action_penalty=-1.0, total=-1.0) | |
| action_monitor = self._action_monitor_for_no_op(StellaratorAction(intent="run")) | |
| self._state.total_reward = round(self._state.total_reward - 1.0, 4) | |
| self._state.episode_done = done | |
| return self._build_observation( | |
| metrics, | |
| action_summary=summary, | |
| reward=-1.0, | |
| reward_breakdown=reward_breakdown, | |
| action_monitor=action_monitor, | |
| done=done, | |
| ) | |
| def _compute_reward_breakdown( | |
| self, | |
| metrics: EvaluationMetrics, | |
| intent: ActionIntent, | |
| done: bool, | |
| magnitude: MagnitudeName | None = None, | |
| initial_reference_score: float | None = None, | |
| reference_metrics: EvaluationMetrics | None = None, | |
| best_low_fidelity_feasibility_before: float | None = None, | |
| best_low_fidelity_score_before: float | None = None, | |
| step_improved: bool = False, | |
| no_progress_steps: int = 0, | |
| repeat_state: bool = False, | |
| ) -> RewardBreakdown: | |
| recovered_from_failure = self._recovered_from_failed_evaluation(metrics) | |
| previous_metrics = reference_metrics or self._reference_metrics(metrics) | |
| best_low_fidelity_feasibility_before = ( | |
| self._state.best_low_fidelity_feasibility | |
| if best_low_fidelity_feasibility_before is None | |
| else best_low_fidelity_feasibility_before | |
| ) | |
| best_low_fidelity_score_before = ( | |
| self._state.best_low_fidelity_score | |
| if best_low_fidelity_score_before is None | |
| else best_low_fidelity_score_before | |
| ) | |
| breakdown = RewardBreakdown( | |
| intent=intent, | |
| evaluation_failed=metrics.evaluation_failed, | |
| recovered_from_failure=recovered_from_failure, | |
| reference_constraints_satisfied=previous_metrics.constraints_satisfied, | |
| reference_score=previous_metrics.p1_score, | |
| reference_feasibility=previous_metrics.p1_feasibility, | |
| reference_max_elongation=previous_metrics.max_elongation, | |
| initial_reference_score=initial_reference_score, | |
| ) | |
| self._apply_step_penalties( | |
| breakdown, | |
| intent=intent, | |
| magnitude=magnitude, | |
| no_progress_steps=no_progress_steps, | |
| repeat_state=repeat_state, | |
| step_improved=step_improved, | |
| ) | |
| if metrics.evaluation_failed: | |
| breakdown.failure_penalty = FAILURE_PENALTY | |
| if intent == "submit": | |
| breakdown.failure_submit_penalty = -1.0 | |
| elif done: | |
| breakdown.failure_budget_penalty = -0.5 | |
| breakdown.total = self._reward_total(breakdown) | |
| return breakdown | |
| if metrics.constraints_satisfied and not previous_metrics.constraints_satisfied: | |
| breakdown.feasibility_crossing_bonus = 3.0 | |
| if previous_metrics.constraints_satisfied and not metrics.constraints_satisfied: | |
| breakdown.feasibility_regression_penalty = -3.0 | |
| if ( | |
| previous_metrics.p1_feasibility > NEAR_FEASIBILITY_THRESHOLD | |
| and metrics.p1_feasibility <= NEAR_FEASIBILITY_THRESHOLD | |
| ): | |
| breakdown.near_feasible_bonus = NEAR_FEASIBILITY_BONUS | |
| if metrics.constraints_satisfied and previous_metrics.constraints_satisfied: | |
| breakdown.objective_delta_reward = ( | |
| previous_metrics.max_elongation - metrics.max_elongation | |
| ) * 10.0 | |
| if intent != "submit" and best_low_fidelity_feasibility_before <= FEASIBILITY_TOLERANCE: | |
| breakdown.best_score_bonus = ( | |
| max( | |
| 0.0, | |
| metrics.p1_score - best_low_fidelity_score_before, | |
| ) | |
| * BEST_SCORE_BONUS_WEIGHT | |
| ) | |
| else: | |
| breakdown.feasibility_delta_reward = ( | |
| previous_metrics.p1_feasibility - metrics.p1_feasibility | |
| ) * FEASIBILITY_DELTA_WEIGHT | |
| if ( | |
| intent != "submit" | |
| and not metrics.constraints_satisfied | |
| and best_low_fidelity_feasibility_before > FEASIBILITY_TOLERANCE | |
| ): | |
| breakdown.best_feasibility_bonus = ( | |
| max( | |
| 0.0, | |
| best_low_fidelity_feasibility_before - metrics.p1_feasibility, | |
| ) | |
| * BEST_FEASIBILITY_BONUS_WEIGHT | |
| ) | |
| breakdown.triangularity_repair_reward = ( | |
| previous_metrics.triangularity_violation - metrics.triangularity_violation | |
| ) * TRIANGULARITY_REPAIR_WEIGHT | |
| breakdown.aspect_ratio_repair_reward = ( | |
| previous_metrics.aspect_ratio_violation - metrics.aspect_ratio_violation | |
| ) * ASPECT_RATIO_REPAIR_WEIGHT | |
| breakdown.iota_repair_reward = ( | |
| previous_metrics.iota_violation - metrics.iota_violation | |
| ) * IOTA_REPAIR_WEIGHT | |
| if recovered_from_failure: | |
| breakdown.recovery_bonus = 1.0 | |
| if intent == "submit" or done: | |
| base_score = ( | |
| initial_reference_score | |
| if initial_reference_score is not None | |
| else self._state.initial_low_fidelity_score | |
| ) | |
| breakdown.initial_reference_score = base_score | |
| improved = metrics.constraints_satisfied and metrics.p1_score > base_score | |
| if improved: | |
| ratio = (metrics.p1_score - base_score) / max(1.0 - base_score, 1e-6) | |
| breakdown.terminal_score_ratio = ratio | |
| if intent == "submit": | |
| breakdown.terminal_improvement_bonus = 5.0 * ratio | |
| breakdown.terminal_budget_bonus = ( | |
| self._state.budget_remaining / self._state.budget_total | |
| ) | |
| else: | |
| breakdown.terminal_improvement_bonus = 2.0 * ratio | |
| else: | |
| breakdown.terminal_no_improvement_penalty = -1.0 if intent == "submit" else -0.5 | |
| breakdown.total = self._reward_total(breakdown) | |
| return breakdown | |
| def _build_observation( | |
| self, | |
| metrics: EvaluationMetrics, | |
| action_summary: str, | |
| reward: float | None = None, | |
| reward_breakdown: RewardBreakdown | None = None, | |
| action_monitor: ActionMonitor | None = None, | |
| done: bool = False, | |
| ) -> StellaratorObservation: | |
| reward_breakdown = reward_breakdown or RewardBreakdown() | |
| action_monitor = action_monitor or self._action_monitor_for_no_op( | |
| StellaratorAction(intent="run") | |
| ) | |
| best_low_fidelity_score = self._state.best_low_fidelity_score | |
| best_low_fidelity_feasibility = self._state.best_low_fidelity_feasibility | |
| trajectory_summary = self._trajectory_summary() | |
| text_lines = [ | |
| action_summary, | |
| "", | |
| f"evaluation_fidelity={metrics.evaluation_fidelity}", | |
| f"evaluation_status={'FAILED' if metrics.evaluation_failed else 'OK'}", | |
| ] | |
| if metrics.evaluation_failed: | |
| text_lines.append(f"failure_reason={metrics.failure_reason}") | |
| text_lines.extend( | |
| [ | |
| f"max_elongation={metrics.max_elongation:.4f}", | |
| f"aspect_ratio={metrics.aspect_ratio:.4f} (<= {ASPECT_RATIO_MAX:.1f})", | |
| f"average_triangularity={metrics.average_triangularity:.4f} (<= {AVERAGE_TRIANGULARITY_MAX:.1f})", | |
| ( | |
| "edge_iota_over_nfp=" | |
| f"{metrics.edge_iota_over_nfp:.4f} (abs(.) >= {EDGE_IOTA_OVER_NFP_MIN:.1f})" | |
| ), | |
| f"feasibility={metrics.p1_feasibility:.6f}", | |
| f"aspect_ratio_violation={metrics.aspect_ratio_violation:.6f}", | |
| f"triangularity_violation={metrics.triangularity_violation:.6f}", | |
| f"iota_violation={metrics.iota_violation:.6f}", | |
| f"dominant_constraint={metrics.dominant_constraint}", | |
| f"best_low_fidelity_score={best_low_fidelity_score:.6f}", | |
| f"best_low_fidelity_feasibility={best_low_fidelity_feasibility:.6f}", | |
| f"no_progress_steps={self._state.no_progress_steps}", | |
| f"vacuum_well={metrics.vacuum_well:.4f}", | |
| f"constraints={'SATISFIED' if metrics.constraints_satisfied else 'VIOLATED'}", | |
| f"step={self._state.step_count} | budget={self._state.budget_remaining}/{self._state.budget_total}", | |
| f"reward_total={reward_breakdown.total:+.4f}", | |
| f"reward_terms={self._reward_terms_text(reward_breakdown)}", | |
| f"action_clamped={action_monitor.clamped}", | |
| f"action_no_op={action_monitor.no_op}", | |
| f"action_repeat_state={action_monitor.repeat_state}", | |
| f"episode_total_reward={self._state.total_reward:+.4f}", | |
| ] | |
| ) | |
| return StellaratorObservation( | |
| diagnostics_text="\n".join(text_lines), | |
| max_elongation=metrics.max_elongation, | |
| aspect_ratio=metrics.aspect_ratio, | |
| average_triangularity=metrics.average_triangularity, | |
| edge_iota_over_nfp=metrics.edge_iota_over_nfp, | |
| aspect_ratio_violation=metrics.aspect_ratio_violation, | |
| triangularity_violation=metrics.triangularity_violation, | |
| iota_violation=metrics.iota_violation, | |
| dominant_constraint=metrics.dominant_constraint, | |
| p1_score=metrics.p1_score, | |
| p1_feasibility=metrics.p1_feasibility, | |
| vacuum_well=metrics.vacuum_well, | |
| evaluation_fidelity=metrics.evaluation_fidelity, | |
| evaluation_failed=metrics.evaluation_failed, | |
| failure_reason=metrics.failure_reason, | |
| step_number=self._state.step_count, | |
| budget_remaining=self._state.budget_remaining, | |
| no_progress_steps=self._state.no_progress_steps, | |
| best_low_fidelity_score=best_low_fidelity_score, | |
| best_low_fidelity_feasibility=best_low_fidelity_feasibility, | |
| constraints_satisfied=metrics.constraints_satisfied, | |
| target_spec=TARGET_SPEC, | |
| reward=reward, | |
| reward_breakdown=reward_breakdown, | |
| action_monitor=action_monitor, | |
| episode_total_reward=self._state.total_reward, | |
| trajectory_summary=trajectory_summary, | |
| done=done, | |
| ) | |
| def _summary_run( | |
| self, | |
| action: StellaratorAction, | |
| metrics: EvaluationMetrics, | |
| action_monitor: ActionMonitor, | |
| ) -> str: | |
| assert action.parameter is not None | |
| assert action.direction is not None | |
| assert action.magnitude is not None | |
| action_note = self._action_monitor_note(action_monitor) | |
| if metrics.evaluation_failed: | |
| return ( | |
| f"Applied {action.parameter} {action.direction} {action.magnitude}. " | |
| f"{action_note}" | |
| f"Low-fidelity evaluation failed: {metrics.failure_reason}" | |
| ) | |
| if self._recovered_from_failed_evaluation(metrics): | |
| return ( | |
| f"Applied {action.parameter} {action.direction} {action.magnitude}. " | |
| f"{action_note}" | |
| "Low-fidelity evaluation recovered from the previous failed evaluation. " | |
| f"feasibility={metrics.p1_feasibility:.6f}." | |
| ) | |
| previous_metrics = self._reference_metrics(metrics) | |
| if metrics.constraints_satisfied and previous_metrics.constraints_satisfied: | |
| delta = previous_metrics.max_elongation - metrics.max_elongation | |
| objective_summary = ( | |
| f"max_elongation changed by {delta:+.4f} to {metrics.max_elongation:.4f}." | |
| ) | |
| else: | |
| delta = previous_metrics.p1_feasibility - metrics.p1_feasibility | |
| objective_summary = ( | |
| f"feasibility changed by {delta:+.6f} to {metrics.p1_feasibility:.6f}; " | |
| f"dominant_constraint={metrics.dominant_constraint}." | |
| ) | |
| return ( | |
| f"Applied {action.parameter} {action.direction} {action.magnitude}. " | |
| f"{action_note}" | |
| f"Low-fidelity evaluation. {objective_summary}" | |
| ) | |
| def _summary_submit( | |
| self, | |
| metrics: EvaluationMetrics, | |
| ) -> str: | |
| if metrics.evaluation_failed: | |
| return f"Submit failed during low-fidelity evaluation: {metrics.failure_reason}" | |
| return ( | |
| f"Submitted current_score={metrics.p1_score:.6f}, " | |
| f"best_score={self._state.best_low_fidelity_score:.6f}, " | |
| f"best_feasibility={self._state.best_low_fidelity_feasibility:.6f}, " | |
| f"constraints={'SATISFIED' if metrics.constraints_satisfied else 'VIOLATED'}." | |
| ) | |
| def _summary_restore(self, metrics: EvaluationMetrics, action_monitor: ActionMonitor) -> str: | |
| action_note = self._action_monitor_note(action_monitor) | |
| if metrics.evaluation_failed: | |
| return ( | |
| f"Restore-best failed during low-fidelity evaluation: {metrics.failure_reason}. " | |
| f"{action_note}" | |
| ) | |
| return ( | |
| "Restored the best-known design. " | |
| f"{action_note}" | |
| f"Low-fidelity score={metrics.p1_score:.6f}, feasibility={metrics.p1_feasibility:.6f}." | |
| ) | |
| def _initial_params(self, seed: int | None) -> LowDimBoundaryParams: | |
| if seed is None: | |
| return RESET_SEEDS[0] | |
| return RESET_SEEDS[seed % len(RESET_SEEDS)] | |
| def _apply_action( | |
| self, | |
| params: LowDimBoundaryParams, | |
| parameter: str, | |
| direction: str, | |
| magnitude: str, | |
| ) -> tuple[LowDimBoundaryParams, bool, bool]: | |
| delta = PARAMETER_DELTAS[parameter][magnitude] | |
| signed_delta = delta if direction == "increase" else -delta | |
| current_value = getattr(params, parameter) | |
| requested_value = current_value + signed_delta | |
| next_value = self._clamp( | |
| requested_value, | |
| parameter=parameter, | |
| ) | |
| next_values = params.model_dump() | |
| next_values[parameter] = next_value | |
| clamped = next_value != requested_value | |
| no_op = next_value == current_value | |
| return LowDimBoundaryParams.model_validate(next_values), clamped, no_op | |
| def _clamp(self, value: float, *, parameter: str) -> float: | |
| lower, upper = PARAMETER_RANGES[parameter] | |
| return min(max(value, lower), upper) | |
| def _evaluate_params( | |
| self, | |
| params: LowDimBoundaryParams, | |
| *, | |
| fidelity: str, | |
| ) -> EvaluationMetrics: | |
| boundary = build_boundary_from_params( | |
| params, | |
| n_field_periods=N_FIELD_PERIODS, | |
| ) | |
| return evaluate_boundary(boundary, fidelity=fidelity) | |
| def _reference_metrics(self, fallback: EvaluationMetrics) -> EvaluationMetrics: | |
| if self._last_metrics is not None and not self._last_metrics.evaluation_failed: | |
| return self._last_metrics | |
| if self._last_successful_metrics is not None: | |
| return self._last_successful_metrics | |
| return fallback | |
| def _best_low_fidelity_snapshot(self) -> tuple[float, float]: | |
| return ( | |
| self._state.best_low_fidelity_feasibility, | |
| self._state.best_low_fidelity_score, | |
| ) | |
| def _advance_low_fidelity_progress( | |
| self, | |
| params: LowDimBoundaryParams, | |
| metrics: EvaluationMetrics, | |
| ) -> tuple[float, float, bool, int]: | |
| best_low_fidelity_feasibility_before, best_low_fidelity_score_before = ( | |
| self._best_low_fidelity_snapshot() | |
| ) | |
| step_improved = self._is_better_than_reference( | |
| metrics, | |
| self._previous_step_metrics(metrics), | |
| ) | |
| self._update_best(params, metrics) | |
| no_progress_steps = self._advance_no_progress(step_improved=step_improved) | |
| self._record_visited_state(params) | |
| return ( | |
| best_low_fidelity_feasibility_before, | |
| best_low_fidelity_score_before, | |
| step_improved, | |
| no_progress_steps, | |
| ) | |
| def _previous_step_metrics(self, fallback: EvaluationMetrics) -> EvaluationMetrics: | |
| if self._last_metrics is not None: | |
| return self._last_metrics | |
| return fallback | |
| def _recovered_from_failed_evaluation(self, metrics: EvaluationMetrics) -> bool: | |
| return ( | |
| not metrics.evaluation_failed | |
| and self._last_metrics is not None | |
| and self._last_metrics.evaluation_failed | |
| ) | |
| def _build_action_monitor( | |
| self, | |
| *, | |
| action: StellaratorAction, | |
| params_before: LowDimBoundaryParams, | |
| params_after: LowDimBoundaryParams, | |
| clamped: bool = False, | |
| no_op: bool = False, | |
| repeat_state: bool = False, | |
| used_best_params: bool = False, | |
| ) -> ActionMonitor: | |
| return ActionMonitor( | |
| intent=action.intent, | |
| parameter=action.parameter, | |
| direction=action.direction, | |
| magnitude=action.magnitude, | |
| params_before=params_before, | |
| params_after=params_after, | |
| clamped=clamped, | |
| no_op=no_op, | |
| repeat_state=repeat_state, | |
| used_best_params=used_best_params, | |
| ) | |
| def _action_monitor_for_no_op(self, action: StellaratorAction) -> ActionMonitor: | |
| params = self._state.current_params | |
| return self._build_action_monitor( | |
| action=action, | |
| params_before=params, | |
| params_after=params, | |
| no_op=True, | |
| ) | |
| def _action_monitor_note(self, action_monitor: ActionMonitor) -> str: | |
| if action_monitor.used_best_params and action_monitor.no_op: | |
| return "Already at the best-known design. " | |
| if action_monitor.no_op: | |
| return "The requested move hit a parameter bound and produced no state change. " | |
| if action_monitor.clamped: | |
| return "The requested move was clipped to stay inside the allowed parameter range. " | |
| return "" | |
| def _apply_step_penalties( | |
| self, | |
| breakdown: RewardBreakdown, | |
| *, | |
| intent: ActionIntent, | |
| magnitude: MagnitudeName | None, | |
| no_progress_steps: int, | |
| repeat_state: bool, | |
| step_improved: bool, | |
| ) -> None: | |
| if intent == "submit": | |
| return | |
| breakdown.step_cost = self._step_cost(intent=intent, magnitude=magnitude) | |
| if intent == "run" and no_progress_steps >= NO_PROGRESS_STEP_THRESHOLD: | |
| breakdown.no_progress_penalty = NO_PROGRESS_PENALTY | |
| if intent == "run" and repeat_state and not step_improved: | |
| breakdown.repeat_state_penalty = REPEAT_STATE_PENALTY | |
| def _step_cost(self, *, intent: ActionIntent, magnitude: MagnitudeName | None) -> float: | |
| if intent == "restore_best": | |
| return RESTORE_STEP_COST | |
| if magnitude is None: | |
| return STEP_COST_BY_MAGNITUDE["medium"] | |
| return STEP_COST_BY_MAGNITUDE[magnitude] | |
| def _reward_total(self, breakdown: RewardBreakdown) -> float: | |
| total = ( | |
| breakdown.invalid_action_penalty | |
| + breakdown.failure_penalty | |
| + breakdown.failure_submit_penalty | |
| + breakdown.failure_budget_penalty | |
| + breakdown.feasibility_crossing_bonus | |
| + breakdown.feasibility_regression_penalty | |
| + breakdown.feasibility_delta_reward | |
| + breakdown.best_feasibility_bonus | |
| + breakdown.near_feasible_bonus | |
| + breakdown.aspect_ratio_repair_reward | |
| + breakdown.triangularity_repair_reward | |
| + breakdown.iota_repair_reward | |
| + breakdown.objective_delta_reward | |
| + breakdown.best_score_bonus | |
| + breakdown.step_cost | |
| + breakdown.no_progress_penalty | |
| + breakdown.repeat_state_penalty | |
| + breakdown.recovery_bonus | |
| + breakdown.terminal_improvement_bonus | |
| + breakdown.terminal_budget_bonus | |
| + breakdown.terminal_no_improvement_penalty | |
| ) | |
| return round(total, 4) | |
| def _reward_terms_text(self, breakdown: RewardBreakdown) -> str: | |
| terms = [ | |
| ("invalid_action_penalty", breakdown.invalid_action_penalty), | |
| ("failure_penalty", breakdown.failure_penalty), | |
| ("failure_submit_penalty", breakdown.failure_submit_penalty), | |
| ("failure_budget_penalty", breakdown.failure_budget_penalty), | |
| ("feasibility_crossing_bonus", breakdown.feasibility_crossing_bonus), | |
| ("feasibility_regression_penalty", breakdown.feasibility_regression_penalty), | |
| ("feasibility_delta_reward", breakdown.feasibility_delta_reward), | |
| ("best_feasibility_bonus", breakdown.best_feasibility_bonus), | |
| ("near_feasible_bonus", breakdown.near_feasible_bonus), | |
| ("aspect_ratio_repair_reward", breakdown.aspect_ratio_repair_reward), | |
| ("triangularity_repair_reward", breakdown.triangularity_repair_reward), | |
| ("iota_repair_reward", breakdown.iota_repair_reward), | |
| ("objective_delta_reward", breakdown.objective_delta_reward), | |
| ("best_score_bonus", breakdown.best_score_bonus), | |
| ("step_cost", breakdown.step_cost), | |
| ("no_progress_penalty", breakdown.no_progress_penalty), | |
| ("repeat_state_penalty", breakdown.repeat_state_penalty), | |
| ("recovery_bonus", breakdown.recovery_bonus), | |
| ("terminal_improvement_bonus", breakdown.terminal_improvement_bonus), | |
| ("terminal_budget_bonus", breakdown.terminal_budget_bonus), | |
| ("terminal_no_improvement_penalty", breakdown.terminal_no_improvement_penalty), | |
| ] | |
| non_zero_terms = [f"{name}={value:+.4f}" for name, value in terms if value != 0.0] | |
| if not non_zero_terms: | |
| return "none" | |
| return ", ".join(non_zero_terms) | |
| def _trajectory_summary(self) -> str: | |
| if not self._state.history: | |
| return "No actions taken yet." | |
| return " | ".join(self._state.history) | |
| def _update_best(self, params: LowDimBoundaryParams, metrics: EvaluationMetrics) -> None: | |
| if metrics.evaluation_failed: | |
| return | |
| if self._is_better_than_best( | |
| metrics, | |
| best_low_fidelity_feasibility=self._state.best_low_fidelity_feasibility, | |
| best_low_fidelity_score=self._state.best_low_fidelity_score, | |
| ): | |
| self._state.best_params = params | |
| self._state.best_low_fidelity_score = metrics.p1_score | |
| self._state.best_low_fidelity_feasibility = metrics.p1_feasibility | |
| def _is_better_than_best( | |
| self, | |
| metrics: EvaluationMetrics, | |
| *, | |
| best_low_fidelity_feasibility: float, | |
| best_low_fidelity_score: float, | |
| ) -> bool: | |
| current = ( | |
| (1, metrics.p1_score) if metrics.constraints_satisfied else (0, -metrics.p1_feasibility) | |
| ) | |
| best = ( | |
| (1, best_low_fidelity_score) | |
| if best_low_fidelity_feasibility <= FEASIBILITY_TOLERANCE | |
| else (0, -best_low_fidelity_feasibility) | |
| ) | |
| return current > best | |
| def _is_better_than_reference( | |
| self, | |
| metrics: EvaluationMetrics, | |
| reference_metrics: EvaluationMetrics, | |
| ) -> bool: | |
| return self._metrics_rank(metrics) > self._metrics_rank(reference_metrics) | |
| def _metrics_rank(self, metrics: EvaluationMetrics) -> tuple[int, float]: | |
| if metrics.evaluation_failed: | |
| return (-1, float("-inf")) | |
| if metrics.constraints_satisfied: | |
| return (1, metrics.p1_score) | |
| return (0, -metrics.p1_feasibility) | |
| def _advance_no_progress(self, *, step_improved: bool) -> int: | |
| if step_improved: | |
| self._state.no_progress_steps = 0 | |
| else: | |
| self._state.no_progress_steps += 1 | |
| return self._state.no_progress_steps | |
| def _is_repeat_state(self, params: LowDimBoundaryParams) -> bool: | |
| return self._state_key(params) in self._state.visited_state_keys | |
| def _record_visited_state(self, params: LowDimBoundaryParams) -> None: | |
| self._state.visited_state_keys.append(self._state_key(params)) | |
| def _state_key(self, params: LowDimBoundaryParams) -> str: | |
| return ( | |
| f"{params.aspect_ratio:.6f}|{params.elongation:.6f}|" | |
| f"{params.rotational_transform:.6f}|{params.triangularity_scale:.6f}" | |
| ) | |