Spaces:
Sleeping
Sleeping
| """Curator Environment Client.""" | |
| from typing import Dict | |
| from openenv.core import EnvClient | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_server.types import State | |
| from models import ( | |
| ActionFeedback, | |
| ContentItem, | |
| CuratorAction, | |
| CuratorObservation, | |
| TaskInfo, | |
| UserProfile, | |
| ) | |
| class CuratorEnv(EnvClient[CuratorAction, CuratorObservation, State]): | |
| """ | |
| Client for the Curator Environment. | |
| Example: | |
| >>> async with CuratorEnv(base_url="http://localhost:8000") as client: | |
| ... result = await client.reset(task_id="easy") | |
| ... print(len(result.observation.items)) | |
| ... | |
| ... result = await client.step(CuratorAction( | |
| ... action_type="rank", | |
| ... rankings=["hn_123", "hn_456"] | |
| ... )) | |
| Example with Docker: | |
| >>> client = await CuratorEnv.from_docker_image("curator:latest") | |
| """ | |
| def _step_payload(self, action: CuratorAction) -> Dict: | |
| """Convert CuratorAction to JSON payload.""" | |
| payload = {"action_type": action.action_type} | |
| if action.item_ids: | |
| payload["item_ids"] = action.item_ids | |
| if action.categories is not None: | |
| payload["categories"] = action.categories | |
| if action.rankings is not None: | |
| payload["rankings"] = action.rankings | |
| if action.reasoning is not None: | |
| payload["reasoning"] = action.reasoning | |
| if action.metadata: | |
| payload["metadata"] = action.metadata | |
| return payload | |
| def _parse_result(self, payload: Dict) -> StepResult[CuratorObservation]: | |
| """Parse server response into StepResult[CuratorObservation].""" | |
| obs_data = payload.get("observation", {}) | |
| # Parse nested models | |
| items = [ContentItem(**it) for it in obs_data.get("items", [])] | |
| user_profile = None | |
| if obs_data.get("user_profile"): | |
| user_profile = UserProfile(**obs_data["user_profile"]) | |
| feedback = None | |
| if obs_data.get("feedback"): | |
| feedback = ActionFeedback(**obs_data["feedback"]) | |
| task_info = None | |
| if obs_data.get("task_info"): | |
| task_info = TaskInfo(**obs_data["task_info"]) | |
| observation = CuratorObservation( | |
| items=items, | |
| user_profile=user_profile, | |
| feedback=feedback, | |
| task_info=task_info, | |
| done=payload.get("done", False), | |
| reward=payload.get("reward"), | |
| metadata=obs_data.get("metadata", {}), | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward"), | |
| done=payload.get("done", False), | |
| ) | |
| def _parse_state(self, payload: Dict) -> State: | |
| """Parse server response into State.""" | |
| return State( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| ) | |