Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """My Env Environment Client.""" | |
| import os | |
| import sys | |
| # Ensure the 'my_env' directory is in sys.path | |
| _current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| if _current_dir not in sys.path: | |
| sys.path.insert(0, _current_dir) | |
| from typing import Dict | |
| from openenv.core import EnvClient | |
| from openenv.core.client_types import StepResult | |
| from models import MyAction, MyObservation, MyState | |
| class MyEnv( | |
| EnvClient[MyAction, MyObservation, MyState] | |
| ): | |
| def _step_payload(self, action: MyAction) -> Dict: | |
| """ | |
| Convert MyAction to JSON payload for step message. | |
| Args: | |
| action: MyAction instance | |
| Returns: | |
| Dictionary representation suitable for JSON encoding | |
| """ | |
| return { | |
| 'action_type':action.action_type, | |
| 'path':action.path, | |
| 'content':action.content | |
| } | |
| def _parse_result(self, payload: Dict) -> StepResult[MyObservation]: | |
| """ | |
| Parse server response into StepResult[MyObservation]. | |
| Args: | |
| payload: JSON response data from server | |
| Returns: | |
| StepResult with MyObservation | |
| """ | |
| obs_data = payload.get("observation", {}) | |
| observation = MyObservation( | |
| task_type=obs_data.get("task_type", None), | |
| task_description=obs_data.get("task_description", None), | |
| cwd=obs_data.get("cwd", None), | |
| cwd_contents=obs_data.get("cwd_contents", []), | |
| file_contents=obs_data.get("file_contents", None), | |
| message=obs_data.get("message", None), | |
| remaining_steps=obs_data.get("remaining_steps", None), | |
| available_actions=obs_data.get("available_actions", []), | |
| done=payload.get("done", False), | |
| reward=payload.get("reward", 0.0), | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward"), | |
| done=payload.get("done", False), | |
| ) | |
| def _parse_state(self, payload: Dict) -> MyState: | |
| """ | |
| Parse server response into State object. | |
| Args: | |
| payload: JSON response from state request | |
| Returns: | |
| State object with episode_id and step_count | |
| """ | |
| return MyState( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| cwd=payload.get("cwd", None), | |
| max_steps=payload.get("max_steps", 0), | |
| task_type=payload.get("task_type", None), | |
| all_task_details=payload.get("all_task_details", {}), | |
| dir_structure=payload.get("dir_structure", None), | |
| cwd_contents=payload.get("cwd_contents", []), | |
| file_contents=payload.get("file_contents", None), | |
| ) | |
| def grade(self, episode_id: str) -> float: | |
| """ | |
| Fetch the normalized grade (0.0 – 1.0) for a given episode. | |
| Calls GET /grade?episode_id=<episode_id> on the server. | |
| Can be called at any point after reset() — reflects live task | |
| completion + efficiency snapshot. | |
| Args: | |
| episode_id: The episode_id passed to reset(). | |
| Returns: | |
| float in [0.0, 1.0] | |
| Raises: | |
| RuntimeError: If the server returns a non-200 status. | |
| """ | |
| import httpx | |
| # EnvClient stores _ws_url ("ws://host:port/ws"); convert to HTTP base | |
| http_base = ( | |
| self._ws_url | |
| .replace("wss://", "https://") | |
| .replace("ws://", "http://") | |
| ) | |
| if http_base.endswith("/ws"): | |
| http_base = http_base[:-3] | |
| url = f"{http_base}/grade" | |
| response = httpx.get(url, params={"episode_id": episode_id}, timeout=10.0) | |
| if response.status_code == 404: | |
| raise RuntimeError( | |
| f"No episode found for episode_id '{episode_id}'. " | |
| "Did you call reset() first?" | |
| ) | |
| if response.status_code != 200: | |
| raise RuntimeError( | |
| f"Grade request failed (HTTP {response.status_code}): {response.text}" | |
| ) | |
| return float(response.json()["grade"]) | |