Spaces:
Sleeping
Sleeping
| # Copyright (c) Meta Platforms, Inc. and affiliates. | |
| # All rights reserved. | |
| # | |
| # This source code is licensed under the BSD-style license found in the | |
| # LICENSE file in the root directory of this source tree. | |
| """Citeguardian Environment Client.""" | |
| from typing import Dict | |
| from openenv.core import EnvClient | |
| from openenv.core.client_types import StepResult | |
| from openenv.core.env_server.types import State | |
| from .models import CiteguardianAction, CiteguardianObservation | |
| class CiteguardianEnv( | |
| EnvClient[CiteguardianAction, CiteguardianObservation, State] | |
| ): | |
| """ | |
| Client for the CiteGuardian Environment. | |
| Maintains a persistent WebSocket connection to the environment server. | |
| Each instance gets its own dedicated session on the server. | |
| Example: | |
| >>> async with CiteguardianEnv(base_url="http://localhost:8000") as client: | |
| ... result = await client.reset() | |
| ... result = await client.step(CiteguardianAction(action_type="SUBMIT")) | |
| Example with Docker: | |
| >>> client = await CiteguardianEnv.from_docker_image("openenv-citeguardian") | |
| >>> result = await client.reset() | |
| >>> result = await client.step(CiteguardianAction(action_type="GO_TO", section_name="Methods")) | |
| """ | |
| def _step_payload(self, action: CiteguardianAction) -> Dict: | |
| """Convert CiteguardianAction to JSON payload for the step request.""" | |
| return action.model_dump(exclude_none=True) | |
| def _parse_result(self, payload: Dict) -> StepResult[CiteguardianObservation]: | |
| """Parse server response into StepResult[CiteguardianObservation].""" | |
| obs_data = payload.get("observation", payload) | |
| observation = CiteguardianObservation( | |
| current_view=obs_data.get("current_view", ""), | |
| metadata=obs_data.get("metadata", {}), | |
| audit_log=obs_data.get("audit_log", []), | |
| tool_result=obs_data.get("tool_result"), | |
| message=obs_data.get("message", ""), | |
| task_level=obs_data.get("task_level", "A"), | |
| done=payload.get("done", obs_data.get("done", False)), | |
| reward=payload.get("reward", obs_data.get("reward", 0.0)), | |
| ) | |
| return StepResult( | |
| observation=observation, | |
| reward=payload.get("reward", obs_data.get("reward", 0.0)), | |
| done=payload.get("done", obs_data.get("done", False)), | |
| ) | |
| def _parse_state(self, payload: Dict) -> State: | |
| """ | |
| Parse server response into State object. | |
| Args: | |
| payload: JSON response from state request | |
| Returns: | |
| State object with episode_id and step_count | |
| """ | |
| return State( | |
| episode_id=payload.get("episode_id"), | |
| step_count=payload.get("step_count", 0), | |
| ) | |