| import gymnasium as gym | |
| from ..logging_utils import logger | |
| class FailAwareWrapper(gym.Wrapper): | |
| """ | |
| Uniformly catch all exception crashes (e.g. IK Fail) at the outermost layer. Convert thrown code Errors to status code info = {"status": "error"} and terminate the episode, ensuring all peripheral execution scripts do not need to write try-except manually. | |
| """ | |
| def __init__(self, env): | |
| super().__init__(env) | |
| self._last_obs = None | |
| def reset(self, **kwargs): | |
| obs, info = super().reset(**kwargs) | |
| self._last_obs = obs | |
| return obs, info | |
| def step(self, action): | |
| try: | |
| obs, reward, terminated, truncated, info = super().step(action) | |
| self._last_obs = obs | |
| return obs, reward, terminated, truncated, info | |
| except Exception as e: | |
| # Record exceptions for traceability and debugging | |
| logger.error(f"Environment execution interrupted due to exception: {str(e)}") | |
| # Directly trigger the terminated exit mechanism and inject an error flag into info | |
| return ( | |
| None, # no obs | |
| 0.0, # Punitive reward or maintain 0 | |
| True, # terminated: True, cut off loop | |
| False, # truncated | |
| { | |
| "status": "error", | |
| "error_message": f"FailAwareWrapper caught specific exception: {str(e)}", | |
| "exception_type": type(e).__name__ | |
| } | |
| ) | |