Spaces:
Running
Running
File size: 9,606 Bytes
c745a99 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 | """
Tests for multi-tenancy: dynamic Account ID derived from AWS_ACCESS_KEY_ID.
When the access key is a 12-digit number, MiniStack uses it as the Account ID
in all ARN generation. Non-numeric keys (like "test") fall back to the default
000000000000.
"""
import os
import boto3
import pytest
from botocore.config import Config
ENDPOINT = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566")
REGION = "us-east-1"
def _client(service, access_key="test"):
"""Create a boto3 client with a specific access key."""
return boto3.client(
service,
endpoint_url=ENDPOINT,
aws_access_key_id=access_key,
aws_secret_access_key="test",
region_name=REGION,
config=Config(region_name=REGION, retries={"max_attempts": 0}),
)
# ββ STS GetCallerIdentity βββββββββββββββββββββββββββββββββ
def test_default_account_id():
"""Non-numeric access key falls back to 000000000000."""
sts = _client("sts", access_key="test")
resp = sts.get_caller_identity()
assert resp["Account"] == "000000000000"
def test_12_digit_access_key_becomes_account_id():
"""A 12-digit numeric access key is used as the Account ID."""
sts = _client("sts", access_key="123456789012")
resp = sts.get_caller_identity()
assert resp["Account"] == "123456789012"
def test_different_12_digit_keys_get_different_accounts():
"""Two different 12-digit keys produce different account IDs."""
sts_a = _client("sts", access_key="111111111111")
sts_b = _client("sts", access_key="222222222222")
assert sts_a.get_caller_identity()["Account"] == "111111111111"
assert sts_b.get_caller_identity()["Account"] == "222222222222"
def test_non_12_digit_numeric_falls_back():
"""A numeric key that isn't exactly 12 digits uses the default."""
sts = _client("sts", access_key="12345")
resp = sts.get_caller_identity()
assert resp["Account"] == "000000000000"
# ββ S3: ARN isolation βββββββββββββββββββββββββββββββββββββ
def test_sqs_queue_arn_uses_dynamic_account():
"""SQS queue ARN reflects the 12-digit access key as account ID."""
sqs = _client("sqs", access_key="048408301323")
q = sqs.create_queue(QueueName="mt-test-queue")
try:
attrs = sqs.get_queue_attributes(
QueueUrl=q["QueueUrl"], AttributeNames=["QueueArn"]
)
arn = attrs["Attributes"]["QueueArn"]
assert "048408301323" in arn, f"Expected account 048408301323 in ARN: {arn}"
finally:
sqs.delete_queue(QueueUrl=q["QueueUrl"])
def test_sqs_queues_isolated_by_account():
"""Queues created with different account keys are separate namespaces."""
sqs_a = _client("sqs", access_key="111111111111")
sqs_b = _client("sqs", access_key="222222222222")
q_a = sqs_a.create_queue(QueueName="isolation-test")
try:
q_b = sqs_b.create_queue(QueueName="isolation-test")
try:
# Both should get their own queue with their own account in the ARN
attrs_a = sqs_a.get_queue_attributes(
QueueUrl=q_a["QueueUrl"], AttributeNames=["QueueArn"]
)
attrs_b = sqs_b.get_queue_attributes(
QueueUrl=q_b["QueueUrl"], AttributeNames=["QueueArn"]
)
assert "111111111111" in attrs_a["Attributes"]["QueueArn"]
assert "222222222222" in attrs_b["Attributes"]["QueueArn"]
finally:
sqs_b.delete_queue(QueueUrl=q_b["QueueUrl"])
finally:
sqs_a.delete_queue(QueueUrl=q_a["QueueUrl"])
# ββ Lambda: ARN uses dynamic account ββββββββββββββββββββββ
def test_lambda_function_arn_uses_dynamic_account():
"""Lambda function ARN reflects the 12-digit access key."""
lam = _client("lambda", access_key="999888777666")
try:
lam.create_function(
FunctionName="mt-func",
Runtime="python3.12",
Role="arn:aws:iam::999888777666:role/test",
Handler="index.handler",
Code={"ZipFile": b"fake"},
)
resp = lam.get_function(FunctionName="mt-func")
arn = resp["Configuration"]["FunctionArn"]
assert "999888777666" in arn, f"Expected account in ARN: {arn}"
finally:
try:
lam.delete_function(FunctionName="mt-func")
except Exception:
pass
# ββ SSM: ARN uses dynamic account ββββββββββββββββββββββββ
def test_ssm_parameter_arn_uses_dynamic_account():
"""SSM parameter ARN reflects the 12-digit access key."""
ssm = _client("ssm", access_key="048408301323")
ssm.put_parameter(
Name="/mt-test/param1",
Value="hello",
Type="String",
)
try:
resp = ssm.get_parameter(Name="/mt-test/param1")
arn = resp["Parameter"]["ARN"]
assert "048408301323" in arn, f"Expected account in ARN: {arn}"
finally:
ssm.delete_parameter(Name="/mt-test/param1")
# βββββββββββββββββββ Cross-account isolation (1.3.3 CRITICAL fixes) βββββββββββββββββββ
# Each test below creates the same resource name in two accounts and asserts
# list/describe operations in one account do NOT see the other account's data.
def test_cloudwatch_metrics_isolated_per_account():
"""PutMetricData from account A is invisible to ListMetrics in account B."""
import uuid
ns = f"ms-mt-{uuid.uuid4().hex[:8]}"
cw_a = _client("cloudwatch", access_key="111111111111")
cw_b = _client("cloudwatch", access_key="222222222222")
cw_a.put_metric_data(Namespace=ns, MetricData=[{"MetricName": "leak", "Value": 1.0}])
metrics_a = cw_a.list_metrics(Namespace=ns)["Metrics"]
metrics_b = cw_b.list_metrics(Namespace=ns)["Metrics"]
assert any(m["MetricName"] == "leak" for m in metrics_a)
assert all(m["MetricName"] != "leak" for m in metrics_b), \
f"CRITICAL: CloudWatch metrics leaking cross-account; B saw: {metrics_b}"
def test_athena_workgroups_isolated_per_account():
"""CreateWorkGroup in account A does NOT appear in ListWorkGroups for account B."""
import uuid
wg = f"mt-wg-{uuid.uuid4().hex[:8]}"
a = _client("athena", access_key="111111111111")
b = _client("athena", access_key="222222222222")
try:
a.create_work_group(Name=wg, Description="A's workgroup")
names_a = [w["Name"] for w in a.list_work_groups()["WorkGroups"]]
names_b = [w["Name"] for w in b.list_work_groups()["WorkGroups"]]
assert wg in names_a
assert wg not in names_b, \
f"CRITICAL: Athena workgroup leaking cross-account; B saw: {names_b}"
finally:
try: a.delete_work_group(WorkGroup=wg)
except Exception: pass
def test_ses_sent_emails_isolated_per_account():
"""Account A's sent emails must not appear in account B's GetSendStatistics."""
a = _client("ses", access_key="111111111111")
b = _client("ses", access_key="222222222222")
# Verify identity first
a.verify_email_identity(EmailAddress="mt-a@example.com")
b.verify_email_identity(EmailAddress="mt-b@example.com")
a.send_email(
Source="mt-a@example.com",
Destination={"ToAddresses": ["recip@example.com"]},
Message={"Subject": {"Data": "A"}, "Body": {"Text": {"Data": "A"}}},
)
stats_a = a.get_send_statistics()["SendDataPoints"]
stats_b = b.get_send_statistics()["SendDataPoints"]
attempts_a = sum(p.get("DeliveryAttempts", 0) for p in stats_a)
attempts_b = sum(p.get("DeliveryAttempts", 0) for p in stats_b)
assert attempts_a >= 1
assert attempts_b == 0, \
f"CRITICAL: SES send stats leaking cross-account; B saw {attempts_b} attempts"
def test_eventbridge_default_bus_has_caller_account_arn():
"""Each account's 'default' bus ARN must reflect the caller's account id."""
a = _client("events", access_key="111111111111")
b = _client("events", access_key="222222222222")
arn_a = a.describe_event_bus(Name="default")["Arn"]
arn_b = b.describe_event_bus(Name="default")["Arn"]
assert ":111111111111:" in arn_a
assert ":222222222222:" in arn_b
assert arn_a != arn_b
def test_apigateway_v1_stages_isolated_per_account():
"""Account A's REST API stages are invisible to account B."""
import uuid
name = f"mt-api-{uuid.uuid4().hex[:8]}"
a = _client("apigateway", access_key="111111111111")
b = _client("apigateway", access_key="222222222222")
a_api = a.create_rest_api(name=name)["id"]
try:
# Must create a deployment before a stage
a_res = a.get_resources(restApiId=a_api)["items"][0]["id"]
a.put_method(restApiId=a_api, resourceId=a_res, httpMethod="GET", authorizationType="NONE")
a.put_integration(restApiId=a_api, resourceId=a_res, httpMethod="GET", type="MOCK")
dep = a.create_deployment(restApiId=a_api, stageName="prod")
# A can see its stage
stages_a = a.get_stages(restApiId=a_api)["item"]
assert any(s["stageName"] == "prod" for s in stages_a)
# B MUST NOT see A's api at all
apis_b = b.get_rest_apis()["items"]
assert all(api["id"] != a_api for api in apis_b), \
f"CRITICAL: APIGW v1 REST api leaking cross-account; B saw: {apis_b}"
finally:
try: a.delete_rest_api(restApiId=a_api)
except Exception: pass
|