my_space / client.py
Aditya6263's picture
Upload folder using huggingface_hub
367cb1e verified
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""My Env Environment Client."""
import os
import sys
# Ensure the 'my_env' directory is in sys.path
_current_dir = os.path.dirname(os.path.abspath(__file__))
if _current_dir not in sys.path:
sys.path.insert(0, _current_dir)
from typing import Dict
from openenv.core import EnvClient
from openenv.core.client_types import StepResult
from models import MyAction, MyObservation, MyState
class MyEnv(
EnvClient[MyAction, MyObservation, MyState]
):
def _step_payload(self, action: MyAction) -> Dict:
"""
Convert MyAction to JSON payload for step message.
Args:
action: MyAction instance
Returns:
Dictionary representation suitable for JSON encoding
"""
return {
'action_type':action.action_type,
'path':action.path,
'content':action.content
}
def _parse_result(self, payload: Dict) -> StepResult[MyObservation]:
"""
Parse server response into StepResult[MyObservation].
Args:
payload: JSON response data from server
Returns:
StepResult with MyObservation
"""
obs_data = payload.get("observation", {})
observation = MyObservation(
task_type=obs_data.get("task_type", None),
task_description=obs_data.get("task_description", None),
cwd=obs_data.get("cwd", None),
cwd_contents=obs_data.get("cwd_contents", []),
file_contents=obs_data.get("file_contents", None),
message=obs_data.get("message", None),
remaining_steps=obs_data.get("remaining_steps", None),
available_actions=obs_data.get("available_actions", []),
done=payload.get("done", False),
reward=payload.get("reward", 0.0),
)
return StepResult(
observation=observation,
reward=payload.get("reward"),
done=payload.get("done", False),
)
def _parse_state(self, payload: Dict) -> MyState:
"""
Parse server response into State object.
Args:
payload: JSON response from state request
Returns:
State object with episode_id and step_count
"""
return MyState(
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
cwd=payload.get("cwd", None),
max_steps=payload.get("max_steps", 0),
task_type=payload.get("task_type", None),
all_task_details=payload.get("all_task_details", {}),
dir_structure=payload.get("dir_structure", None),
cwd_contents=payload.get("cwd_contents", []),
file_contents=payload.get("file_contents", None),
)
def grade(self, episode_id: str) -> float:
"""
Fetch the normalized grade (0.0 – 1.0) for a given episode.
Calls GET /grade?episode_id=<episode_id> on the server.
Can be called at any point after reset() — reflects live task
completion + efficiency snapshot.
Args:
episode_id: The episode_id passed to reset().
Returns:
float in [0.0, 1.0]
Raises:
RuntimeError: If the server returns a non-200 status.
"""
import httpx
# EnvClient stores _ws_url ("ws://host:port/ws"); convert to HTTP base
http_base = (
self._ws_url
.replace("wss://", "https://")
.replace("ws://", "http://")
)
if http_base.endswith("/ws"):
http_base = http_base[:-3]
url = f"{http_base}/grade"
response = httpx.get(url, params={"episode_id": episode_id}, timeout=10.0)
if response.status_code == 404:
raise RuntimeError(
f"No episode found for episode_id '{episode_id}'. "
"Did you call reset() first?"
)
if response.status_code != 200:
raise RuntimeError(
f"Grade request failed (HTTP {response.status_code}): {response.text}"
)
return float(response.json()["grade"])