aws_rl_env / inference-complete.py
Sizzing's picture
Upload folder using huggingface_hub
0f8f2c1 verified
"""
Complete Inference Loop — Sweeps All Difficulty Tiers
=====================================================
Runs the LLM agent through the full curriculum (warmup -> expert),
printing per-step rewards and per-tier summaries. The environment's
curriculum handles task selection and tier promotion automatically.
Prerequisites:
- Environment accessible: Docker image (LOCAL_IMAGE_NAME) or running server (SERVER_URL)
- LLM API accessible: API_BASE_URL + HF_TOKEN/API_KEY
Env vars:
SERVER_URL Server endpoint (default: http://localhost:8000)
LOCAL_IMAGE_NAME Docker image name (uses Docker if set)
API_BASE_URL LLM API endpoint (default: https://router.huggingface.co/v1)
MODEL_NAME Model identifier (default: Qwen/Qwen2.5-72B-Instruct)
HF_TOKEN HuggingFace API token
API_KEY Fallback API key
MAX_STEPS Max steps per episode (default: 15)
MAX_EPISODES Max total episodes (default: 200)
TEMPERATURE Generation temperature (default: 0.7)
MAX_TOKENS Max generation tokens (default: 512)
CONVERGENCE_WINDOW Episodes for plateau check (default: 10)
CONVERGENCE_EPSILON Min improvement threshold (default: 0.01)
CONVERGENCE_PATIENCE Consecutive plateau checks (default: 3)
Stopping: Runs until reward convergence is detected at expert tier,
or MAX_EPISODES is reached. If the task is not complete the curriculum
re-prioritises it (weakness gets higher priority), so the agent will
retry weak tasks automatically.
Output format:
============================================================
Episode 1 — Task 0: List all S3 buckets (tier: warmup)
[Step 1] cmd="aws s3 ls" reward=1.00 success=True achieved=True
Result: PASSED (steps=1, max_reward=1.00)
...
FINAL RESULTS
=== TIER: warmup — 6/6 passed (100.0%) ===
=== OVERALL: 18/21 (85.7%) ===
"""
import asyncio
import os
import textwrap
from collections import defaultdict
from typing import List, Optional
from dotenv import load_dotenv
from openai import OpenAI
from client import AwsRlEnv
from models import AwsRlAction
load_dotenv()
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
SERVER_URL = os.getenv("SERVER_URL", "http://localhost:8000")
LOCAL_IMAGE_NAME = os.getenv("LOCAL_IMAGE_NAME")
API_BASE_URL = os.getenv("API_BASE_URL", "https://router.huggingface.co/v1")
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen2.5-72B-Instruct")
HF_TOKEN = os.getenv("HF_TOKEN")
API_KEY = os.getenv("API_KEY")
MAX_STEPS = int(os.getenv("MAX_STEPS", "15"))
MAX_EPISODES = int(os.getenv("MAX_EPISODES", "200"))
TEMPERATURE = float(os.getenv("TEMPERATURE", "0.7"))
MAX_TOKENS = int(os.getenv("MAX_TOKENS", "512"))
# Convergence at expert level
CONVERGENCE_WINDOW = int(os.getenv("CONVERGENCE_WINDOW", "10"))
CONVERGENCE_EPSILON = float(os.getenv("CONVERGENCE_EPSILON", "0.01"))
CONVERGENCE_PATIENCE = int(os.getenv("CONVERGENCE_PATIENCE", "2"))
ALL_TIERS = ["warmup", "beginner", "intermediate", "advanced", "expert"]
SYSTEM_PROMPT = textwrap.dedent(
"""
You are an AWS cloud engineer interacting with a real AWS environment via CLI.
Each turn you must send exactly ONE valid AWS CLI command (starting with 'aws').
You will be given a task to accomplish. Read the task description carefully.
Use the command output and error messages to guide your next action.
Rules:
- Only send AWS CLI commands (e.g. 'aws s3 ls', 'aws dynamodb create-table ...')
- One command per turn — no pipes, no shell syntax, no chaining
- Reply with ONLY the command, nothing else — no explanations, no quotes
- If unsure, use 'aws help' to get unstuck, but try to be specific to the service if possible (e.g. 'aws s3 help')
- When ever you need a hint, use 'aws help --task-hint' to get a task-specific hint (you can use this multiple times for more hints, but hints reduce your reward)
"""
).strip()
# ---------------------------------------------------------------------------
# Prompt building & model calling (reused from inference.py patterns)
# ---------------------------------------------------------------------------
def build_user_prompt(
task_description: str,
step: int,
last_output: str,
last_error: str,
last_reward: float,
history: List[str],
) -> str:
history_block = "\n".join(history) if history else "None"
return textwrap.dedent(
f"""
TASK: {task_description}
Step: {step}
Last command output: {last_output!r}
Last error: {last_error!r}
Last reward: {last_reward:.2f}
Previous steps:
{history_block}
Send your next AWS CLI command.
"""
).strip()
def get_model_command(
client: OpenAI,
task_description: str,
step: int,
last_output: str,
last_error: str,
last_reward: float,
history: List[str],
) -> str:
user_prompt = build_user_prompt(
task_description, step, last_output, last_error, last_reward, history
)
try:
completion = client.chat.completions.create(
model=MODEL_NAME,
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_prompt},
],
temperature=TEMPERATURE,
max_tokens=MAX_TOKENS,
stream=False,
)
text = (completion.choices[0].message.content or "").strip()
# Strip markdown code fences if the model wraps the command
if text.startswith("```"):
lines = text.split("\n")
text = "\n".join(
line for line in lines if not line.startswith("```")
).strip()
return text if text.startswith("aws ") else "aws help"
except Exception as exc:
print(f" [WARN] Model request failed: {exc}", flush=True)
return "aws help"
# ---------------------------------------------------------------------------
# Episode runner
# ---------------------------------------------------------------------------
async def run_episode(env: AwsRlEnv, llm_client: OpenAI) -> Optional[dict]:
"""Run a single episode: reset -> step loop -> return results."""
result = await env.reset()
obs = result.observation
task = obs.task
episode_num = obs.episode_id
if task is None:
print(f"Episode {episode_num} : No task assigned, skipping")
return None
tier = str(task.difficulty)
task_desc = task.description
task_id = int(task.task_id)
print(f"\n{'=' * 60}")
print(f"Episode {episode_num} -- Task {task_id}: {task_desc} (tier: {tier})")
print(f"\n{'=' * 60}")
history: List[str] = []
last_output = obs.command_output
last_error = ""
last_reward = 0.0
rewards: List[float] = []
achieved = False
for step in range(1, MAX_STEPS + 1):
if result.done:
break
command = get_model_command(
llm_client,
task_desc,
step,
last_output,
last_error,
last_reward,
history,
)
result = await env.step(AwsRlAction(command=command))
obs = result.observation
reward = result.reward or 0.0
success = obs.command_success
task_achieved = obs.task_achieved
rewards.append(reward)
print()
print(f"\n{'-' * 60}")
print(
f' [Step {step}] cmd="{command}" command_output={obs.command_output!r} '
f"reward={reward:.2f} command_success={success} achieved={task_achieved}"
)
print(f"\n{'-' * 60}")
print()
status = "OK" if success else "FAIL"
history.append(
f"Step {step} [{status}]: {command} [command_output]={obs.command_output!r} [error]={obs.error!r} -> reward={reward:.2f}"
)
last_output = obs.command_output
last_error = obs.error
last_reward = reward
if task_achieved:
achieved = True
break
max_reward = max(rewards) if rewards else 0.0
result_str = "PASSED" if achieved else "FAILED"
print(f" Result: {result_str} (steps={len(rewards)}, max_reward={max_reward:.2f})")
return {
"task_id": task_id,
"tier": tier,
"achieved": achieved,
"steps": len(rewards),
"rewards": rewards,
"max_reward": max_reward,
}
# ---------------------------------------------------------------------------
# Convergence detector
# ---------------------------------------------------------------------------
class ConvergenceDetector:
"""Tracks reward history at expert tier and detects plateau."""
def __init__(self) -> None:
self.expert_rewards: List[float] = []
self._plateau_count: int = 0
def record(self, tier: str, max_reward: float) -> None:
if tier == "expert":
self.expert_rewards.append(max_reward)
def is_converged(self) -> bool:
if len(self.expert_rewards) < CONVERGENCE_WINDOW:
return False
half = CONVERGENCE_WINDOW // 2
recent = self.expert_rewards[-half:]
older = self.expert_rewards[-(half * 2) : -half]
if not older:
return False
recent_avg = sum(recent) / len(recent)
older_avg = sum(older) / len(older)
improvement = abs(recent_avg - older_avg)
if improvement < CONVERGENCE_EPSILON:
self._plateau_count += 1
else:
self._plateau_count = 0
return self._plateau_count >= CONVERGENCE_PATIENCE
# ---------------------------------------------------------------------------
# Summary printer
# ---------------------------------------------------------------------------
def print_summary(tier_results: dict[str, list]) -> None:
total_passed = 0
total_tasks = 0
print(f"\n{'=' * 60}")
print("FINAL RESULTS")
print(f"{'=' * 60}")
for tier in ALL_TIERS:
results = tier_results.get(tier, [])
if not results:
print(f"\n=== TIER: {tier} -- not reached ===")
continue
passed = sum(1 for r in results if r["achieved"])
total = len(results)
pct = (passed / total * 100) if total > 0 else 0
print(f"\n=== TIER: {tier} -- {passed}/{total} passed ({pct:.1f}%) ===")
for r in results:
status = "PASS" if r["achieved"] else "FAIL"
print(
f" Task {r['task_id']}: {status} "
f"(steps={r['steps']}, reward={r['max_reward']:.2f})"
)
total_passed += passed
total_tasks += total
overall_pct = (total_passed / total_tasks * 100) if total_tasks > 0 else 0
print(f"\n=== OVERALL: {total_passed}/{total_tasks} ({overall_pct:.1f}%) ===")
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
async def main() -> None:
key = HF_TOKEN if HF_TOKEN else API_KEY
if not key:
print("ERROR: Set HF_TOKEN or API_KEY for the LLM API.")
return
llm_client = OpenAI(base_url=API_BASE_URL, api_key=key)
# Connect to environment
if LOCAL_IMAGE_NAME:
print(f"Starting environment from Docker image: {LOCAL_IMAGE_NAME}")
env = await AwsRlEnv.from_docker_image(LOCAL_IMAGE_NAME)
else:
print(f"Connecting to server: {SERVER_URL}")
env = AwsRlEnv(base_url=SERVER_URL)
tier_results: dict[str, list] = defaultdict(list)
convergence = ConvergenceDetector()
try:
for episode in range(1, MAX_EPISODES + 1):
ep_result = await run_episode(env, llm_client)
if ep_result is None:
continue
tier = ep_result["tier"]
tier_results[tier].append(ep_result)
# Track convergence
convergence.record(tier, ep_result["max_reward"])
if convergence.is_converged():
print(
f"\nConvergence detected at expert level "
f"after {episode} episodes "
f"(plateau for {CONVERGENCE_PATIENCE} consecutive checks). "
f"Stopping."
)
break
print_summary(tier_results)
finally:
try:
await env.close()
except Exception as e:
print(f"[WARN] env.close() error: {e}")
if __name__ == "__main__":
asyncio.run(main())