Spaces:
Sleeping
Sleeping
| """OpenEnv-compatible FastAPI server exposing DarkGuard.""" | |
| from __future__ import annotations | |
| import threading | |
| from openenv.core.env_server.http_server import create_app | |
| from openenv.core.env_server.interfaces import Environment | |
| from darkguard_openenv.environment import DarkGuardEnvironment | |
| from darkguard_openenv.models import ( | |
| DarkGuardOpenEnvAction, | |
| DarkGuardOpenEnvObservation, | |
| DarkGuardOpenEnvState, | |
| ) | |
| class DarkGuardOpenEnvAdapter(Environment[DarkGuardOpenEnvAction, DarkGuardOpenEnvObservation, DarkGuardOpenEnvState]): | |
| """Adapter wrapping dict-based core env into OpenEnv Environment interface.""" | |
| SUPPORTS_CONCURRENT_SESSIONS: bool = True | |
| _shared_env = DarkGuardEnvironment() | |
| _shared_lock = threading.RLock() | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self._env = self._shared_env | |
| def reset(self, seed: int | None = None, episode_id: str | None = None, **kwargs: object) -> DarkGuardOpenEnvObservation: | |
| payload = dict(kwargs) | |
| if seed is not None: | |
| payload["seed"] = seed | |
| if episode_id is not None: | |
| payload["episode_id"] = episode_id | |
| with self._shared_lock: | |
| obs = self._env.reset(**payload) | |
| return DarkGuardOpenEnvObservation(**obs) | |
| def step( | |
| self, | |
| action: DarkGuardOpenEnvAction, | |
| timeout_s: float | None = None, | |
| **kwargs: object, | |
| ) -> DarkGuardOpenEnvObservation: | |
| _ = timeout_s, kwargs | |
| action_payload = action.model_dump(exclude_none=True) | |
| # OpenEnv may include auxiliary metadata on action objects; DarkGuard | |
| # validator is strict and should only see canonical action fields. | |
| action_payload.pop("metadata", None) | |
| with self._shared_lock: | |
| try: | |
| obs = self._env.step(action_payload) | |
| except RuntimeError: | |
| # Defensive fallback for stateless request paths. | |
| self._env.reset() | |
| obs = self._env.step(action_payload) | |
| return DarkGuardOpenEnvObservation(**obs) | |
| def state(self) -> DarkGuardOpenEnvState: | |
| with self._shared_lock: | |
| return DarkGuardOpenEnvState(**self._env.state()) | |
| app = create_app( | |
| env=DarkGuardOpenEnvAdapter, | |
| action_cls=DarkGuardOpenEnvAction, | |
| observation_cls=DarkGuardOpenEnvObservation, | |
| env_name="darkguard-openenv", | |
| ) | |