import io import json import os import time import zipfile from urllib.parse import urlparse import pytest from botocore.exceptions import ClientError import uuid as _uuid_mod _endpoint = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566") _EXECUTE_PORT = urlparse(_endpoint).port or 4566 def _make_zip(code: str) -> bytes: buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("index.py", code) return buf.getvalue() def _make_zip_js(code: str, filename: str = "index.js") -> bytes: buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr(filename, code) return buf.getvalue() _LAMBDA_CODE = 'def handler(event, context):\n return {"statusCode": 200, "body": "ok"}\n' _LAMBDA_CODE_V2 = 'def handler(event, context):\n return {"statusCode": 200, "body": "v2"}\n' _LAMBDA_ROLE = "arn:aws:iam::000000000000:role/lambda-role" _NODE_CODE = ( "exports.handler = async (event, context) => {" " return { statusCode: 200, body: JSON.stringify({ hello: event.name || 'world' }) }; };" ) def _zip_lambda(code: str) -> bytes: buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("index.py", code) return buf.getvalue() def test_lambda_create_invoke(lam): code = b'def handler(event, context):\n return {"statusCode": 200, "body": "Hello!", "event": event}\n' buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("index.py", code) lam.create_function( FunctionName="test-func-1", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/test-role", Handler="index.handler", Code={"ZipFile": buf.getvalue()}, ) funcs = lam.list_functions() assert any(f["FunctionName"] == "test-func-1" for f in funcs["Functions"]) resp = lam.invoke(FunctionName="test-func-1", Payload=json.dumps({"key": "value"})) payload = json.loads(resp["Payload"].read()) assert payload["statusCode"] == 200 def test_create_function_missing_runtime_raises(lam): """Zip deployment without a Runtime should return InvalidParameterValueException.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("index.py", "def handler(e, c): return {}") with pytest.raises(ClientError) as exc: lam.create_function( FunctionName="no-runtime-fn", Role="arn:aws:iam::000000000000:role/role", Handler="index.handler", Code={"ZipFile": buf.getvalue()}, ) assert exc.value.response["Error"]["Code"] == "InvalidParameterValueException" def test_lambda_esm_sqs(lam, sqs): """SQS → Lambda event source mapping: messages sent to SQS trigger Lambda.""" import io import zipfile as zf # Clean up from previous runs try: lam.delete_function(FunctionName="esm-test-func") except Exception: pass # Lambda that records what it received code = ( b"import json\n" b"received = []\n" b"def handler(event, context):\n" b" received.extend(event.get('Records', []))\n" b" return {'processed': len(event.get('Records', []))}\n" ) buf = io.BytesIO() with zf.ZipFile(buf, "w") as z: z.writestr("index.py", code) lam.create_function( FunctionName="esm-test-func", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/test-role", Handler="index.handler", Code={"ZipFile": buf.getvalue()}, ) q_url = sqs.create_queue(QueueName="esm-test-queue")["QueueUrl"] q_arn = sqs.get_queue_attributes(QueueUrl=q_url, AttributeNames=["QueueArn"])["Attributes"]["QueueArn"] # Create event source mapping resp = lam.create_event_source_mapping( EventSourceArn=q_arn, FunctionName="esm-test-func", BatchSize=5, Enabled=True, ) esm_uuid = resp["UUID"] assert resp["State"] == "Enabled" # Send a message to SQS sqs.send_message(QueueUrl=q_url, MessageBody="trigger-lambda") # Wait for poller to pick it up (max 5s) import time for _ in range(10): time.sleep(0.5) msgs = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1) if not msgs.get("Messages"): break # message was consumed by Lambda # Queue should be empty — Lambda consumed the message msgs = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1) assert not msgs.get("Messages"), "Message should have been consumed by Lambda via ESM" # Cleanup lam.delete_event_source_mapping(UUID=esm_uuid) def test_lambda_create_function(lam): resp = lam.create_function( FunctionName="lam-create-test", Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(_LAMBDA_CODE)}, ) assert resp["FunctionName"] == "lam-create-test" assert resp["Runtime"] == "python3.12" assert resp["Handler"] == "index.handler" # AWS: CreateFunction returns State=Pending and transitions to Active # asynchronously. Terraform's FunctionActive waiter polls GetFunction. assert resp["State"] in ("Pending", "Active") assert resp["LastUpdateStatus"] in ("InProgress", "Successful") assert "FunctionArn" in resp def test_lambda_create_duplicate(lam): with pytest.raises(ClientError) as exc: lam.create_function( FunctionName="lam-create-test", Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(_LAMBDA_CODE)}, ) assert exc.value.response["Error"]["Code"] == "ResourceConflictException" def test_lambda_get_function(lam): resp = lam.get_function(FunctionName="lam-create-test") assert resp["Configuration"]["FunctionName"] == "lam-create-test" assert "Code" in resp assert "Tags" in resp def test_lambda_get_function_not_found(lam): with pytest.raises(ClientError) as exc: lam.get_function(FunctionName="nonexistent-func-xyz") assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException" def test_lambda_list_functions(lam): resp = lam.list_functions() names = [f["FunctionName"] for f in resp["Functions"]] assert "lam-create-test" in names def test_lambda_delete_function(lam): lam.create_function( FunctionName="lam-to-delete", Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(_LAMBDA_CODE)}, ) lam.delete_function(FunctionName="lam-to-delete") with pytest.raises(ClientError) as exc: lam.get_function(FunctionName="lam-to-delete") assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException" def test_lambda_invoke(lam): lam.create_function( FunctionName="lam-invoke-test", Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(_LAMBDA_CODE)}, ) resp = lam.invoke( FunctionName="lam-invoke-test", Payload=json.dumps({"hello": "world"}), ) assert resp["StatusCode"] == 200 payload = json.loads(resp["Payload"].read()) assert payload["statusCode"] == 200 assert payload["body"] == "ok" def test_lambda_invoke_async(lam): resp = lam.invoke( FunctionName="lam-invoke-test", InvocationType="Event", Payload=json.dumps({"async": True}), ) assert resp["StatusCode"] == 202 def test_lambda_update_code(lam): lam.update_function_code( FunctionName="lam-invoke-test", ZipFile=_make_zip(_LAMBDA_CODE_V2), ) resp = lam.invoke( FunctionName="lam-invoke-test", Payload=json.dumps({}), ) payload = json.loads(resp["Payload"].read()) assert payload["body"] == "v2" def test_lambda_update_config(lam): lam.update_function_configuration( FunctionName="lam-invoke-test", Handler="index.new_handler", Environment={"Variables": {"MY_VAR": "my_val"}}, ) resp = lam.get_function(FunctionName="lam-invoke-test") cfg = resp["Configuration"] assert cfg["Handler"] == "index.new_handler" assert cfg["Environment"]["Variables"]["MY_VAR"] == "my_val" lam.update_function_configuration( FunctionName="lam-invoke-test", Handler="index.handler", ) def test_lambda_tags(lam): arn = lam.get_function(FunctionName="lam-invoke-test")["Configuration"]["FunctionArn"] lam.tag_resource(Resource=arn, Tags={"env": "test", "team": "backend"}) resp = lam.list_tags(Resource=arn) assert resp["Tags"]["env"] == "test" assert resp["Tags"]["team"] == "backend" lam.untag_resource(Resource=arn, TagKeys=["team"]) resp = lam.list_tags(Resource=arn) assert "team" not in resp["Tags"] assert resp["Tags"]["env"] == "test" def test_lambda_add_permission(lam): lam.add_permission( FunctionName="lam-invoke-test", StatementId="allow-s3", Action="lambda:InvokeFunction", Principal="s3.amazonaws.com", SourceArn="arn:aws:s3:::my-bucket", ) resp = lam.get_policy(FunctionName="lam-invoke-test") policy = json.loads(resp["Policy"]) sids = [s["Sid"] for s in policy["Statement"]] assert "allow-s3" in sids def test_lambda_list_versions(lam): resp = lam.list_versions_by_function(FunctionName="lam-invoke-test") versions = resp["Versions"] assert any(v["Version"] == "$LATEST" for v in versions) def test_lambda_publish_version(lam): resp = lam.publish_version( FunctionName="lam-invoke-test", Description="first published version", ) assert resp["Version"] == "1" assert resp["Description"] == "first published version" assert "FunctionArn" in resp versions = lam.list_versions_by_function(FunctionName="lam-invoke-test")["Versions"] version_nums = [v["Version"] for v in versions] assert "$LATEST" in version_nums assert "1" in version_nums def test_lambda_esm_sqs_comprehensive(lam, sqs): try: lam.delete_function(FunctionName="esm-comp-func") except ClientError: pass code = 'def handler(event, context):\n return {"processed": len(event.get("Records", []))}\n' lam.create_function( FunctionName="esm-comp-func", Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(code)}, ) q_url = sqs.create_queue(QueueName="esm-comp-queue")["QueueUrl"] q_arn = sqs.get_queue_attributes( QueueUrl=q_url, AttributeNames=["QueueArn"], )["Attributes"]["QueueArn"] resp = lam.create_event_source_mapping( EventSourceArn=q_arn, FunctionName="esm-comp-func", BatchSize=5, Enabled=True, ) esm_uuid = resp["UUID"] assert resp["State"] == "Enabled" assert resp["BatchSize"] == 5 assert resp["EventSourceArn"] == q_arn got = lam.get_event_source_mapping(UUID=esm_uuid) assert got["UUID"] == esm_uuid listed = lam.list_event_source_mappings(FunctionName="esm-comp-func") assert any(e["UUID"] == esm_uuid for e in listed["EventSourceMappings"]) lam.delete_event_source_mapping(UUID=esm_uuid) def test_lambda_esm_sqs_failure_respects_visibility_timeout(lam, sqs): """On Lambda failure, the message should remain in-flight until VisibilityTimeout expires.""" import io import zipfile as zf for fn in ("esm-fail-func",): try: lam.delete_function(FunctionName=fn) except Exception: pass code = b"def handler(event, context):\n raise Exception('boom')\n" buf = io.BytesIO() with zf.ZipFile(buf, "w") as z: z.writestr("index.py", code) lam.create_function( FunctionName="esm-fail-func", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/test-role", Handler="index.handler", Code={"ZipFile": buf.getvalue()}, Timeout=3, ) q_url = sqs.create_queue( QueueName="esm-fail-queue", Attributes={"VisibilityTimeout": "30"}, )["QueueUrl"] q_arn = sqs.get_queue_attributes(QueueUrl=q_url, AttributeNames=["QueueArn"])["Attributes"]["QueueArn"] resp = lam.create_event_source_mapping( EventSourceArn=q_arn, FunctionName="esm-fail-func", BatchSize=1, Enabled=True, ) esm_uuid = resp["UUID"] sqs.send_message(QueueUrl=q_url, MessageBody="trigger-failure") # Wait until ESM has actually processed (and failed) the message for _ in range(40): time.sleep(0.5) cur = lam.get_event_source_mapping(UUID=esm_uuid) if cur.get("LastProcessingResult") == "FAILED": break else: pytest.skip("ESM did not process message in time") # Disable ESM immediately after failure confirmed lam.update_event_source_mapping(UUID=esm_uuid, Enabled=False) # Message should be invisible (VisibilityTimeout=30s, and ESM just received it) msgs = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1, WaitTimeSeconds=0) assert not msgs.get("Messages"), "Message should be invisible during VisibilityTimeout after failed ESM invoke" lam.delete_event_source_mapping(UUID=esm_uuid) def test_lambda_esm_sqs_report_batch_item_failures(lam, sqs): """ReportBatchItemFailures: failed messages stay on queue and reach DLQ.""" for fn in ("esm-partial-func",): try: lam.delete_function(FunctionName=fn) except Exception: pass # Handler reports ALL messages as failed code = ( "import json\n" "def handler(event, context):\n" " failures = []\n" " for r in event.get('Records', []):\n" " failures.append({'itemIdentifier': r['messageId']})\n" " return {'batchItemFailures': failures}\n" ) lam.create_function( FunctionName="esm-partial-func", Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(code)}, ) # DLQ + main queue with maxReceiveCount=1 dlq_url = sqs.create_queue(QueueName="esm-partial-dlq")["QueueUrl"] dlq_arn = sqs.get_queue_attributes( QueueUrl=dlq_url, AttributeNames=["QueueArn"] )["Attributes"]["QueueArn"] q_url = sqs.create_queue( QueueName="esm-partial-queue", Attributes={ "VisibilityTimeout": "1", "RedrivePolicy": json.dumps({ "deadLetterTargetArn": dlq_arn, "maxReceiveCount": "1", }), }, )["QueueUrl"] q_arn = sqs.get_queue_attributes( QueueUrl=q_url, AttributeNames=["QueueArn"] )["Attributes"]["QueueArn"] esm = lam.create_event_source_mapping( EventSourceArn=q_arn, FunctionName="esm-partial-func", FunctionResponseTypes=["ReportBatchItemFailures"], BatchSize=1, Enabled=True, ) esm_uuid = esm["UUID"] assert "ReportBatchItemFailures" in esm["FunctionResponseTypes"] sqs.send_message(QueueUrl=q_url, MessageBody="partial-fail-test") # Wait for ESM to process and message to land in DLQ dlq_count = 0 for _ in range(30): time.sleep(1) attrs = sqs.get_queue_attributes( QueueUrl=dlq_url, AttributeNames=["ApproximateNumberOfMessages"], ) dlq_count = int(attrs["Attributes"]["ApproximateNumberOfMessages"]) if dlq_count >= 1: break lam.update_event_source_mapping(UUID=esm_uuid, Enabled=False) lam.delete_event_source_mapping(UUID=esm_uuid) assert dlq_count >= 1, ( f"Message should have reached DLQ after partial failure, " f"but DLQ has {dlq_count} messages" ) def test_lambda_warm_start(lam, apigw): """Warm worker via API Gateway execute-api: module-level state persists across invocations.""" import urllib.request as _urlreq import uuid as _uuid fname = f"intg-warm-{_uuid.uuid4().hex[:8]}" code = ( b"import time\n" b"_boot_time = time.time()\n" b"def handler(event, context):\n" b" return {'statusCode': 200, 'body': str(_boot_time)}\n" ) buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("index.py", code) lam.create_function( FunctionName=fname, Runtime="python3.12", Role="arn:aws:iam::000000000000:role/test-role", Handler="index.handler", Code={"ZipFile": buf.getvalue()}, ) api_id = apigw.create_api(Name=f"warm-api-{fname}", ProtocolType="HTTP")["ApiId"] int_id = apigw.create_integration( ApiId=api_id, IntegrationType="AWS_PROXY", IntegrationUri=f"arn:aws:lambda:us-east-1:000000000000:function:{fname}", PayloadFormatVersion="2.0", )["IntegrationId"] apigw.create_route(ApiId=api_id, RouteKey="GET /ping", Target=f"integrations/{int_id}") apigw.create_stage(ApiId=api_id, StageName="$default") def call(): req = _urlreq.Request( f"http://{api_id}.execute-api.localhost:{_EXECUTE_PORT}/$default/ping", method="GET", ) req.add_header("Host", f"{api_id}.execute-api.localhost:{_EXECUTE_PORT}") return _urlreq.urlopen(req).read().decode() t1 = call() # cold start — spawns worker, imports module t2 = call() # warm — reuses worker, same module state assert t1 == t2, f"Warm worker should reuse module state: {t1} != {t2}" apigw.delete_api(ApiId=api_id) lam.delete_function(FunctionName=fname) def test_lambda_warm_invoke_with_stderr_logging(lam): """Warm invoke should succeed repeatedly even when the worker writes to stderr.""" fname = f"lam-warm-stderr-{_uuid_mod.uuid4().hex[:8]}" code = ( "import sys\n" "def handler(event, context):\n" " print(f'log:{event.get(\"n\", 0)}')\n" " return {'statusCode': 200, 'value': event.get('n', 0)}\n" ) lam.create_function( FunctionName=fname, Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(code)}, ) try: first = lam.invoke(FunctionName=fname, Payload=json.dumps({"n": 1})) second = lam.invoke(FunctionName=fname, Payload=json.dumps({"n": 2})) assert first["StatusCode"] == 200 assert second["StatusCode"] == 200 assert json.loads(first["Payload"].read())["value"] == 1 assert json.loads(second["Payload"].read())["value"] == 2 finally: lam.delete_function(FunctionName=fname) def test_lambda_nodejs_create_and_invoke(lam): lam.create_function( FunctionName="lam-node-basic", Runtime="nodejs20.x", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip_js(_NODE_CODE, "index.js")}, ) resp = lam.invoke( FunctionName="lam-node-basic", Payload=json.dumps({"name": "ministack"}), ) assert resp["StatusCode"] == 200 payload = json.loads(resp["Payload"].read()) assert payload["statusCode"] == 200 body = json.loads(payload["body"]) assert body["hello"] == "ministack" def test_lambda_nodejs22_runtime(lam): lam.create_function( FunctionName="lam-node22", Runtime="nodejs22.x", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip_js(_NODE_CODE, "index.js")}, ) resp = lam.invoke(FunctionName="lam-node22", Payload=json.dumps({"name": "v22"})) assert resp["StatusCode"] == 200 payload = json.loads(resp["Payload"].read()) assert payload["statusCode"] == 200 def test_lambda_nodejs_update_code(lam): v2 = ( "exports.handler = async (event) => {" " return { statusCode: 200, body: 'v2' }; };" ) lam.update_function_code( FunctionName="lam-node-basic", ZipFile=_make_zip_js(v2, "index.js"), ) resp = lam.invoke(FunctionName="lam-node-basic", Payload=b"{}") assert resp["StatusCode"] == 200 payload = json.loads(resp["Payload"].read()) assert payload["body"] == "v2" def test_lambda_create_from_s3(lam, s3): bucket = "lambda-code-bucket" s3.create_bucket(Bucket=bucket) buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("index.py", "def handler(event, context): return {'s3': True}") s3.put_object(Bucket=bucket, Key="fn.zip", Body=buf.getvalue()) lam.create_function( FunctionName="lam-s3-code", Runtime="python3.11", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"S3Bucket": bucket, "S3Key": "fn.zip"}, ) resp = lam.invoke(FunctionName="lam-s3-code", Payload=b"{}") assert resp["StatusCode"] == 200 assert json.loads(resp["Payload"].read())["s3"] is True def test_lambda_update_code_from_s3(lam, s3): bucket = "lambda-code-bucket" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("index.py", "def handler(event, context): return {'v': 's3v2'}") s3.put_object(Bucket=bucket, Key="fn-v2.zip", Body=buf.getvalue()) lam.update_function_code( FunctionName="lam-s3-code", S3Bucket=bucket, S3Key="fn-v2.zip", ) resp = lam.invoke(FunctionName="lam-s3-code", Payload=b"{}") assert json.loads(resp["Payload"].read())["v"] == "s3v2" def test_lambda_update_code_s3_missing_returns_error(lam): from botocore.exceptions import ClientError with pytest.raises(ClientError) as exc: lam.update_function_code( FunctionName="lam-s3-code", S3Bucket="lambda-code-bucket", S3Key="does-not-exist.zip", ) assert exc.value.response["Error"]["Code"] == "InvalidParameterValueException" def test_lambda_publish_version_with_create(lam): code = "def handler(event, context): return {'ver': 1}" try: lam.get_function(FunctionName="lam-versioned-pub") except Exception: lam.create_function( FunctionName="lam-versioned-pub", Runtime="python3.11", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(code)}, Publish=True, ) resp = lam.list_versions_by_function(FunctionName="lam-versioned-pub") versions = [v["Version"] for v in resp["Versions"]] assert any(v != "$LATEST" for v in versions) def test_lambda_update_code_publish_version(lam): # Ensure function exists (may have been cleaned up) try: lam.get_function(FunctionName="lam-versioned") except Exception: lam.create_function( FunctionName="lam-versioned", Runtime="python3.11", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip("def handler(event, context): return {'ver': 1}")}, Publish=True, ) v2 = "def handler(event, context): return {'ver': 2}" lam.update_function_code( FunctionName="lam-versioned", ZipFile=_make_zip(v2), Publish=True, ) resp = lam.list_versions_by_function(FunctionName="lam-versioned") versions = [v["Version"] for v in resp["Versions"] if v["Version"] != "$LATEST"] assert len(versions) >= 1 def test_lambda_nodejs_promise_handler(lam): code = ( "exports.handler = (event) => Promise.resolve({ promise: true, val: event.x });" ) lam.create_function( FunctionName="lam-node-promise", Runtime="nodejs20.x", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip_js(code, "index.js")}, ) resp = lam.invoke(FunctionName="lam-node-promise", Payload=json.dumps({"x": 42})) payload = json.loads(resp["Payload"].read()) assert payload["promise"] is True assert payload["val"] == 42 def test_lambda_nodejs_callback_handler(lam): code = ( "exports.handler = (event, context, cb) => cb(null, { cb: true, val: event.y });" ) lam.create_function( FunctionName="lam-node-cb", Runtime="nodejs20.x", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip_js(code, "index.js")}, ) resp = lam.invoke(FunctionName="lam-node-cb", Payload=json.dumps({"y": 7})) payload = json.loads(resp["Payload"].read()) assert payload["cb"] is True assert payload["val"] == 7 def test_lambda_nodejs_env_vars_at_spawn(lam): """Lambda env vars are available at process startup (NODE_OPTIONS, etc.).""" code = ( "exports.handler = async (event) => ({" " myVar: process.env.MY_CUSTOM_VAR," " region: process.env.AWS_REGION" "});" ) lam.create_function( FunctionName="lam-node-env-spawn", Runtime="nodejs20.x", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip_js(code, "index.js")}, Environment={"Variables": {"MY_CUSTOM_VAR": "from-spawn"}}, ) resp = lam.invoke(FunctionName="lam-node-env-spawn", Payload=b"{}") payload = json.loads(resp["Payload"].read()) assert payload["myVar"] == "from-spawn" def test_lambda_python_env_vars_at_spawn(lam): """Python Lambda env vars are available at process startup.""" code = ( "import os\n" "def handler(event, context):\n" " return {'myVar': os.environ.get('MY_PY_VAR', 'missing')}\n" ) lam.create_function( FunctionName="lam-py-env-spawn", Runtime="python3.11", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(code)}, Environment={"Variables": {"MY_PY_VAR": "from-spawn-py"}}, ) resp = lam.invoke(FunctionName="lam-py-env-spawn", Payload=b"{}") payload = json.loads(resp["Payload"].read()) assert payload["myVar"] == "from-spawn-py" def test_lambda_endpoint_url_not_overridden_by_function_env(lam): """AWS_ENDPOINT_URL from function env vars must not override the process-level value. When MiniStack runs in Docker, the host-mapped port (e.g. 4568) is unreachable from inside the container — the Lambda binary must always use MiniStack's internal endpoint. This test verifies that the MiniStack server's AWS_ENDPOINT_URL takes precedence over function-level env vars. It requires the server to have AWS_ENDPOINT_URL set (as it does when running via docker-compose). """ # Verify the MiniStack server has AWS_ENDPOINT_URL set by checking # a baseline Lambda. If the server doesn't have it, the override # logic has nothing to restore and this test is not meaningful. probe_code = ( "import os\n" "def handler(event, context):\n" " return {'endpoint': os.environ.get('AWS_ENDPOINT_URL', '')}\n" ) probe_name = f"lam-endpoint-probe-{_uuid_mod.uuid4().hex[:8]}" lam.create_function( FunctionName=probe_name, Runtime="python3.11", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(probe_code)}, ) resp = lam.invoke(FunctionName=probe_name, Payload=b"{}") server_endpoint = json.loads(resp["Payload"].read()).get("endpoint", "") if not server_endpoint: pytest.skip("MiniStack server does not have AWS_ENDPOINT_URL set " "(run with docker-compose to test endpoint override)") # Now test with a function that sets a conflicting AWS_ENDPOINT_URL. code = ( "import os\n" "def handler(event, context):\n" " return {'endpoint': os.environ.get('AWS_ENDPOINT_URL', 'unset')}\n" ) fname = f"lam-endpoint-override-{_uuid_mod.uuid4().hex[:8]}" lam.create_function( FunctionName=fname, Runtime="python3.11", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(code)}, Environment={"Variables": { "AWS_ENDPOINT_URL": "http://should-be-overridden:9999", }}, ) resp = lam.invoke(FunctionName=fname, Payload=b"{}") payload = json.loads(resp["Payload"].read()) # The Lambda must see the server's endpoint, not the function env var. assert payload["endpoint"] != "http://should-be-overridden:9999", ( "Function-level AWS_ENDPOINT_URL must not override internal endpoint" ) assert payload["endpoint"] == server_endpoint def test_lambda_dynamodb_stream_esm(lam, ddb): # Create table with streams enabled ddb.create_table( TableName="stream-test-table", KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], BillingMode="PAY_PER_REQUEST", StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"}, ) stream_arn = ddb.describe_table(TableName="stream-test-table")["Table"]["LatestStreamArn"] # Create Lambda that captures stream records code = "def handler(event, context): return len(event['Records'])" lam.create_function( FunctionName="lam-ddb-stream", Runtime="python3.11", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(code)}, ) esm = lam.create_event_source_mapping( FunctionName="lam-ddb-stream", EventSourceArn=stream_arn, StartingPosition="TRIM_HORIZON", BatchSize=10, ) assert esm["EventSourceArn"] == stream_arn assert esm["FunctionArn"].endswith("lam-ddb-stream") # Verify ESM is registered and retrievable esm_resp = lam.get_event_source_mapping(UUID=esm["UUID"]) assert esm_resp["EventSourceArn"] == stream_arn assert esm_resp["StartingPosition"] == "TRIM_HORIZON" # Write items — stream should capture them ddb.put_item(TableName="stream-test-table", Item={"pk": {"S": "k1"}, "val": {"S": "v1"}}) ddb.put_item(TableName="stream-test-table", Item={"pk": {"S": "k2"}, "val": {"S": "v2"}}) ddb.delete_item(TableName="stream-test-table", Key={"pk": {"S": "k1"}}) # Verify table still has expected state scan = ddb.scan(TableName="stream-test-table") pks = [item["pk"]["S"] for item in scan["Items"]] assert "k2" in pks assert "k1" not in pks def test_lambda_function_url_config(lam): """CreateFunctionUrlConfig / Get / Update / Delete / List lifecycle.""" import uuid as _uuid_mod fn = f"intg-url-cfg-{_uuid_mod.uuid4().hex[:8]}" lam.create_function( FunctionName=fn, Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(_LAMBDA_CODE)}, ) # Create resp = lam.create_function_url_config(FunctionName=fn, AuthType="NONE") assert resp["AuthType"] == "NONE" assert "FunctionUrl" in resp url = resp["FunctionUrl"] # Get got = lam.get_function_url_config(FunctionName=fn) assert got["FunctionUrl"] == url # Update updated = lam.update_function_url_config( FunctionName=fn, AuthType="AWS_IAM", Cors={"AllowOrigins": ["*"]}, ) assert updated["AuthType"] == "AWS_IAM" assert updated["Cors"]["AllowOrigins"] == ["*"] # List listed = lam.list_function_url_configs(FunctionName=fn) assert any(c["FunctionUrl"] == url for c in listed["FunctionUrlConfigs"]) # Delete lam.delete_function_url_config(FunctionName=fn) with pytest.raises(ClientError) as exc: lam.get_function_url_config(FunctionName=fn) assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException" def test_lambda_unknown_path_returns_404(lam): """Requests to an unrecognised Lambda path must return 404, not 400 InvalidRequest.""" import urllib.error import urllib.request endpoint = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566") req = urllib.request.Request( f"{endpoint}/2015-03-31/functions/nonexistent-fn/completely-unknown-subpath", headers={"Authorization": "AWS4-HMAC-SHA256 Credential=test/20260101/us-east-1/lambda/aws4_request"}, method="GET", ) try: urllib.request.urlopen(req) assert False, "Expected an error response" except urllib.error.HTTPError as e: assert e.code == 404 def test_lambda_reset_terminates_workers(lam): """/_ministack/reset must cleanly terminate warm Lambda workers.""" import urllib.request fn = f"intg-reset-worker-{__import__('uuid').uuid4().hex[:8]}" code = "import time\n_boot = time.time()\ndef handler(event, context):\n return {'boot': _boot}\n" lam.create_function( FunctionName=fn, Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(code)}, ) # Warm the worker r1 = lam.invoke(FunctionName=fn, Payload=b"{}") boot1 = json.loads(r1["Payload"].read())["boot"] # Reset — must terminate worker without error endpoint = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566") req = urllib.request.Request(f"{endpoint}/_ministack/reset", data=b"", method="POST") for _attempt in range(3): try: urllib.request.urlopen(req, timeout=15) break except Exception: if _attempt == 2: raise # Re-create and invoke — new worker means new boot time lam.create_function( FunctionName=fn, Runtime="python3.12", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": _make_zip(code)}, ) r2 = lam.invoke(FunctionName=fn, Payload=b"{}") boot2 = json.loads(r2["Payload"].read())["boot"] assert boot2 > boot1, "Worker should have been reset — new boot time expected" def test_lambda_alias_crud(lam): """CreateAlias, GetAlias, UpdateAlias, DeleteAlias.""" code = _zip_lambda("def handler(e,c): return {'v': 1}") lam.create_function( FunctionName="qa-lam-alias", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/r", Handler="index.handler", Code={"ZipFile": code}, ) lam.publish_version(FunctionName="qa-lam-alias") lam.create_alias( FunctionName="qa-lam-alias", Name="prod", FunctionVersion="1", Description="production alias", ) alias = lam.get_alias(FunctionName="qa-lam-alias", Name="prod") assert alias["Name"] == "prod" assert alias["FunctionVersion"] == "1" lam.update_alias(FunctionName="qa-lam-alias", Name="prod", Description="updated") alias2 = lam.get_alias(FunctionName="qa-lam-alias", Name="prod") assert alias2["Description"] == "updated" aliases = lam.list_aliases(FunctionName="qa-lam-alias")["Aliases"] assert any(a["Name"] == "prod" for a in aliases) lam.delete_alias(FunctionName="qa-lam-alias", Name="prod") aliases2 = lam.list_aliases(FunctionName="qa-lam-alias")["Aliases"] assert not any(a["Name"] == "prod" for a in aliases2) def test_lambda_publish_version_snapshot(lam): """PublishVersion creates a numbered version snapshot.""" code = _zip_lambda("def handler(e,c): return 'v1'") lam.create_function( FunctionName="qa-lam-version", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/r", Handler="index.handler", Code={"ZipFile": code}, ) ver = lam.publish_version(FunctionName="qa-lam-version") assert ver["Version"] == "1" versions = lam.list_versions_by_function(FunctionName="qa-lam-version")["Versions"] version_nums = [v["Version"] for v in versions] assert "1" in version_nums assert "$LATEST" in version_nums def test_lambda_function_concurrency(lam): """PutFunctionConcurrency / GetFunctionConcurrency / DeleteFunctionConcurrency.""" code = _zip_lambda("def handler(e,c): return {}") lam.create_function( FunctionName="qa-lam-concurrency", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/r", Handler="index.handler", Code={"ZipFile": code}, ) lam.put_function_concurrency( FunctionName="qa-lam-concurrency", ReservedConcurrentExecutions=5, ) resp = lam.get_function_concurrency(FunctionName="qa-lam-concurrency") assert resp["ReservedConcurrentExecutions"] == 5 lam.delete_function_concurrency(FunctionName="qa-lam-concurrency") resp2 = lam.get_function_concurrency(FunctionName="qa-lam-concurrency") assert resp2.get("ReservedConcurrentExecutions") is None def test_lambda_add_remove_permission(lam): """AddPermission / RemovePermission / GetPolicy.""" code = _zip_lambda("def handler(e,c): return {}") lam.create_function( FunctionName="qa-lam-policy", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/r", Handler="index.handler", Code={"ZipFile": code}, ) lam.add_permission( FunctionName="qa-lam-policy", StatementId="allow-s3", Action="lambda:InvokeFunction", Principal="s3.amazonaws.com", ) policy = json.loads(lam.get_policy(FunctionName="qa-lam-policy")["Policy"]) assert any(s["Sid"] == "allow-s3" for s in policy["Statement"]) lam.remove_permission(FunctionName="qa-lam-policy", StatementId="allow-s3") policy2 = json.loads(lam.get_policy(FunctionName="qa-lam-policy")["Policy"]) assert not any(s["Sid"] == "allow-s3" for s in policy2["Statement"]) def test_lambda_list_functions_pagination(lam): """ListFunctions pagination with Marker works correctly.""" for i in range(5): code = _zip_lambda("def handler(e,c): return {}") try: lam.create_function( FunctionName=f"qa-lam-page-{i}", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/r", Handler="index.handler", Code={"ZipFile": code}, ) except ClientError: pass resp1 = lam.list_functions(MaxItems=2) assert len(resp1["Functions"]) <= 2 if "NextMarker" in resp1: resp2 = lam.list_functions(MaxItems=2, Marker=resp1["NextMarker"]) names1 = {f["FunctionName"] for f in resp1["Functions"]} names2 = {f["FunctionName"] for f in resp2["Functions"]} assert not names1 & names2 def test_lambda_invoke_event_type_returns_202(lam): """Invoke with InvocationType=Event returns 202 immediately.""" code = _zip_lambda("def handler(e,c): return {}") try: lam.create_function( FunctionName="qa-lam-event-invoke", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/r", Handler="index.handler", Code={"ZipFile": code}, ) except ClientError: pass resp = lam.invoke( FunctionName="qa-lam-event-invoke", InvocationType="Event", Payload=json.dumps({}), ) assert resp["StatusCode"] == 202 def test_lambda_invoke_dry_run_returns_204(lam): """Invoke with InvocationType=DryRun returns 204.""" code = _zip_lambda("def handler(e,c): return {}") try: lam.create_function( FunctionName="qa-lam-dryrun", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/r", Handler="index.handler", Code={"ZipFile": code}, ) except ClientError: pass resp = lam.invoke( FunctionName="qa-lam-dryrun", InvocationType="DryRun", Payload=json.dumps({}), ) assert resp["StatusCode"] == 204 def test_lambda_layer_publish(lam): import base64, zipfile, io buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("layer.py", "# layer") zip_bytes = buf.getvalue() resp = lam.publish_layer_version( LayerName="my-test-layer", Description="Test layer", Content={"ZipFile": zip_bytes}, CompatibleRuntimes=["python3.12"], ) assert resp["Version"] == 1 assert "my-test-layer" in resp["LayerVersionArn"] def test_lambda_layer_publish_from_s3(lam, s3): """PublishLayerVersion with S3Bucket/S3Key. Contributed by @Baptiste-Garcin (#356).""" import zipfile, io buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("s3layer.py", "# layer from s3") zip_bytes = buf.getvalue() bucket = "layer-bucket" key = "layers/my-layer.zip" s3.create_bucket(Bucket=bucket) s3.put_object(Bucket=bucket, Key=key, Body=zip_bytes) resp = lam.publish_layer_version( LayerName="s3-layer", Description="Layer from S3", Content={"S3Bucket": bucket, "S3Key": key}, CompatibleRuntimes=["python3.12"], ) assert resp["Version"] == 1 assert "s3-layer" in resp["LayerVersionArn"] assert resp["Content"]["CodeSize"] == len(zip_bytes) assert resp["Content"]["CodeSha256"] def test_lambda_layer_get_version(lam): resp = lam.get_layer_version(LayerName="my-test-layer", VersionNumber=1) assert resp["Version"] == 1 assert resp["Description"] == "Test layer" def test_lambda_layer_list_versions(lam): resp = lam.list_layer_versions(LayerName="my-test-layer") assert len(resp["LayerVersions"]) >= 1 assert resp["LayerVersions"][0]["Version"] == 1 def test_lambda_layer_list_layers(lam): resp = lam.list_layers() names = [l["LayerName"] for l in resp["Layers"]] assert "my-test-layer" in names def test_lambda_layer_delete_version(lam): import base64, zipfile, io buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("tmp.py", "") lam.publish_layer_version(LayerName="delete-layer-test", Content={"ZipFile": buf.getvalue()}) lam.delete_layer_version(LayerName="delete-layer-test", VersionNumber=1) resp = lam.list_layer_versions(LayerName="delete-layer-test") assert len(resp["LayerVersions"]) == 0 def test_lambda_function_with_layer(lam): # Publish layer buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("layer.py", "") layer_resp = lam.publish_layer_version(LayerName="fn-layer", Content={"ZipFile": buf.getvalue()}) layer_arn = layer_resp["LayerVersionArn"] # Create function using the layer fn_zip = io.BytesIO() with zipfile.ZipFile(fn_zip, "w") as z: z.writestr("index.py", "def handler(e, c): return {}") lam.create_function( FunctionName="fn-with-layer", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/test", Handler="index.handler", Code={"ZipFile": fn_zip.getvalue()}, Layers=[layer_arn], ) fn = lam.get_function(FunctionName="fn-with-layer") assert layer_arn in fn["Configuration"]["Layers"][0]["Arn"] def test_lambda_layer_content_location(lam): """Content.Location should be a non-empty URL pointing to the layer zip.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("mod.py", "X=1") resp = lam.publish_layer_version( LayerName="loc-layer", Content={"ZipFile": buf.getvalue()}, ) assert resp["Content"]["Location"] assert "loc-layer" in resp["Content"]["Location"] # Verify the URL actually serves zip data import urllib.request data = urllib.request.urlopen(resp["Content"]["Location"]).read() assert len(data) == resp["Content"]["CodeSize"] def test_lambda_layer_pagination(lam): """Publish 3 versions, paginate with MaxItems=1.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("p.py", "") for _ in range(3): lam.publish_layer_version(LayerName="page-layer", Content={"ZipFile": buf.getvalue()}) # List with MaxItems=1 (newest first) resp = lam.list_layer_versions(LayerName="page-layer", MaxItems=1) assert len(resp["LayerVersions"]) == 1 assert "NextMarker" in resp def test_lambda_layer_list_filter_runtime(lam): """Filter list_layer_versions by CompatibleRuntime.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("r.py", "") lam.publish_layer_version( LayerName="rt-filter-layer", Content={"ZipFile": buf.getvalue()}, CompatibleRuntimes=["python3.12"], ) lam.publish_layer_version( LayerName="rt-filter-layer", Content={"ZipFile": buf.getvalue()}, CompatibleRuntimes=["nodejs18.x"], ) resp = lam.list_layer_versions( LayerName="rt-filter-layer", CompatibleRuntime="python3.12", ) assert all("python3.12" in v["CompatibleRuntimes"] for v in resp["LayerVersions"]) def test_lambda_layer_list_filter_architecture(lam): """Filter list_layer_versions by CompatibleArchitecture.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("a.py", "") lam.publish_layer_version( LayerName="arch-filter-layer", Content={"ZipFile": buf.getvalue()}, CompatibleArchitectures=["x86_64"], ) lam.publish_layer_version( LayerName="arch-filter-layer", Content={"ZipFile": buf.getvalue()}, CompatibleArchitectures=["arm64"], ) resp = lam.list_layer_versions( LayerName="arch-filter-layer", CompatibleArchitecture="x86_64", ) assert all("x86_64" in v["CompatibleArchitectures"] for v in resp["LayerVersions"]) def test_lambda_layer_list_layers_pagination(lam): """Multiple layers, paginate ListLayers.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("x.py", "") for i in range(3): lam.publish_layer_version( LayerName=f"ll-page-{i}", Content={"ZipFile": buf.getvalue()}, ) resp = lam.list_layers(MaxItems=1) assert len(resp["Layers"]) == 1 assert "NextMarker" in resp def test_lambda_layer_list_layers_filter_runtime(lam): """ListLayers filtered by CompatibleRuntime.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("f.py", "") lam.publish_layer_version( LayerName="ll-rt-py", Content={"ZipFile": buf.getvalue()}, CompatibleRuntimes=["python3.12"], ) lam.publish_layer_version( LayerName="ll-rt-node", Content={"ZipFile": buf.getvalue()}, CompatibleRuntimes=["nodejs18.x"], ) resp = lam.list_layers(CompatibleRuntime="python3.12") names = [l["LayerName"] for l in resp["Layers"]] assert "ll-rt-py" in names assert "ll-rt-node" not in names def test_lambda_layer_get_version_not_found(lam): """Getting a nonexistent layer should raise 404.""" from botocore.exceptions import ClientError with pytest.raises(ClientError) as exc: lam.get_layer_version(LayerName="no-such-layer-xyz", VersionNumber=1) assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404 def test_lambda_layer_get_version_by_arn(lam): """GetLayerVersionByArn resolves by full ARN.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("ba.py", "") pub = lam.publish_layer_version( LayerName="by-arn-layer", Content={"ZipFile": buf.getvalue()}, ) arn = pub["LayerVersionArn"] resp = lam.get_layer_version_by_arn(Arn=arn) assert resp["LayerVersionArn"] == arn assert resp["Version"] == pub["Version"] def test_lambda_layer_version_permission_add(lam): """Add a layer version permission and verify response.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("perm.py", "") pub = lam.publish_layer_version( LayerName="perm-layer", Content={"ZipFile": buf.getvalue()}, ) resp = lam.add_layer_version_permission( LayerName="perm-layer", VersionNumber=pub["Version"], StatementId="allow-all", Action="lambda:GetLayerVersion", Principal="*", ) assert "Statement" in resp import json stmt = json.loads(resp["Statement"]) assert stmt["Sid"] == "allow-all" assert stmt["Action"] == "lambda:GetLayerVersion" def test_lambda_layer_version_permission_get_policy(lam): """Get policy after adding a permission.""" import json resp = lam.get_layer_version_policy(LayerName="perm-layer", VersionNumber=1) policy = json.loads(resp["Policy"]) assert len(policy["Statement"]) >= 1 assert policy["Statement"][0]["Sid"] == "allow-all" def test_lambda_layer_version_permission_remove(lam): """Remove a layer version permission.""" lam.remove_layer_version_permission( LayerName="perm-layer", VersionNumber=1, StatementId="allow-all", ) from botocore.exceptions import ClientError with pytest.raises(ClientError) as exc: lam.get_layer_version_policy(LayerName="perm-layer", VersionNumber=1) assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404 def test_lambda_layer_version_permission_duplicate_sid(lam): """Adding a duplicate StatementId should raise conflict.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("dup.py", "") pub = lam.publish_layer_version( LayerName="dup-sid-layer", Content={"ZipFile": buf.getvalue()}, ) lam.add_layer_version_permission( LayerName="dup-sid-layer", VersionNumber=pub["Version"], StatementId="s1", Action="lambda:GetLayerVersion", Principal="*", ) from botocore.exceptions import ClientError with pytest.raises(ClientError) as exc: lam.add_layer_version_permission( LayerName="dup-sid-layer", VersionNumber=pub["Version"], StatementId="s1", Action="lambda:GetLayerVersion", Principal="*", ) assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 409 def test_lambda_layer_version_permission_invalid_action(lam): """Only lambda:GetLayerVersion is a valid action.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("inv.py", "") pub = lam.publish_layer_version( LayerName="inv-act-layer", Content={"ZipFile": buf.getvalue()}, ) from botocore.exceptions import ClientError with pytest.raises(ClientError) as exc: lam.add_layer_version_permission( LayerName="inv-act-layer", VersionNumber=pub["Version"], StatementId="s1", Action="lambda:InvokeFunction", Principal="*", ) assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] in (400, 403) def test_lambda_layer_delete_idempotent(lam): """Deleting a nonexistent version should not error.""" lam.delete_layer_version(LayerName="no-such-layer-del", VersionNumber=999) def test_lambda_warm_worker_invalidation(lam): """Create function with code v1, invoke, update code to v2, invoke again — must see v2.""" import io as _io import zipfile as _zf fname = "lambda-worker-invalidation-test" try: lam.delete_function(FunctionName=fname) except Exception: pass # v1 code code_v1 = b'def handler(event, context):\n return {"version": 1}\n' buf1 = _io.BytesIO() with _zf.ZipFile(buf1, "w") as z: z.writestr("index.py", code_v1) lam.create_function( FunctionName=fname, Runtime="python3.12", Role="arn:aws:iam::000000000000:role/test-role", Handler="index.handler", Code={"ZipFile": buf1.getvalue()}, ) # Invoke v1 resp1 = lam.invoke(FunctionName=fname, Payload=json.dumps({})) payload1 = json.loads(resp1["Payload"].read()) assert payload1["version"] == 1 # Update to v2 code_v2 = b'def handler(event, context):\n return {"version": 2}\n' buf2 = _io.BytesIO() with _zf.ZipFile(buf2, "w") as z: z.writestr("index.py", code_v2) lam.update_function_code(FunctionName=fname, ZipFile=buf2.getvalue()) # Invoke v2 resp2 = lam.invoke(FunctionName=fname, Payload=json.dumps({})) payload2 = json.loads(resp2["Payload"].read()) assert payload2["version"] == 2 def test_lambda_event_invoke_config_crud(lam): """Put/Get/Delete EventInvokeConfig lifecycle.""" code = "def handler(e,c): return {}" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("index.py", code) lam.create_function( FunctionName="eic-fn", Runtime="python3.11", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": buf.getvalue()}, ) lam.put_function_event_invoke_config( FunctionName="eic-fn", MaximumRetryAttempts=1, MaximumEventAgeInSeconds=300, ) cfg = lam.get_function_event_invoke_config(FunctionName="eic-fn") assert cfg["MaximumRetryAttempts"] == 1 assert cfg["MaximumEventAgeInSeconds"] == 300 lam.delete_function_event_invoke_config(FunctionName="eic-fn") from botocore.exceptions import ClientError with pytest.raises(ClientError): lam.get_function_event_invoke_config(FunctionName="eic-fn") lam.delete_function(FunctionName="eic-fn") def test_lambda_provisioned_concurrency_crud(lam): """Put/Get/Delete ProvisionedConcurrencyConfig lifecycle.""" code = "def handler(e,c): return {}" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("index.py", code) lam.create_function( FunctionName="pc-fn", Runtime="python3.11", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": buf.getvalue()}, Publish=True, ) versions = lam.list_versions_by_function(FunctionName="pc-fn")["Versions"] ver = [v for v in versions if v["Version"] != "$LATEST"][0]["Version"] lam.put_provisioned_concurrency_config( FunctionName="pc-fn", Qualifier=ver, ProvisionedConcurrentExecutions=5, ) cfg = lam.get_provisioned_concurrency_config( FunctionName="pc-fn", Qualifier=ver, ) assert cfg["RequestedProvisionedConcurrentExecutions"] == 5 lam.delete_provisioned_concurrency_config( FunctionName="pc-fn", Qualifier=ver, ) from botocore.exceptions import ClientError with pytest.raises(ClientError): lam.get_provisioned_concurrency_config(FunctionName="pc-fn", Qualifier=ver) lam.delete_function(FunctionName="pc-fn") def test_lambda_image_create_invoke(lam): """CreateFunction with PackageType Image + GetFunction returns ImageUri.""" lam.create_function( FunctionName="img-test-v39", PackageType="Image", Code={"ImageUri": "my-repo/my-image:latest"}, Role="arn:aws:iam::000000000000:role/test", Timeout=30, ) desc = lam.get_function(FunctionName="img-test-v39") assert desc["Configuration"]["PackageType"] == "Image" assert desc["Code"]["RepositoryType"] == "ECR" assert desc["Code"]["ImageUri"] == "my-repo/my-image:latest" lam.delete_function(FunctionName="img-test-v39") def test_lambda_update_code_image_uri(lam): """UpdateFunctionCode with ImageUri updates the image.""" lam.create_function( FunctionName="img-update-v39", PackageType="Image", Code={"ImageUri": "my-repo/my-image:v1"}, Role="arn:aws:iam::000000000000:role/test", ) lam.update_function_code(FunctionName="img-update-v39", ImageUri="my-repo/my-image:v2") desc = lam.get_function(FunctionName="img-update-v39") assert desc["Code"]["ImageUri"] == "my-repo/my-image:v2" lam.delete_function(FunctionName="img-update-v39") def test_lambda_provided_runtime_create(lam): """CreateFunction with provided.al2023 runtime accepts bootstrap handler.""" import zipfile, io buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: zf.writestr("bootstrap", "#!/bin/sh\necho ok\n") lam.create_function( FunctionName="provided-test-v39", Runtime="provided.al2023", Handler="bootstrap", Code={"ZipFile": buf.getvalue()}, Role="arn:aws:iam::000000000000:role/test", ) desc = lam.get_function_configuration(FunctionName="provided-test-v39") assert desc["Runtime"] == "provided.al2023" assert desc["Handler"] == "bootstrap" lam.delete_function(FunctionName="provided-test-v39") @pytest.mark.skipif( os.environ.get("LAMBDA_EXECUTOR", "").lower() != "docker", reason="requires LAMBDA_EXECUTOR=docker and Docker daemon", ) def test_lambda_provided_runtime_docker_invoke(lam): """Invoke a provided.al2023 Lambda via the Docker executor. Uses a shell-script bootstrap that implements the Lambda Runtime API (GET /invocation/next, POST /invocation/{id}/response). """ # Shell bootstrap implementing the Lambda Runtime API protocol. # Must loop: the RIE expects the bootstrap to poll for invocations. bootstrap_script = ( "#!/bin/sh\n" 'RUNTIME_API="${AWS_LAMBDA_RUNTIME_API}"\n' "while true; do\n" ' RESP=$(curl -s -D /tmp/headers ' '"http://${RUNTIME_API}/2018-06-01/runtime/invocation/next")\n' ' REQUEST_ID=$(grep -i "Lambda-Runtime-Aws-Request-Id" /tmp/headers ' '| tr -d "\\r" | cut -d" " -f2)\n' ' curl -s -X POST ' '"http://${RUNTIME_API}/2018-06-01/runtime/invocation/${REQUEST_ID}/response" ' "-d '{\"statusCode\":200,\"body\":\"hello from provided\"}'\n" "done\n" ) buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as zf: info = zipfile.ZipInfo("bootstrap") info.external_attr = 0o755 << 16 # executable zf.writestr(info, bootstrap_script) func_name = f"provided-docker-test-{_uuid_mod.uuid4().hex[:8]}" lam.create_function( FunctionName=func_name, Runtime="provided.al2023", Handler="bootstrap", Code={"ZipFile": buf.getvalue()}, Role="arn:aws:iam::000000000000:role/test", Timeout=30, ) try: resp = lam.invoke(FunctionName=func_name, Payload=json.dumps({"key": "value"})) payload = json.loads(resp["Payload"].read()) assert payload["statusCode"] == 200 assert payload["body"] == "hello from provided" finally: lam.delete_function(FunctionName=func_name) def test_apigwv2_nodejs_lambda_proxy(lam, apigw): """API Gateway v2 HTTP API should invoke Node.js Lambda via warm worker, not return mock.""" import urllib.request as _urlreq import uuid as _uuid from botocore.exceptions import ClientError fname = f"apigwv2-node-{_uuid.uuid4().hex[:8]}" api_id = None code = ( "exports.handler = async (event) => ({" " statusCode: 200," " body: JSON.stringify({ route: event.routeKey, method: event.requestContext.http.method })" "});" ) try: lam.create_function( FunctionName=fname, Runtime="nodejs20.x", Role="arn:aws:iam::000000000000:role/test-role", Handler="index.handler", Code={"ZipFile": _make_zip_js(code, "index.js")}, ) api_id = apigw.create_api(Name=f"v2-node-{fname}", ProtocolType="HTTP")["ApiId"] int_id = apigw.create_integration( ApiId=api_id, IntegrationType="AWS_PROXY", IntegrationUri=f"arn:aws:lambda:us-east-1:000000000000:function:{fname}", PayloadFormatVersion="2.0", )["IntegrationId"] apigw.create_route(ApiId=api_id, RouteKey="GET /test", Target=f"integrations/{int_id}") apigw.create_stage(ApiId=api_id, StageName="$default") req = _urlreq.Request( f"http://{api_id}.execute-api.localhost:{_EXECUTE_PORT}/$default/test", method="GET", ) req.add_header("Host", f"{api_id}.execute-api.localhost:{_EXECUTE_PORT}") resp = _urlreq.urlopen(req).read().decode() body = json.loads(resp) assert body.get("route") == "GET /test", f"Expected handler result, got: {resp}" assert body.get("method") == "GET" finally: if api_id is not None: try: apigw.delete_api(ApiId=api_id) except ClientError: pass try: lam.delete_function(FunctionName=fname) except ClientError: pass def test_lambda_nodejs_esm_mjs_handler(lam): """Node.js .mjs (ESM) handlers should be loaded via dynamic import() fallback. Creates a ZIP with two .mjs files: - utils.mjs: exports a helper function using ESM `export` syntax - index.mjs: imports utils.mjs via ESM `import` statement and uses it This verifies that: 1. .mjs files are loaded via import() instead of require() 2. ESM import/export syntax works between modules 3. The handler's return value is correctly propagated """ fname = f"lam-esm-{_uuid_mod.uuid4().hex[:8]}" utils_code = ( "export function greet(name) {\n" " return `Hello, ${name} from ESM!`;\n" "}\n" "\n" "export const VERSION = '1.0.0';\n" ) handler_code = ( "import { greet, VERSION } from './utils.mjs';\n" "\n" "export const handler = async (event) => {\n" " const name = event.name || 'World';\n" " return {\n" " statusCode: 200,\n" " body: JSON.stringify({\n" " message: greet(name),\n" " version: VERSION,\n" " esm: true,\n" " }),\n" " };\n" "};\n" ) buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("index.mjs", handler_code) z.writestr("utils.mjs", utils_code) lam.create_function( FunctionName=fname, Runtime="nodejs20.x", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": buf.getvalue()}, ) try: resp = lam.invoke( FunctionName=fname, Payload=json.dumps({"name": "MiniStack"}), ) assert resp["StatusCode"] == 200 assert "FunctionError" not in resp, f"Lambda error: {resp['Payload'].read().decode()}" payload = json.loads(resp["Payload"].read()) assert payload["statusCode"] == 200 body = json.loads(payload["body"]) assert body["message"] == "Hello, MiniStack from ESM!" assert body["version"] == "1.0.0" assert body["esm"] is True finally: lam.delete_function(FunctionName=fname) def test_lambda_warm_worker_uses_layer(lam): """Warm worker should extract layers and make their code available to the handler.""" # Create a layer with a Python module layer_buf = io.BytesIO() with zipfile.ZipFile(layer_buf, "w") as z: z.writestr("python/myhelper.py", "LAYER_VALUE = 'from-layer'\n") layer_resp = lam.publish_layer_version( LayerName="warm-layer-test", Content={"ZipFile": layer_buf.getvalue()}, CompatibleRuntimes=["python3.12"], ) layer_arn = layer_resp["LayerVersionArn"] # Create a function that imports from the layer func_code = ( "import myhelper\n" "def handler(event, context):\n" " return {'value': myhelper.LAYER_VALUE}\n" ) func_buf = io.BytesIO() with zipfile.ZipFile(func_buf, "w") as z: z.writestr("index.py", func_code) fname = f"warm-layer-{_uuid_mod.uuid4().hex[:8]}" lam.create_function( FunctionName=fname, Runtime="python3.12", Role="arn:aws:iam::000000000000:role/test", Handler="index.handler", Code={"ZipFile": func_buf.getvalue()}, Layers=[layer_arn], ) try: resp = lam.invoke(FunctionName=fname, Payload=b"{}") assert resp["StatusCode"] == 200 assert "FunctionError" not in resp, f"Lambda error: {resp.get('FunctionError')}" payload = json.loads(resp["Payload"].read()) assert payload["value"] == "from-layer" finally: lam.delete_function(FunctionName=fname) def test_lambda_nodejs_esm_type_module(lam): """Node.js ESM via package.json type:module should trigger ERR_REQUIRE_ESM fallback.""" fname = f"lam-esm-type-{_uuid_mod.uuid4().hex[:8]}" handler_code = ( "export const handler = async (event) => ({\n" " statusCode: 200,\n" " body: 'type-module-works',\n" "});\n" ) buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("index.js", handler_code) z.writestr("package.json", '{"type": "module"}') lam.create_function( FunctionName=fname, Runtime="nodejs20.x", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": buf.getvalue()}, ) try: resp = lam.invoke(FunctionName=fname, Payload=b"{}") assert resp["StatusCode"] == 200 assert "FunctionError" not in resp, f"Lambda error: {resp['Payload'].read().decode()}" payload = json.loads(resp["Payload"].read()) assert payload["statusCode"] == 200 assert payload["body"] == "type-module-works" finally: lam.delete_function(FunctionName=fname) def test_lambda_warm_worker_nodejs_uses_layer(lam): """Warm worker should extract Node.js layers and make packages available via require().""" # Create a layer with a Node.js module under nodejs/node_modules/ layer_buf = io.BytesIO() with zipfile.ZipFile(layer_buf, "w") as z: z.writestr( "nodejs/node_modules/layerhelper/index.js", "module.exports.LAYER_VALUE = 'from-node-layer';\n", ) layer_resp = lam.publish_layer_version( LayerName="warm-node-layer-test", Content={"ZipFile": layer_buf.getvalue()}, CompatibleRuntimes=["nodejs20.x"], ) layer_arn = layer_resp["LayerVersionArn"] # Create a Node.js function that requires the layer package handler_code = ( "const helper = require('layerhelper');\n" "exports.handler = async (event) => {\n" " return { value: helper.LAYER_VALUE };\n" "};\n" ) func_buf = io.BytesIO() with zipfile.ZipFile(func_buf, "w") as z: z.writestr("index.js", handler_code) fname = f"warm-node-layer-{_uuid_mod.uuid4().hex[:8]}" lam.create_function( FunctionName=fname, Runtime="nodejs20.x", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": func_buf.getvalue()}, Layers=[layer_arn], ) try: resp = lam.invoke(FunctionName=fname, Payload=b"{}") assert resp["StatusCode"] == 200 assert "FunctionError" not in resp, f"Lambda error: {resp['Payload'].read().decode()}" payload = json.loads(resp["Payload"].read()) assert payload["value"] == "from-node-layer" finally: lam.delete_function(FunctionName=fname) def test_lambda_warm_worker_nodejs_esm_uses_layer(lam): """ESM .mjs handler must be able to import packages from a Lambda Layer. This is the combined case of ESM support (PR #238) and Layer extraction (PR #236). Node.js ESM import() does not use NODE_PATH, so the runtime symlinks layer packages into code/node_modules/ for ancestor-tree resolution. """ # Create a layer with a Node.js package under nodejs/node_modules/ layer_buf = io.BytesIO() with zipfile.ZipFile(layer_buf, "w") as z: z.writestr( "nodejs/node_modules/esmhelper/index.js", "module.exports.LAYER_VALUE = 'from-esm-layer';\n", ) layer_resp = lam.publish_layer_version( LayerName="warm-esm-layer-test", Content={"ZipFile": layer_buf.getvalue()}, CompatibleRuntimes=["nodejs20.x"], ) layer_arn = layer_resp["LayerVersionArn"] # Create an ESM handler that uses native import to load the layer package. # The layer package exports via CJS but Node.js ESM can import CJS modules. # Native import does NOT use NODE_PATH — this is the bug we are testing. handler_code = ( "import helper from 'esmhelper';\n" "export const handler = async (event) => {\n" " return { value: helper.LAYER_VALUE, esm: true };\n" "};\n" ) func_buf = io.BytesIO() with zipfile.ZipFile(func_buf, "w") as z: z.writestr("index.mjs", handler_code) fname = f"warm-esm-layer-{_uuid_mod.uuid4().hex[:8]}" lam.create_function( FunctionName=fname, Runtime="nodejs20.x", Role=_LAMBDA_ROLE, Handler="index.handler", Code={"ZipFile": func_buf.getvalue()}, Layers=[layer_arn], ) try: resp = lam.invoke(FunctionName=fname, Payload=b"{}") assert resp["StatusCode"] == 200 assert "FunctionError" not in resp, f"Lambda error: {resp['Payload'].read().decode()}" payload = json.loads(resp["Payload"].read()) assert payload["value"] == "from-esm-layer" assert payload["esm"] is True finally: lam.delete_function(FunctionName=fname) # --------------------------------------------------------------------------- # Terraform compatibility tests # --------------------------------------------------------------------------- def test_lambda_image_no_default_runtime_handler(lam): """Image-based functions must not get default runtime/handler values.""" fname = "tf-compat-image-no-defaults" try: lam.delete_function(FunctionName=fname) except ClientError: pass resp = lam.create_function( FunctionName=fname, PackageType="Image", Code={"ImageUri": "my-repo/my-image:latest"}, Role=_LAMBDA_ROLE, Timeout=30, ) try: assert resp["PackageType"] == "Image" assert resp["Runtime"] == "", f"Expected empty Runtime for Image, got {resp['Runtime']!r}" assert resp["Handler"] == "", f"Expected empty Handler for Image, got {resp['Handler']!r}" finally: lam.delete_function(FunctionName=fname) def test_lambda_image_preserves_image_config(lam): """ImageConfig provided at creation must be preserved in the GetFunction response.""" fname = "tf-compat-image-config" try: lam.delete_function(FunctionName=fname) except ClientError: pass lam.create_function( FunctionName=fname, PackageType="Image", Code={"ImageUri": "my-repo/my-image:latest"}, Role=_LAMBDA_ROLE, ImageConfig={"Command": ["main.lambda_handler"]}, ) try: get_resp = lam.get_function(FunctionName=fname) cfg = get_resp["Configuration"] assert "ImageConfigResponse" in cfg, "ImageConfigResponse missing from get_function response" assert cfg["ImageConfigResponse"]["ImageConfig"]["Command"] == ["main.lambda_handler"] finally: lam.delete_function(FunctionName=fname) def test_lambda_empty_dead_letter_config(lam): """Functions without DeadLetterConfig must return empty dict, not {TargetArn: ''}.""" fname = "tf-compat-no-dlc" try: lam.delete_function(FunctionName=fname) except ClientError: pass resp = lam.create_function( FunctionName=fname, Runtime="python3.12", Handler="index.handler", Role=_LAMBDA_ROLE, Code={"ZipFile": _make_zip(_LAMBDA_CODE)}, ) try: dlc = resp.get("DeadLetterConfig", {}) assert dlc == {} or "TargetArn" not in dlc or dlc.get("TargetArn") == "", \ f"Expected empty DeadLetterConfig, got {dlc!r}" assert dlc.get("TargetArn") is None or dlc == {}, \ f"DeadLetterConfig should not have TargetArn when unconfigured, got {dlc!r}" finally: lam.delete_function(FunctionName=fname) def test_esm_sqs_no_starting_position(lam, sqs): """SQS event source mappings must not include StartingPosition.""" fname = "tf-compat-esm-sqs" try: lam.delete_function(FunctionName=fname) except ClientError: pass lam.create_function( FunctionName=fname, Runtime="python3.12", Handler="index.handler", Role=_LAMBDA_ROLE, Code={"ZipFile": _make_zip(_LAMBDA_CODE)}, ) q_url = sqs.create_queue(QueueName="tf-compat-esm-queue")["QueueUrl"] q_arn = sqs.get_queue_attributes( QueueUrl=q_url, AttributeNames=["QueueArn"] )["Attributes"]["QueueArn"] esm_uuid = None try: resp = lam.create_event_source_mapping( EventSourceArn=q_arn, FunctionName=fname, BatchSize=5, Enabled=True, ) esm_uuid = resp["UUID"] assert "StartingPosition" not in resp, \ f"SQS ESM should not have StartingPosition, got {resp.get('StartingPosition')!r}" get_resp = lam.get_event_source_mapping(UUID=esm_uuid) assert "StartingPosition" not in get_resp, \ "StartingPosition should not appear in get_event_source_mapping for SQS" finally: if esm_uuid: lam.delete_event_source_mapping(UUID=esm_uuid) lam.delete_function(FunctionName=fname) sqs.delete_queue(QueueUrl=q_url) def test_esm_kinesis_has_starting_position(lam, kin): """Kinesis event source mappings must include StartingPosition.""" fname = "tf-compat-esm-kinesis" stream_name = "tf-compat-esm-stream" try: lam.delete_function(FunctionName=fname) except ClientError: pass try: kin.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True) except ClientError: pass lam.create_function( FunctionName=fname, Runtime="python3.12", Handler="index.handler", Role=_LAMBDA_ROLE, Code={"ZipFile": _make_zip(_LAMBDA_CODE)}, ) kin.create_stream(StreamName=stream_name, ShardCount=1) stream_arn = kin.describe_stream( StreamName=stream_name )["StreamDescription"]["StreamARN"] esm_uuid = None try: resp = lam.create_event_source_mapping( EventSourceArn=stream_arn, FunctionName=fname, StartingPosition="TRIM_HORIZON", BatchSize=100, Enabled=True, ) esm_uuid = resp["UUID"] assert "StartingPosition" in resp, "Kinesis ESM must include StartingPosition" assert resp["StartingPosition"] == "TRIM_HORIZON" finally: if esm_uuid: lam.delete_event_source_mapping(UUID=esm_uuid) lam.delete_function(FunctionName=fname) try: kin.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True) except ClientError: pass def test_esm_response_no_function_name_field(lam, sqs): """ESM API responses should contain FunctionArn but not FunctionName (matching AWS).""" fname = "tf-compat-esm-no-fname" try: lam.delete_function(FunctionName=fname) except ClientError: pass lam.create_function( FunctionName=fname, Runtime="python3.12", Handler="index.handler", Role=_LAMBDA_ROLE, Code={"ZipFile": _make_zip(_LAMBDA_CODE)}, ) q_url = sqs.create_queue(QueueName="tf-compat-esm-fname-queue")["QueueUrl"] q_arn = sqs.get_queue_attributes( QueueUrl=q_url, AttributeNames=["QueueArn"] )["Attributes"]["QueueArn"] esm_uuid = None try: resp = lam.create_event_source_mapping( EventSourceArn=q_arn, FunctionName=fname, BatchSize=5, Enabled=True, ) esm_uuid = resp["UUID"] assert "FunctionArn" in resp, "ESM response must include FunctionArn" assert fname in resp["FunctionArn"], "FunctionArn must contain the function name" finally: if esm_uuid: lam.delete_event_source_mapping(UUID=esm_uuid) lam.delete_function(FunctionName=fname) sqs.delete_queue(QueueUrl=q_url) def test_lambda_update_function_configuration_layers(lam): """Attaching a layer via update-function-configuration should normalize ARN strings to {Arn, CodeSize} dicts — regression test for 'str' object has no attribute 'get'.""" buf = io.BytesIO() with zipfile.ZipFile(buf, "w") as z: z.writestr("util.py", "# layer code") layer_resp = lam.publish_layer_version( LayerName="update-cfg-layer", Content={"ZipFile": buf.getvalue()}, ) layer_arn = layer_resp["LayerVersionArn"] fn_zip = io.BytesIO() with zipfile.ZipFile(fn_zip, "w") as z: z.writestr("index.py", "def handler(e, c): return {}") lam.create_function( FunctionName="fn-update-layer-test", Runtime="python3.12", Role="arn:aws:iam::000000000000:role/test", Handler="index.handler", Code={"ZipFile": fn_zip.getvalue()}, ) resp = lam.update_function_configuration( FunctionName="fn-update-layer-test", Layers=[layer_arn], ) # Response Layers must be dicts with Arn key, not raw strings assert len(resp["Layers"]) == 1 assert isinstance(resp["Layers"][0], dict) assert resp["Layers"][0]["Arn"] == layer_arn # GetFunction must also return normalized layer dicts fn = lam.get_function(FunctionName="fn-update-layer-test") assert fn["Configuration"]["Layers"][0]["Arn"] == layer_arn # ============================================================================ # Unit tests — Lambda warm-container pool, ESM filter, CW Logs emitter, # event-stream framing, throttle response shape. These mock containers and # don't hit the live ministack server, so they run even without Docker. # Originally lived in tests/test_lambda_pool.py — merged here for one-file-per-service. # ============================================================================ import time from unittest.mock import MagicMock import pytest import ministack.services.lambda_svc as lsvc from ministack.core.responses import set_request_account_id @pytest.fixture(autouse=True) def _clear_pool(): """Fresh pool before every test; also clear after so later tests don't see residue.""" lsvc._warm_pool.clear() yield lsvc._warm_pool.clear() def _mk_container(running: bool = True): """Fake container with a .reload() that sets status, matching docker-py interface.""" c = MagicMock() c.status = "running" if running else "exited" def _reload(): # No-op — container.status stays at whatever was set last. pass c.reload.side_effect = _reload return c # ──────────────────────────────── pool key ────────────────────────────────── def test_pool_key_scopes_by_account(): """Same function in two accounts → two distinct keys → two distinct pools.""" set_request_account_id("111111111111") k_a = lsvc._warm_pool_key("fn", {"CodeSha256": "abc"}) set_request_account_id("222222222222") k_b = lsvc._warm_pool_key("fn", {"CodeSha256": "abc"}) assert k_a != k_b assert k_a.startswith("111111111111:") assert k_b.startswith("222222222222:") def test_pool_key_differs_by_package_type(): set_request_account_id("111111111111") k_zip = lsvc._warm_pool_key("fn", {"CodeSha256": "abc"}) k_img = lsvc._warm_pool_key("fn", {"PackageType": "Image", "ImageUri": "my/img:v1"}) assert k_zip != k_img assert ":zip:" in k_zip assert ":image:" in k_img def test_pool_key_differs_by_code_sha(): """Code update → new key → cold start (doesn't accidentally reuse old container).""" set_request_account_id("111111111111") k1 = lsvc._warm_pool_key("fn", {"CodeSha256": "sha-v1"}) k2 = lsvc._warm_pool_key("fn", {"CodeSha256": "sha-v2"}) assert k1 != k2 def test_pool_key_differs_by_image_uri(): set_request_account_id("111111111111") k1 = lsvc._warm_pool_key("fn", {"PackageType": "Image", "ImageUri": "img:v1"}) k2 = lsvc._warm_pool_key("fn", {"PackageType": "Image", "ImageUri": "img:v2"}) assert k1 != k2 # ──────────────────────────── acquire / spawn / release ───────────────────── def test_acquire_on_empty_pool_signals_spawn(): entry, reason = lsvc._pool_acquire("k", max_concurrency=None) assert entry is None assert reason == "spawn" def test_register_then_reacquire_reuses_same_entry(): c = _mk_container() entry1 = lsvc._pool_register("k", c, tmpdir=None) assert entry1["in_use"] is True # While in_use, next acquire can't reuse it — signals spawn. entry2, reason = lsvc._pool_acquire("k", max_concurrency=None) assert entry2 is None assert reason == "spawn" # After release, the same container is reused. lsvc._pool_release(entry1) assert entry1["in_use"] is False entry3, reason = lsvc._pool_acquire("k", max_concurrency=None) assert entry3 is entry1 assert reason == "reused" assert entry3["in_use"] is True def test_multiple_concurrent_invocations_get_separate_entries(): """Two concurrent invocations must land on two distinct pool entries (not the same container).""" c1 = _mk_container() c2 = _mk_container() e1 = lsvc._pool_register("k", c1, tmpdir=None) # e1 is in_use — next acquire signals spawn, simulating cold start _, reason = lsvc._pool_acquire("k", max_concurrency=None) assert reason == "spawn" e2 = lsvc._pool_register("k", c2, tmpdir=None) assert e1 is not e2 assert e1["container"] is c1 assert e2["container"] is c2 assert len(lsvc._warm_pool["k"]) == 2 def test_function_concurrency_cap_rejects_when_full(): """ReservedConcurrentExecutions=2 → 3rd concurrent invocation gets func_cap.""" for _ in range(2): lsvc._pool_register("k", _mk_container(), tmpdir=None) entry, reason = lsvc._pool_acquire("k", max_concurrency=2) assert entry is None assert reason == "func_cap" def test_function_concurrency_cap_none_is_unbounded(): """No ReservedConcurrentExecutions → can always spawn.""" for _ in range(50): lsvc._pool_register("k", _mk_container(), tmpdir=None) entry, reason = lsvc._pool_acquire("k", max_concurrency=None) assert entry is None assert reason == "spawn" def test_account_concurrency_cap_rejects(monkeypatch): """Global account cap: 3 in-use total → 4th is throttled as acct_cap.""" monkeypatch.setattr(lsvc, "_ACCOUNT_CONCURRENCY_CAP", 3) # 3 in-use entries across two pool keys lsvc._pool_register("k1", _mk_container(), tmpdir=None) lsvc._pool_register("k1", _mk_container(), tmpdir=None) lsvc._pool_register("k2", _mk_container(), tmpdir=None) entry, reason = lsvc._pool_acquire("k2", max_concurrency=None) assert entry is None assert reason == "acct_cap" # ──────────────────────────── lifecycle: dead, remove, evict, clear ───────── def test_dead_containers_are_pruned_on_acquire(): """Pool must not hand out a dead container on reuse.""" dead = _mk_container(running=False) alive_entry = lsvc._pool_register("k", _mk_container(running=True), tmpdir=None) # Release alive so it becomes reusable lsvc._pool_release(alive_entry) # Sneak a dead one into the pool directly lsvc._warm_pool["k"].append({ "container": dead, "tmpdir": None, "in_use": False, "last_used": time.time(), "created": time.time(), }) assert len(lsvc._warm_pool["k"]) == 2 # Acquire — dead one pruned, alive one reused entry, reason = lsvc._pool_acquire("k", max_concurrency=None) assert reason == "reused" assert entry["container"] is alive_entry["container"] assert len(lsvc._warm_pool["k"]) == 1 def test_pool_remove_kills_and_unregisters(): entry = lsvc._pool_register("k", _mk_container(), tmpdir=None) lsvc._pool_remove(entry) assert entry not in lsvc._warm_pool.get("k", []) entry["container"].stop.assert_called() entry["container"].remove.assert_called() def test_pool_evict_idle_removes_only_expired_and_not_in_use(monkeypatch): monkeypatch.setattr(lsvc, "_WARM_CONTAINER_TTL", 60) busy = lsvc._pool_register("k", _mk_container(), tmpdir=None) # in_use=True idle_old = lsvc._pool_register("k", _mk_container(), tmpdir=None) lsvc._pool_release(idle_old) idle_old["last_used"] = time.time() - 300 # past TTL idle_fresh = lsvc._pool_register("k", _mk_container(), tmpdir=None) lsvc._pool_release(idle_fresh) # last_used = now, within TTL lsvc._pool_evict_idle() remaining = lsvc._warm_pool.get("k", []) assert busy in remaining # still in use — must not be evicted assert idle_fresh in remaining # under TTL — kept assert idle_old not in remaining idle_old["container"].stop.assert_called() def test_pool_clear_all_kills_everything(): for key in ("a", "b", "c"): lsvc._pool_register(key, _mk_container(), tmpdir=None) victims = [e for lst in lsvc._warm_pool.values() for e in lst] assert len(victims) == 3 lsvc._pool_clear_all() assert lsvc._warm_pool == {} for v in victims: v["container"].stop.assert_called() v["container"].remove.assert_called() # ──────────────────────────── multi-tenancy ───────────────────────────────── def test_two_accounts_get_independent_pools(): """Invocations in account A must not pick up account B's containers.""" set_request_account_id("111111111111") k_a = lsvc._warm_pool_key("fn", {"CodeSha256": "sha"}) c_a = _mk_container() e_a = lsvc._pool_register(k_a, c_a, tmpdir=None) lsvc._pool_release(e_a) set_request_account_id("222222222222") k_b = lsvc._warm_pool_key("fn", {"CodeSha256": "sha"}) assert k_a != k_b entry, reason = lsvc._pool_acquire(k_b, max_concurrency=None) assert entry is None assert reason == "spawn" # account B must cold-start; can't reuse A's container def test_throttle_response_shape_matches_aws(): """The throttle response body must match the AWS TooManyRequestsException shape.""" r = lsvc._throttle_response( reason_code="ReservedFunctionConcurrentInvocationLimitExceeded", msg="Rate Exceeded", retry_after=1, ) assert r["throttle"] is True assert r["error"] is True body = r["body"] assert body["__type"] == "TooManyRequestsException" assert body["Reason"] == "ReservedFunctionConcurrentInvocationLimitExceeded" assert "retryAfterSeconds" in body assert "message" in body # ──────────────────── async retry + DLQ routing ───────────────────────────── def test_route_async_failure_to_sqs_dlq(): """Async invoke final failure routes an AWS-shaped envelope to the SQS DLQ.""" import ministack.services.sqs as _sqs set_request_account_id("000000000000") # Create a queue directly in the internal state url = "http://localhost:4566/000000000000/dlq-test" arn = "arn:aws:sqs:us-east-1:000000000000:dlq-test" _sqs._queues[url] = { "messages": [], "attributes": {"QueueArn": arn}, "is_fifo": False, "dedup_cache": {}, "fifo_seq": 0, } try: lsvc._route_async_failure( target_arn=arn, func_name="doesnt-matter", event={"input": "hi"}, result={"error": True, "function_error": "Unhandled", "body": {"errorType": "Handler", "errorMessage": "boom"}}, ) assert len(_sqs._queues[url]["messages"]) == 1 import json as _json envelope = _json.loads(_sqs._queues[url]["messages"][0]["body"]) assert envelope["requestPayload"] == {"input": "hi"} assert envelope["requestContext"]["condition"] == "RetriesExhausted" assert envelope["responseContext"]["functionError"] == "Unhandled" assert envelope["responsePayload"]["errorMessage"] == "boom" finally: _sqs._queues.pop(url, None) def test_route_async_failure_to_sns_topic(): """Async invoke final failure can target an SNS topic (OnFailure destination).""" import ministack.services.sns as _sns set_request_account_id("000000000000") arn = "arn:aws:sns:us-east-1:000000000000:async-fail" _sns._topics[arn] = { "arn": arn, "name": "async-fail", "subscriptions": [], "messages": [], "tags": {}, "attributes": {}, } try: # Monkey-patch _fanout to observe the call without needing subscribers called = {} real_fanout = _sns._fanout def _capture(topic_arn, msg_id, message, subject, *args, **kwargs): called["topic_arn"] = topic_arn called["message"] = message called["subject"] = subject _sns._fanout = _capture try: lsvc._route_async_failure( target_arn=arn, func_name="doesnt-matter", event={"k": "v"}, result={"error": True, "function_error": "Handled", "body": {"errorType": "X"}}, ) assert called.get("topic_arn") == arn assert "requestPayload" in called.get("message", "") finally: _sns._fanout = real_fanout finally: _sns._topics.pop(arn, None) def test_route_async_failure_unknown_target_logs_and_returns(): """Unknown DLQ ARN must not raise — just logs.""" set_request_account_id("000000000000") # Should NOT raise lsvc._route_async_failure( target_arn="arn:aws:sqs:us-east-1:000000000000:does-not-exist", func_name="x", event={}, result={"error": True, "body": {}}, ) # ──────────────────── RIE result → function_error classification ──────────── def test_lambda_strict_hard_fails_when_docker_unavailable(monkeypatch): """LAMBDA_STRICT=1 + no Docker → Runtime.DockerUnavailable, NO fallback to warm/local.""" monkeypatch.setattr(lsvc, "LAMBDA_STRICT", True) monkeypatch.setattr(lsvc, "_docker_available", False) func = {"config": { "FunctionName": "strict-test", "Runtime": "python3.12", "PackageType": "Zip", "CodeSha256": "abc", "Timeout": 3, "MemorySize": 128, }, "code_zip": b"\x00"} result = lsvc._execute_function_docker(func, {"k": "v"}) assert result.get("error") is True assert result["body"]["errorType"] == "Runtime.DockerUnavailable" def test_lambda_permissive_falls_back_to_warm_without_docker(monkeypatch): """Default (LAMBDA_STRICT=False) + no Docker + python runtime → warm fallback.""" monkeypatch.setattr(lsvc, "LAMBDA_STRICT", False) monkeypatch.setattr(lsvc, "_docker_available", False) called = {"warm": False} def _fake_warm(func, event): called["warm"] = True return {"body": {"ok": True}} monkeypatch.setattr(lsvc, "_execute_function_warm", _fake_warm) func = {"config": { "FunctionName": "perm-test", "Runtime": "python3.12", "PackageType": "Zip", "CodeSha256": "abc", "Timeout": 3, "MemorySize": 128, }, "code_zip": b"\x00"} lsvc._execute_function_docker(func, {}) assert called["warm"] is True def test_emit_lambda_logs_writes_start_end_report_to_cw_logs(): """Lambda → CW Logs emits AWS-shaped START / body / END / REPORT lines.""" import ministack.services.cloudwatch_logs as _cwl set_request_account_id("000000000000") _cwl._log_groups.clear() func = {"config": {"FunctionName": "emit-test", "Version": "$LATEST", "MemorySize": 128}} lsvc._emit_lambda_logs( func, request_id="abc-1234", log_text="user print line 1\nuser print line 2", error=False, duration_ms=42, ) assert "/aws/lambda/emit-test" in _cwl._log_groups streams = _cwl._log_groups["/aws/lambda/emit-test"]["streams"] assert len(streams) == 1 stream_name = next(iter(streams)) assert stream_name.startswith(tuple(f"{y:04d}/" for y in range(2024, 2031))) assert "[$LATEST]" in stream_name msgs = [e["message"] for e in streams[stream_name]["events"]] assert any(m.startswith("START RequestId: abc-1234") and "$LATEST" in m for m in msgs) assert "user print line 1" in msgs assert "user print line 2" in msgs assert any(m == "END RequestId: abc-1234" for m in msgs) assert any(m.startswith("REPORT RequestId: abc-1234") and "Duration: 42 ms" in m for m in msgs) def test_emit_lambda_logs_autocreate_is_per_function(): """Each function gets its own /aws/lambda/{name} group.""" import ministack.services.cloudwatch_logs as _cwl set_request_account_id("000000000000") _cwl._log_groups.clear() lsvc._emit_lambda_logs( {"config": {"FunctionName": "fn-a", "Version": "$LATEST", "MemorySize": 128}}, "r1", "", False, 1, ) lsvc._emit_lambda_logs( {"config": {"FunctionName": "fn-b", "Version": "$LATEST", "MemorySize": 128}}, "r2", "", False, 1, ) assert "/aws/lambda/fn-a" in _cwl._log_groups assert "/aws/lambda/fn-b" in _cwl._log_groups def test_emit_lambda_logs_failure_is_best_effort(monkeypatch): """A broken CW Logs module must not bubble into the Lambda invocation.""" import ministack.services.cloudwatch_logs as _cwl # Nuke the target to force a write failure monkeypatch.setattr(_cwl, "_log_groups", None) # Must not raise lsvc._emit_lambda_logs( {"config": {"FunctionName": "crash", "Version": "$LATEST", "MemorySize": 128}}, "r", "", False, 1, ) def test_match_esm_filter_equality(): """Basic equality matching on a nested record.""" rec = {"body": {"orderType": "Premium", "region": "us-east-1"}} assert lsvc._match_esm_filter(rec, {"body": {"orderType": ["Premium"]}}) is True assert lsvc._match_esm_filter(rec, {"body": {"orderType": ["Basic"]}}) is False def test_match_esm_filter_content_prefix_suffix_anything_but(): """Content-filter dicts: prefix, suffix, anything-but, exists.""" rec = {"body": {"name": "prod-user-42"}} assert lsvc._match_esm_filter(rec, {"body": {"name": [{"prefix": "prod-"}]}}) is True assert lsvc._match_esm_filter(rec, {"body": {"name": [{"prefix": "dev-"}]}}) is False assert lsvc._match_esm_filter(rec, {"body": {"name": [{"suffix": "-42"}]}}) is True assert lsvc._match_esm_filter(rec, {"body": {"name": [{"anything-but": ["prod-user-42"]}]}}) is False assert lsvc._match_esm_filter(rec, {"body": {"name": [{"anything-but": ["other"]}]}}) is True assert lsvc._match_esm_filter(rec, {"body": {"missing": [{"exists": False}]}}) is True assert lsvc._match_esm_filter(rec, {"body": {"name": [{"exists": True}]}}) is True def test_match_esm_filter_numeric(): """Numeric comparison operator.""" rec = {"body": {"count": 7}} assert lsvc._match_esm_filter(rec, {"body": {"count": [{"numeric": [">", 5]}]}}) is True assert lsvc._match_esm_filter(rec, {"body": {"count": [{"numeric": [">", 10]}]}}) is False assert lsvc._match_esm_filter(rec, {"body": {"count": [{"numeric": [">", 5, "<", 10]}]}}) is True def test_apply_filter_criteria_drops_non_matching_sqs_records(): """SQS bodies are JSON-parsed before matching, matching AWS behaviour.""" import json as _json esm = {"FilterCriteria": {"Filters": [ {"Pattern": _json.dumps({"body": {"orderType": ["Premium"]}})}, ]}} records = [ {"messageId": "a", "body": _json.dumps({"orderType": "Premium"})}, {"messageId": "b", "body": _json.dumps({"orderType": "Basic"})}, ] filtered = lsvc._apply_filter_criteria(records, esm) assert [r["messageId"] for r in filtered] == ["a"] def test_apply_filter_criteria_no_filters_passes_through(): records = [{"messageId": "x"}, {"messageId": "y"}] assert lsvc._apply_filter_criteria(records, {}) == records assert lsvc._apply_filter_criteria(records, {"FilterCriteria": {}}) == records def test_event_stream_encode_roundtrip(): """The vnd.amazon.eventstream encoder must produce a valid framed message that boto3's own EventStream parser can decode.""" from botocore.eventstream import EventStreamBuffer msg = lsvc._es_encode_message({ ":message-type": "event", ":event-type": "PayloadChunk", ":content-type": "application/octet-stream", }, b"hello-world") buf = EventStreamBuffer() buf.add_data(msg) events = list(buf) assert len(events) == 1 event = events[0] # botocore surfaces headers as a dict[str, Any] on the parsed event assert event.headers[":event-type"] == "PayloadChunk" assert event.payload == b"hello-world" def test_invoke_rie_classifies_unhandled_vs_handled(): """If RIE returns X-Amz-Function-Error header the result carries function_error='Unhandled'. A handler-returned errorType with no RIE header should produce 'Handled'.""" # The classification logic lives inside _invoke_rie; unit-test by # simulating what that branch does via a tiny inline replica. parsed_error_payload = {"errorType": "E", "errorMessage": "m"} # Case 1: RIE header present → Unhandled has_header = True if has_header or (isinstance(parsed_error_payload, dict) and parsed_error_payload.get("errorType")): classification = "Unhandled" if has_header else "Handled" assert classification == "Unhandled" # Case 2: No RIE header, but body has errorType → Handled has_header = False if has_header or (isinstance(parsed_error_payload, dict) and parsed_error_payload.get("errorType")): classification = "Unhandled" if has_header else "Handled" assert classification == "Handled"