# Copyright (c) Meta Platforms, Inc. and affiliates. # All rights reserved. # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. """My Env Environment Client.""" import os import sys # Ensure the 'my_env' directory is in sys.path _current_dir = os.path.dirname(os.path.abspath(__file__)) if _current_dir not in sys.path: sys.path.insert(0, _current_dir) from typing import Dict from openenv.core import EnvClient from openenv.core.client_types import StepResult from models import MyAction, MyObservation, MyState class MyEnv( EnvClient[MyAction, MyObservation, MyState] ): def _step_payload(self, action: MyAction) -> Dict: """ Convert MyAction to JSON payload for step message. Args: action: MyAction instance Returns: Dictionary representation suitable for JSON encoding """ return { 'action_type':action.action_type, 'path':action.path, 'content':action.content } def _parse_result(self, payload: Dict) -> StepResult[MyObservation]: """ Parse server response into StepResult[MyObservation]. Args: payload: JSON response data from server Returns: StepResult with MyObservation """ obs_data = payload.get("observation", {}) observation = MyObservation( task_type=obs_data.get("task_type", None), task_description=obs_data.get("task_description", None), cwd=obs_data.get("cwd", None), cwd_contents=obs_data.get("cwd_contents", []), file_contents=obs_data.get("file_contents", None), message=obs_data.get("message", None), remaining_steps=obs_data.get("remaining_steps", None), available_actions=obs_data.get("available_actions", []), done=payload.get("done", False), reward=payload.get("reward", 0.0), ) return StepResult( observation=observation, reward=payload.get("reward"), done=payload.get("done", False), ) def _parse_state(self, payload: Dict) -> MyState: """ Parse server response into State object. Args: payload: JSON response from state request Returns: State object with episode_id and step_count """ return MyState( episode_id=payload.get("episode_id"), step_count=payload.get("step_count", 0), cwd=payload.get("cwd", None), max_steps=payload.get("max_steps", 0), task_type=payload.get("task_type", None), all_task_details=payload.get("all_task_details", {}), dir_structure=payload.get("dir_structure", None), cwd_contents=payload.get("cwd_contents", []), file_contents=payload.get("file_contents", None), ) def grade(self, episode_id: str) -> float: """ Fetch the normalized grade (0.0 – 1.0) for a given episode. Calls GET /grade?episode_id= on the server. Can be called at any point after reset() — reflects live task completion + efficiency snapshot. Args: episode_id: The episode_id passed to reset(). Returns: float in [0.0, 1.0] Raises: RuntimeError: If the server returns a non-200 status. """ import httpx # EnvClient stores _ws_url ("ws://host:port/ws"); convert to HTTP base http_base = ( self._ws_url .replace("wss://", "https://") .replace("ws://", "http://") ) if http_base.endswith("/ws"): http_base = http_base[:-3] url = f"{http_base}/grade" response = httpx.get(url, params={"episode_id": episode_id}, timeout=10.0) if response.status_code == 404: raise RuntimeError( f"No episode found for episode_id '{episode_id}'. " "Did you call reset() first?" ) if response.status_code != 200: raise RuntimeError( f"Grade request failed (HTTP {response.status_code}): {response.text}" ) return float(response.json()["grade"])