File size: 6,003 Bytes
e181764
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
HR Onboarding/Offboarding Environment Implementation.

An OpenEnv environment that simulates enterprise HR workflows.
The agent calls tools (hr_create_employee, it_assign_asset, etc.)
to complete onboarding/offboarding tasks. Reward is computed via rubrics.
"""

import json
import random
from typing import Any, Dict, List, Optional
from uuid import uuid4

from openenv.core.env_server.interfaces import Environment
from openenv.core.env_server.types import State

from models import HROnboardingAction, HROnboardingObservation

try:
    from .world import WorldState
    from .tools import ToolRegistry, TOOL_DEFINITIONS
    from .tasks import TaskGenerator
    from .rubrics import RubricEvaluator
except ImportError:
    from world import WorldState
    from tools import ToolRegistry, TOOL_DEFINITIONS
    from tasks import TaskGenerator
    from rubrics import RubricEvaluator


class HROnboardingEnvironment(Environment):
    """
    HR Onboarding/Offboarding environment.

    Simulates an enterprise HR system with 200+ employees, 8 departments,
    RBAC, approval chains, and IT provisioning. The agent calls one of 25
    tools per step to complete onboarding/offboarding tasks.

    Example:
        >>> env = HROnboardingEnvironment()
        >>> obs = env.reset()
        >>> print(obs.instruction)  # "Onboard Priya Sharma to Engineering..."
        >>>
        >>> obs = env.step(HROnboardingAction(
        ...     tool_name="hr_create_employee",
        ...     arguments={"name": "Priya Sharma", "department": "Engineering",
        ...                "level": "L2", "role": "Software Engineer"}
        ... ))
        >>> print(obs.tool_result)  # {"success": true, "employee": {...}}
        >>> print(obs.reward)       # 0.0 (intermediate) or 0.85 (final)
    """

    SUPPORTS_CONCURRENT_SESSIONS: bool = True

    def __init__(self, seed: int = 42, max_steps: int = 15):
        """Initialize the HR environment."""
        self._seed = seed
        self._max_steps = max_steps
        self._rng = random.Random(seed)

        # World state + tools
        self.world = WorldState()
        self.tool_registry = ToolRegistry(self.world)
        self.evaluator = RubricEvaluator()

        # Tasks
        self._task_gen = TaskGenerator(self.world, seed=seed)
        self._tasks = self._task_gen.generate_all_tasks()
        self._task_idx = 0
        self._current_task = None

        # Episode state
        self._state = State(episode_id=str(uuid4()), step_count=0)
        self._done = False
        self._tool_names = [t["name"] for t in TOOL_DEFINITIONS]

    def reset(self) -> HROnboardingObservation:
        """
        Reset the environment for a new episode.

        Picks the next task, resets world state, returns initial observation
        with the task instruction and available tools.
        """
        self.world.reset()
        self._done = False

        # Pick next task (cycle through)
        self._current_task = self._tasks[self._task_idx % len(self._tasks)]
        self._task_idx += 1

        # Apply task setup if any
        if self._current_task.setup_fn:
            self._current_task.setup_fn(self.world)

        self._state = State(episode_id=str(uuid4()), step_count=0)

        return HROnboardingObservation(
            task_id=self._current_task.task_id,
            instruction=self._current_task.instruction,
            tool_name="",
            tool_result={},
            step=0,
            max_steps=self._max_steps,
            available_tools=self._tool_names,
            done=False,
            reward=0.0,
            metadata={
                "difficulty": self._current_task.difficulty,
                "category": self._current_task.category,
                "context": self._current_task.context,
            },
        )

    def step(self, action: HROnboardingAction) -> HROnboardingObservation:  # type: ignore[override]
        """
        Execute one step: call the specified tool and return the result.

        Args:
            action: HROnboardingAction with tool_name and arguments.

        Returns:
            HROnboardingObservation with tool result, reward (on final step), and done flag.
        """
        if self._done:
            return HROnboardingObservation(
                task_id=self._current_task.task_id if self._current_task else "",
                instruction="",
                tool_name=action.tool_name,
                tool_result={"error": "Episode already finished"},
                step=self._state.step_count,
                max_steps=self._max_steps,
                available_tools=self._tool_names,
                done=True,
                reward=0.0,
            )

        self._state.step_count += 1

        # Execute the tool
        result = self.tool_registry.execute(action.tool_name, action.arguments)

        # Check if episode is done
        done = self._state.step_count >= self._max_steps
        self._done = done

        # Compute reward on final step
        reward = 0.0
        eval_info = {}
        if done and self._current_task:
            eval_result = self.evaluator.evaluate(self._current_task, self.world.action_log)
            reward = eval_result["score"]
            eval_info = eval_result

        return HROnboardingObservation(
            task_id=self._current_task.task_id if self._current_task else "",
            instruction=self._current_task.instruction if self._current_task else "",
            tool_name=action.tool_name,
            tool_result=result,
            step=self._state.step_count,
            max_steps=self._max_steps,
            available_tools=self._tool_names,
            done=done,
            reward=reward,
            metadata={
                "step": self._state.step_count,
                **({"evaluation": eval_info} if eval_info else {}),
            },
        )

    @property
    def state(self) -> State:
        """Get the current environment state."""
        return self._state