aws_rl_env / tests_tasks /test_intermediate_tasks.py
Sizzing's picture
Upload folder using huggingface_hub
e56d042 verified
"""Tests for intermediate-tier tasks — verifies multi-step command sequences and grading.
Intermediate tasks require the agent to execute multiple AWS CLI commands in order.
The grader checks that each step's operation + resource has been executed successfully
via the EpisodeTracker.
Each test resets MiniStack, executes the full command sequence, and asserts the grader
returns task_achieved=True with reward=1.0.
Run inside Docker:
docker exec aws-rl-env python -m pytest tests/test_intermediate_tasks.py -v
"""
import json
import pytest
import yaml
from pathlib import Path
from models import SuccessCriteria, Task, TaskID, TaskDifficulty, SetupCommand
from server.services.simulator_strategy import SimulatorStrategy
from server.services.task_grader import TaskGrader
from server.services.episode_tracker import EpisodeTracker
TASKS_FILE = (
Path(__file__).resolve().parent.parent
/ "server"
/ "services"
/ "tasks"
/ "intermediate.yaml"
)
# Mapping of task_id -> ordered list of AWS CLI commands to complete the task
INTERMEDIATE_COMMANDS: dict[int, list[str]] = {
11: [
"aws s3api create-bucket --bucket data-pipeline",
"aws s3api put-object --bucket data-pipeline --key test.txt --content-type text/plain",
],
12: [
(
"aws dynamodb create-table --table-name orders "
"--key-schema AttributeName=order_id,KeyType=HASH "
"--attribute-definitions AttributeName=order_id,AttributeType=S "
"--billing-mode PAY_PER_REQUEST"
),
(
"aws dynamodb put-item --table-name orders "
'--item \'{"order_id":{"S":"001"},"status":{"S":"pending"}}\''
),
],
13: [
"aws sns create-topic --name alerts",
"aws sqs create-queue --queue-name alert-inbox",
(
"aws sns subscribe --topic-arn arn:aws:sns:us-east-1:000000000000:alerts "
"--protocol sqs "
"--notification-endpoint arn:aws:sqs:us-east-1:000000000000:alert-inbox"
),
],
14: [
(
"aws iam create-role --role-name lambda-exec-role "
"--assume-role-policy-document "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Principal":{"Service":"lambda.amazonaws.com"},"Action":"sts:AssumeRole"}]}\''
),
(
"aws iam attach-role-policy --role-name lambda-exec-role "
"--policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"
),
],
66: [
"aws s3api create-bucket --bucket app-assets",
(
"aws iam create-policy --policy-name app-assets-read-policy "
"--policy-document "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Action":"s3:GetObject","Resource":"arn:aws:s3:::app-assets/*"}]}\''
),
],
67: [
(
"aws dynamodb create-table --table-name user-sessions "
"--key-schema AttributeName=session_id,KeyType=HASH "
"--attribute-definitions AttributeName=session_id,AttributeType=S "
"--billing-mode PAY_PER_REQUEST"
),
"aws s3api create-bucket --bucket session-exports",
],
68: [
(
"aws iam create-role --role-name data-processor-role "
"--assume-role-policy-document "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Principal":{"Service":"lambda.amazonaws.com"},"Action":"sts:AssumeRole"}]}\''
),
(
"aws lambda create-function --function-name data-processor "
"--runtime python3.12 --handler index.handler "
"--role arn:aws:iam::000000000000:role/data-processor-role "
"--code S3Bucket=dummy,S3Key=dummy.zip"
),
],
69: [
"aws sqs create-queue --queue-name order-events",
"aws sns create-topic --name order-notifications",
(
"aws sns subscribe "
"--topic-arn arn:aws:sns:us-east-1:000000000000:order-notifications "
"--protocol sqs "
"--notification-endpoint arn:aws:sqs:us-east-1:000000000000:order-events"
),
],
70: [
(
"aws secretsmanager create-secret --name db-credentials "
'--secret-string \'{"username":"admin","password":"secret123"}\''
),
(
"aws iam create-role --role-name secret-reader-role "
"--assume-role-policy-document "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Principal":{"Service":"lambda.amazonaws.com"},"Action":"sts:AssumeRole"}]}\''
),
],
71: [
(
"aws ssm put-parameter --name /app/config/db-host "
"--type String --value db.internal.local"
),
(
"aws lambda create-function --function-name config-loader "
"--runtime python3.12 --handler index.handler "
"--role arn:aws:iam::000000000000:role/lambda-exec-role "
"--code S3Bucket=dummy,S3Key=dummy.zip"
),
],
72: [
(
"aws lambda create-function --function-name scheduled-task "
"--runtime python3.12 --handler index.handler "
"--role arn:aws:iam::000000000000:role/lambda-exec-role "
"--code S3Bucket=dummy,S3Key=dummy.zip"
),
'aws events put-rule --name every-five-minutes --schedule-expression "rate(5 minutes)"',
(
"aws events put-targets --rule every-five-minutes "
"--targets Id=1,Arn=arn:aws:lambda:us-east-1:000000000000:function:scheduled-task"
),
],
73: [
(
"aws iam create-role --role-name ecs-task-role "
"--assume-role-policy-document "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Principal":{"Service":"ecs-tasks.amazonaws.com"},"Action":"sts:AssumeRole"}]}\''
),
(
"aws iam attach-role-policy --role-name ecs-task-role "
"--policy-arn arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"
),
],
74: [
(
"aws secretsmanager create-secret --name rds-master-password "
"--secret-string "
'\'{"host":"db.local","port":"3306","username":"admin","password":"secret"}\''
),
(
"aws rds create-db-instance --db-instance-identifier app-database "
"--engine mysql --db-instance-class db.t3.micro "
"--master-username admin --master-user-password secret"
),
],
75: [
(
"aws elbv2 create-target-group --name web-targets "
"--protocol HTTP --port 80 --vpc-id vpc-00000001"
),
(
"aws route53 create-hosted-zone --name app.example.com "
"--caller-reference unique-ref-75"
),
],
76: [
"aws cognito-idp create-user-pool --pool-name app-users",
# second command placeholder — needs dynamic user-pool-id (see DYNAMIC_TASKS)
],
77: [
"aws efs create-file-system --creation-token app-storage",
(
"aws ec2 create-security-group --group-name efs-mount-sg "
'--description "Allow NFS access for EFS mount"'
),
],
78: [
"aws ec2 create-volume --size 20 --availability-zone us-east-1a --volume-type gp3 "
"--tag-specifications ResourceType=volume,Tags=[{Key=Name,Value=data-volume}]",
# second command placeholder — needs dynamic volume-id (see DYNAMIC_TASKS)
],
79: [
(
"aws elasticache create-cache-subnet-group "
"--cache-subnet-group-name cache-subnets "
'--cache-subnet-group-description "Cache subnets" '
"--subnet-ids subnet-00000001 subnet-00000002"
),
(
"aws elasticache create-cache-cluster --cache-cluster-id session-cache "
"--engine redis --cache-node-type cache.t3.micro --num-cache-nodes 1"
),
],
80: [
'aws glue create-database --database-input \'{"Name":"analytics-db"}\'',
(
"aws glue create-crawler --name raw-data-crawler "
"--role arn:aws:iam::000000000000:role/glue-role "
"--database-name analytics-db "
'--targets \'{"S3Targets":[{"Path":"s3://data-bucket/raw/"}]}\''
),
],
81: [
(
"aws cloudformation create-stack --stack-name vpc-stack "
'--template-body \'{"AWSTemplateFormatVersion":"2010-09-09","Resources":{}}\''
),
"aws cloudformation describe-stacks --stack-name vpc-stack",
],
82: [
"aws apigatewayv2 create-api --name products-api --protocol-type HTTP",
# second command placeholder — needs dynamic api-id (see DYNAMIC_TASKS)
],
83: [
"aws s3api create-bucket --bucket firehose-delivery",
(
"aws firehose create-delivery-stream --delivery-stream-name event-stream "
"--s3-destination-configuration "
"RoleARN=arn:aws:iam::000000000000:role/firehose-role,"
"BucketARN=arn:aws:s3:::firehose-delivery"
),
],
84: [
"aws sqs create-queue --queue-name task-queue",
# second command placeholder — needs dynamic queue-url (see DYNAMIC_TASKS)
],
85: [
(
"aws dynamodb create-table --table-name products "
"--key-schema AttributeName=product_id,KeyType=HASH "
"AttributeName=category,KeyType=RANGE "
"--attribute-definitions AttributeName=product_id,AttributeType=S "
"AttributeName=category,AttributeType=S "
"--billing-mode PAY_PER_REQUEST"
),
(
"aws dynamodb put-item --table-name products "
'--item \'{"product_id":{"S":"P001"},"category":{"S":"electronics"},'
'"name":{"S":"Wireless Mouse"}}\''
),
],
86: [
(
"aws iam create-role --role-name firehose-delivery-role "
"--assume-role-policy-document "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Principal":{"Service":"firehose.amazonaws.com"},"Action":"sts:AssumeRole"}]}\''
),
(
"aws iam create-policy --policy-name s3-write-policy "
"--policy-document "
'\'{"Version":"2012-10-17","Statement":[{"Effect":"Allow",'
'"Action":"s3:PutObject","Resource":"*"}]}\''
),
(
"aws iam attach-role-policy --role-name firehose-delivery-role "
"--policy-arn arn:aws:iam::000000000000:policy/s3-write-policy"
),
],
}
def _resolve_dynamic_commands(task_id: int, outputs: list[str]) -> list[str]:
"""Generate additional commands for tasks that need dynamic IDs from prior outputs.
Returns extra commands to append after the static ones have run.
"""
if task_id == 76:
# create-user-pool-client needs the user-pool-id from create-user-pool output
data = json.loads(outputs[0])
pool_id = data["UserPool"]["Id"]
return [
f"aws cognito-idp create-user-pool-client --user-pool-id {pool_id} "
f"--client-name web-app-client"
]
if task_id == 78:
# create-tags needs the volume-id from create-volume output
data = json.loads(outputs[0])
vol_id = data["VolumeId"]
return [
f"aws ec2 create-tags --resources {vol_id} "
f"--tags Key=Name,Value=data-volume"
]
if task_id == 82:
# create-route needs the api-id from create-api output
data = json.loads(outputs[0])
api_id = data["ApiId"]
return [
f"aws apigatewayv2 create-route --api-id {api_id} "
f'--route-key "GET /products-api"'
]
if task_id == 84:
# send-message needs the queue-url from create-queue output
data = json.loads(outputs[0])
queue_url = data["QueueUrl"]
return [
f"aws sqs send-message --queue-url {queue_url} "
f'--message-body \'{{"task":"process","id":"task-queue-001"}}\''
]
return []
# Tasks that have placeholder entries and need dynamic command resolution
_DYNAMIC_TASK_IDS = {76, 78, 82, 84}
def _execute_all_commands(
task_id: int, backend: SimulatorStrategy
) -> list[tuple[str, bool, str, str]]:
"""Execute static commands, resolve dynamic follow-ups, return all (cmd, ok, out, err)."""
static_cmds = INTERMEDIATE_COMMANDS[task_id]
results: list[tuple[str, bool, str, str]] = []
for cmd in static_cmds:
success, stdout, stderr = backend.execute_command(cmd)
results.append((cmd, success, stdout, stderr))
if task_id in _DYNAMIC_TASK_IDS:
outputs = [r[2] for r in results]
extra_cmds = _resolve_dynamic_commands(task_id, outputs)
for cmd in extra_cmds:
success, stdout, stderr = backend.execute_command(cmd)
results.append((cmd, success, stdout, stderr))
return results
@pytest.fixture(scope="module")
def backend() -> SimulatorStrategy:
return SimulatorStrategy()
@pytest.fixture(scope="module")
def grader(backend: SimulatorStrategy) -> TaskGrader:
return TaskGrader(backend)
@pytest.fixture(scope="module")
def intermediate_tasks() -> list[dict]:
with open(TASKS_FILE) as f:
return yaml.safe_load(f)
def _build_task(entry: dict) -> Task:
"""Build a Task model from a raw YAML entry."""
return Task(
task_id=TaskID(entry["task_id"]),
difficulty=TaskDifficulty.INTERMEDIATE,
description=entry["description"],
success_criteria=SuccessCriteria(**entry.get("success_criteria", {})),
setup_commands=[
SetupCommand(command=cmd) if isinstance(cmd, str) else SetupCommand(**cmd)
for cmd in entry.get("setup_commands", [])
],
)
def test_all_intermediate_tasks_have_commands(intermediate_tasks: list[dict]) -> None:
"""Every intermediate task in the YAML must have a corresponding test command sequence."""
missing = [
t["task_id"]
for t in intermediate_tasks
if t["task_id"] not in INTERMEDIATE_COMMANDS
]
assert not missing, f"No test commands mapped for task_ids: {missing}"
@pytest.mark.parametrize(
"task_id",
sorted(INTERMEDIATE_COMMANDS.keys()),
ids=[f"task_{tid}" for tid in sorted(INTERMEDIATE_COMMANDS.keys())],
)
def test_intermediate_task_commands_execute(
task_id: int,
backend: SimulatorStrategy,
) -> None:
"""All commands in the sequence must execute successfully against MiniStack."""
backend.reset_environment()
results = _execute_all_commands(task_id, backend)
for i, (cmd, success, stdout, stderr) in enumerate(results):
assert success, (
f"Command {i + 1}/{len(results)} failed for task {task_id}.\n"
f" Command: {cmd}\n"
f" Stderr: {stderr}"
)
@pytest.mark.parametrize(
"task_id",
sorted(INTERMEDIATE_COMMANDS.keys()),
ids=[f"task_{tid}" for tid in sorted(INTERMEDIATE_COMMANDS.keys())],
)
def test_intermediate_task_grading(
task_id: int,
intermediate_tasks: list[dict],
backend: SimulatorStrategy,
grader: TaskGrader,
) -> None:
"""Execute the full command sequence and verify the grader marks the task as achieved."""
entry = next((t for t in intermediate_tasks if t["task_id"] == task_id), None)
assert entry is not None, f"task_id {task_id} not found in intermediate.yaml"
backend.reset_environment()
task = _build_task(entry)
results = _execute_all_commands(task_id, backend)
tracker = EpisodeTracker()
for cmd, success, stdout, stderr in results:
step = tracker.record_step(cmd, success, stdout, stderr)
result = grader.grade(task, tracker, step)
all_cmds = [r[0] for r in results]
assert result.task_achieved, (
f"Task {task_id} not achieved.\n"
f" Description: {entry['description']}\n"
f" Commands: {all_cmds}\n"
f" Reason: {result.reason}\n"
f" Reward: {result.reward}"
)
assert result.reward == 1.0, f"Expected reward=1.0, got {result.reward}"
@pytest.mark.parametrize(
"task_id",
sorted(INTERMEDIATE_COMMANDS.keys()),
ids=[f"task_{tid}_partial" for tid in sorted(INTERMEDIATE_COMMANDS.keys())],
)
def test_intermediate_task_partial_gives_no_completion(
task_id: int,
intermediate_tasks: list[dict],
backend: SimulatorStrategy,
grader: TaskGrader,
) -> None:
"""Executing only the first command of a multi-step task should not achieve it."""
entry = next((t for t in intermediate_tasks if t["task_id"] == task_id), None)
assert entry is not None
steps = entry.get("success_criteria", {}).get("steps", [])
if len(steps) < 2:
pytest.skip("Single-step task — partial test not applicable")
backend.reset_environment()
task = _build_task(entry)
cmd = INTERMEDIATE_COMMANDS[task_id][0]
success, stdout, stderr = backend.execute_command(cmd)
tracker = EpisodeTracker()
step = tracker.record_step(cmd, success, stdout, stderr)
result = grader.grade(task, tracker, step)
assert not result.task_achieved, (
f"Task {task_id} should NOT be achieved with only the first command.\n"
f" Command: {cmd}\n"
f" Reason: {result.reason}"
)
assert result.reward < 1.0