Spaces:
Sleeping
Sleeping
File size: 9,946 Bytes
15503f9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 | """
Gym-agnostic Agent Runner — connects an LLM to any OpenEnv environment.
This module is the CORE of the evaluation platform. It:
1. Receives a pre-connected OpenEnv client (from AutoEnv discovery)
2. Discovers tools via list_tools()
3. Gives the LLM a scenario prompt + available tools
4. Loops: LLM reasons → agent calls env.step() → observation → LLM reasons again
5. Collects an EpisodeLog with timestamps for reward calculation + trajectory logging
Usage:
from openenv import AutoEnv
env = AutoEnv.from_env("visual_memory", base_url="http://localhost:8000")
runner = AgentRunner(model="gpt-4o", env_client=env)
episode, breakdown = runner.run_scenario(scenario, checker)
"""
import json
import logging
import time
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Tuple
IST = timezone(timedelta(hours=5, minutes=30))
from openenv.core.mcp_client import MCPToolClient
from openenv.core.env_server.mcp_types import CallToolAction, CallToolObservation, Tool
from rewards.base import (
EpisodeLog,
RewardBreakdown,
RewardCalculator,
Scenario,
OpenEnvRewardCalculator,
)
from .llm import LLMClient
logger = logging.getLogger(__name__)
SYSTEM_PROMPT = """\
You are an AI agent interacting with an environment through tools.
Your job:
1. Read the task description carefully.
2. Use the available tools to complete the task.
3. Call tools one at a time. Wait for each result before deciding the next step.
4. When the task is complete, respond with a plain text summary of what you did.
Do NOT call any more tools after you're done.
Rules:
- Only use tools that are listed as available.
- Provide all required arguments for each tool call.
- If a tool call fails, read the error and decide how to recover.
- Be efficient — complete the task in as few steps as possible.
- When you're done, clearly state what you accomplished.
"""
def mcp_tools_to_openai(tools: List[Tool]) -> List[Dict[str, Any]]:
"""Convert OpenEnv MCP tool definitions to OpenAI function-calling format."""
openai_tools = []
for tool in tools:
schema = tool.input_schema or {"type": "object", "properties": {}}
if "type" not in schema:
schema["type"] = "object"
if "properties" not in schema:
schema["properties"] = {}
openai_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": schema,
},
})
return openai_tools
def _observation_to_str(step_result) -> str:
"""Convert an OpenEnv step result to a string the LLM can read."""
obs = step_result.observation
if isinstance(obs, CallToolObservation):
if obs.error:
return json.dumps({"error": obs.error.message}, indent=2)
result = obs.result
if hasattr(result, "data"):
result = result.data
elif isinstance(result, dict) and "data" in result:
result = result["data"]
try:
return json.dumps(result, indent=2, default=str)
except (TypeError, ValueError):
return str(result)
if hasattr(obs, "metadata") and obs.metadata:
return json.dumps(obs.metadata, indent=2, default=str)
return str(obs)
class AgentRunner:
"""
Gym-agnostic agent that connects an LLM to any OpenEnv environment.
Reward modes:
- "custom" (default): Episode-level reward via RewardCalculator
- "openenv": Per-step reward via Transform + ground truth
"""
def __init__(
self,
model: str,
env_client: MCPToolClient,
temperature: float = 0.0,
max_tokens: int = 1024,
reward_mode: str = "custom",
transform=None,
):
self.llm = LLMClient(
model=model,
temperature=temperature,
max_tokens=max_tokens,
)
self.env_client = env_client
self.reward_mode = reward_mode
self.transform = transform
self.calculator = RewardCalculator()
if reward_mode == "openenv":
self.openenv_calculator = OpenEnvRewardCalculator()
def run_scenario(
self,
scenario: Scenario,
checker: Any,
) -> Tuple[EpisodeLog, RewardBreakdown]:
"""Run a single scenario through the LLM agent."""
return self._execute(scenario, checker, self.env_client)
def _execute(
self,
scenario: Scenario,
checker: Any,
env: MCPToolClient,
) -> Tuple[EpisodeLog, RewardBreakdown]:
env.reset()
session_id = None
try:
session_result = env.step(
CallToolAction(tool_name="get_session_info", arguments={})
)
obs = session_result.observation
if isinstance(obs, CallToolObservation) and obs.result:
result_data = obs.result
if hasattr(result_data, "data"):
result_data = result_data.data
elif isinstance(result_data, dict) and "data" in result_data:
result_data = result_data["data"]
if isinstance(result_data, dict):
session_id = result_data.get("session_id")
elif isinstance(result_data, str):
import json as _json
try:
parsed = _json.loads(result_data)
session_id = parsed.get("session_id")
except (ValueError, TypeError):
pass
except Exception as e:
logger.warning(f"Could not get session_id: {e}")
if session_id and hasattr(checker, "set_session"):
checker.set_session(session_id)
logger.info(f"Session-scoped checker -> {session_id}")
if self.transform and hasattr(self.transform, "set_scenario"):
self.transform.set_scenario(scenario)
all_tools = env.list_tools(use_cache=False)
tools = [t for t in all_tools if t.name != "get_session_info"]
openai_tools = mcp_tools_to_openai(tools)
tool_names = [t.name for t in tools]
logger.info(f"Discovered {len(tools)} agent tools: {tool_names}")
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": scenario.prompt},
]
episode = EpisodeLog()
step_rewards = []
final_answer = None
for step_num in range(1, scenario.max_steps + 1):
logger.info(f"Step {step_num}/{scenario.max_steps}")
response = self.llm.chat(messages, tools=openai_tools)
tool_calls = LLMClient.extract_tool_calls(response)
if not tool_calls:
final_answer = LLMClient.get_text_response(response)
logger.info(f"Agent done. Final answer: {(final_answer or '')[:100]}...")
break
messages.append(response.choices[0].message.model_dump())
for tc in tool_calls:
tool_name = tc["name"]
arguments = tc["arguments"]
call_id = tc["id"]
logger.info(f" Tool: {tool_name}({json.dumps(arguments, default=str)[:100]})")
step_ts = datetime.now(IST).isoformat()
step_start = time.time()
error_msg = None
try:
step_result = env.step(
CallToolAction(tool_name=tool_name, arguments=arguments)
)
obs = step_result.observation
is_error = (
isinstance(obs, CallToolObservation)
and obs.error is not None
)
result_str = _observation_to_str(step_result)
if is_error and isinstance(obs, CallToolObservation):
error_msg = obs.error.message
except Exception as exc:
is_error = True
error_msg = str(exc)
result_str = json.dumps({"error": error_msg})
obs = None
step_elapsed = time.time() - step_start
if self.reward_mode == "openenv" and self.transform and obs is not None:
transformed = self.transform(obs)
step_rewards.append(
transformed.reward if transformed.reward is not None else 0.0
)
episode.add_step(
tool_name=tool_name,
arguments=arguments,
success=not is_error,
result=result_str,
error=error_msg,
timestamp=step_ts,
elapsed=step_elapsed,
)
logger.info(f" -> success={not is_error} ({step_elapsed:.2f}s)")
messages.append({
"role": "tool",
"tool_call_id": call_id,
"content": result_str,
})
if hasattr(checker, "set_episode"):
checker.set_episode(episode)
outcome_results = checker.check_all(scenario.outcome_checks)
if self.reward_mode == "openenv":
breakdown = self.openenv_calculator.calculate(
step_rewards=step_rewards,
outcome_results=outcome_results,
max_steps=scenario.max_steps,
actual_steps=len(episode.steps),
)
else:
breakdown = self.calculator.calculate(
episode=episode,
scenario=scenario,
outcome_results=outcome_results,
)
return episode, breakdown
|