visual_memory / agent /runner.py
kdemon1011's picture
Upload folder using huggingface_hub
15503f9 verified
"""
Gym-agnostic Agent Runner — connects an LLM to any OpenEnv environment.
This module is the CORE of the evaluation platform. It:
1. Receives a pre-connected OpenEnv client (from AutoEnv discovery)
2. Discovers tools via list_tools()
3. Gives the LLM a scenario prompt + available tools
4. Loops: LLM reasons → agent calls env.step() → observation → LLM reasons again
5. Collects an EpisodeLog with timestamps for reward calculation + trajectory logging
Usage:
from openenv import AutoEnv
env = AutoEnv.from_env("visual_memory", base_url="http://localhost:8000")
runner = AgentRunner(model="gpt-4o", env_client=env)
episode, breakdown = runner.run_scenario(scenario, checker)
"""
import json
import logging
import time
from datetime import datetime, timezone, timedelta
from typing import Any, Dict, List, Tuple
IST = timezone(timedelta(hours=5, minutes=30))
from openenv.core.mcp_client import MCPToolClient
from openenv.core.env_server.mcp_types import CallToolAction, CallToolObservation, Tool
from rewards.base import (
EpisodeLog,
RewardBreakdown,
RewardCalculator,
Scenario,
OpenEnvRewardCalculator,
)
from .llm import LLMClient
logger = logging.getLogger(__name__)
SYSTEM_PROMPT = """\
You are an AI agent interacting with an environment through tools.
Your job:
1. Read the task description carefully.
2. Use the available tools to complete the task.
3. Call tools one at a time. Wait for each result before deciding the next step.
4. When the task is complete, respond with a plain text summary of what you did.
Do NOT call any more tools after you're done.
Rules:
- Only use tools that are listed as available.
- Provide all required arguments for each tool call.
- If a tool call fails, read the error and decide how to recover.
- Be efficient — complete the task in as few steps as possible.
- When you're done, clearly state what you accomplished.
"""
def mcp_tools_to_openai(tools: List[Tool]) -> List[Dict[str, Any]]:
"""Convert OpenEnv MCP tool definitions to OpenAI function-calling format."""
openai_tools = []
for tool in tools:
schema = tool.input_schema or {"type": "object", "properties": {}}
if "type" not in schema:
schema["type"] = "object"
if "properties" not in schema:
schema["properties"] = {}
openai_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": schema,
},
})
return openai_tools
def _observation_to_str(step_result) -> str:
"""Convert an OpenEnv step result to a string the LLM can read."""
obs = step_result.observation
if isinstance(obs, CallToolObservation):
if obs.error:
return json.dumps({"error": obs.error.message}, indent=2)
result = obs.result
if hasattr(result, "data"):
result = result.data
elif isinstance(result, dict) and "data" in result:
result = result["data"]
try:
return json.dumps(result, indent=2, default=str)
except (TypeError, ValueError):
return str(result)
if hasattr(obs, "metadata") and obs.metadata:
return json.dumps(obs.metadata, indent=2, default=str)
return str(obs)
class AgentRunner:
"""
Gym-agnostic agent that connects an LLM to any OpenEnv environment.
Reward modes:
- "custom" (default): Episode-level reward via RewardCalculator
- "openenv": Per-step reward via Transform + ground truth
"""
def __init__(
self,
model: str,
env_client: MCPToolClient,
temperature: float = 0.0,
max_tokens: int = 1024,
reward_mode: str = "custom",
transform=None,
):
self.llm = LLMClient(
model=model,
temperature=temperature,
max_tokens=max_tokens,
)
self.env_client = env_client
self.reward_mode = reward_mode
self.transform = transform
self.calculator = RewardCalculator()
if reward_mode == "openenv":
self.openenv_calculator = OpenEnvRewardCalculator()
def run_scenario(
self,
scenario: Scenario,
checker: Any,
) -> Tuple[EpisodeLog, RewardBreakdown]:
"""Run a single scenario through the LLM agent."""
return self._execute(scenario, checker, self.env_client)
def _execute(
self,
scenario: Scenario,
checker: Any,
env: MCPToolClient,
) -> Tuple[EpisodeLog, RewardBreakdown]:
env.reset()
session_id = None
try:
session_result = env.step(
CallToolAction(tool_name="get_session_info", arguments={})
)
obs = session_result.observation
if isinstance(obs, CallToolObservation) and obs.result:
result_data = obs.result
if hasattr(result_data, "data"):
result_data = result_data.data
elif isinstance(result_data, dict) and "data" in result_data:
result_data = result_data["data"]
if isinstance(result_data, dict):
session_id = result_data.get("session_id")
elif isinstance(result_data, str):
import json as _json
try:
parsed = _json.loads(result_data)
session_id = parsed.get("session_id")
except (ValueError, TypeError):
pass
except Exception as e:
logger.warning(f"Could not get session_id: {e}")
if session_id and hasattr(checker, "set_session"):
checker.set_session(session_id)
logger.info(f"Session-scoped checker -> {session_id}")
if self.transform and hasattr(self.transform, "set_scenario"):
self.transform.set_scenario(scenario)
all_tools = env.list_tools(use_cache=False)
tools = [t for t in all_tools if t.name != "get_session_info"]
openai_tools = mcp_tools_to_openai(tools)
tool_names = [t.name for t in tools]
logger.info(f"Discovered {len(tools)} agent tools: {tool_names}")
messages = [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": scenario.prompt},
]
episode = EpisodeLog()
step_rewards = []
final_answer = None
for step_num in range(1, scenario.max_steps + 1):
logger.info(f"Step {step_num}/{scenario.max_steps}")
response = self.llm.chat(messages, tools=openai_tools)
tool_calls = LLMClient.extract_tool_calls(response)
if not tool_calls:
final_answer = LLMClient.get_text_response(response)
logger.info(f"Agent done. Final answer: {(final_answer or '')[:100]}...")
break
messages.append(response.choices[0].message.model_dump())
for tc in tool_calls:
tool_name = tc["name"]
arguments = tc["arguments"]
call_id = tc["id"]
logger.info(f" Tool: {tool_name}({json.dumps(arguments, default=str)[:100]})")
step_ts = datetime.now(IST).isoformat()
step_start = time.time()
error_msg = None
try:
step_result = env.step(
CallToolAction(tool_name=tool_name, arguments=arguments)
)
obs = step_result.observation
is_error = (
isinstance(obs, CallToolObservation)
and obs.error is not None
)
result_str = _observation_to_str(step_result)
if is_error and isinstance(obs, CallToolObservation):
error_msg = obs.error.message
except Exception as exc:
is_error = True
error_msg = str(exc)
result_str = json.dumps({"error": error_msg})
obs = None
step_elapsed = time.time() - step_start
if self.reward_mode == "openenv" and self.transform and obs is not None:
transformed = self.transform(obs)
step_rewards.append(
transformed.reward if transformed.reward is not None else 0.0
)
episode.add_step(
tool_name=tool_name,
arguments=arguments,
success=not is_error,
result=result_str,
error=error_msg,
timestamp=step_ts,
elapsed=step_elapsed,
)
logger.info(f" -> success={not is_error} ({step_elapsed:.2f}s)")
messages.append({
"role": "tool",
"tool_call_id": call_id,
"content": result_str,
})
if hasattr(checker, "set_episode"):
checker.set_episode(episode)
outcome_results = checker.check_all(scenario.outcome_checks)
if self.reward_mode == "openenv":
breakdown = self.openenv_calculator.calculate(
step_rewards=step_rewards,
outcome_results=outcome_results,
max_steps=scenario.max_steps,
actual_steps=len(episode.steps),
)
else:
breakdown = self.calculator.calculate(
episode=episode,
scenario=scenario,
outcome_results=outcome_results,
)
return episode, breakdown