Spaces:
Paused
Paused
| """ | |
| PERMANENCE — OpenEnv-compatible client. | |
| Uses ``openenv.core.SyncEnvClient`` for typed, WebSocket-based | |
| communication with a running PERMANENCE server. | |
| Usage: | |
| from client import PermanenceEnvClient | |
| from models import PermanenceAction | |
| client = PermanenceEnvClient("http://localhost:7860") | |
| obs = client.reset() | |
| obs = client.step(PermanenceAction(text="<action id='draft_internal_memo'/>...")) | |
| print(obs.text, obs.reward, obs.done) | |
| """ | |
| from __future__ import annotations | |
| import os | |
| from typing import Optional | |
| from openenv.core import SyncEnvClient | |
| from models import PermanenceAction, PermanenceObservation, PermanenceState | |
| DEFAULT_ENV_URL = os.getenv( | |
| "PERMANENCE_ENV_URL", | |
| "https://chane35-permanence.hf.space", | |
| ) | |
| class PermanenceEnvClient(SyncEnvClient[PermanenceAction, PermanenceObservation, PermanenceState]): | |
| """ | |
| Typed OpenEnv client for the PERMANENCE environment. | |
| Connects to a running PERMANENCE server and provides typed | |
| ``reset()``, ``step()``, and ``state`` access. | |
| """ | |
| action_type = PermanenceAction | |
| observation_type = PermanenceObservation | |
| state_type = PermanenceState | |
| def __init__(self, base_url: str = DEFAULT_ENV_URL): | |
| super().__init__(base_url=base_url) | |