# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """LeanMigrate Environment Client.""" from typing import Dict from openenv.core import EnvClient from openenv.core.client_types import StepResult from openenv.core.env_server.types import State from .env.models import LeanMigrateAction, LeanMigrateObservation class LeanMigrateEnv(EnvClient[LeanMigrateAction, LeanMigrateObservation, State]): """ Client for the Lean Migrate Environment. This client maintains a persistent WebSocket connection to the environment server, enabling efficient multi-step interactions with lower latency. Each client instance has its own dedicated environment session on the server. Example: >>> # Connect to a running server >>> with LeanMigrateEnv(base_url="http://localhost:8000") as client: ... result = client.reset() ... print(result.observation.echoed_message) ... ... result = client.step(LeanMigrateAction(message="Hello!")) ... print(result.observation.echoed_message) Example with Docker: >>> # Automatically start container and connect >>> client = LeanMigrateEnv.from_docker_image("lean_migrate-env:latest") >>> try: ... result = client.reset() ... result = client.step(LeanMigrateAction(message="Test")) ... finally: ... client.close() """ def _step_payload(self, action: LeanMigrateAction) -> Dict: return action.model_dump() def _parse_result(self, payload: Dict) -> StepResult[LeanMigrateObservation]: obs_data = payload.get("observation", {}) observation = LeanMigrateObservation.model_validate(obs_data) return StepResult( observation=observation, reward=payload.get("reward"), done=payload.get("done", False), ) def _parse_state(self, payload: Dict) -> State: return State.model_validate(payload)