File size: 9,946 Bytes
15503f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
"""
Gym-agnostic Agent Runner — connects an LLM to any OpenEnv environment.

This module is the CORE of the evaluation platform. It:
  1. Receives a pre-connected OpenEnv client (from AutoEnv discovery)
  2. Discovers tools via list_tools()
  3. Gives the LLM a scenario prompt + available tools
  4. Loops: LLM reasons → agent calls env.step() → observation → LLM reasons again
  5. Collects an EpisodeLog with timestamps for reward calculation + trajectory logging

Usage:
    from openenv import AutoEnv
    env = AutoEnv.from_env("visual_memory", base_url="http://localhost:8000")
    runner = AgentRunner(model="gpt-4o", env_client=env)
    episode, breakdown = runner.run_scenario(scenario, checker)
"""

import json
import logging
import time
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Tuple

IST = timezone(timedelta(hours=5, minutes=30))

from openenv.core.mcp_client import MCPToolClient
from openenv.core.env_server.mcp_types import CallToolAction, CallToolObservation, Tool

from rewards.base import (
    EpisodeLog,
    RewardBreakdown,
    RewardCalculator,
    Scenario,
    OpenEnvRewardCalculator,
)
from .llm import LLMClient

logger = logging.getLogger(__name__)


SYSTEM_PROMPT = """\
You are an AI agent interacting with an environment through tools.

Your job:
1. Read the task description carefully.
2. Use the available tools to complete the task.
3. Call tools one at a time. Wait for each result before deciding the next step.
4. When the task is complete, respond with a plain text summary of what you did.
   Do NOT call any more tools after you're done.

Rules:
- Only use tools that are listed as available.
- Provide all required arguments for each tool call.
- If a tool call fails, read the error and decide how to recover.
- Be efficient — complete the task in as few steps as possible.
- When you're done, clearly state what you accomplished.
"""


def mcp_tools_to_openai(tools: List[Tool]) -> List[Dict[str, Any]]:
    """Convert OpenEnv MCP tool definitions to OpenAI function-calling format."""
    openai_tools = []
    for tool in tools:
        schema = tool.input_schema or {"type": "object", "properties": {}}
        if "type" not in schema:
            schema["type"] = "object"
        if "properties" not in schema:
            schema["properties"] = {}

        openai_tools.append({
            "type": "function",
            "function": {
                "name": tool.name,
                "description": tool.description or "",
                "parameters": schema,
            },
        })
    return openai_tools


def _observation_to_str(step_result) -> str:
    """Convert an OpenEnv step result to a string the LLM can read."""
    obs = step_result.observation
    if isinstance(obs, CallToolObservation):
        if obs.error:
            return json.dumps({"error": obs.error.message}, indent=2)
        result = obs.result
        if hasattr(result, "data"):
            result = result.data
        elif isinstance(result, dict) and "data" in result:
            result = result["data"]
        try:
            return json.dumps(result, indent=2, default=str)
        except (TypeError, ValueError):
            return str(result)
    if hasattr(obs, "metadata") and obs.metadata:
        return json.dumps(obs.metadata, indent=2, default=str)
    return str(obs)


class AgentRunner:
    """
    Gym-agnostic agent that connects an LLM to any OpenEnv environment.

    Reward modes:
      - "custom"  (default): Episode-level reward via RewardCalculator
      - "openenv": Per-step reward via Transform + ground truth
    """

    def __init__(
        self,
        model: str,
        env_client: MCPToolClient,
        temperature: float = 0.0,
        max_tokens: int = 1024,
        reward_mode: str = "custom",
        transform=None,
    ):
        self.llm = LLMClient(
            model=model,
            temperature=temperature,
            max_tokens=max_tokens,
        )
        self.env_client = env_client
        self.reward_mode = reward_mode
        self.transform = transform

        self.calculator = RewardCalculator()

        if reward_mode == "openenv":
            self.openenv_calculator = OpenEnvRewardCalculator()

    def run_scenario(
        self,
        scenario: Scenario,
        checker: Any,
    ) -> Tuple[EpisodeLog, RewardBreakdown]:
        """Run a single scenario through the LLM agent."""
        return self._execute(scenario, checker, self.env_client)

    def _execute(
        self,
        scenario: Scenario,
        checker: Any,
        env: MCPToolClient,
    ) -> Tuple[EpisodeLog, RewardBreakdown]:

        env.reset()

        session_id = None
        try:
            session_result = env.step(
                CallToolAction(tool_name="get_session_info", arguments={})
            )
            obs = session_result.observation
            if isinstance(obs, CallToolObservation) and obs.result:
                result_data = obs.result
                if hasattr(result_data, "data"):
                    result_data = result_data.data
                elif isinstance(result_data, dict) and "data" in result_data:
                    result_data = result_data["data"]
                if isinstance(result_data, dict):
                    session_id = result_data.get("session_id")
                elif isinstance(result_data, str):
                    import json as _json
                    try:
                        parsed = _json.loads(result_data)
                        session_id = parsed.get("session_id")
                    except (ValueError, TypeError):
                        pass
        except Exception as e:
            logger.warning(f"Could not get session_id: {e}")

        if session_id and hasattr(checker, "set_session"):
            checker.set_session(session_id)
            logger.info(f"Session-scoped checker -> {session_id}")

        if self.transform and hasattr(self.transform, "set_scenario"):
            self.transform.set_scenario(scenario)

        all_tools = env.list_tools(use_cache=False)
        tools = [t for t in all_tools if t.name != "get_session_info"]
        openai_tools = mcp_tools_to_openai(tools)
        tool_names = [t.name for t in tools]
        logger.info(f"Discovered {len(tools)} agent tools: {tool_names}")

        messages = [
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": scenario.prompt},
        ]

        episode = EpisodeLog()
        step_rewards = []
        final_answer = None

        for step_num in range(1, scenario.max_steps + 1):
            logger.info(f"Step {step_num}/{scenario.max_steps}")

            response = self.llm.chat(messages, tools=openai_tools)
            tool_calls = LLMClient.extract_tool_calls(response)

            if not tool_calls:
                final_answer = LLMClient.get_text_response(response)
                logger.info(f"Agent done. Final answer: {(final_answer or '')[:100]}...")
                break

            messages.append(response.choices[0].message.model_dump())

            for tc in tool_calls:
                tool_name = tc["name"]
                arguments = tc["arguments"]
                call_id = tc["id"]

                logger.info(f"  Tool: {tool_name}({json.dumps(arguments, default=str)[:100]})")

                step_ts = datetime.now(IST).isoformat()
                step_start = time.time()
                error_msg = None
                try:
                    step_result = env.step(
                        CallToolAction(tool_name=tool_name, arguments=arguments)
                    )
                    obs = step_result.observation
                    is_error = (
                        isinstance(obs, CallToolObservation)
                        and obs.error is not None
                    )
                    result_str = _observation_to_str(step_result)
                    if is_error and isinstance(obs, CallToolObservation):
                        error_msg = obs.error.message
                except Exception as exc:
                    is_error = True
                    error_msg = str(exc)
                    result_str = json.dumps({"error": error_msg})
                    obs = None

                step_elapsed = time.time() - step_start

                if self.reward_mode == "openenv" and self.transform and obs is not None:
                    transformed = self.transform(obs)
                    step_rewards.append(
                        transformed.reward if transformed.reward is not None else 0.0
                    )

                episode.add_step(
                    tool_name=tool_name,
                    arguments=arguments,
                    success=not is_error,
                    result=result_str,
                    error=error_msg,
                    timestamp=step_ts,
                    elapsed=step_elapsed,
                )

                logger.info(f"    -> success={not is_error} ({step_elapsed:.2f}s)")

                messages.append({
                    "role": "tool",
                    "tool_call_id": call_id,
                    "content": result_str,
                })

        if hasattr(checker, "set_episode"):
            checker.set_episode(episode)

        outcome_results = checker.check_all(scenario.outcome_checks)

        if self.reward_mode == "openenv":
            breakdown = self.openenv_calculator.calculate(
                step_rewards=step_rewards,
                outcome_results=outcome_results,
                max_steps=scenario.max_steps,
                actual_steps=len(episode.steps),
            )
        else:
            breakdown = self.calculator.calculate(
                episode=episode,
                scenario=scenario,
                outcome_results=outcome_results,
            )

        return episode, breakdown