Spaces:
Sleeping
Sleeping
| """ | |
| Gym-agnostic Agent Runner — connects an LLM to any OpenEnv environment. | |
| This module is the CORE of the evaluation platform. It: | |
| 1. Receives a pre-connected OpenEnv client (from AutoEnv discovery) | |
| 2. Discovers tools via list_tools() | |
| 3. Gives the LLM a scenario prompt + available tools | |
| 4. Loops: LLM reasons → agent calls env.step() → observation → LLM reasons again | |
| 5. Collects an EpisodeLog with timestamps for reward calculation + trajectory logging | |
| Usage: | |
| from openenv import AutoEnv | |
| env = AutoEnv.from_env("visual_memory", base_url="http://localhost:8000") | |
| runner = AgentRunner(model="gpt-4o", env_client=env) | |
| episode, breakdown = runner.run_scenario(scenario, checker) | |
| """ | |
| import json | |
| import logging | |
| import time | |
| from datetime import datetime, timezone, timedelta | |
| from typing import Any, Dict, List, Tuple | |
| IST = timezone(timedelta(hours=5, minutes=30)) | |
| from openenv.core.mcp_client import MCPToolClient | |
| from openenv.core.env_server.mcp_types import CallToolAction, CallToolObservation, Tool | |
| from rewards.base import ( | |
| EpisodeLog, | |
| RewardBreakdown, | |
| RewardCalculator, | |
| Scenario, | |
| OpenEnvRewardCalculator, | |
| ) | |
| from .llm import LLMClient | |
| logger = logging.getLogger(__name__) | |
| SYSTEM_PROMPT = """\ | |
| You are an AI agent interacting with an environment through tools. | |
| Your job: | |
| 1. Read the task description carefully. | |
| 2. Use the available tools to complete the task. | |
| 3. Call tools one at a time. Wait for each result before deciding the next step. | |
| 4. When the task is complete, respond with a plain text summary of what you did. | |
| Do NOT call any more tools after you're done. | |
| Rules: | |
| - Only use tools that are listed as available. | |
| - Provide all required arguments for each tool call. | |
| - If a tool call fails, read the error and decide how to recover. | |
| - Be efficient — complete the task in as few steps as possible. | |
| - When you're done, clearly state what you accomplished. | |
| """ | |
| def mcp_tools_to_openai(tools: List[Tool]) -> List[Dict[str, Any]]: | |
| """Convert OpenEnv MCP tool definitions to OpenAI function-calling format.""" | |
| openai_tools = [] | |
| for tool in tools: | |
| schema = tool.input_schema or {"type": "object", "properties": {}} | |
| if "type" not in schema: | |
| schema["type"] = "object" | |
| if "properties" not in schema: | |
| schema["properties"] = {} | |
| openai_tools.append({ | |
| "type": "function", | |
| "function": { | |
| "name": tool.name, | |
| "description": tool.description or "", | |
| "parameters": schema, | |
| }, | |
| }) | |
| return openai_tools | |
| def _observation_to_str(step_result) -> str: | |
| """Convert an OpenEnv step result to a string the LLM can read.""" | |
| obs = step_result.observation | |
| if isinstance(obs, CallToolObservation): | |
| if obs.error: | |
| return json.dumps({"error": obs.error.message}, indent=2) | |
| result = obs.result | |
| if hasattr(result, "data"): | |
| result = result.data | |
| elif isinstance(result, dict) and "data" in result: | |
| result = result["data"] | |
| try: | |
| return json.dumps(result, indent=2, default=str) | |
| except (TypeError, ValueError): | |
| return str(result) | |
| if hasattr(obs, "metadata") and obs.metadata: | |
| return json.dumps(obs.metadata, indent=2, default=str) | |
| return str(obs) | |
| class AgentRunner: | |
| """ | |
| Gym-agnostic agent that connects an LLM to any OpenEnv environment. | |
| Reward modes: | |
| - "custom" (default): Episode-level reward via RewardCalculator | |
| - "openenv": Per-step reward via Transform + ground truth | |
| """ | |
| def __init__( | |
| self, | |
| model: str, | |
| env_client: MCPToolClient, | |
| temperature: float = 0.0, | |
| max_tokens: int = 1024, | |
| reward_mode: str = "custom", | |
| transform=None, | |
| ): | |
| self.llm = LLMClient( | |
| model=model, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| ) | |
| self.env_client = env_client | |
| self.reward_mode = reward_mode | |
| self.transform = transform | |
| self.calculator = RewardCalculator() | |
| if reward_mode == "openenv": | |
| self.openenv_calculator = OpenEnvRewardCalculator() | |
| def run_scenario( | |
| self, | |
| scenario: Scenario, | |
| checker: Any, | |
| ) -> Tuple[EpisodeLog, RewardBreakdown]: | |
| """Run a single scenario through the LLM agent.""" | |
| return self._execute(scenario, checker, self.env_client) | |
| def _execute( | |
| self, | |
| scenario: Scenario, | |
| checker: Any, | |
| env: MCPToolClient, | |
| ) -> Tuple[EpisodeLog, RewardBreakdown]: | |
| env.reset() | |
| session_id = None | |
| try: | |
| session_result = env.step( | |
| CallToolAction(tool_name="get_session_info", arguments={}) | |
| ) | |
| obs = session_result.observation | |
| if isinstance(obs, CallToolObservation) and obs.result: | |
| result_data = obs.result | |
| if hasattr(result_data, "data"): | |
| result_data = result_data.data | |
| elif isinstance(result_data, dict) and "data" in result_data: | |
| result_data = result_data["data"] | |
| if isinstance(result_data, dict): | |
| session_id = result_data.get("session_id") | |
| elif isinstance(result_data, str): | |
| import json as _json | |
| try: | |
| parsed = _json.loads(result_data) | |
| session_id = parsed.get("session_id") | |
| except (ValueError, TypeError): | |
| pass | |
| except Exception as e: | |
| logger.warning(f"Could not get session_id: {e}") | |
| if session_id and hasattr(checker, "set_session"): | |
| checker.set_session(session_id) | |
| logger.info(f"Session-scoped checker -> {session_id}") | |
| if self.transform and hasattr(self.transform, "set_scenario"): | |
| self.transform.set_scenario(scenario) | |
| all_tools = env.list_tools(use_cache=False) | |
| tools = [t for t in all_tools if t.name != "get_session_info"] | |
| openai_tools = mcp_tools_to_openai(tools) | |
| tool_names = [t.name for t in tools] | |
| logger.info(f"Discovered {len(tools)} agent tools: {tool_names}") | |
| messages = [ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": scenario.prompt}, | |
| ] | |
| episode = EpisodeLog() | |
| step_rewards = [] | |
| final_answer = None | |
| for step_num in range(1, scenario.max_steps + 1): | |
| logger.info(f"Step {step_num}/{scenario.max_steps}") | |
| response = self.llm.chat(messages, tools=openai_tools) | |
| tool_calls = LLMClient.extract_tool_calls(response) | |
| if not tool_calls: | |
| final_answer = LLMClient.get_text_response(response) | |
| logger.info(f"Agent done. Final answer: {(final_answer or '')[:100]}...") | |
| break | |
| messages.append(response.choices[0].message.model_dump()) | |
| for tc in tool_calls: | |
| tool_name = tc["name"] | |
| arguments = tc["arguments"] | |
| call_id = tc["id"] | |
| logger.info(f" Tool: {tool_name}({json.dumps(arguments, default=str)[:100]})") | |
| step_ts = datetime.now(IST).isoformat() | |
| step_start = time.time() | |
| error_msg = None | |
| try: | |
| step_result = env.step( | |
| CallToolAction(tool_name=tool_name, arguments=arguments) | |
| ) | |
| obs = step_result.observation | |
| is_error = ( | |
| isinstance(obs, CallToolObservation) | |
| and obs.error is not None | |
| ) | |
| result_str = _observation_to_str(step_result) | |
| if is_error and isinstance(obs, CallToolObservation): | |
| error_msg = obs.error.message | |
| except Exception as exc: | |
| is_error = True | |
| error_msg = str(exc) | |
| result_str = json.dumps({"error": error_msg}) | |
| obs = None | |
| step_elapsed = time.time() - step_start | |
| if self.reward_mode == "openenv" and self.transform and obs is not None: | |
| transformed = self.transform(obs) | |
| step_rewards.append( | |
| transformed.reward if transformed.reward is not None else 0.0 | |
| ) | |
| episode.add_step( | |
| tool_name=tool_name, | |
| arguments=arguments, | |
| success=not is_error, | |
| result=result_str, | |
| error=error_msg, | |
| timestamp=step_ts, | |
| elapsed=step_elapsed, | |
| ) | |
| logger.info(f" -> success={not is_error} ({step_elapsed:.2f}s)") | |
| messages.append({ | |
| "role": "tool", | |
| "tool_call_id": call_id, | |
| "content": result_str, | |
| }) | |
| if hasattr(checker, "set_episode"): | |
| checker.set_episode(episode) | |
| outcome_results = checker.check_all(scenario.outcome_checks) | |
| if self.reward_mode == "openenv": | |
| breakdown = self.openenv_calculator.calculate( | |
| step_rewards=step_rewards, | |
| outcome_results=outcome_results, | |
| max_steps=scenario.max_steps, | |
| actual_steps=len(episode.steps), | |
| ) | |
| else: | |
| breakdown = self.calculator.calculate( | |
| episode=episode, | |
| scenario=scenario, | |
| outcome_results=outcome_results, | |
| ) | |
| return episode, breakdown | |