File size: 6,441 Bytes
665e529 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | """CaPAgent — the core code-as-policy agent.
Generates Python robot control code, executes in sandbox, iterates
with multi-turn feedback. Orchestrates VDM, ensemble, and skill library.
"""
from __future__ import annotations
import argparse
import logging
import time
from pathlib import Path
from typing import Any
import numpy as np
from anima_naka.agent.ensemble import EnsembleReasoner
from anima_naka.agent.llm_client import LLMClient
from anima_naka.agent.prompts import build_initial_prompt, build_multiturn_prompt
from anima_naka.agent.vdm import VisualDifferencingModule
from anima_naka.config import NakaConfig
from anima_naka.constants import SINGLE_TURN_TIERS
from anima_naka.executor.code_parser import CodeParser
from anima_naka.skills.library import SkillLibrary
from anima_naka.types import TrialResult
logger = logging.getLogger("anima_naka.agent")
class CaPAgent:
"""Code-as-Policy agent — NAKA's heart.
Takes task description + API docs -> generates Python code -> executes -> iterates.
"""
def __init__(self, config: NakaConfig):
self.config = config
self.llm = LLMClient(model=config.agent_model, temperature=config.agent_temperature)
self.vdm: VisualDifferencingModule | None = None
if config.use_visual_differencing:
self.vdm = VisualDifferencingModule(self.llm)
self.skill_library: SkillLibrary | None = None
if config.use_skill_library:
self.skill_library = SkillLibrary(config.skill_library_path)
self.ensemble: EnsembleReasoner | None = None
if config.use_parallel_ensemble:
self.ensemble = EnsembleReasoner()
def run_trial(self, env: Any) -> TrialResult:
"""Execute a complete trial: reset -> code gen -> execute -> multi-turn -> result."""
start_time = time.perf_counter()
obs, info = env.reset()
skill_code = ""
if self.skill_library:
skill_code = self.skill_library.get_prompt_injection()
messages = build_initial_prompt(
task_description=info["task_description"],
api_documentation=info["api_documentation"],
skill_library_code=skill_code,
)
if self.vdm and "robot0_robotview" in obs:
rgb = self._get_rgb(obs)
if rgb is not None:
scene_desc = self.vdm.describe_scene(rgb, info["task_description"])
messages[-1]["content"] += f"\n\nCurrent scene observation:\n{scene_desc}"
response = self._generate(messages)
code_blocks: list[str] = []
execution_results = []
prev_rgb = self._get_rgb(obs)
reward = 0.0
for _ in range(self.config.multiturn_limit):
code_blocks_raw = CodeParser.extract_code_blocks(response)
if not code_blocks_raw:
break
code = code_blocks_raw[0]
code_blocks.append(code)
obs, reward, terminated, truncated, exec_info = env.step(code)
execution_results.append(exec_info["execution_result"])
if terminated or reward >= 1.0:
break
if truncated:
break
if self.config.tier in SINGLE_TURN_TIERS:
break
visual_diff = None
if self.vdm and prev_rgb is not None:
curr_rgb = self._get_rgb(obs)
if curr_rgb is not None:
visual_diff = self.vdm.describe_changes(
prev_rgb,
curr_rgb,
info["task_description"],
)
decision_messages = build_multiturn_prompt(
executed_code=code,
stdout=exec_info["stdout"],
stderr=exec_info["stderr"],
visual_diff=visual_diff,
reward=reward,
task_completed=exec_info.get("task_completed", False),
)
decision_response = self._generate(decision_messages)
decision, new_code = CodeParser.parse_decision(decision_response)
if decision == "finish" or new_code is None:
break
response = new_code
prev_rgb = self._get_rgb(obs)
if reward >= 1.0 and self.skill_library:
self.skill_library.extract_and_add(code_blocks, info.get("task_description", ""))
duration = time.perf_counter() - start_time
return TrialResult(
trial_id=0,
reward=reward,
task_completed=reward >= 1.0,
turns=len(code_blocks),
code_blocks=code_blocks,
execution_results=execution_results,
duration_s=duration,
)
def _generate(self, messages: list[dict[str, str]]) -> str:
"""Generate code using LLM (single or ensemble)."""
if self.ensemble:
return self.ensemble.generate(messages)
return self.llm.query(messages)
@staticmethod
def _get_rgb(obs: dict) -> np.ndarray | None:
"""Extract RGB image from observation dict."""
try:
return obs["robot0_robotview"]["images"]["rgb"]
except (KeyError, TypeError):
return None
def _normalize_simulator(value: str | None) -> str:
if value is None:
return "mock"
normalized = value.strip().lower()
if normalized in {"real", "robosuite"}:
return "robosuite"
if normalized in {"mock", "none", "sim", ""}:
return "mock"
raise ValueError(f"Unsupported simulator '{value}'. Use mock or robosuite.")
def main() -> None:
"""Entrypoint for module execution.
Performs a smoke-load check and prints resolved configuration.
"""
parser = argparse.ArgumentParser(description="Run CaP-Agent smoke check")
parser.add_argument("--config", type=str, default="configs/debug.toml")
parser.add_argument("--sim", type=_normalize_simulator, default=None)
args = parser.parse_args()
cfg_path = Path(args.config)
config = NakaConfig.from_toml(cfg_path) if cfg_path.exists() else NakaConfig()
if args.sim is not None:
config = config.model_copy(update={"simulator": args.sim})
print(
f"CaP-Agent initialized | config={args.config} | "
f"simulator={config.simulator} | task={config.task} | tier={config.tier}"
)
if __name__ == "__main__":
main()
|