File size: 2,860 Bytes
e181764
ec17c6d
 
 
 
 
 
 
e181764
ec17c6d
 
e181764
 
ec17c6d
 
e181764
ec17c6d
e181764
 
ec17c6d
 
e181764
ec17c6d
e181764
ec17c6d
e181764
 
 
 
 
ec17c6d
 
e181764
ec17c6d
 
e181764
 
 
 
ec17c6d
 
 
 
e181764
 
ec17c6d
e181764
 
ec17c6d
 
e181764
 
ec17c6d
e181764
 
 
 
 
 
 
 
ec17c6d
 
 
 
 
 
 
 
 
 
 
 
e181764
ec17c6d
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""HR Onboarding/Offboarding Environment Client."""

from typing import Dict

from openenv.core.client_types import StepResult
from openenv.core.env_server.types import State
from openenv.core import EnvClient

from .models import HROnboardingAction, HROnboardingObservation


class HROnboardingEnv(
    EnvClient[HROnboardingAction, HROnboardingObservation]
):
    """
    Client for the HR Onboarding/Offboarding Environment.

    Maintains a persistent WebSocket connection to the environment server.
    Each client instance has its own dedicated environment session.

    Example:
        >>> with HROnboardingEnv(base_url="http://localhost:7860") as client:
        ...     result = client.reset()
        ...     print(result.observation.instruction)
        ...
        ...     result = client.step(HROnboardingAction(
        ...         tool_name="hr_read_employee",
        ...         arguments={"emp_id": "emp_0001"}
        ...     ))
        ...     print(result.observation.tool_result)

    Example with Docker:
        >>> client = HROnboardingEnv.from_docker_image("hr-onboarding-env:latest")
        >>> try:
        ...     result = client.reset()
        ...     result = client.step(HROnboardingAction(
        ...         tool_name="hr_search_employees",
        ...         arguments={"department": "Engineering"}
        ...     ))
        ... finally:
        ...     client.close()
    """

    def _step_payload(self, action: HROnboardingAction) -> Dict:
        """Convert HROnboardingAction to JSON payload for step message."""
        return {
            "tool_name": action.tool_name,
            "arguments": action.arguments,
        }

    def _parse_result(self, payload: Dict) -> StepResult[HROnboardingObservation]:
        """Parse server response into StepResult[HROnboardingObservation]."""
        obs_data = payload.get("observation", {})
        observation = HROnboardingObservation(
            task_id=obs_data.get("task_id", ""),
            instruction=obs_data.get("instruction", ""),
            tool_name=obs_data.get("tool_name", ""),
            tool_result=obs_data.get("tool_result", {}),
            step=obs_data.get("step", 0),
            max_steps=obs_data.get("max_steps", 15),
            available_tools=obs_data.get("available_tools", []),
            done=payload.get("done", False),
            reward=payload.get("reward"),
            metadata=obs_data.get("metadata", {}),
        )

        return StepResult(
            observation=observation,
            reward=payload.get("reward"),
            done=payload.get("done", False),
        )

    def _parse_state(self, payload: Dict) -> State:
        """Parse server response into State object."""
        return State(
            episode_id=payload.get("episode_id"),
            step_count=payload.get("step_count", 0),
        )