"""CaPAgent — the core code-as-policy agent. Generates Python robot control code, executes in sandbox, iterates with multi-turn feedback. Orchestrates VDM, ensemble, and skill library. """ from __future__ import annotations import argparse import logging import time from pathlib import Path from typing import Any import numpy as np from anima_naka.agent.ensemble import EnsembleReasoner from anima_naka.agent.llm_client import LLMClient from anima_naka.agent.prompts import build_initial_prompt, build_multiturn_prompt from anima_naka.agent.vdm import VisualDifferencingModule from anima_naka.config import NakaConfig from anima_naka.constants import SINGLE_TURN_TIERS from anima_naka.executor.code_parser import CodeParser from anima_naka.skills.library import SkillLibrary from anima_naka.types import TrialResult logger = logging.getLogger("anima_naka.agent") class CaPAgent: """Code-as-Policy agent — NAKA's heart. Takes task description + API docs -> generates Python code -> executes -> iterates. """ def __init__(self, config: NakaConfig): self.config = config self.llm = LLMClient(model=config.agent_model, temperature=config.agent_temperature) self.vdm: VisualDifferencingModule | None = None if config.use_visual_differencing: self.vdm = VisualDifferencingModule(self.llm) self.skill_library: SkillLibrary | None = None if config.use_skill_library: self.skill_library = SkillLibrary(config.skill_library_path) self.ensemble: EnsembleReasoner | None = None if config.use_parallel_ensemble: self.ensemble = EnsembleReasoner() def run_trial(self, env: Any) -> TrialResult: """Execute a complete trial: reset -> code gen -> execute -> multi-turn -> result.""" start_time = time.perf_counter() obs, info = env.reset() skill_code = "" if self.skill_library: skill_code = self.skill_library.get_prompt_injection() messages = build_initial_prompt( task_description=info["task_description"], api_documentation=info["api_documentation"], skill_library_code=skill_code, ) if self.vdm and "robot0_robotview" in obs: rgb = self._get_rgb(obs) if rgb is not None: scene_desc = self.vdm.describe_scene(rgb, info["task_description"]) messages[-1]["content"] += f"\n\nCurrent scene observation:\n{scene_desc}" response = self._generate(messages) code_blocks: list[str] = [] execution_results = [] prev_rgb = self._get_rgb(obs) reward = 0.0 for _ in range(self.config.multiturn_limit): code_blocks_raw = CodeParser.extract_code_blocks(response) if not code_blocks_raw: break code = code_blocks_raw[0] code_blocks.append(code) obs, reward, terminated, truncated, exec_info = env.step(code) execution_results.append(exec_info["execution_result"]) if terminated or reward >= 1.0: break if truncated: break if self.config.tier in SINGLE_TURN_TIERS: break visual_diff = None if self.vdm and prev_rgb is not None: curr_rgb = self._get_rgb(obs) if curr_rgb is not None: visual_diff = self.vdm.describe_changes( prev_rgb, curr_rgb, info["task_description"], ) decision_messages = build_multiturn_prompt( executed_code=code, stdout=exec_info["stdout"], stderr=exec_info["stderr"], visual_diff=visual_diff, reward=reward, task_completed=exec_info.get("task_completed", False), ) decision_response = self._generate(decision_messages) decision, new_code = CodeParser.parse_decision(decision_response) if decision == "finish" or new_code is None: break response = new_code prev_rgb = self._get_rgb(obs) if reward >= 1.0 and self.skill_library: self.skill_library.extract_and_add(code_blocks, info.get("task_description", "")) duration = time.perf_counter() - start_time return TrialResult( trial_id=0, reward=reward, task_completed=reward >= 1.0, turns=len(code_blocks), code_blocks=code_blocks, execution_results=execution_results, duration_s=duration, ) def _generate(self, messages: list[dict[str, str]]) -> str: """Generate code using LLM (single or ensemble).""" if self.ensemble: return self.ensemble.generate(messages) return self.llm.query(messages) @staticmethod def _get_rgb(obs: dict) -> np.ndarray | None: """Extract RGB image from observation dict.""" try: return obs["robot0_robotview"]["images"]["rgb"] except (KeyError, TypeError): return None def _normalize_simulator(value: str | None) -> str: if value is None: return "mock" normalized = value.strip().lower() if normalized in {"real", "robosuite"}: return "robosuite" if normalized in {"mock", "none", "sim", ""}: return "mock" raise ValueError(f"Unsupported simulator '{value}'. Use mock or robosuite.") def main() -> None: """Entrypoint for module execution. Performs a smoke-load check and prints resolved configuration. """ parser = argparse.ArgumentParser(description="Run CaP-Agent smoke check") parser.add_argument("--config", type=str, default="configs/debug.toml") parser.add_argument("--sim", type=_normalize_simulator, default=None) args = parser.parse_args() cfg_path = Path(args.config) config = NakaConfig.from_toml(cfg_path) if cfg_path.exists() else NakaConfig() if args.sim is not None: config = config.model_copy(update={"simulator": args.sim}) print( f"CaP-Agent initialized | config={args.config} | " f"simulator={config.simulator} | task={config.task} | tier={config.tier}" ) if __name__ == "__main__": main()