| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | """My Env Environment Client."""
|
| |
|
| | from typing import Dict
|
| |
|
| | from openenv.core.client_types import StepResult
|
| | from openenv.core.env_server.types import State
|
| | from openenv.core import EnvClient
|
| |
|
| | from .models import MyAction, MyObservation
|
| |
|
| |
|
| | class MyEnv(
|
| | EnvClient[MyAction, MyObservation]
|
| | ):
|
| | """
|
| | Client for the My Env Environment.
|
| |
|
| | This client maintains a persistent WebSocket connection to the environment server,
|
| | enabling efficient multi-step interactions with lower latency.
|
| | Each client instance has its own dedicated environment session on the server.
|
| |
|
| | Example:
|
| | >>> # Connect to a running server
|
| | >>> with MyEnv(base_url="http://localhost:8000") as client:
|
| | ... result = client.reset()
|
| | ... print(result.observation.echoed_message)
|
| | ...
|
| | ... result = client.step(MyAction(message="Hello!"))
|
| | ... print(result.observation.echoed_message)
|
| |
|
| | Example with Docker:
|
| | >>> # Automatically start container and connect
|
| | >>> client = MyEnv.from_docker_image("my_env-env:latest")
|
| | >>> try:
|
| | ... result = client.reset()
|
| | ... result = client.step(MyAction(message="Test"))
|
| | ... finally:
|
| | ... client.close()
|
| | """
|
| |
|
| | def _step_payload(self, action: MyAction) -> Dict:
|
| | """
|
| | Convert MyAction to JSON payload for step message.
|
| |
|
| | Args:
|
| | action: MyAction instance
|
| |
|
| | Returns:
|
| | Dictionary representation suitable for JSON encoding
|
| | """
|
| | return {
|
| | "message": action.message,
|
| | }
|
| |
|
| | def _parse_result(self, payload: Dict) -> StepResult[MyObservation]:
|
| | """
|
| | Parse server response into StepResult[MyObservation].
|
| |
|
| | Args:
|
| | payload: JSON response data from server
|
| |
|
| | Returns:
|
| | StepResult with MyObservation
|
| | """
|
| | obs_data = payload.get("observation", {})
|
| | observation = MyObservation(
|
| | echoed_message=obs_data.get("echoed_message", ""),
|
| | message_length=obs_data.get("message_length", 0),
|
| | done=payload.get("done", False),
|
| | reward=payload.get("reward"),
|
| | metadata=obs_data.get("metadata", {}),
|
| | )
|
| |
|
| | return StepResult(
|
| | observation=observation,
|
| | reward=payload.get("reward"),
|
| | done=payload.get("done", False),
|
| | )
|
| |
|
| | def _parse_state(self, payload: Dict) -> State:
|
| | """
|
| | Parse server response into State object.
|
| |
|
| | Args:
|
| | payload: JSON response from state request
|
| |
|
| | Returns:
|
| | State object with episode_id and step_count
|
| | """
|
| | return State(
|
| | episode_id=payload.get("episode_id"),
|
| | step_count=payload.get("step_count", 0),
|
| | )
|
| |
|