""" Gym-agnostic Agent Runner — connects an LLM to any OpenEnv environment. This module is the CORE of the evaluation platform. It: 1. Receives a pre-connected OpenEnv client (from AutoEnv discovery) 2. Discovers tools via list_tools() 3. Gives the LLM a scenario prompt + available tools 4. Loops: LLM reasons → agent calls env.step() → observation → LLM reasons again 5. Collects an EpisodeLog with timestamps for reward calculation + trajectory logging Usage: from openenv import AutoEnv env = AutoEnv.from_env("visual_memory", base_url="http://localhost:8000") runner = AgentRunner(model="gpt-4o", env_client=env) episode, breakdown = runner.run_scenario(scenario, checker) """ import json import logging import time from datetime import datetime, timezone, timedelta from typing import Any, Dict, List, Tuple IST = timezone(timedelta(hours=5, minutes=30)) from openenv.core.mcp_client import MCPToolClient from openenv.core.env_server.mcp_types import CallToolAction, CallToolObservation, Tool from rewards.base import ( EpisodeLog, RewardBreakdown, RewardCalculator, Scenario, OpenEnvRewardCalculator, ) from .llm import LLMClient logger = logging.getLogger(__name__) SYSTEM_PROMPT = """\ You are an AI agent interacting with an environment through tools. Your job: 1. Read the task description carefully. 2. Use the available tools to complete the task. 3. Call tools one at a time. Wait for each result before deciding the next step. 4. When the task is complete, respond with a plain text summary of what you did. Do NOT call any more tools after you're done. Rules: - Only use tools that are listed as available. - Provide all required arguments for each tool call. - If a tool call fails, read the error and decide how to recover. - Be efficient — complete the task in as few steps as possible. - When you're done, clearly state what you accomplished. """ def mcp_tools_to_openai(tools: List[Tool]) -> List[Dict[str, Any]]: """Convert OpenEnv MCP tool definitions to OpenAI function-calling format.""" openai_tools = [] for tool in tools: schema = tool.input_schema or {"type": "object", "properties": {}} if "type" not in schema: schema["type"] = "object" if "properties" not in schema: schema["properties"] = {} openai_tools.append({ "type": "function", "function": { "name": tool.name, "description": tool.description or "", "parameters": schema, }, }) return openai_tools def _observation_to_str(step_result) -> str: """Convert an OpenEnv step result to a string the LLM can read.""" obs = step_result.observation if isinstance(obs, CallToolObservation): if obs.error: return json.dumps({"error": obs.error.message}, indent=2) result = obs.result if hasattr(result, "data"): result = result.data elif isinstance(result, dict) and "data" in result: result = result["data"] try: return json.dumps(result, indent=2, default=str) except (TypeError, ValueError): return str(result) if hasattr(obs, "metadata") and obs.metadata: return json.dumps(obs.metadata, indent=2, default=str) return str(obs) class AgentRunner: """ Gym-agnostic agent that connects an LLM to any OpenEnv environment. Reward modes: - "custom" (default): Episode-level reward via RewardCalculator - "openenv": Per-step reward via Transform + ground truth """ def __init__( self, model: str, env_client: MCPToolClient, temperature: float = 0.0, max_tokens: int = 1024, reward_mode: str = "custom", transform=None, ): self.llm = LLMClient( model=model, temperature=temperature, max_tokens=max_tokens, ) self.env_client = env_client self.reward_mode = reward_mode self.transform = transform self.calculator = RewardCalculator() if reward_mode == "openenv": self.openenv_calculator = OpenEnvRewardCalculator() def run_scenario( self, scenario: Scenario, checker: Any, ) -> Tuple[EpisodeLog, RewardBreakdown]: """Run a single scenario through the LLM agent.""" return self._execute(scenario, checker, self.env_client) def _execute( self, scenario: Scenario, checker: Any, env: MCPToolClient, ) -> Tuple[EpisodeLog, RewardBreakdown]: env.reset() session_id = None try: session_result = env.step( CallToolAction(tool_name="get_session_info", arguments={}) ) obs = session_result.observation if isinstance(obs, CallToolObservation) and obs.result: result_data = obs.result if hasattr(result_data, "data"): result_data = result_data.data elif isinstance(result_data, dict) and "data" in result_data: result_data = result_data["data"] if isinstance(result_data, dict): session_id = result_data.get("session_id") elif isinstance(result_data, str): import json as _json try: parsed = _json.loads(result_data) session_id = parsed.get("session_id") except (ValueError, TypeError): pass except Exception as e: logger.warning(f"Could not get session_id: {e}") if session_id and hasattr(checker, "set_session"): checker.set_session(session_id) logger.info(f"Session-scoped checker -> {session_id}") if self.transform and hasattr(self.transform, "set_scenario"): self.transform.set_scenario(scenario) all_tools = env.list_tools(use_cache=False) tools = [t for t in all_tools if t.name != "get_session_info"] openai_tools = mcp_tools_to_openai(tools) tool_names = [t.name for t in tools] logger.info(f"Discovered {len(tools)} agent tools: {tool_names}") messages = [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": scenario.prompt}, ] episode = EpisodeLog() step_rewards = [] final_answer = None for step_num in range(1, scenario.max_steps + 1): logger.info(f"Step {step_num}/{scenario.max_steps}") response = self.llm.chat(messages, tools=openai_tools) tool_calls = LLMClient.extract_tool_calls(response) if not tool_calls: final_answer = LLMClient.get_text_response(response) logger.info(f"Agent done. Final answer: {(final_answer or '')[:100]}...") break messages.append(response.choices[0].message.model_dump()) for tc in tool_calls: tool_name = tc["name"] arguments = tc["arguments"] call_id = tc["id"] logger.info(f" Tool: {tool_name}({json.dumps(arguments, default=str)[:100]})") step_ts = datetime.now(IST).isoformat() step_start = time.time() error_msg = None try: step_result = env.step( CallToolAction(tool_name=tool_name, arguments=arguments) ) obs = step_result.observation is_error = ( isinstance(obs, CallToolObservation) and obs.error is not None ) result_str = _observation_to_str(step_result) if is_error and isinstance(obs, CallToolObservation): error_msg = obs.error.message except Exception as exc: is_error = True error_msg = str(exc) result_str = json.dumps({"error": error_msg}) obs = None step_elapsed = time.time() - step_start if self.reward_mode == "openenv" and self.transform and obs is not None: transformed = self.transform(obs) step_rewards.append( transformed.reward if transformed.reward is not None else 0.0 ) episode.add_step( tool_name=tool_name, arguments=arguments, success=not is_error, result=result_str, error=error_msg, timestamp=step_ts, elapsed=step_elapsed, ) logger.info(f" -> success={not is_error} ({step_elapsed:.2f}s)") messages.append({ "role": "tool", "tool_call_id": call_id, "content": result_str, }) if hasattr(checker, "set_episode"): checker.set_episode(episode) outcome_results = checker.check_all(scenario.outcome_checks) if self.reward_mode == "openenv": breakdown = self.openenv_calculator.calculate( step_rewards=step_rewards, outcome_results=outcome_results, max_steps=scenario.max_steps, actual_steps=len(episode.steps), ) else: breakdown = self.calculator.calculate( episode=episode, scenario=scenario, outcome_results=outcome_results, ) return episode, breakdown