Spaces:
Running
Running
| """ | |
| Tests for multi-tenancy: dynamic Account ID derived from AWS_ACCESS_KEY_ID. | |
| When the access key is a 12-digit number, MiniStack uses it as the Account ID | |
| in all ARN generation. Non-numeric keys (like "test") fall back to the default | |
| 000000000000. | |
| """ | |
| import os | |
| import boto3 | |
| import pytest | |
| from botocore.config import Config | |
| ENDPOINT = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566") | |
| REGION = "us-east-1" | |
| def _client(service, access_key="test"): | |
| """Create a boto3 client with a specific access key.""" | |
| return boto3.client( | |
| service, | |
| endpoint_url=ENDPOINT, | |
| aws_access_key_id=access_key, | |
| aws_secret_access_key="test", | |
| region_name=REGION, | |
| config=Config(region_name=REGION, retries={"max_attempts": 0}), | |
| ) | |
| # ββ STS GetCallerIdentity βββββββββββββββββββββββββββββββββ | |
| def test_default_account_id(): | |
| """Non-numeric access key falls back to 000000000000.""" | |
| sts = _client("sts", access_key="test") | |
| resp = sts.get_caller_identity() | |
| assert resp["Account"] == "000000000000" | |
| def test_12_digit_access_key_becomes_account_id(): | |
| """A 12-digit numeric access key is used as the Account ID.""" | |
| sts = _client("sts", access_key="123456789012") | |
| resp = sts.get_caller_identity() | |
| assert resp["Account"] == "123456789012" | |
| def test_different_12_digit_keys_get_different_accounts(): | |
| """Two different 12-digit keys produce different account IDs.""" | |
| sts_a = _client("sts", access_key="111111111111") | |
| sts_b = _client("sts", access_key="222222222222") | |
| assert sts_a.get_caller_identity()["Account"] == "111111111111" | |
| assert sts_b.get_caller_identity()["Account"] == "222222222222" | |
| def test_non_12_digit_numeric_falls_back(): | |
| """A numeric key that isn't exactly 12 digits uses the default.""" | |
| sts = _client("sts", access_key="12345") | |
| resp = sts.get_caller_identity() | |
| assert resp["Account"] == "000000000000" | |
| # ββ S3: ARN isolation βββββββββββββββββββββββββββββββββββββ | |
| def test_sqs_queue_arn_uses_dynamic_account(): | |
| """SQS queue ARN reflects the 12-digit access key as account ID.""" | |
| sqs = _client("sqs", access_key="048408301323") | |
| q = sqs.create_queue(QueueName="mt-test-queue") | |
| try: | |
| attrs = sqs.get_queue_attributes( | |
| QueueUrl=q["QueueUrl"], AttributeNames=["QueueArn"] | |
| ) | |
| arn = attrs["Attributes"]["QueueArn"] | |
| assert "048408301323" in arn, f"Expected account 048408301323 in ARN: {arn}" | |
| finally: | |
| sqs.delete_queue(QueueUrl=q["QueueUrl"]) | |
| def test_sqs_queues_isolated_by_account(): | |
| """Queues created with different account keys are separate namespaces.""" | |
| sqs_a = _client("sqs", access_key="111111111111") | |
| sqs_b = _client("sqs", access_key="222222222222") | |
| q_a = sqs_a.create_queue(QueueName="isolation-test") | |
| try: | |
| q_b = sqs_b.create_queue(QueueName="isolation-test") | |
| try: | |
| # Both should get their own queue with their own account in the ARN | |
| attrs_a = sqs_a.get_queue_attributes( | |
| QueueUrl=q_a["QueueUrl"], AttributeNames=["QueueArn"] | |
| ) | |
| attrs_b = sqs_b.get_queue_attributes( | |
| QueueUrl=q_b["QueueUrl"], AttributeNames=["QueueArn"] | |
| ) | |
| assert "111111111111" in attrs_a["Attributes"]["QueueArn"] | |
| assert "222222222222" in attrs_b["Attributes"]["QueueArn"] | |
| finally: | |
| sqs_b.delete_queue(QueueUrl=q_b["QueueUrl"]) | |
| finally: | |
| sqs_a.delete_queue(QueueUrl=q_a["QueueUrl"]) | |
| # ββ Lambda: ARN uses dynamic account ββββββββββββββββββββββ | |
| def test_lambda_function_arn_uses_dynamic_account(): | |
| """Lambda function ARN reflects the 12-digit access key.""" | |
| lam = _client("lambda", access_key="999888777666") | |
| try: | |
| lam.create_function( | |
| FunctionName="mt-func", | |
| Runtime="python3.12", | |
| Role="arn:aws:iam::999888777666:role/test", | |
| Handler="index.handler", | |
| Code={"ZipFile": b"fake"}, | |
| ) | |
| resp = lam.get_function(FunctionName="mt-func") | |
| arn = resp["Configuration"]["FunctionArn"] | |
| assert "999888777666" in arn, f"Expected account in ARN: {arn}" | |
| finally: | |
| try: | |
| lam.delete_function(FunctionName="mt-func") | |
| except Exception: | |
| pass | |
| # ββ SSM: ARN uses dynamic account ββββββββββββββββββββββββ | |
| def test_ssm_parameter_arn_uses_dynamic_account(): | |
| """SSM parameter ARN reflects the 12-digit access key.""" | |
| ssm = _client("ssm", access_key="048408301323") | |
| ssm.put_parameter( | |
| Name="/mt-test/param1", | |
| Value="hello", | |
| Type="String", | |
| ) | |
| try: | |
| resp = ssm.get_parameter(Name="/mt-test/param1") | |
| arn = resp["Parameter"]["ARN"] | |
| assert "048408301323" in arn, f"Expected account in ARN: {arn}" | |
| finally: | |
| ssm.delete_parameter(Name="/mt-test/param1") | |
| # βββββββββββββββββββ Cross-account isolation (1.3.3 CRITICAL fixes) βββββββββββββββββββ | |
| # Each test below creates the same resource name in two accounts and asserts | |
| # list/describe operations in one account do NOT see the other account's data. | |
| def test_cloudwatch_metrics_isolated_per_account(): | |
| """PutMetricData from account A is invisible to ListMetrics in account B.""" | |
| import uuid | |
| ns = f"ms-mt-{uuid.uuid4().hex[:8]}" | |
| cw_a = _client("cloudwatch", access_key="111111111111") | |
| cw_b = _client("cloudwatch", access_key="222222222222") | |
| cw_a.put_metric_data(Namespace=ns, MetricData=[{"MetricName": "leak", "Value": 1.0}]) | |
| metrics_a = cw_a.list_metrics(Namespace=ns)["Metrics"] | |
| metrics_b = cw_b.list_metrics(Namespace=ns)["Metrics"] | |
| assert any(m["MetricName"] == "leak" for m in metrics_a) | |
| assert all(m["MetricName"] != "leak" for m in metrics_b), \ | |
| f"CRITICAL: CloudWatch metrics leaking cross-account; B saw: {metrics_b}" | |
| def test_athena_workgroups_isolated_per_account(): | |
| """CreateWorkGroup in account A does NOT appear in ListWorkGroups for account B.""" | |
| import uuid | |
| wg = f"mt-wg-{uuid.uuid4().hex[:8]}" | |
| a = _client("athena", access_key="111111111111") | |
| b = _client("athena", access_key="222222222222") | |
| try: | |
| a.create_work_group(Name=wg, Description="A's workgroup") | |
| names_a = [w["Name"] for w in a.list_work_groups()["WorkGroups"]] | |
| names_b = [w["Name"] for w in b.list_work_groups()["WorkGroups"]] | |
| assert wg in names_a | |
| assert wg not in names_b, \ | |
| f"CRITICAL: Athena workgroup leaking cross-account; B saw: {names_b}" | |
| finally: | |
| try: a.delete_work_group(WorkGroup=wg) | |
| except Exception: pass | |
| def test_ses_sent_emails_isolated_per_account(): | |
| """Account A's sent emails must not appear in account B's GetSendStatistics.""" | |
| a = _client("ses", access_key="111111111111") | |
| b = _client("ses", access_key="222222222222") | |
| # Verify identity first | |
| a.verify_email_identity(EmailAddress="mt-a@example.com") | |
| b.verify_email_identity(EmailAddress="mt-b@example.com") | |
| a.send_email( | |
| Source="mt-a@example.com", | |
| Destination={"ToAddresses": ["recip@example.com"]}, | |
| Message={"Subject": {"Data": "A"}, "Body": {"Text": {"Data": "A"}}}, | |
| ) | |
| stats_a = a.get_send_statistics()["SendDataPoints"] | |
| stats_b = b.get_send_statistics()["SendDataPoints"] | |
| attempts_a = sum(p.get("DeliveryAttempts", 0) for p in stats_a) | |
| attempts_b = sum(p.get("DeliveryAttempts", 0) for p in stats_b) | |
| assert attempts_a >= 1 | |
| assert attempts_b == 0, \ | |
| f"CRITICAL: SES send stats leaking cross-account; B saw {attempts_b} attempts" | |
| def test_eventbridge_default_bus_has_caller_account_arn(): | |
| """Each account's 'default' bus ARN must reflect the caller's account id.""" | |
| a = _client("events", access_key="111111111111") | |
| b = _client("events", access_key="222222222222") | |
| arn_a = a.describe_event_bus(Name="default")["Arn"] | |
| arn_b = b.describe_event_bus(Name="default")["Arn"] | |
| assert ":111111111111:" in arn_a | |
| assert ":222222222222:" in arn_b | |
| assert arn_a != arn_b | |
| def test_apigateway_v1_stages_isolated_per_account(): | |
| """Account A's REST API stages are invisible to account B.""" | |
| import uuid | |
| name = f"mt-api-{uuid.uuid4().hex[:8]}" | |
| a = _client("apigateway", access_key="111111111111") | |
| b = _client("apigateway", access_key="222222222222") | |
| a_api = a.create_rest_api(name=name)["id"] | |
| try: | |
| # Must create a deployment before a stage | |
| a_res = a.get_resources(restApiId=a_api)["items"][0]["id"] | |
| a.put_method(restApiId=a_api, resourceId=a_res, httpMethod="GET", authorizationType="NONE") | |
| a.put_integration(restApiId=a_api, resourceId=a_res, httpMethod="GET", type="MOCK") | |
| dep = a.create_deployment(restApiId=a_api, stageName="prod") | |
| # A can see its stage | |
| stages_a = a.get_stages(restApiId=a_api)["item"] | |
| assert any(s["stageName"] == "prod" for s in stages_a) | |
| # B MUST NOT see A's api at all | |
| apis_b = b.get_rest_apis()["items"] | |
| assert all(api["id"] != a_api for api in apis_b), \ | |
| f"CRITICAL: APIGW v1 REST api leaking cross-account; B saw: {apis_b}" | |
| finally: | |
| try: a.delete_rest_api(restApiId=a_api) | |
| except Exception: pass | |