Spaces:
Sleeping
Sleeping
File size: 4,434 Bytes
09ec238 0fb8bd2 09ec238 0fb8bd2 09ec238 f913610 bbf592c 09ec238 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | # Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""Code Review Environment Client."""
from typing import Dict
from openenv.core import EnvClient
from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from .models import (
CodeReviewAction,
CodeReviewObservation,
CodeReviewReward,
CodeReviewPullRequest,
)
class CodeReviewEnv(EnvClient[CodeReviewAction, CodeReviewObservation, State]):
"""
Client for the Code Review Environment.
This client maintains a persistent WebSocket connection to the environment server,
enabling efficient multi-step interactions with lower latency.
Each client instance has its own dedicated environment session on the server.
Example:
>>> # Connect to a running server
>>> with CodeReviewEnv(base_url="http://localhost:8000") as client:
... result = client.reset()
... print(result.observation.echoed_message)
...
... result = client.step(CodeReviewAction(message="Hello!"))
... print(result.observation.echoed_message)
Example with Docker:
>>> # Automatically start container and connect
>>> client = CodeReviewEnv.from_docker_image("code_review-env:latest")
>>> try:
... result = client.reset()
... result = client.step(CodeReviewAction(message="Test"))
... finally:
... client.close()
"""
def _step_payload(self, action: CodeReviewAction) -> Dict:
# print("Action == ", action)
# Handle dict input
if isinstance(action, dict):
act = {
"action_type": action.get("action_type"),
"comment": action.get("comment"),
"suggested_code": action.get("suggested_code"),
"decision": action.get("decision"),
}
else:
act = {
"action_type": action.action_type,
"comment": action.comment,
"suggested_code": action.suggested_code,
"decision": action.decision,
}
# print("Act == ", act)
return act
def _parse_result(self, payload: Dict) -> StepResult[CodeReviewObservation]:
"""
Parse server response into StepResult[CodeReviewObservation].
Args:
payload: JSON response data from server
Returns:
StepResult with CodeReviewObservation
"""
"""
return CodeReviewObservation(
#echoed_message="Code Review environment ready!",
pr=self.pr,
previous_comments=self.history,
step_count=self.step_count,
max_steps=self.max_steps,
reward=0.0,
done=False,
)
"""
# print("Payload ====== ", payload)
obs_data = payload.get("observation") or {}
if "observation" in obs_data: # nested case
obs_data = obs_data["observation"]
if not obs_data or "pr" not in obs_data:
raise ValueError(f"Invalid observation payload: {payload}")
pr_data = obs_data["pr"]
observation = CodeReviewObservation(
pr=CodeReviewPullRequest(**pr_data),
previous_comments=obs_data.get("previous_comments") or [],
step_count=obs_data.get("step_count", 0),
max_steps=obs_data.get("max_steps", 3),
)
# Handle reward (reset vs step)
reward_data = payload.get("reward")
reward = None
if reward_data is not None:
try:
reward = float(reward_data)
except Exception:
reward = None
return StepResult(
observation=observation,
reward=reward,
done=payload.get("done", False),
)
def _parse_state(self, payload: Dict) -> State:
"""
Parse server response into State object.
Args:
payload: JSON response from state request
Returns:
State object with episode_id and step_count
"""
return State(
episode_id=payload.get("episode_id"),
step_count=payload.get("step_count", 0),
)
|