| """CaPAgent — the core code-as-policy agent. |
| |
| Generates Python robot control code, executes in sandbox, iterates |
| with multi-turn feedback. Orchestrates VDM, ensemble, and skill library. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import argparse |
| import logging |
| import time |
| from pathlib import Path |
| from typing import Any |
|
|
| import numpy as np |
|
|
| from anima_naka.agent.ensemble import EnsembleReasoner |
| from anima_naka.agent.llm_client import LLMClient |
| from anima_naka.agent.prompts import build_initial_prompt, build_multiturn_prompt |
| from anima_naka.agent.vdm import VisualDifferencingModule |
| from anima_naka.config import NakaConfig |
| from anima_naka.constants import SINGLE_TURN_TIERS |
| from anima_naka.executor.code_parser import CodeParser |
| from anima_naka.skills.library import SkillLibrary |
| from anima_naka.types import TrialResult |
|
|
| logger = logging.getLogger("anima_naka.agent") |
|
|
|
|
| class CaPAgent: |
| """Code-as-Policy agent — NAKA's heart. |
| |
| Takes task description + API docs -> generates Python code -> executes -> iterates. |
| """ |
|
|
| def __init__(self, config: NakaConfig): |
| self.config = config |
| self.llm = LLMClient(model=config.agent_model, temperature=config.agent_temperature) |
|
|
| self.vdm: VisualDifferencingModule | None = None |
| if config.use_visual_differencing: |
| self.vdm = VisualDifferencingModule(self.llm) |
|
|
| self.skill_library: SkillLibrary | None = None |
| if config.use_skill_library: |
| self.skill_library = SkillLibrary(config.skill_library_path) |
|
|
| self.ensemble: EnsembleReasoner | None = None |
| if config.use_parallel_ensemble: |
| self.ensemble = EnsembleReasoner() |
|
|
| def run_trial(self, env: Any) -> TrialResult: |
| """Execute a complete trial: reset -> code gen -> execute -> multi-turn -> result.""" |
| start_time = time.perf_counter() |
| obs, info = env.reset() |
|
|
| skill_code = "" |
| if self.skill_library: |
| skill_code = self.skill_library.get_prompt_injection() |
|
|
| messages = build_initial_prompt( |
| task_description=info["task_description"], |
| api_documentation=info["api_documentation"], |
| skill_library_code=skill_code, |
| ) |
|
|
| if self.vdm and "robot0_robotview" in obs: |
| rgb = self._get_rgb(obs) |
| if rgb is not None: |
| scene_desc = self.vdm.describe_scene(rgb, info["task_description"]) |
| messages[-1]["content"] += f"\n\nCurrent scene observation:\n{scene_desc}" |
|
|
| response = self._generate(messages) |
|
|
| code_blocks: list[str] = [] |
| execution_results = [] |
| prev_rgb = self._get_rgb(obs) |
| reward = 0.0 |
|
|
| for _ in range(self.config.multiturn_limit): |
| code_blocks_raw = CodeParser.extract_code_blocks(response) |
| if not code_blocks_raw: |
| break |
| code = code_blocks_raw[0] |
| code_blocks.append(code) |
|
|
| obs, reward, terminated, truncated, exec_info = env.step(code) |
| execution_results.append(exec_info["execution_result"]) |
|
|
| if terminated or reward >= 1.0: |
| break |
|
|
| if truncated: |
| break |
|
|
| if self.config.tier in SINGLE_TURN_TIERS: |
| break |
|
|
| visual_diff = None |
| if self.vdm and prev_rgb is not None: |
| curr_rgb = self._get_rgb(obs) |
| if curr_rgb is not None: |
| visual_diff = self.vdm.describe_changes( |
| prev_rgb, |
| curr_rgb, |
| info["task_description"], |
| ) |
|
|
| decision_messages = build_multiturn_prompt( |
| executed_code=code, |
| stdout=exec_info["stdout"], |
| stderr=exec_info["stderr"], |
| visual_diff=visual_diff, |
| reward=reward, |
| task_completed=exec_info.get("task_completed", False), |
| ) |
| decision_response = self._generate(decision_messages) |
| decision, new_code = CodeParser.parse_decision(decision_response) |
| if decision == "finish" or new_code is None: |
| break |
|
|
| response = new_code |
| prev_rgb = self._get_rgb(obs) |
|
|
| if reward >= 1.0 and self.skill_library: |
| self.skill_library.extract_and_add(code_blocks, info.get("task_description", "")) |
|
|
| duration = time.perf_counter() - start_time |
| return TrialResult( |
| trial_id=0, |
| reward=reward, |
| task_completed=reward >= 1.0, |
| turns=len(code_blocks), |
| code_blocks=code_blocks, |
| execution_results=execution_results, |
| duration_s=duration, |
| ) |
|
|
| def _generate(self, messages: list[dict[str, str]]) -> str: |
| """Generate code using LLM (single or ensemble).""" |
| if self.ensemble: |
| return self.ensemble.generate(messages) |
| return self.llm.query(messages) |
|
|
| @staticmethod |
| def _get_rgb(obs: dict) -> np.ndarray | None: |
| """Extract RGB image from observation dict.""" |
| try: |
| return obs["robot0_robotview"]["images"]["rgb"] |
| except (KeyError, TypeError): |
| return None |
|
|
|
|
| def _normalize_simulator(value: str | None) -> str: |
| if value is None: |
| return "mock" |
| normalized = value.strip().lower() |
| if normalized in {"real", "robosuite"}: |
| return "robosuite" |
| if normalized in {"mock", "none", "sim", ""}: |
| return "mock" |
| raise ValueError(f"Unsupported simulator '{value}'. Use mock or robosuite.") |
|
|
|
|
| def main() -> None: |
| """Entrypoint for module execution. |
| |
| Performs a smoke-load check and prints resolved configuration. |
| """ |
| parser = argparse.ArgumentParser(description="Run CaP-Agent smoke check") |
| parser.add_argument("--config", type=str, default="configs/debug.toml") |
| parser.add_argument("--sim", type=_normalize_simulator, default=None) |
| args = parser.parse_args() |
|
|
| cfg_path = Path(args.config) |
| config = NakaConfig.from_toml(cfg_path) if cfg_path.exists() else NakaConfig() |
| if args.sim is not None: |
| config = config.model_copy(update={"simulator": args.sim}) |
|
|
| print( |
| f"CaP-Agent initialized | config={args.config} | " |
| f"simulator={config.simulator} | task={config.task} | tier={config.tier}" |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|