Spaces:
Running
Running
| """ | |
| Pytest fixtures for MiniStack integration tests. | |
| """ | |
| import os | |
| import urllib.request | |
| import boto3 | |
| import pytest | |
| from botocore.config import Config | |
| ENDPOINT = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566") | |
| REGION = "us-east-1" | |
| _kwargs = dict( | |
| endpoint_url=ENDPOINT, | |
| aws_access_key_id="test", | |
| aws_secret_access_key="test", | |
| region_name=REGION, | |
| # Hardcoded retry and pool settings to reduce transient connection flakes | |
| config=Config( | |
| region_name=REGION, | |
| retries={"mode": "standard"}, | |
| max_pool_connections=50, | |
| ), | |
| ) | |
| def make_client(service): | |
| return boto3.client(service, **_kwargs) | |
| _SERIAL_TESTS = { | |
| "tests/test_athena.py::test_athena_engine_mock_via_config", | |
| "tests/test_lambda.py::test_lambda_reset_terminates_workers", | |
| "tests/test_ministack.py::test_ministack_config_invalid_key_ignored", | |
| "tests/test_sfn.py::test_sfn_mock_config_return", | |
| "tests/test_sfn.py::test_sfn_mock_config_throw", | |
| "tests/test_ec2.py::test_ec2_create_default_vpc", | |
| "tests/test_sfn.py::test_sfn_wait_scale_zero_skips_wait", | |
| "tests/test_sfn.py::test_sfn_wait_scale_zero_does_not_timeout_lambda_tasks", | |
| "tests/test_eks.py::test_eks_create_describe_delete_cluster", | |
| "tests/test_eks.py::test_eks_cfn_cluster", | |
| } | |
| def pytest_configure(config): | |
| config.addinivalue_line( | |
| "markers", | |
| "serial: test must run in a dedicated sequential phase", | |
| ) | |
| def pytest_collection_modifyitems(config, items): | |
| for item in items: | |
| nodeid = item.nodeid.split("[", 1)[0] | |
| if nodeid in _SERIAL_TESTS: | |
| item.add_marker("serial") | |
| def reset_server(): | |
| """Reset all server state once before the test session starts.""" | |
| req = urllib.request.Request( | |
| f"{ENDPOINT}/_ministack/reset", | |
| data=b"", | |
| method="POST", | |
| ) | |
| try: | |
| urllib.request.urlopen(req, timeout=5) | |
| except Exception: | |
| pass # server may not be up yet; individual tests will fail naturally | |
| def s3(): | |
| return make_client("s3") | |
| def sqs(): | |
| return make_client("sqs") | |
| def sns(): | |
| return make_client("sns") | |
| def ddb(): | |
| return make_client("dynamodb") | |
| def sts(): | |
| return make_client("sts") | |
| def sm(): | |
| return make_client("secretsmanager") | |
| def logs(): | |
| return make_client("logs") | |
| def lam(): | |
| return make_client("lambda") | |
| def iam(): | |
| return make_client("iam") | |
| def ssm(): | |
| return make_client("ssm") | |
| def eb(): | |
| return make_client("events") | |
| def kin(): | |
| return make_client("kinesis") | |
| def cw(): | |
| return make_client("cloudwatch") | |
| def ses(): | |
| return make_client("ses") | |
| def sfn(): | |
| return make_client("stepfunctions") | |
| def ecs(): | |
| return make_client("ecs") | |
| def rds(): | |
| return make_client("rds") | |
| def ecr(): | |
| return make_client("ecr") | |
| def ec(): | |
| return make_client("elasticache") | |
| def glue(): | |
| return make_client("glue") | |
| def athena(): | |
| return make_client("athena") | |
| def _ministack_config(settings): | |
| """Set runtime config on the running server via POST /_ministack/config.""" | |
| import json | |
| req = urllib.request.Request( | |
| f"{ENDPOINT}/_ministack/config", | |
| data=json.dumps(settings).encode(), | |
| headers={"Content-Type": "application/json"}, | |
| method="POST", | |
| ) | |
| urllib.request.urlopen(req, timeout=5) | |
| def fh(): | |
| return make_client("firehose") | |
| def apigw(): | |
| return make_client("apigatewayv2") | |
| def apigw_v1(): | |
| return make_client("apigateway") | |
| def r53(): | |
| return make_client("route53") | |
| def cognito_idp(): | |
| return make_client("cognito-idp") | |
| def cognito_identity(): | |
| return make_client("cognito-identity") | |
| def ec2(): | |
| return make_client("ec2") | |
| def emr(): | |
| return make_client("emr") | |
| def elbv2(): | |
| return make_client("elbv2") | |
| def ebs(): | |
| return make_client("ec2") | |
| def efs(): | |
| return make_client("efs") | |
| def acm_client(): | |
| return make_client("acm") | |
| def wafv2(): | |
| return make_client("wafv2") | |
| def sesv2(): | |
| return make_client("sesv2") | |
| def cfn(): | |
| return make_client("cloudformation") | |
| def kms_client(): | |
| return make_client("kms") | |
| def sfn_sync(): | |
| """SFN client for StartSyncExecution — forces same endpoint (boto3 normally prefixes sync-).""" | |
| from botocore.config import Config as BotoConfig | |
| return boto3.client( | |
| "stepfunctions", | |
| endpoint_url=ENDPOINT, | |
| aws_access_key_id="test", | |
| aws_secret_access_key="test", | |
| region_name=REGION, | |
| config=BotoConfig( | |
| region_name=REGION, | |
| retries={"mode": "standard"}, | |
| max_pool_connections=50, | |
| inject_host_prefix=False, | |
| ), | |
| ) | |
| def cloudfront(): | |
| return make_client("cloudfront") | |
| def rds_data(): | |
| return make_client("rds-data") | |
| def appconfig_client(): | |
| return make_client("appconfig") | |
| def appconfigdata_client(): | |
| return make_client("appconfigdata") | |
| def sd(): | |
| """SD client for DiscoverInstances — forces same endpoint (boto3 normally prefixes data-).""" | |
| from botocore.config import Config as BotoConfig | |
| return boto3.client( | |
| "servicediscovery", | |
| endpoint_url=ENDPOINT, | |
| aws_access_key_id="test", | |
| aws_secret_access_key="test", | |
| region_name=REGION, | |
| config=BotoConfig( | |
| region_name=REGION, | |
| retries={"mode": "standard"}, | |
| max_pool_connections=50, | |
| inject_host_prefix=False, | |
| ), | |
| ) | |
| def codebuild(): | |
| return make_client("codebuild") | |
| def autoscaling(): | |
| return make_client("autoscaling") | |
| def transfer(): | |
| return make_client("transfer") | |
| def eks(): | |
| return make_client("eks") | |
| def appsync(): | |
| return make_client("appsync") | |
| def scheduler(): | |
| return make_client("scheduler") | |
| def tagging(): | |
| return make_client("resourcegroupstaggingapi") | |