aws_rl_env / aws_infra /tests /test_multitenancy.py
Sizzing's picture
Upload folder using huggingface_hub
c745a99 verified
"""
Tests for multi-tenancy: dynamic Account ID derived from AWS_ACCESS_KEY_ID.
When the access key is a 12-digit number, MiniStack uses it as the Account ID
in all ARN generation. Non-numeric keys (like "test") fall back to the default
000000000000.
"""
import os
import boto3
import pytest
from botocore.config import Config
ENDPOINT = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566")
REGION = "us-east-1"
def _client(service, access_key="test"):
"""Create a boto3 client with a specific access key."""
return boto3.client(
service,
endpoint_url=ENDPOINT,
aws_access_key_id=access_key,
aws_secret_access_key="test",
region_name=REGION,
config=Config(region_name=REGION, retries={"max_attempts": 0}),
)
# ── STS GetCallerIdentity ─────────────────────────────────
def test_default_account_id():
"""Non-numeric access key falls back to 000000000000."""
sts = _client("sts", access_key="test")
resp = sts.get_caller_identity()
assert resp["Account"] == "000000000000"
def test_12_digit_access_key_becomes_account_id():
"""A 12-digit numeric access key is used as the Account ID."""
sts = _client("sts", access_key="123456789012")
resp = sts.get_caller_identity()
assert resp["Account"] == "123456789012"
def test_different_12_digit_keys_get_different_accounts():
"""Two different 12-digit keys produce different account IDs."""
sts_a = _client("sts", access_key="111111111111")
sts_b = _client("sts", access_key="222222222222")
assert sts_a.get_caller_identity()["Account"] == "111111111111"
assert sts_b.get_caller_identity()["Account"] == "222222222222"
def test_non_12_digit_numeric_falls_back():
"""A numeric key that isn't exactly 12 digits uses the default."""
sts = _client("sts", access_key="12345")
resp = sts.get_caller_identity()
assert resp["Account"] == "000000000000"
# ── S3: ARN isolation ─────────────────────────────────────
def test_sqs_queue_arn_uses_dynamic_account():
"""SQS queue ARN reflects the 12-digit access key as account ID."""
sqs = _client("sqs", access_key="048408301323")
q = sqs.create_queue(QueueName="mt-test-queue")
try:
attrs = sqs.get_queue_attributes(
QueueUrl=q["QueueUrl"], AttributeNames=["QueueArn"]
)
arn = attrs["Attributes"]["QueueArn"]
assert "048408301323" in arn, f"Expected account 048408301323 in ARN: {arn}"
finally:
sqs.delete_queue(QueueUrl=q["QueueUrl"])
def test_sqs_queues_isolated_by_account():
"""Queues created with different account keys are separate namespaces."""
sqs_a = _client("sqs", access_key="111111111111")
sqs_b = _client("sqs", access_key="222222222222")
q_a = sqs_a.create_queue(QueueName="isolation-test")
try:
q_b = sqs_b.create_queue(QueueName="isolation-test")
try:
# Both should get their own queue with their own account in the ARN
attrs_a = sqs_a.get_queue_attributes(
QueueUrl=q_a["QueueUrl"], AttributeNames=["QueueArn"]
)
attrs_b = sqs_b.get_queue_attributes(
QueueUrl=q_b["QueueUrl"], AttributeNames=["QueueArn"]
)
assert "111111111111" in attrs_a["Attributes"]["QueueArn"]
assert "222222222222" in attrs_b["Attributes"]["QueueArn"]
finally:
sqs_b.delete_queue(QueueUrl=q_b["QueueUrl"])
finally:
sqs_a.delete_queue(QueueUrl=q_a["QueueUrl"])
# ── Lambda: ARN uses dynamic account ──────────────────────
def test_lambda_function_arn_uses_dynamic_account():
"""Lambda function ARN reflects the 12-digit access key."""
lam = _client("lambda", access_key="999888777666")
try:
lam.create_function(
FunctionName="mt-func",
Runtime="python3.12",
Role="arn:aws:iam::999888777666:role/test",
Handler="index.handler",
Code={"ZipFile": b"fake"},
)
resp = lam.get_function(FunctionName="mt-func")
arn = resp["Configuration"]["FunctionArn"]
assert "999888777666" in arn, f"Expected account in ARN: {arn}"
finally:
try:
lam.delete_function(FunctionName="mt-func")
except Exception:
pass
# ── SSM: ARN uses dynamic account ────────────────────────
def test_ssm_parameter_arn_uses_dynamic_account():
"""SSM parameter ARN reflects the 12-digit access key."""
ssm = _client("ssm", access_key="048408301323")
ssm.put_parameter(
Name="/mt-test/param1",
Value="hello",
Type="String",
)
try:
resp = ssm.get_parameter(Name="/mt-test/param1")
arn = resp["Parameter"]["ARN"]
assert "048408301323" in arn, f"Expected account in ARN: {arn}"
finally:
ssm.delete_parameter(Name="/mt-test/param1")
# ─────────────────── Cross-account isolation (1.3.3 CRITICAL fixes) ───────────────────
# Each test below creates the same resource name in two accounts and asserts
# list/describe operations in one account do NOT see the other account's data.
def test_cloudwatch_metrics_isolated_per_account():
"""PutMetricData from account A is invisible to ListMetrics in account B."""
import uuid
ns = f"ms-mt-{uuid.uuid4().hex[:8]}"
cw_a = _client("cloudwatch", access_key="111111111111")
cw_b = _client("cloudwatch", access_key="222222222222")
cw_a.put_metric_data(Namespace=ns, MetricData=[{"MetricName": "leak", "Value": 1.0}])
metrics_a = cw_a.list_metrics(Namespace=ns)["Metrics"]
metrics_b = cw_b.list_metrics(Namespace=ns)["Metrics"]
assert any(m["MetricName"] == "leak" for m in metrics_a)
assert all(m["MetricName"] != "leak" for m in metrics_b), \
f"CRITICAL: CloudWatch metrics leaking cross-account; B saw: {metrics_b}"
def test_athena_workgroups_isolated_per_account():
"""CreateWorkGroup in account A does NOT appear in ListWorkGroups for account B."""
import uuid
wg = f"mt-wg-{uuid.uuid4().hex[:8]}"
a = _client("athena", access_key="111111111111")
b = _client("athena", access_key="222222222222")
try:
a.create_work_group(Name=wg, Description="A's workgroup")
names_a = [w["Name"] for w in a.list_work_groups()["WorkGroups"]]
names_b = [w["Name"] for w in b.list_work_groups()["WorkGroups"]]
assert wg in names_a
assert wg not in names_b, \
f"CRITICAL: Athena workgroup leaking cross-account; B saw: {names_b}"
finally:
try: a.delete_work_group(WorkGroup=wg)
except Exception: pass
def test_ses_sent_emails_isolated_per_account():
"""Account A's sent emails must not appear in account B's GetSendStatistics."""
a = _client("ses", access_key="111111111111")
b = _client("ses", access_key="222222222222")
# Verify identity first
a.verify_email_identity(EmailAddress="mt-a@example.com")
b.verify_email_identity(EmailAddress="mt-b@example.com")
a.send_email(
Source="mt-a@example.com",
Destination={"ToAddresses": ["recip@example.com"]},
Message={"Subject": {"Data": "A"}, "Body": {"Text": {"Data": "A"}}},
)
stats_a = a.get_send_statistics()["SendDataPoints"]
stats_b = b.get_send_statistics()["SendDataPoints"]
attempts_a = sum(p.get("DeliveryAttempts", 0) for p in stats_a)
attempts_b = sum(p.get("DeliveryAttempts", 0) for p in stats_b)
assert attempts_a >= 1
assert attempts_b == 0, \
f"CRITICAL: SES send stats leaking cross-account; B saw {attempts_b} attempts"
def test_eventbridge_default_bus_has_caller_account_arn():
"""Each account's 'default' bus ARN must reflect the caller's account id."""
a = _client("events", access_key="111111111111")
b = _client("events", access_key="222222222222")
arn_a = a.describe_event_bus(Name="default")["Arn"]
arn_b = b.describe_event_bus(Name="default")["Arn"]
assert ":111111111111:" in arn_a
assert ":222222222222:" in arn_b
assert arn_a != arn_b
def test_apigateway_v1_stages_isolated_per_account():
"""Account A's REST API stages are invisible to account B."""
import uuid
name = f"mt-api-{uuid.uuid4().hex[:8]}"
a = _client("apigateway", access_key="111111111111")
b = _client("apigateway", access_key="222222222222")
a_api = a.create_rest_api(name=name)["id"]
try:
# Must create a deployment before a stage
a_res = a.get_resources(restApiId=a_api)["items"][0]["id"]
a.put_method(restApiId=a_api, resourceId=a_res, httpMethod="GET", authorizationType="NONE")
a.put_integration(restApiId=a_api, resourceId=a_res, httpMethod="GET", type="MOCK")
dep = a.create_deployment(restApiId=a_api, stageName="prod")
# A can see its stage
stages_a = a.get_stages(restApiId=a_api)["item"]
assert any(s["stageName"] == "prod" for s in stages_a)
# B MUST NOT see A's api at all
apis_b = b.get_rest_apis()["items"]
assert all(api["id"] != a_api for api in apis_b), \
f"CRITICAL: APIGW v1 REST api leaking cross-account; B saw: {apis_b}"
finally:
try: a.delete_rest_api(restApiId=a_api)
except Exception: pass