Spaces:
Running
Running
File size: 5,564 Bytes
c745a99 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | """
Chaos Injection Engine.
Silently mutates AWS state mid-episode to test agent resilience and
situational awareness. Perturbations are scoped to services the current
task uses and are selected from a per-service catalog of destructive
AWS CLI commands.
"""
import logging
import os
import random
import re
from models import AwsService, Task
from server.services.environment_strategy import EnvironmentStrategy
from server.services.episode_tracker import EpisodeTracker
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Resource-name extraction patterns (from successful AWS CLI commands)
# ---------------------------------------------------------------------------
_RESOURCE_PATTERNS: dict[AwsService, list[re.Pattern[str]]] = {
AwsService.S3: [
re.compile(r"aws\s+s3\s+mb\s+s3://([^\s]+)"),
re.compile(r"aws\s+s3api\s+create-bucket\s+--bucket\s+([^\s]+)"),
],
AwsService.DYNAMODB: [
re.compile(r"aws\s+dynamodb\s+create-table\s+.*--table-name\s+([^\s]+)"),
],
AwsService.LAMBDA: [
re.compile(r"aws\s+lambda\s+create-function\s+.*--function-name\s+([^\s]+)"),
],
AwsService.SQS: [
re.compile(r"aws\s+sqs\s+create-queue\s+.*--queue-name\s+([^\s]+)"),
],
AwsService.IAM: [
re.compile(
r"aws\s+iam\s+attach-role-policy\s+.*--role-name\s+([^\s]+)"
r"\s+.*--policy-arn\s+([^\s]+)"
),
re.compile(
r"aws\s+iam\s+attach-role-policy\s+.*--policy-arn\s+([^\s]+)"
r"\s+.*--role-name\s+([^\s]+)"
),
],
}
# ---------------------------------------------------------------------------
# Perturbation templates per service
# ---------------------------------------------------------------------------
_PERTURBATION_TEMPLATES: dict[AwsService, list[str]] = {
AwsService.S3: [
"aws s3 rb s3://{name} --force",
],
AwsService.DYNAMODB: [
"aws dynamodb delete-table --table-name {name}",
],
AwsService.LAMBDA: [
"aws lambda delete-function --function-name {name}",
],
AwsService.SQS: [
"aws sqs delete-queue --queue-url {name}",
],
AwsService.IAM: [
"aws iam detach-role-policy --role-name {name} --policy-arn {arn}",
],
}
class ChaosEngine:
"""Silently mutates AWS state mid-episode to test agent resilience."""
def __init__(self, backend: EnvironmentStrategy) -> None:
self._backend = backend
self._enabled = os.environ.get("ENABLE_CHAOS", "true").lower() == "true"
self._chaos_occurred = False
def reset(self) -> None:
"""Reset per-episode chaos state."""
self._chaos_occurred = False
@property
def chaos_occurred(self) -> bool:
"""Whether chaos was injected at any point during this episode."""
return self._chaos_occurred
def maybe_inject(
self,
task: Task,
tracker: EpisodeTracker,
probability: float,
) -> bool:
"""Roll dice and, if triggered, execute a task-relevant perturbation.
Returns True if a perturbation was actually executed.
"""
if not self._enabled or probability <= 0.0:
return False
if random.random() >= probability:
return False
perturbation = self._select_perturbation(task, tracker)
if perturbation is None:
return False
logger.info("Chaos injection: %s", perturbation)
self._backend.execute_command(perturbation)
self._chaos_occurred = True
return True
# -- Private helpers ------------------------------------------------------
def _select_perturbation(
self,
task: Task,
tracker: EpisodeTracker,
) -> str | None:
"""Pick a concrete perturbation command scoped to services the task uses."""
task_services = set(task.success_criteria.services)
if not task_services:
return None
# Collect all candidate (service, rendered_command) pairs
candidates: list[str] = []
for step in tracker.command_history:
if not step.success:
continue
for service in task_services:
for pattern in _RESOURCE_PATTERNS.get(service, []):
match = pattern.search(step.command)
if not match:
continue
templates = _PERTURBATION_TEMPLATES.get(service, [])
for template in templates:
rendered = self._render_template(template, match, service)
if rendered:
candidates.append(rendered)
if not candidates:
return None
return random.choice(candidates)
@staticmethod
def _render_template(
template: str,
match: re.Match[str],
service: AwsService,
) -> str | None:
"""Fill a perturbation template from regex match groups."""
groups = match.groups()
if not groups:
return None
if service == AwsService.IAM and len(groups) >= 2:
# IAM patterns capture (role_name, policy_arn) or vice-versa
# The first pattern has role first, second has arn first
if "role-name" in template and "policy-arn" in template:
return template.format(name=groups[0], arn=groups[1])
return None
return template.format(name=groups[0])
|