Spaces:
Running
Running
| """ | |
| Complete Inference Loop — Sweeps All Difficulty Tiers | |
| ===================================================== | |
| Runs the LLM agent through the full curriculum (warmup -> expert), | |
| printing per-step rewards and per-tier summaries. The environment's | |
| curriculum handles task selection and tier promotion automatically. | |
| Prerequisites: | |
| - Environment accessible: Docker image (LOCAL_IMAGE_NAME) or running server (SERVER_URL) | |
| - LLM API accessible: API_BASE_URL + HF_TOKEN/API_KEY | |
| Env vars: | |
| SERVER_URL Server endpoint (default: http://localhost:8000) | |
| LOCAL_IMAGE_NAME Docker image name (uses Docker if set) | |
| API_BASE_URL LLM API endpoint (default: https://router.huggingface.co/v1) | |
| MODEL_NAME Model identifier (default: Qwen/Qwen2.5-72B-Instruct) | |
| HF_TOKEN HuggingFace API token | |
| API_KEY Fallback API key | |
| MAX_STEPS Max steps per episode (default: 15) | |
| MAX_EPISODES Max total episodes (default: 200) | |
| TEMPERATURE Generation temperature (default: 0.7) | |
| MAX_TOKENS Max generation tokens (default: 512) | |
| CONVERGENCE_WINDOW Episodes for plateau check (default: 10) | |
| CONVERGENCE_EPSILON Min improvement threshold (default: 0.01) | |
| CONVERGENCE_PATIENCE Consecutive plateau checks (default: 3) | |
| Stopping: Runs until reward convergence is detected at expert tier, | |
| or MAX_EPISODES is reached. If the task is not complete the curriculum | |
| re-prioritises it (weakness gets higher priority), so the agent will | |
| retry weak tasks automatically. | |
| Output format: | |
| ============================================================ | |
| Episode 1 — Task 0: List all S3 buckets (tier: warmup) | |
| [Step 1] cmd="aws s3 ls" reward=1.00 success=True achieved=True | |
| Result: PASSED (steps=1, max_reward=1.00) | |
| ... | |
| FINAL RESULTS | |
| === TIER: warmup — 6/6 passed (100.0%) === | |
| === OVERALL: 18/21 (85.7%) === | |
| """ | |
| import asyncio | |
| import os | |
| import textwrap | |
| from collections import defaultdict | |
| from typing import List, Optional | |
| from dotenv import load_dotenv | |
| from openai import OpenAI | |
| from client import AwsRlEnv | |
| from models import AwsRlAction | |
| load_dotenv() | |
| # --------------------------------------------------------------------------- | |
| # Config | |
| # --------------------------------------------------------------------------- | |
| SERVER_URL = os.getenv("SERVER_URL", "http://localhost:8000") | |
| LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME") | |
| API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1") | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct") | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| API_KEY = os.getenv("API_KEY") | |
| MAX_STEPS = int(os.getenv("MAX_STEPS", "15")) | |
| MAX_EPISODES = int(os.getenv("MAX_EPISODES", "200")) | |
| TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7")) | |
| MAX_TOKENS = int(os.getenv("MAX_TOKENS", "512")) | |
| # Convergence at expert level | |
| CONVERGENCE_WINDOW = int(os.getenv("CONVERGENCE_WINDOW", "10")) | |
| CONVERGENCE_EPSILON = float(os.getenv("CONVERGENCE_EPSILON", "0.01")) | |
| CONVERGENCE_PATIENCE = int(os.getenv("CONVERGENCE_PATIENCE", "2")) | |
| ALL_TIERS = ["warmup", "beginner", "intermediate", "advanced", "expert"] | |
| SYSTEM_PROMPT = textwrap.dedent( | |
| """ | |
| You are an AWS cloud engineer interacting with a real AWS environment via CLI. | |
| Each turn you must send exactly ONE valid AWS CLI command (starting with 'aws'). | |
| You will be given a task to accomplish. Read the task description carefully. | |
| Use the command output and error messages to guide your next action. | |
| Rules: | |
| - Only send AWS CLI commands (e.g. 'aws s3 ls', 'aws dynamodb create-table ...') | |
| - One command per turn — no pipes, no shell syntax, no chaining | |
| - Reply with ONLY the command, nothing else — no explanations, no quotes | |
| - If unsure, use 'aws help' to get unstuck, but try to be specific to the service if possible (e.g. 'aws s3 help') | |
| - When ever you need a hint, use 'aws help --task-hint' to get a task-specific hint (you can use this multiple times for more hints, but hints reduce your reward) | |
| """ | |
| ).strip() | |
| # --------------------------------------------------------------------------- | |
| # Prompt building & model calling (reused from inference.py patterns) | |
| # --------------------------------------------------------------------------- | |
| def build_user_prompt( | |
| task_description: str, | |
| step: int, | |
| last_output: str, | |
| last_error: str, | |
| last_reward: float, | |
| history: List[str], | |
| ) -> str: | |
| history_block = "\n".join(history) if history else "None" | |
| return textwrap.dedent( | |
| f""" | |
| TASK: {task_description} | |
| Step: {step} | |
| Last command output: {last_output!r} | |
| Last error: {last_error!r} | |
| Last reward: {last_reward:.2f} | |
| Previous steps: | |
| {history_block} | |
| Send your next AWS CLI command. | |
| """ | |
| ).strip() | |
| def get_model_command( | |
| client: OpenAI, | |
| task_description: str, | |
| step: int, | |
| last_output: str, | |
| last_error: str, | |
| last_reward: float, | |
| history: List[str], | |
| ) -> str: | |
| user_prompt = build_user_prompt( | |
| task_description, step, last_output, last_error, last_reward, history | |
| ) | |
| try: | |
| completion = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=[ | |
| {"role": "system", "content": SYSTEM_PROMPT}, | |
| {"role": "user", "content": user_prompt}, | |
| ], | |
| temperature=TEMPERATURE, | |
| max_tokens=MAX_TOKENS, | |
| stream=False, | |
| ) | |
| text = (completion.choices[0].message.content or "").strip() | |
| # Strip markdown code fences if the model wraps the command | |
| if text.startswith("```"): | |
| lines = text.split("\n") | |
| text = "\n".join( | |
| line for line in lines if not line.startswith("```") | |
| ).strip() | |
| return text if text.startswith("aws ") else "aws help" | |
| except Exception as exc: | |
| print(f" [WARN] Model request failed: {exc}", flush=True) | |
| return "aws help" | |
| # --------------------------------------------------------------------------- | |
| # Episode runner | |
| # --------------------------------------------------------------------------- | |
| async def run_episode(env: AwsRlEnv, llm_client: OpenAI) -> Optional[dict]: | |
| """Run a single episode: reset -> step loop -> return results.""" | |
| result = await env.reset() | |
| obs = result.observation | |
| task = obs.task | |
| episode_num = obs.episode_id | |
| if task is None: | |
| print(f"Episode {episode_num} : No task assigned, skipping") | |
| return None | |
| tier = str(task.difficulty) | |
| task_desc = task.description | |
| task_id = int(task.task_id) | |
| print(f"\n{'=' * 60}") | |
| print(f"Episode {episode_num} -- Task {task_id}: {task_desc} (tier: {tier})") | |
| print(f"\n{'=' * 60}") | |
| history: List[str] = [] | |
| last_output = obs.command_output | |
| last_error = "" | |
| last_reward = 0.0 | |
| rewards: List[float] = [] | |
| achieved = False | |
| for step in range(1, MAX_STEPS + 1): | |
| if result.done: | |
| break | |
| command = get_model_command( | |
| llm_client, | |
| task_desc, | |
| step, | |
| last_output, | |
| last_error, | |
| last_reward, | |
| history, | |
| ) | |
| result = await env.step(AwsRlAction(command=command)) | |
| obs = result.observation | |
| reward = result.reward or 0.0 | |
| success = obs.command_success | |
| task_achieved = obs.task_achieved | |
| rewards.append(reward) | |
| print() | |
| print(f"\n{'-' * 60}") | |
| print( | |
| f' [Step {step}] cmd="{command}" command_output={obs.command_output!r} ' | |
| f"reward={reward:.2f} command_success={success} achieved={task_achieved}" | |
| ) | |
| print(f"\n{'-' * 60}") | |
| print() | |
| status = "OK" if success else "FAIL" | |
| history.append( | |
| f"Step {step} [{status}]: {command} [command_output]={obs.command_output!r} [error]={obs.error!r} -> reward={reward:.2f}" | |
| ) | |
| last_output = obs.command_output | |
| last_error = obs.error | |
| last_reward = reward | |
| if task_achieved: | |
| achieved = True | |
| break | |
| max_reward = max(rewards) if rewards else 0.0 | |
| result_str = "PASSED" if achieved else "FAILED" | |
| print(f" Result: {result_str} (steps={len(rewards)}, max_reward={max_reward:.2f})") | |
| return { | |
| "task_id": task_id, | |
| "tier": tier, | |
| "achieved": achieved, | |
| "steps": len(rewards), | |
| "rewards": rewards, | |
| "max_reward": max_reward, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Convergence detector | |
| # --------------------------------------------------------------------------- | |
| class ConvergenceDetector: | |
| """Tracks reward history at expert tier and detects plateau.""" | |
| def __init__(self) -> None: | |
| self.expert_rewards: List[float] = [] | |
| self._plateau_count: int = 0 | |
| def record(self, tier: str, max_reward: float) -> None: | |
| if tier == "expert": | |
| self.expert_rewards.append(max_reward) | |
| def is_converged(self) -> bool: | |
| if len(self.expert_rewards) < CONVERGENCE_WINDOW: | |
| return False | |
| half = CONVERGENCE_WINDOW // 2 | |
| recent = self.expert_rewards[-half:] | |
| older = self.expert_rewards[-(half * 2) : -half] | |
| if not older: | |
| return False | |
| recent_avg = sum(recent) / len(recent) | |
| older_avg = sum(older) / len(older) | |
| improvement = abs(recent_avg - older_avg) | |
| if improvement < CONVERGENCE_EPSILON: | |
| self._plateau_count += 1 | |
| else: | |
| self._plateau_count = 0 | |
| return self._plateau_count >= CONVERGENCE_PATIENCE | |
| # --------------------------------------------------------------------------- | |
| # Summary printer | |
| # --------------------------------------------------------------------------- | |
| def print_summary(tier_results: dict[str, list]) -> None: | |
| total_passed = 0 | |
| total_tasks = 0 | |
| print(f"\n{'=' * 60}") | |
| print("FINAL RESULTS") | |
| print(f"{'=' * 60}") | |
| for tier in ALL_TIERS: | |
| results = tier_results.get(tier, []) | |
| if not results: | |
| print(f"\n=== TIER: {tier} -- not reached ===") | |
| continue | |
| passed = sum(1 for r in results if r["achieved"]) | |
| total = len(results) | |
| pct = (passed / total * 100) if total > 0 else 0 | |
| print(f"\n=== TIER: {tier} -- {passed}/{total} passed ({pct:.1f}%) ===") | |
| for r in results: | |
| status = "PASS" if r["achieved"] else "FAIL" | |
| print( | |
| f" Task {r['task_id']}: {status} " | |
| f"(steps={r['steps']}, reward={r['max_reward']:.2f})" | |
| ) | |
| total_passed += passed | |
| total_tasks += total | |
| overall_pct = (total_passed / total_tasks * 100) if total_tasks > 0 else 0 | |
| print(f"\n=== OVERALL: {total_passed}/{total_tasks} ({overall_pct:.1f}%) ===") | |
| # --------------------------------------------------------------------------- | |
| # Main | |
| # --------------------------------------------------------------------------- | |
| async def main() -> None: | |
| key = HF_TOKEN if HF_TOKEN else API_KEY | |
| if not key: | |
| print("ERROR: Set HF_TOKEN or API_KEY for the LLM API.") | |
| return | |
| llm_client = OpenAI(base_url=API_BASE_URL, api_key=key) | |
| # Connect to environment | |
| if LOCAL_IMAGE_NAME: | |
| print(f"Starting environment from Docker image: {LOCAL_IMAGE_NAME}") | |
| env = await AwsRlEnv.from_docker_image(LOCAL_IMAGE_NAME) | |
| else: | |
| print(f"Connecting to server: {SERVER_URL}") | |
| env = AwsRlEnv(base_url=SERVER_URL) | |
| tier_results: dict[str, list] = defaultdict(list) | |
| convergence = ConvergenceDetector() | |
| try: | |
| for episode in range(1, MAX_EPISODES + 1): | |
| ep_result = await run_episode(env, llm_client) | |
| if ep_result is None: | |
| continue | |
| tier = ep_result["tier"] | |
| tier_results[tier].append(ep_result) | |
| # Track convergence | |
| convergence.record(tier, ep_result["max_reward"]) | |
| if convergence.is_converged(): | |
| print( | |
| f"\nConvergence detected at expert level " | |
| f"after {episode} episodes " | |
| f"(plateau for {CONVERGENCE_PATIENCE} consecutive checks). " | |
| f"Stopping." | |
| ) | |
| break | |
| print_summary(tier_results) | |
| finally: | |
| try: | |
| await env.close() | |
| except Exception as e: | |
| print(f"[WARN] env.close() error: {e}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |