aws_rl_env / aws_infra /tests /test_lambda.py
Sizzing's picture
Upload folder using huggingface_hub
e56d042 verified
import io
import json
import os
import time
import zipfile
from urllib.parse import urlparse
import pytest
from botocore.exceptions import ClientError
import uuid as _uuid_mod
_endpoint = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566")
_EXECUTE_PORT = urlparse(_endpoint).port or 4566
def _make_zip(code: str) -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", code)
return buf.getvalue()
def _make_zip_js(code: str, filename: str = "index.js") -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr(filename, code)
return buf.getvalue()
_LAMBDA_CODE = 'def handler(event, context):\n return {"statusCode": 200, "body": "ok"}\n'
_LAMBDA_CODE_V2 = 'def handler(event, context):\n return {"statusCode": 200, "body": "v2"}\n'
_LAMBDA_ROLE = "arn:aws:iam::000000000000:role/lambda-role"
_NODE_CODE = (
"exports.handler = async (event, context) => {"
" return { statusCode: 200, body: JSON.stringify({ hello: event.name || 'world' }) }; };"
)
def _zip_lambda(code: str) -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", code)
return buf.getvalue()
def test_lambda_create_invoke(lam):
code = b'def handler(event, context):\n return {"statusCode": 200, "body": "Hello!", "event": event}\n'
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", code)
lam.create_function(
FunctionName="test-func-1",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/test-role",
Handler="index.handler",
Code={"ZipFile": buf.getvalue()},
)
funcs = lam.list_functions()
assert any(f["FunctionName"] == "test-func-1" for f in funcs["Functions"])
resp = lam.invoke(FunctionName="test-func-1", Payload=json.dumps({"key": "value"}))
payload = json.loads(resp["Payload"].read())
assert payload["statusCode"] == 200
def test_create_function_missing_runtime_raises(lam):
"""Zip deployment without a Runtime should return InvalidParameterValueException."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", "def handler(e, c): return {}")
with pytest.raises(ClientError) as exc:
lam.create_function(
FunctionName="no-runtime-fn",
Role="arn:aws:iam::000000000000:role/role",
Handler="index.handler",
Code={"ZipFile": buf.getvalue()},
)
assert exc.value.response["Error"]["Code"] == "InvalidParameterValueException"
def test_lambda_esm_sqs(lam, sqs):
"""SQS → Lambda event source mapping: messages sent to SQS trigger Lambda."""
import io
import zipfile as zf
# Clean up from previous runs
try:
lam.delete_function(FunctionName="esm-test-func")
except Exception:
pass
# Lambda that records what it received
code = (
b"import json\n"
b"received = []\n"
b"def handler(event, context):\n"
b" received.extend(event.get('Records', []))\n"
b" return {'processed': len(event.get('Records', []))}\n"
)
buf = io.BytesIO()
with zf.ZipFile(buf, "w") as z:
z.writestr("index.py", code)
lam.create_function(
FunctionName="esm-test-func",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/test-role",
Handler="index.handler",
Code={"ZipFile": buf.getvalue()},
)
q_url = sqs.create_queue(QueueName="esm-test-queue")["QueueUrl"]
q_arn = sqs.get_queue_attributes(QueueUrl=q_url, AttributeNames=["QueueArn"])["Attributes"]["QueueArn"]
# Create event source mapping
resp = lam.create_event_source_mapping(
EventSourceArn=q_arn,
FunctionName="esm-test-func",
BatchSize=5,
Enabled=True,
)
esm_uuid = resp["UUID"]
assert resp["State"] == "Enabled"
# Send a message to SQS
sqs.send_message(QueueUrl=q_url, MessageBody="trigger-lambda")
# Wait for poller to pick it up (max 5s)
import time
for _ in range(10):
time.sleep(0.5)
msgs = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1)
if not msgs.get("Messages"):
break # message was consumed by Lambda
# Queue should be empty — Lambda consumed the message
msgs = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1)
assert not msgs.get("Messages"), "Message should have been consumed by Lambda via ESM"
# Cleanup
lam.delete_event_source_mapping(UUID=esm_uuid)
def test_lambda_create_function(lam):
resp = lam.create_function(
FunctionName="lam-create-test",
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(_LAMBDA_CODE)},
)
assert resp["FunctionName"] == "lam-create-test"
assert resp["Runtime"] == "python3.12"
assert resp["Handler"] == "index.handler"
# AWS: CreateFunction returns State=Pending and transitions to Active
# asynchronously. Terraform's FunctionActive waiter polls GetFunction.
assert resp["State"] in ("Pending", "Active")
assert resp["LastUpdateStatus"] in ("InProgress", "Successful")
assert "FunctionArn" in resp
def test_lambda_create_duplicate(lam):
with pytest.raises(ClientError) as exc:
lam.create_function(
FunctionName="lam-create-test",
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(_LAMBDA_CODE)},
)
assert exc.value.response["Error"]["Code"] == "ResourceConflictException"
def test_lambda_get_function(lam):
resp = lam.get_function(FunctionName="lam-create-test")
assert resp["Configuration"]["FunctionName"] == "lam-create-test"
assert "Code" in resp
assert "Tags" in resp
def test_lambda_get_function_not_found(lam):
with pytest.raises(ClientError) as exc:
lam.get_function(FunctionName="nonexistent-func-xyz")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_lambda_list_functions(lam):
resp = lam.list_functions()
names = [f["FunctionName"] for f in resp["Functions"]]
assert "lam-create-test" in names
def test_lambda_delete_function(lam):
lam.create_function(
FunctionName="lam-to-delete",
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(_LAMBDA_CODE)},
)
lam.delete_function(FunctionName="lam-to-delete")
with pytest.raises(ClientError) as exc:
lam.get_function(FunctionName="lam-to-delete")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_lambda_invoke(lam):
lam.create_function(
FunctionName="lam-invoke-test",
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(_LAMBDA_CODE)},
)
resp = lam.invoke(
FunctionName="lam-invoke-test",
Payload=json.dumps({"hello": "world"}),
)
assert resp["StatusCode"] == 200
payload = json.loads(resp["Payload"].read())
assert payload["statusCode"] == 200
assert payload["body"] == "ok"
def test_lambda_invoke_async(lam):
resp = lam.invoke(
FunctionName="lam-invoke-test",
InvocationType="Event",
Payload=json.dumps({"async": True}),
)
assert resp["StatusCode"] == 202
def test_lambda_update_code(lam):
lam.update_function_code(
FunctionName="lam-invoke-test",
ZipFile=_make_zip(_LAMBDA_CODE_V2),
)
resp = lam.invoke(
FunctionName="lam-invoke-test",
Payload=json.dumps({}),
)
payload = json.loads(resp["Payload"].read())
assert payload["body"] == "v2"
def test_lambda_update_config(lam):
lam.update_function_configuration(
FunctionName="lam-invoke-test",
Handler="index.new_handler",
Environment={"Variables": {"MY_VAR": "my_val"}},
)
resp = lam.get_function(FunctionName="lam-invoke-test")
cfg = resp["Configuration"]
assert cfg["Handler"] == "index.new_handler"
assert cfg["Environment"]["Variables"]["MY_VAR"] == "my_val"
lam.update_function_configuration(
FunctionName="lam-invoke-test",
Handler="index.handler",
)
def test_lambda_tags(lam):
arn = lam.get_function(FunctionName="lam-invoke-test")["Configuration"]["FunctionArn"]
lam.tag_resource(Resource=arn, Tags={"env": "test", "team": "backend"})
resp = lam.list_tags(Resource=arn)
assert resp["Tags"]["env"] == "test"
assert resp["Tags"]["team"] == "backend"
lam.untag_resource(Resource=arn, TagKeys=["team"])
resp = lam.list_tags(Resource=arn)
assert "team" not in resp["Tags"]
assert resp["Tags"]["env"] == "test"
def test_lambda_add_permission(lam):
lam.add_permission(
FunctionName="lam-invoke-test",
StatementId="allow-s3",
Action="lambda:InvokeFunction",
Principal="s3.amazonaws.com",
SourceArn="arn:aws:s3:::my-bucket",
)
resp = lam.get_policy(FunctionName="lam-invoke-test")
policy = json.loads(resp["Policy"])
sids = [s["Sid"] for s in policy["Statement"]]
assert "allow-s3" in sids
def test_lambda_list_versions(lam):
resp = lam.list_versions_by_function(FunctionName="lam-invoke-test")
versions = resp["Versions"]
assert any(v["Version"] == "$LATEST" for v in versions)
def test_lambda_publish_version(lam):
resp = lam.publish_version(
FunctionName="lam-invoke-test",
Description="first published version",
)
assert resp["Version"] == "1"
assert resp["Description"] == "first published version"
assert "FunctionArn" in resp
versions = lam.list_versions_by_function(FunctionName="lam-invoke-test")["Versions"]
version_nums = [v["Version"] for v in versions]
assert "$LATEST" in version_nums
assert "1" in version_nums
def test_lambda_esm_sqs_comprehensive(lam, sqs):
try:
lam.delete_function(FunctionName="esm-comp-func")
except ClientError:
pass
code = 'def handler(event, context):\n return {"processed": len(event.get("Records", []))}\n'
lam.create_function(
FunctionName="esm-comp-func",
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
)
q_url = sqs.create_queue(QueueName="esm-comp-queue")["QueueUrl"]
q_arn = sqs.get_queue_attributes(
QueueUrl=q_url,
AttributeNames=["QueueArn"],
)["Attributes"]["QueueArn"]
resp = lam.create_event_source_mapping(
EventSourceArn=q_arn,
FunctionName="esm-comp-func",
BatchSize=5,
Enabled=True,
)
esm_uuid = resp["UUID"]
assert resp["State"] == "Enabled"
assert resp["BatchSize"] == 5
assert resp["EventSourceArn"] == q_arn
got = lam.get_event_source_mapping(UUID=esm_uuid)
assert got["UUID"] == esm_uuid
listed = lam.list_event_source_mappings(FunctionName="esm-comp-func")
assert any(e["UUID"] == esm_uuid for e in listed["EventSourceMappings"])
lam.delete_event_source_mapping(UUID=esm_uuid)
def test_lambda_esm_sqs_failure_respects_visibility_timeout(lam, sqs):
"""On Lambda failure, the message should remain in-flight until VisibilityTimeout expires."""
import io
import zipfile as zf
for fn in ("esm-fail-func",):
try:
lam.delete_function(FunctionName=fn)
except Exception:
pass
code = b"def handler(event, context):\n raise Exception('boom')\n"
buf = io.BytesIO()
with zf.ZipFile(buf, "w") as z:
z.writestr("index.py", code)
lam.create_function(
FunctionName="esm-fail-func",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/test-role",
Handler="index.handler",
Code={"ZipFile": buf.getvalue()},
Timeout=3,
)
q_url = sqs.create_queue(
QueueName="esm-fail-queue",
Attributes={"VisibilityTimeout": "30"},
)["QueueUrl"]
q_arn = sqs.get_queue_attributes(QueueUrl=q_url, AttributeNames=["QueueArn"])["Attributes"]["QueueArn"]
resp = lam.create_event_source_mapping(
EventSourceArn=q_arn,
FunctionName="esm-fail-func",
BatchSize=1,
Enabled=True,
)
esm_uuid = resp["UUID"]
sqs.send_message(QueueUrl=q_url, MessageBody="trigger-failure")
# Wait until ESM has actually processed (and failed) the message
for _ in range(40):
time.sleep(0.5)
cur = lam.get_event_source_mapping(UUID=esm_uuid)
if cur.get("LastProcessingResult") == "FAILED":
break
else:
pytest.skip("ESM did not process message in time")
# Disable ESM immediately after failure confirmed
lam.update_event_source_mapping(UUID=esm_uuid, Enabled=False)
# Message should be invisible (VisibilityTimeout=30s, and ESM just received it)
msgs = sqs.receive_message(QueueUrl=q_url, MaxNumberOfMessages=1, WaitTimeSeconds=0)
assert not msgs.get("Messages"), "Message should be invisible during VisibilityTimeout after failed ESM invoke"
lam.delete_event_source_mapping(UUID=esm_uuid)
def test_lambda_esm_sqs_report_batch_item_failures(lam, sqs):
"""ReportBatchItemFailures: failed messages stay on queue and reach DLQ."""
for fn in ("esm-partial-func",):
try:
lam.delete_function(FunctionName=fn)
except Exception:
pass
# Handler reports ALL messages as failed
code = (
"import json\n"
"def handler(event, context):\n"
" failures = []\n"
" for r in event.get('Records', []):\n"
" failures.append({'itemIdentifier': r['messageId']})\n"
" return {'batchItemFailures': failures}\n"
)
lam.create_function(
FunctionName="esm-partial-func",
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
)
# DLQ + main queue with maxReceiveCount=1
dlq_url = sqs.create_queue(QueueName="esm-partial-dlq")["QueueUrl"]
dlq_arn = sqs.get_queue_attributes(
QueueUrl=dlq_url, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
q_url = sqs.create_queue(
QueueName="esm-partial-queue",
Attributes={
"VisibilityTimeout": "1",
"RedrivePolicy": json.dumps({
"deadLetterTargetArn": dlq_arn,
"maxReceiveCount": "1",
}),
},
)["QueueUrl"]
q_arn = sqs.get_queue_attributes(
QueueUrl=q_url, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
esm = lam.create_event_source_mapping(
EventSourceArn=q_arn,
FunctionName="esm-partial-func",
FunctionResponseTypes=["ReportBatchItemFailures"],
BatchSize=1,
Enabled=True,
)
esm_uuid = esm["UUID"]
assert "ReportBatchItemFailures" in esm["FunctionResponseTypes"]
sqs.send_message(QueueUrl=q_url, MessageBody="partial-fail-test")
# Wait for ESM to process and message to land in DLQ
dlq_count = 0
for _ in range(30):
time.sleep(1)
attrs = sqs.get_queue_attributes(
QueueUrl=dlq_url,
AttributeNames=["ApproximateNumberOfMessages"],
)
dlq_count = int(attrs["Attributes"]["ApproximateNumberOfMessages"])
if dlq_count >= 1:
break
lam.update_event_source_mapping(UUID=esm_uuid, Enabled=False)
lam.delete_event_source_mapping(UUID=esm_uuid)
assert dlq_count >= 1, (
f"Message should have reached DLQ after partial failure, "
f"but DLQ has {dlq_count} messages"
)
def test_lambda_warm_start(lam, apigw):
"""Warm worker via API Gateway execute-api: module-level state persists across invocations."""
import urllib.request as _urlreq
import uuid as _uuid
fname = f"intg-warm-{_uuid.uuid4().hex[:8]}"
code = (
b"import time\n"
b"_boot_time = time.time()\n"
b"def handler(event, context):\n"
b" return {'statusCode': 200, 'body': str(_boot_time)}\n"
)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", code)
lam.create_function(
FunctionName=fname,
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/test-role",
Handler="index.handler",
Code={"ZipFile": buf.getvalue()},
)
api_id = apigw.create_api(Name=f"warm-api-{fname}", ProtocolType="HTTP")["ApiId"]
int_id = apigw.create_integration(
ApiId=api_id,
IntegrationType="AWS_PROXY",
IntegrationUri=f"arn:aws:lambda:us-east-1:000000000000:function:{fname}",
PayloadFormatVersion="2.0",
)["IntegrationId"]
apigw.create_route(ApiId=api_id, RouteKey="GET /ping", Target=f"integrations/{int_id}")
apigw.create_stage(ApiId=api_id, StageName="$default")
def call():
req = _urlreq.Request(
f"http://{api_id}.execute-api.localhost:{_EXECUTE_PORT}/$default/ping",
method="GET",
)
req.add_header("Host", f"{api_id}.execute-api.localhost:{_EXECUTE_PORT}")
return _urlreq.urlopen(req).read().decode()
t1 = call() # cold start — spawns worker, imports module
t2 = call() # warm — reuses worker, same module state
assert t1 == t2, f"Warm worker should reuse module state: {t1} != {t2}"
apigw.delete_api(ApiId=api_id)
lam.delete_function(FunctionName=fname)
def test_lambda_warm_invoke_with_stderr_logging(lam):
"""Warm invoke should succeed repeatedly even when the worker writes to stderr."""
fname = f"lam-warm-stderr-{_uuid_mod.uuid4().hex[:8]}"
code = (
"import sys\n"
"def handler(event, context):\n"
" print(f'log:{event.get(\"n\", 0)}')\n"
" return {'statusCode': 200, 'value': event.get('n', 0)}\n"
)
lam.create_function(
FunctionName=fname,
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
)
try:
first = lam.invoke(FunctionName=fname, Payload=json.dumps({"n": 1}))
second = lam.invoke(FunctionName=fname, Payload=json.dumps({"n": 2}))
assert first["StatusCode"] == 200
assert second["StatusCode"] == 200
assert json.loads(first["Payload"].read())["value"] == 1
assert json.loads(second["Payload"].read())["value"] == 2
finally:
lam.delete_function(FunctionName=fname)
def test_lambda_nodejs_create_and_invoke(lam):
lam.create_function(
FunctionName="lam-node-basic",
Runtime="nodejs20.x",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip_js(_NODE_CODE, "index.js")},
)
resp = lam.invoke(
FunctionName="lam-node-basic",
Payload=json.dumps({"name": "ministack"}),
)
assert resp["StatusCode"] == 200
payload = json.loads(resp["Payload"].read())
assert payload["statusCode"] == 200
body = json.loads(payload["body"])
assert body["hello"] == "ministack"
def test_lambda_nodejs22_runtime(lam):
lam.create_function(
FunctionName="lam-node22",
Runtime="nodejs22.x",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip_js(_NODE_CODE, "index.js")},
)
resp = lam.invoke(FunctionName="lam-node22", Payload=json.dumps({"name": "v22"}))
assert resp["StatusCode"] == 200
payload = json.loads(resp["Payload"].read())
assert payload["statusCode"] == 200
def test_lambda_nodejs_update_code(lam):
v2 = (
"exports.handler = async (event) => {"
" return { statusCode: 200, body: 'v2' }; };"
)
lam.update_function_code(
FunctionName="lam-node-basic",
ZipFile=_make_zip_js(v2, "index.js"),
)
resp = lam.invoke(FunctionName="lam-node-basic", Payload=b"{}")
assert resp["StatusCode"] == 200
payload = json.loads(resp["Payload"].read())
assert payload["body"] == "v2"
def test_lambda_create_from_s3(lam, s3):
bucket = "lambda-code-bucket"
s3.create_bucket(Bucket=bucket)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", "def handler(event, context): return {'s3': True}")
s3.put_object(Bucket=bucket, Key="fn.zip", Body=buf.getvalue())
lam.create_function(
FunctionName="lam-s3-code",
Runtime="python3.11",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"S3Bucket": bucket, "S3Key": "fn.zip"},
)
resp = lam.invoke(FunctionName="lam-s3-code", Payload=b"{}")
assert resp["StatusCode"] == 200
assert json.loads(resp["Payload"].read())["s3"] is True
def test_lambda_update_code_from_s3(lam, s3):
bucket = "lambda-code-bucket"
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", "def handler(event, context): return {'v': 's3v2'}")
s3.put_object(Bucket=bucket, Key="fn-v2.zip", Body=buf.getvalue())
lam.update_function_code(
FunctionName="lam-s3-code",
S3Bucket=bucket,
S3Key="fn-v2.zip",
)
resp = lam.invoke(FunctionName="lam-s3-code", Payload=b"{}")
assert json.loads(resp["Payload"].read())["v"] == "s3v2"
def test_lambda_update_code_s3_missing_returns_error(lam):
from botocore.exceptions import ClientError
with pytest.raises(ClientError) as exc:
lam.update_function_code(
FunctionName="lam-s3-code",
S3Bucket="lambda-code-bucket",
S3Key="does-not-exist.zip",
)
assert exc.value.response["Error"]["Code"] == "InvalidParameterValueException"
def test_lambda_publish_version_with_create(lam):
code = "def handler(event, context): return {'ver': 1}"
try:
lam.get_function(FunctionName="lam-versioned-pub")
except Exception:
lam.create_function(
FunctionName="lam-versioned-pub",
Runtime="python3.11",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
Publish=True,
)
resp = lam.list_versions_by_function(FunctionName="lam-versioned-pub")
versions = [v["Version"] for v in resp["Versions"]]
assert any(v != "$LATEST" for v in versions)
def test_lambda_update_code_publish_version(lam):
# Ensure function exists (may have been cleaned up)
try:
lam.get_function(FunctionName="lam-versioned")
except Exception:
lam.create_function(
FunctionName="lam-versioned",
Runtime="python3.11",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip("def handler(event, context): return {'ver': 1}")},
Publish=True,
)
v2 = "def handler(event, context): return {'ver': 2}"
lam.update_function_code(
FunctionName="lam-versioned",
ZipFile=_make_zip(v2),
Publish=True,
)
resp = lam.list_versions_by_function(FunctionName="lam-versioned")
versions = [v["Version"] for v in resp["Versions"] if v["Version"] != "$LATEST"]
assert len(versions) >= 1
def test_lambda_nodejs_promise_handler(lam):
code = (
"exports.handler = (event) => Promise.resolve({ promise: true, val: event.x });"
)
lam.create_function(
FunctionName="lam-node-promise",
Runtime="nodejs20.x",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip_js(code, "index.js")},
)
resp = lam.invoke(FunctionName="lam-node-promise", Payload=json.dumps({"x": 42}))
payload = json.loads(resp["Payload"].read())
assert payload["promise"] is True
assert payload["val"] == 42
def test_lambda_nodejs_callback_handler(lam):
code = (
"exports.handler = (event, context, cb) => cb(null, { cb: true, val: event.y });"
)
lam.create_function(
FunctionName="lam-node-cb",
Runtime="nodejs20.x",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip_js(code, "index.js")},
)
resp = lam.invoke(FunctionName="lam-node-cb", Payload=json.dumps({"y": 7}))
payload = json.loads(resp["Payload"].read())
assert payload["cb"] is True
assert payload["val"] == 7
def test_lambda_nodejs_env_vars_at_spawn(lam):
"""Lambda env vars are available at process startup (NODE_OPTIONS, etc.)."""
code = (
"exports.handler = async (event) => ({"
" myVar: process.env.MY_CUSTOM_VAR,"
" region: process.env.AWS_REGION"
"});"
)
lam.create_function(
FunctionName="lam-node-env-spawn",
Runtime="nodejs20.x",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip_js(code, "index.js")},
Environment={"Variables": {"MY_CUSTOM_VAR": "from-spawn"}},
)
resp = lam.invoke(FunctionName="lam-node-env-spawn", Payload=b"{}")
payload = json.loads(resp["Payload"].read())
assert payload["myVar"] == "from-spawn"
def test_lambda_python_env_vars_at_spawn(lam):
"""Python Lambda env vars are available at process startup."""
code = (
"import os\n"
"def handler(event, context):\n"
" return {'myVar': os.environ.get('MY_PY_VAR', 'missing')}\n"
)
lam.create_function(
FunctionName="lam-py-env-spawn",
Runtime="python3.11",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
Environment={"Variables": {"MY_PY_VAR": "from-spawn-py"}},
)
resp = lam.invoke(FunctionName="lam-py-env-spawn", Payload=b"{}")
payload = json.loads(resp["Payload"].read())
assert payload["myVar"] == "from-spawn-py"
def test_lambda_endpoint_url_not_overridden_by_function_env(lam):
"""AWS_ENDPOINT_URL from function env vars must not override the
process-level value. When MiniStack runs in Docker, the host-mapped
port (e.g. 4568) is unreachable from inside the container — the
Lambda binary must always use MiniStack's internal endpoint.
This test verifies that the MiniStack server's AWS_ENDPOINT_URL takes
precedence over function-level env vars. It requires the server to
have AWS_ENDPOINT_URL set (as it does when running via docker-compose).
"""
# Verify the MiniStack server has AWS_ENDPOINT_URL set by checking
# a baseline Lambda. If the server doesn't have it, the override
# logic has nothing to restore and this test is not meaningful.
probe_code = (
"import os\n"
"def handler(event, context):\n"
" return {'endpoint': os.environ.get('AWS_ENDPOINT_URL', '')}\n"
)
probe_name = f"lam-endpoint-probe-{_uuid_mod.uuid4().hex[:8]}"
lam.create_function(
FunctionName=probe_name,
Runtime="python3.11",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(probe_code)},
)
resp = lam.invoke(FunctionName=probe_name, Payload=b"{}")
server_endpoint = json.loads(resp["Payload"].read()).get("endpoint", "")
if not server_endpoint:
pytest.skip("MiniStack server does not have AWS_ENDPOINT_URL set "
"(run with docker-compose to test endpoint override)")
# Now test with a function that sets a conflicting AWS_ENDPOINT_URL.
code = (
"import os\n"
"def handler(event, context):\n"
" return {'endpoint': os.environ.get('AWS_ENDPOINT_URL', 'unset')}\n"
)
fname = f"lam-endpoint-override-{_uuid_mod.uuid4().hex[:8]}"
lam.create_function(
FunctionName=fname,
Runtime="python3.11",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
Environment={"Variables": {
"AWS_ENDPOINT_URL": "http://should-be-overridden:9999",
}},
)
resp = lam.invoke(FunctionName=fname, Payload=b"{}")
payload = json.loads(resp["Payload"].read())
# The Lambda must see the server's endpoint, not the function env var.
assert payload["endpoint"] != "http://should-be-overridden:9999", (
"Function-level AWS_ENDPOINT_URL must not override internal endpoint"
)
assert payload["endpoint"] == server_endpoint
def test_lambda_dynamodb_stream_esm(lam, ddb):
# Create table with streams enabled
ddb.create_table(
TableName="stream-test-table",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"},
)
stream_arn = ddb.describe_table(TableName="stream-test-table")["Table"]["LatestStreamArn"]
# Create Lambda that captures stream records
code = "def handler(event, context): return len(event['Records'])"
lam.create_function(
FunctionName="lam-ddb-stream",
Runtime="python3.11",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
)
esm = lam.create_event_source_mapping(
FunctionName="lam-ddb-stream",
EventSourceArn=stream_arn,
StartingPosition="TRIM_HORIZON",
BatchSize=10,
)
assert esm["EventSourceArn"] == stream_arn
assert esm["FunctionArn"].endswith("lam-ddb-stream")
# Verify ESM is registered and retrievable
esm_resp = lam.get_event_source_mapping(UUID=esm["UUID"])
assert esm_resp["EventSourceArn"] == stream_arn
assert esm_resp["StartingPosition"] == "TRIM_HORIZON"
# Write items — stream should capture them
ddb.put_item(TableName="stream-test-table", Item={"pk": {"S": "k1"}, "val": {"S": "v1"}})
ddb.put_item(TableName="stream-test-table", Item={"pk": {"S": "k2"}, "val": {"S": "v2"}})
ddb.delete_item(TableName="stream-test-table", Key={"pk": {"S": "k1"}})
# Verify table still has expected state
scan = ddb.scan(TableName="stream-test-table")
pks = [item["pk"]["S"] for item in scan["Items"]]
assert "k2" in pks
assert "k1" not in pks
def test_lambda_function_url_config(lam):
"""CreateFunctionUrlConfig / Get / Update / Delete / List lifecycle."""
import uuid as _uuid_mod
fn = f"intg-url-cfg-{_uuid_mod.uuid4().hex[:8]}"
lam.create_function(
FunctionName=fn,
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(_LAMBDA_CODE)},
)
# Create
resp = lam.create_function_url_config(FunctionName=fn, AuthType="NONE")
assert resp["AuthType"] == "NONE"
assert "FunctionUrl" in resp
url = resp["FunctionUrl"]
# Get
got = lam.get_function_url_config(FunctionName=fn)
assert got["FunctionUrl"] == url
# Update
updated = lam.update_function_url_config(
FunctionName=fn,
AuthType="AWS_IAM",
Cors={"AllowOrigins": ["*"]},
)
assert updated["AuthType"] == "AWS_IAM"
assert updated["Cors"]["AllowOrigins"] == ["*"]
# List
listed = lam.list_function_url_configs(FunctionName=fn)
assert any(c["FunctionUrl"] == url for c in listed["FunctionUrlConfigs"])
# Delete
lam.delete_function_url_config(FunctionName=fn)
with pytest.raises(ClientError) as exc:
lam.get_function_url_config(FunctionName=fn)
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_lambda_unknown_path_returns_404(lam):
"""Requests to an unrecognised Lambda path must return 404, not 400 InvalidRequest."""
import urllib.error
import urllib.request
endpoint = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566")
req = urllib.request.Request(
f"{endpoint}/2015-03-31/functions/nonexistent-fn/completely-unknown-subpath",
headers={"Authorization": "AWS4-HMAC-SHA256 Credential=test/20260101/us-east-1/lambda/aws4_request"},
method="GET",
)
try:
urllib.request.urlopen(req)
assert False, "Expected an error response"
except urllib.error.HTTPError as e:
assert e.code == 404
def test_lambda_reset_terminates_workers(lam):
"""/_ministack/reset must cleanly terminate warm Lambda workers."""
import urllib.request
fn = f"intg-reset-worker-{__import__('uuid').uuid4().hex[:8]}"
code = "import time\n_boot = time.time()\ndef handler(event, context):\n return {'boot': _boot}\n"
lam.create_function(
FunctionName=fn,
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
)
# Warm the worker
r1 = lam.invoke(FunctionName=fn, Payload=b"{}")
boot1 = json.loads(r1["Payload"].read())["boot"]
# Reset — must terminate worker without error
endpoint = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566")
req = urllib.request.Request(f"{endpoint}/_ministack/reset", data=b"", method="POST")
for _attempt in range(3):
try:
urllib.request.urlopen(req, timeout=15)
break
except Exception:
if _attempt == 2:
raise
# Re-create and invoke — new worker means new boot time
lam.create_function(
FunctionName=fn,
Runtime="python3.12",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
)
r2 = lam.invoke(FunctionName=fn, Payload=b"{}")
boot2 = json.loads(r2["Payload"].read())["boot"]
assert boot2 > boot1, "Worker should have been reset — new boot time expected"
def test_lambda_alias_crud(lam):
"""CreateAlias, GetAlias, UpdateAlias, DeleteAlias."""
code = _zip_lambda("def handler(e,c): return {'v': 1}")
lam.create_function(
FunctionName="qa-lam-alias",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/r",
Handler="index.handler",
Code={"ZipFile": code},
)
lam.publish_version(FunctionName="qa-lam-alias")
lam.create_alias(
FunctionName="qa-lam-alias",
Name="prod",
FunctionVersion="1",
Description="production alias",
)
alias = lam.get_alias(FunctionName="qa-lam-alias", Name="prod")
assert alias["Name"] == "prod"
assert alias["FunctionVersion"] == "1"
lam.update_alias(FunctionName="qa-lam-alias", Name="prod", Description="updated")
alias2 = lam.get_alias(FunctionName="qa-lam-alias", Name="prod")
assert alias2["Description"] == "updated"
aliases = lam.list_aliases(FunctionName="qa-lam-alias")["Aliases"]
assert any(a["Name"] == "prod" for a in aliases)
lam.delete_alias(FunctionName="qa-lam-alias", Name="prod")
aliases2 = lam.list_aliases(FunctionName="qa-lam-alias")["Aliases"]
assert not any(a["Name"] == "prod" for a in aliases2)
def test_lambda_publish_version_snapshot(lam):
"""PublishVersion creates a numbered version snapshot."""
code = _zip_lambda("def handler(e,c): return 'v1'")
lam.create_function(
FunctionName="qa-lam-version",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/r",
Handler="index.handler",
Code={"ZipFile": code},
)
ver = lam.publish_version(FunctionName="qa-lam-version")
assert ver["Version"] == "1"
versions = lam.list_versions_by_function(FunctionName="qa-lam-version")["Versions"]
version_nums = [v["Version"] for v in versions]
assert "1" in version_nums
assert "$LATEST" in version_nums
def test_lambda_function_concurrency(lam):
"""PutFunctionConcurrency / GetFunctionConcurrency / DeleteFunctionConcurrency."""
code = _zip_lambda("def handler(e,c): return {}")
lam.create_function(
FunctionName="qa-lam-concurrency",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/r",
Handler="index.handler",
Code={"ZipFile": code},
)
lam.put_function_concurrency(
FunctionName="qa-lam-concurrency",
ReservedConcurrentExecutions=5,
)
resp = lam.get_function_concurrency(FunctionName="qa-lam-concurrency")
assert resp["ReservedConcurrentExecutions"] == 5
lam.delete_function_concurrency(FunctionName="qa-lam-concurrency")
resp2 = lam.get_function_concurrency(FunctionName="qa-lam-concurrency")
assert resp2.get("ReservedConcurrentExecutions") is None
def test_lambda_add_remove_permission(lam):
"""AddPermission / RemovePermission / GetPolicy."""
code = _zip_lambda("def handler(e,c): return {}")
lam.create_function(
FunctionName="qa-lam-policy",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/r",
Handler="index.handler",
Code={"ZipFile": code},
)
lam.add_permission(
FunctionName="qa-lam-policy",
StatementId="allow-s3",
Action="lambda:InvokeFunction",
Principal="s3.amazonaws.com",
)
policy = json.loads(lam.get_policy(FunctionName="qa-lam-policy")["Policy"])
assert any(s["Sid"] == "allow-s3" for s in policy["Statement"])
lam.remove_permission(FunctionName="qa-lam-policy", StatementId="allow-s3")
policy2 = json.loads(lam.get_policy(FunctionName="qa-lam-policy")["Policy"])
assert not any(s["Sid"] == "allow-s3" for s in policy2["Statement"])
def test_lambda_list_functions_pagination(lam):
"""ListFunctions pagination with Marker works correctly."""
for i in range(5):
code = _zip_lambda("def handler(e,c): return {}")
try:
lam.create_function(
FunctionName=f"qa-lam-page-{i}",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/r",
Handler="index.handler",
Code={"ZipFile": code},
)
except ClientError:
pass
resp1 = lam.list_functions(MaxItems=2)
assert len(resp1["Functions"]) <= 2
if "NextMarker" in resp1:
resp2 = lam.list_functions(MaxItems=2, Marker=resp1["NextMarker"])
names1 = {f["FunctionName"] for f in resp1["Functions"]}
names2 = {f["FunctionName"] for f in resp2["Functions"]}
assert not names1 & names2
def test_lambda_invoke_event_type_returns_202(lam):
"""Invoke with InvocationType=Event returns 202 immediately."""
code = _zip_lambda("def handler(e,c): return {}")
try:
lam.create_function(
FunctionName="qa-lam-event-invoke",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/r",
Handler="index.handler",
Code={"ZipFile": code},
)
except ClientError:
pass
resp = lam.invoke(
FunctionName="qa-lam-event-invoke",
InvocationType="Event",
Payload=json.dumps({}),
)
assert resp["StatusCode"] == 202
def test_lambda_invoke_dry_run_returns_204(lam):
"""Invoke with InvocationType=DryRun returns 204."""
code = _zip_lambda("def handler(e,c): return {}")
try:
lam.create_function(
FunctionName="qa-lam-dryrun",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/r",
Handler="index.handler",
Code={"ZipFile": code},
)
except ClientError:
pass
resp = lam.invoke(
FunctionName="qa-lam-dryrun",
InvocationType="DryRun",
Payload=json.dumps({}),
)
assert resp["StatusCode"] == 204
def test_lambda_layer_publish(lam):
import base64, zipfile, io
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("layer.py", "# layer")
zip_bytes = buf.getvalue()
resp = lam.publish_layer_version(
LayerName="my-test-layer",
Description="Test layer",
Content={"ZipFile": zip_bytes},
CompatibleRuntimes=["python3.12"],
)
assert resp["Version"] == 1
assert "my-test-layer" in resp["LayerVersionArn"]
def test_lambda_layer_publish_from_s3(lam, s3):
"""PublishLayerVersion with S3Bucket/S3Key. Contributed by @Baptiste-Garcin (#356)."""
import zipfile, io
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("s3layer.py", "# layer from s3")
zip_bytes = buf.getvalue()
bucket = "layer-bucket"
key = "layers/my-layer.zip"
s3.create_bucket(Bucket=bucket)
s3.put_object(Bucket=bucket, Key=key, Body=zip_bytes)
resp = lam.publish_layer_version(
LayerName="s3-layer",
Description="Layer from S3",
Content={"S3Bucket": bucket, "S3Key": key},
CompatibleRuntimes=["python3.12"],
)
assert resp["Version"] == 1
assert "s3-layer" in resp["LayerVersionArn"]
assert resp["Content"]["CodeSize"] == len(zip_bytes)
assert resp["Content"]["CodeSha256"]
def test_lambda_layer_get_version(lam):
resp = lam.get_layer_version(LayerName="my-test-layer", VersionNumber=1)
assert resp["Version"] == 1
assert resp["Description"] == "Test layer"
def test_lambda_layer_list_versions(lam):
resp = lam.list_layer_versions(LayerName="my-test-layer")
assert len(resp["LayerVersions"]) >= 1
assert resp["LayerVersions"][0]["Version"] == 1
def test_lambda_layer_list_layers(lam):
resp = lam.list_layers()
names = [l["LayerName"] for l in resp["Layers"]]
assert "my-test-layer" in names
def test_lambda_layer_delete_version(lam):
import base64, zipfile, io
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("tmp.py", "")
lam.publish_layer_version(LayerName="delete-layer-test", Content={"ZipFile": buf.getvalue()})
lam.delete_layer_version(LayerName="delete-layer-test", VersionNumber=1)
resp = lam.list_layer_versions(LayerName="delete-layer-test")
assert len(resp["LayerVersions"]) == 0
def test_lambda_function_with_layer(lam):
# Publish layer
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("layer.py", "")
layer_resp = lam.publish_layer_version(LayerName="fn-layer", Content={"ZipFile": buf.getvalue()})
layer_arn = layer_resp["LayerVersionArn"]
# Create function using the layer
fn_zip = io.BytesIO()
with zipfile.ZipFile(fn_zip, "w") as z:
z.writestr("index.py", "def handler(e, c): return {}")
lam.create_function(
FunctionName="fn-with-layer",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/test",
Handler="index.handler",
Code={"ZipFile": fn_zip.getvalue()},
Layers=[layer_arn],
)
fn = lam.get_function(FunctionName="fn-with-layer")
assert layer_arn in fn["Configuration"]["Layers"][0]["Arn"]
def test_lambda_layer_content_location(lam):
"""Content.Location should be a non-empty URL pointing to the layer zip."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("mod.py", "X=1")
resp = lam.publish_layer_version(
LayerName="loc-layer",
Content={"ZipFile": buf.getvalue()},
)
assert resp["Content"]["Location"]
assert "loc-layer" in resp["Content"]["Location"]
# Verify the URL actually serves zip data
import urllib.request
data = urllib.request.urlopen(resp["Content"]["Location"]).read()
assert len(data) == resp["Content"]["CodeSize"]
def test_lambda_layer_pagination(lam):
"""Publish 3 versions, paginate with MaxItems=1."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("p.py", "")
for _ in range(3):
lam.publish_layer_version(LayerName="page-layer", Content={"ZipFile": buf.getvalue()})
# List with MaxItems=1 (newest first)
resp = lam.list_layer_versions(LayerName="page-layer", MaxItems=1)
assert len(resp["LayerVersions"]) == 1
assert "NextMarker" in resp
def test_lambda_layer_list_filter_runtime(lam):
"""Filter list_layer_versions by CompatibleRuntime."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("r.py", "")
lam.publish_layer_version(
LayerName="rt-filter-layer",
Content={"ZipFile": buf.getvalue()},
CompatibleRuntimes=["python3.12"],
)
lam.publish_layer_version(
LayerName="rt-filter-layer",
Content={"ZipFile": buf.getvalue()},
CompatibleRuntimes=["nodejs18.x"],
)
resp = lam.list_layer_versions(
LayerName="rt-filter-layer",
CompatibleRuntime="python3.12",
)
assert all("python3.12" in v["CompatibleRuntimes"] for v in resp["LayerVersions"])
def test_lambda_layer_list_filter_architecture(lam):
"""Filter list_layer_versions by CompatibleArchitecture."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("a.py", "")
lam.publish_layer_version(
LayerName="arch-filter-layer",
Content={"ZipFile": buf.getvalue()},
CompatibleArchitectures=["x86_64"],
)
lam.publish_layer_version(
LayerName="arch-filter-layer",
Content={"ZipFile": buf.getvalue()},
CompatibleArchitectures=["arm64"],
)
resp = lam.list_layer_versions(
LayerName="arch-filter-layer",
CompatibleArchitecture="x86_64",
)
assert all("x86_64" in v["CompatibleArchitectures"] for v in resp["LayerVersions"])
def test_lambda_layer_list_layers_pagination(lam):
"""Multiple layers, paginate ListLayers."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("x.py", "")
for i in range(3):
lam.publish_layer_version(
LayerName=f"ll-page-{i}",
Content={"ZipFile": buf.getvalue()},
)
resp = lam.list_layers(MaxItems=1)
assert len(resp["Layers"]) == 1
assert "NextMarker" in resp
def test_lambda_layer_list_layers_filter_runtime(lam):
"""ListLayers filtered by CompatibleRuntime."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("f.py", "")
lam.publish_layer_version(
LayerName="ll-rt-py",
Content={"ZipFile": buf.getvalue()},
CompatibleRuntimes=["python3.12"],
)
lam.publish_layer_version(
LayerName="ll-rt-node",
Content={"ZipFile": buf.getvalue()},
CompatibleRuntimes=["nodejs18.x"],
)
resp = lam.list_layers(CompatibleRuntime="python3.12")
names = [l["LayerName"] for l in resp["Layers"]]
assert "ll-rt-py" in names
assert "ll-rt-node" not in names
def test_lambda_layer_get_version_not_found(lam):
"""Getting a nonexistent layer should raise 404."""
from botocore.exceptions import ClientError
with pytest.raises(ClientError) as exc:
lam.get_layer_version(LayerName="no-such-layer-xyz", VersionNumber=1)
assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404
def test_lambda_layer_get_version_by_arn(lam):
"""GetLayerVersionByArn resolves by full ARN."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("ba.py", "")
pub = lam.publish_layer_version(
LayerName="by-arn-layer",
Content={"ZipFile": buf.getvalue()},
)
arn = pub["LayerVersionArn"]
resp = lam.get_layer_version_by_arn(Arn=arn)
assert resp["LayerVersionArn"] == arn
assert resp["Version"] == pub["Version"]
def test_lambda_layer_version_permission_add(lam):
"""Add a layer version permission and verify response."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("perm.py", "")
pub = lam.publish_layer_version(
LayerName="perm-layer",
Content={"ZipFile": buf.getvalue()},
)
resp = lam.add_layer_version_permission(
LayerName="perm-layer",
VersionNumber=pub["Version"],
StatementId="allow-all",
Action="lambda:GetLayerVersion",
Principal="*",
)
assert "Statement" in resp
import json
stmt = json.loads(resp["Statement"])
assert stmt["Sid"] == "allow-all"
assert stmt["Action"] == "lambda:GetLayerVersion"
def test_lambda_layer_version_permission_get_policy(lam):
"""Get policy after adding a permission."""
import json
resp = lam.get_layer_version_policy(LayerName="perm-layer", VersionNumber=1)
policy = json.loads(resp["Policy"])
assert len(policy["Statement"]) >= 1
assert policy["Statement"][0]["Sid"] == "allow-all"
def test_lambda_layer_version_permission_remove(lam):
"""Remove a layer version permission."""
lam.remove_layer_version_permission(
LayerName="perm-layer",
VersionNumber=1,
StatementId="allow-all",
)
from botocore.exceptions import ClientError
with pytest.raises(ClientError) as exc:
lam.get_layer_version_policy(LayerName="perm-layer", VersionNumber=1)
assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404
def test_lambda_layer_version_permission_duplicate_sid(lam):
"""Adding a duplicate StatementId should raise conflict."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("dup.py", "")
pub = lam.publish_layer_version(
LayerName="dup-sid-layer",
Content={"ZipFile": buf.getvalue()},
)
lam.add_layer_version_permission(
LayerName="dup-sid-layer",
VersionNumber=pub["Version"],
StatementId="s1",
Action="lambda:GetLayerVersion",
Principal="*",
)
from botocore.exceptions import ClientError
with pytest.raises(ClientError) as exc:
lam.add_layer_version_permission(
LayerName="dup-sid-layer",
VersionNumber=pub["Version"],
StatementId="s1",
Action="lambda:GetLayerVersion",
Principal="*",
)
assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 409
def test_lambda_layer_version_permission_invalid_action(lam):
"""Only lambda:GetLayerVersion is a valid action."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("inv.py", "")
pub = lam.publish_layer_version(
LayerName="inv-act-layer",
Content={"ZipFile": buf.getvalue()},
)
from botocore.exceptions import ClientError
with pytest.raises(ClientError) as exc:
lam.add_layer_version_permission(
LayerName="inv-act-layer",
VersionNumber=pub["Version"],
StatementId="s1",
Action="lambda:InvokeFunction",
Principal="*",
)
assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] in (400, 403)
def test_lambda_layer_delete_idempotent(lam):
"""Deleting a nonexistent version should not error."""
lam.delete_layer_version(LayerName="no-such-layer-del", VersionNumber=999)
def test_lambda_warm_worker_invalidation(lam):
"""Create function with code v1, invoke, update code to v2, invoke again — must see v2."""
import io as _io
import zipfile as _zf
fname = "lambda-worker-invalidation-test"
try:
lam.delete_function(FunctionName=fname)
except Exception:
pass
# v1 code
code_v1 = b'def handler(event, context):\n return {"version": 1}\n'
buf1 = _io.BytesIO()
with _zf.ZipFile(buf1, "w") as z:
z.writestr("index.py", code_v1)
lam.create_function(
FunctionName=fname,
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/test-role",
Handler="index.handler",
Code={"ZipFile": buf1.getvalue()},
)
# Invoke v1
resp1 = lam.invoke(FunctionName=fname, Payload=json.dumps({}))
payload1 = json.loads(resp1["Payload"].read())
assert payload1["version"] == 1
# Update to v2
code_v2 = b'def handler(event, context):\n return {"version": 2}\n'
buf2 = _io.BytesIO()
with _zf.ZipFile(buf2, "w") as z:
z.writestr("index.py", code_v2)
lam.update_function_code(FunctionName=fname, ZipFile=buf2.getvalue())
# Invoke v2
resp2 = lam.invoke(FunctionName=fname, Payload=json.dumps({}))
payload2 = json.loads(resp2["Payload"].read())
assert payload2["version"] == 2
def test_lambda_event_invoke_config_crud(lam):
"""Put/Get/Delete EventInvokeConfig lifecycle."""
code = "def handler(e,c): return {}"
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", code)
lam.create_function(
FunctionName="eic-fn", Runtime="python3.11",
Role=_LAMBDA_ROLE, Handler="index.handler",
Code={"ZipFile": buf.getvalue()},
)
lam.put_function_event_invoke_config(
FunctionName="eic-fn",
MaximumRetryAttempts=1,
MaximumEventAgeInSeconds=300,
)
cfg = lam.get_function_event_invoke_config(FunctionName="eic-fn")
assert cfg["MaximumRetryAttempts"] == 1
assert cfg["MaximumEventAgeInSeconds"] == 300
lam.delete_function_event_invoke_config(FunctionName="eic-fn")
from botocore.exceptions import ClientError
with pytest.raises(ClientError):
lam.get_function_event_invoke_config(FunctionName="eic-fn")
lam.delete_function(FunctionName="eic-fn")
def test_lambda_provisioned_concurrency_crud(lam):
"""Put/Get/Delete ProvisionedConcurrencyConfig lifecycle."""
code = "def handler(e,c): return {}"
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", code)
lam.create_function(
FunctionName="pc-fn", Runtime="python3.11",
Role=_LAMBDA_ROLE, Handler="index.handler",
Code={"ZipFile": buf.getvalue()},
Publish=True,
)
versions = lam.list_versions_by_function(FunctionName="pc-fn")["Versions"]
ver = [v for v in versions if v["Version"] != "$LATEST"][0]["Version"]
lam.put_provisioned_concurrency_config(
FunctionName="pc-fn",
Qualifier=ver,
ProvisionedConcurrentExecutions=5,
)
cfg = lam.get_provisioned_concurrency_config(
FunctionName="pc-fn", Qualifier=ver,
)
assert cfg["RequestedProvisionedConcurrentExecutions"] == 5
lam.delete_provisioned_concurrency_config(
FunctionName="pc-fn", Qualifier=ver,
)
from botocore.exceptions import ClientError
with pytest.raises(ClientError):
lam.get_provisioned_concurrency_config(FunctionName="pc-fn", Qualifier=ver)
lam.delete_function(FunctionName="pc-fn")
def test_lambda_image_create_invoke(lam):
"""CreateFunction with PackageType Image + GetFunction returns ImageUri."""
lam.create_function(
FunctionName="img-test-v39",
PackageType="Image",
Code={"ImageUri": "my-repo/my-image:latest"},
Role="arn:aws:iam::000000000000:role/test",
Timeout=30,
)
desc = lam.get_function(FunctionName="img-test-v39")
assert desc["Configuration"]["PackageType"] == "Image"
assert desc["Code"]["RepositoryType"] == "ECR"
assert desc["Code"]["ImageUri"] == "my-repo/my-image:latest"
lam.delete_function(FunctionName="img-test-v39")
def test_lambda_update_code_image_uri(lam):
"""UpdateFunctionCode with ImageUri updates the image."""
lam.create_function(
FunctionName="img-update-v39",
PackageType="Image",
Code={"ImageUri": "my-repo/my-image:v1"},
Role="arn:aws:iam::000000000000:role/test",
)
lam.update_function_code(FunctionName="img-update-v39", ImageUri="my-repo/my-image:v2")
desc = lam.get_function(FunctionName="img-update-v39")
assert desc["Code"]["ImageUri"] == "my-repo/my-image:v2"
lam.delete_function(FunctionName="img-update-v39")
def test_lambda_provided_runtime_create(lam):
"""CreateFunction with provided.al2023 runtime accepts bootstrap handler."""
import zipfile, io
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("bootstrap", "#!/bin/sh\necho ok\n")
lam.create_function(
FunctionName="provided-test-v39",
Runtime="provided.al2023",
Handler="bootstrap",
Code={"ZipFile": buf.getvalue()},
Role="arn:aws:iam::000000000000:role/test",
)
desc = lam.get_function_configuration(FunctionName="provided-test-v39")
assert desc["Runtime"] == "provided.al2023"
assert desc["Handler"] == "bootstrap"
lam.delete_function(FunctionName="provided-test-v39")
@pytest.mark.skipif(
os.environ.get("LAMBDA_EXECUTOR", "").lower() != "docker",
reason="requires LAMBDA_EXECUTOR=docker and Docker daemon",
)
def test_lambda_provided_runtime_docker_invoke(lam):
"""Invoke a provided.al2023 Lambda via the Docker executor.
Uses a shell-script bootstrap that implements the Lambda Runtime API
(GET /invocation/next, POST /invocation/{id}/response).
"""
# Shell bootstrap implementing the Lambda Runtime API protocol.
# Must loop: the RIE expects the bootstrap to poll for invocations.
bootstrap_script = (
"#!/bin/sh\n"
'RUNTIME_API="${AWS_LAMBDA_RUNTIME_API}"\n'
"while true; do\n"
' RESP=$(curl -s -D /tmp/headers '
'"http://${RUNTIME_API}/2018-06-01/runtime/invocation/next")\n'
' REQUEST_ID=$(grep -i "Lambda-Runtime-Aws-Request-Id" /tmp/headers '
'| tr -d "\\r" | cut -d" " -f2)\n'
' curl -s -X POST '
'"http://${RUNTIME_API}/2018-06-01/runtime/invocation/${REQUEST_ID}/response" '
"-d '{\"statusCode\":200,\"body\":\"hello from provided\"}'\n"
"done\n"
)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
info = zipfile.ZipInfo("bootstrap")
info.external_attr = 0o755 << 16 # executable
zf.writestr(info, bootstrap_script)
func_name = f"provided-docker-test-{_uuid_mod.uuid4().hex[:8]}"
lam.create_function(
FunctionName=func_name,
Runtime="provided.al2023",
Handler="bootstrap",
Code={"ZipFile": buf.getvalue()},
Role="arn:aws:iam::000000000000:role/test",
Timeout=30,
)
try:
resp = lam.invoke(FunctionName=func_name, Payload=json.dumps({"key": "value"}))
payload = json.loads(resp["Payload"].read())
assert payload["statusCode"] == 200
assert payload["body"] == "hello from provided"
finally:
lam.delete_function(FunctionName=func_name)
def test_apigwv2_nodejs_lambda_proxy(lam, apigw):
"""API Gateway v2 HTTP API should invoke Node.js Lambda via warm worker, not return mock."""
import urllib.request as _urlreq
import uuid as _uuid
from botocore.exceptions import ClientError
fname = f"apigwv2-node-{_uuid.uuid4().hex[:8]}"
api_id = None
code = (
"exports.handler = async (event) => ({"
" statusCode: 200,"
" body: JSON.stringify({ route: event.routeKey, method: event.requestContext.http.method })"
"});"
)
try:
lam.create_function(
FunctionName=fname,
Runtime="nodejs20.x",
Role="arn:aws:iam::000000000000:role/test-role",
Handler="index.handler",
Code={"ZipFile": _make_zip_js(code, "index.js")},
)
api_id = apigw.create_api(Name=f"v2-node-{fname}", ProtocolType="HTTP")["ApiId"]
int_id = apigw.create_integration(
ApiId=api_id,
IntegrationType="AWS_PROXY",
IntegrationUri=f"arn:aws:lambda:us-east-1:000000000000:function:{fname}",
PayloadFormatVersion="2.0",
)["IntegrationId"]
apigw.create_route(ApiId=api_id, RouteKey="GET /test", Target=f"integrations/{int_id}")
apigw.create_stage(ApiId=api_id, StageName="$default")
req = _urlreq.Request(
f"http://{api_id}.execute-api.localhost:{_EXECUTE_PORT}/$default/test",
method="GET",
)
req.add_header("Host", f"{api_id}.execute-api.localhost:{_EXECUTE_PORT}")
resp = _urlreq.urlopen(req).read().decode()
body = json.loads(resp)
assert body.get("route") == "GET /test", f"Expected handler result, got: {resp}"
assert body.get("method") == "GET"
finally:
if api_id is not None:
try:
apigw.delete_api(ApiId=api_id)
except ClientError:
pass
try:
lam.delete_function(FunctionName=fname)
except ClientError:
pass
def test_lambda_nodejs_esm_mjs_handler(lam):
"""Node.js .mjs (ESM) handlers should be loaded via dynamic import() fallback.
Creates a ZIP with two .mjs files:
- utils.mjs: exports a helper function using ESM `export` syntax
- index.mjs: imports utils.mjs via ESM `import` statement and uses it
This verifies that:
1. .mjs files are loaded via import() instead of require()
2. ESM import/export syntax works between modules
3. The handler's return value is correctly propagated
"""
fname = f"lam-esm-{_uuid_mod.uuid4().hex[:8]}"
utils_code = (
"export function greet(name) {\n"
" return `Hello, ${name} from ESM!`;\n"
"}\n"
"\n"
"export const VERSION = '1.0.0';\n"
)
handler_code = (
"import { greet, VERSION } from './utils.mjs';\n"
"\n"
"export const handler = async (event) => {\n"
" const name = event.name || 'World';\n"
" return {\n"
" statusCode: 200,\n"
" body: JSON.stringify({\n"
" message: greet(name),\n"
" version: VERSION,\n"
" esm: true,\n"
" }),\n"
" };\n"
"};\n"
)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("index.mjs", handler_code)
z.writestr("utils.mjs", utils_code)
lam.create_function(
FunctionName=fname,
Runtime="nodejs20.x",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": buf.getvalue()},
)
try:
resp = lam.invoke(
FunctionName=fname,
Payload=json.dumps({"name": "MiniStack"}),
)
assert resp["StatusCode"] == 200
assert "FunctionError" not in resp, f"Lambda error: {resp['Payload'].read().decode()}"
payload = json.loads(resp["Payload"].read())
assert payload["statusCode"] == 200
body = json.loads(payload["body"])
assert body["message"] == "Hello, MiniStack from ESM!"
assert body["version"] == "1.0.0"
assert body["esm"] is True
finally:
lam.delete_function(FunctionName=fname)
def test_lambda_warm_worker_uses_layer(lam):
"""Warm worker should extract layers and make their code available to the handler."""
# Create a layer with a Python module
layer_buf = io.BytesIO()
with zipfile.ZipFile(layer_buf, "w") as z:
z.writestr("python/myhelper.py", "LAYER_VALUE = 'from-layer'\n")
layer_resp = lam.publish_layer_version(
LayerName="warm-layer-test",
Content={"ZipFile": layer_buf.getvalue()},
CompatibleRuntimes=["python3.12"],
)
layer_arn = layer_resp["LayerVersionArn"]
# Create a function that imports from the layer
func_code = (
"import myhelper\n"
"def handler(event, context):\n"
" return {'value': myhelper.LAYER_VALUE}\n"
)
func_buf = io.BytesIO()
with zipfile.ZipFile(func_buf, "w") as z:
z.writestr("index.py", func_code)
fname = f"warm-layer-{_uuid_mod.uuid4().hex[:8]}"
lam.create_function(
FunctionName=fname,
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/test",
Handler="index.handler",
Code={"ZipFile": func_buf.getvalue()},
Layers=[layer_arn],
)
try:
resp = lam.invoke(FunctionName=fname, Payload=b"{}")
assert resp["StatusCode"] == 200
assert "FunctionError" not in resp, f"Lambda error: {resp.get('FunctionError')}"
payload = json.loads(resp["Payload"].read())
assert payload["value"] == "from-layer"
finally:
lam.delete_function(FunctionName=fname)
def test_lambda_nodejs_esm_type_module(lam):
"""Node.js ESM via package.json type:module should trigger ERR_REQUIRE_ESM fallback."""
fname = f"lam-esm-type-{_uuid_mod.uuid4().hex[:8]}"
handler_code = (
"export const handler = async (event) => ({\n"
" statusCode: 200,\n"
" body: 'type-module-works',\n"
"});\n"
)
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("index.js", handler_code)
z.writestr("package.json", '{"type": "module"}')
lam.create_function(
FunctionName=fname,
Runtime="nodejs20.x",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": buf.getvalue()},
)
try:
resp = lam.invoke(FunctionName=fname, Payload=b"{}")
assert resp["StatusCode"] == 200
assert "FunctionError" not in resp, f"Lambda error: {resp['Payload'].read().decode()}"
payload = json.loads(resp["Payload"].read())
assert payload["statusCode"] == 200
assert payload["body"] == "type-module-works"
finally:
lam.delete_function(FunctionName=fname)
def test_lambda_warm_worker_nodejs_uses_layer(lam):
"""Warm worker should extract Node.js layers and make packages available via require()."""
# Create a layer with a Node.js module under nodejs/node_modules/
layer_buf = io.BytesIO()
with zipfile.ZipFile(layer_buf, "w") as z:
z.writestr(
"nodejs/node_modules/layerhelper/index.js",
"module.exports.LAYER_VALUE = 'from-node-layer';\n",
)
layer_resp = lam.publish_layer_version(
LayerName="warm-node-layer-test",
Content={"ZipFile": layer_buf.getvalue()},
CompatibleRuntimes=["nodejs20.x"],
)
layer_arn = layer_resp["LayerVersionArn"]
# Create a Node.js function that requires the layer package
handler_code = (
"const helper = require('layerhelper');\n"
"exports.handler = async (event) => {\n"
" return { value: helper.LAYER_VALUE };\n"
"};\n"
)
func_buf = io.BytesIO()
with zipfile.ZipFile(func_buf, "w") as z:
z.writestr("index.js", handler_code)
fname = f"warm-node-layer-{_uuid_mod.uuid4().hex[:8]}"
lam.create_function(
FunctionName=fname,
Runtime="nodejs20.x",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": func_buf.getvalue()},
Layers=[layer_arn],
)
try:
resp = lam.invoke(FunctionName=fname, Payload=b"{}")
assert resp["StatusCode"] == 200
assert "FunctionError" not in resp, f"Lambda error: {resp['Payload'].read().decode()}"
payload = json.loads(resp["Payload"].read())
assert payload["value"] == "from-node-layer"
finally:
lam.delete_function(FunctionName=fname)
def test_lambda_warm_worker_nodejs_esm_uses_layer(lam):
"""ESM .mjs handler must be able to import packages from a Lambda Layer.
This is the combined case of ESM support (PR #238) and Layer extraction
(PR #236). Node.js ESM import() does not use NODE_PATH, so the runtime
symlinks layer packages into code/node_modules/ for ancestor-tree resolution.
"""
# Create a layer with a Node.js package under nodejs/node_modules/
layer_buf = io.BytesIO()
with zipfile.ZipFile(layer_buf, "w") as z:
z.writestr(
"nodejs/node_modules/esmhelper/index.js",
"module.exports.LAYER_VALUE = 'from-esm-layer';\n",
)
layer_resp = lam.publish_layer_version(
LayerName="warm-esm-layer-test",
Content={"ZipFile": layer_buf.getvalue()},
CompatibleRuntimes=["nodejs20.x"],
)
layer_arn = layer_resp["LayerVersionArn"]
# Create an ESM handler that uses native import to load the layer package.
# The layer package exports via CJS but Node.js ESM can import CJS modules.
# Native import does NOT use NODE_PATH — this is the bug we are testing.
handler_code = (
"import helper from 'esmhelper';\n"
"export const handler = async (event) => {\n"
" return { value: helper.LAYER_VALUE, esm: true };\n"
"};\n"
)
func_buf = io.BytesIO()
with zipfile.ZipFile(func_buf, "w") as z:
z.writestr("index.mjs", handler_code)
fname = f"warm-esm-layer-{_uuid_mod.uuid4().hex[:8]}"
lam.create_function(
FunctionName=fname,
Runtime="nodejs20.x",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": func_buf.getvalue()},
Layers=[layer_arn],
)
try:
resp = lam.invoke(FunctionName=fname, Payload=b"{}")
assert resp["StatusCode"] == 200
assert "FunctionError" not in resp, f"Lambda error: {resp['Payload'].read().decode()}"
payload = json.loads(resp["Payload"].read())
assert payload["value"] == "from-esm-layer"
assert payload["esm"] is True
finally:
lam.delete_function(FunctionName=fname)
# ---------------------------------------------------------------------------
# Terraform compatibility tests
# ---------------------------------------------------------------------------
def test_lambda_image_no_default_runtime_handler(lam):
"""Image-based functions must not get default runtime/handler values."""
fname = "tf-compat-image-no-defaults"
try:
lam.delete_function(FunctionName=fname)
except ClientError:
pass
resp = lam.create_function(
FunctionName=fname,
PackageType="Image",
Code={"ImageUri": "my-repo/my-image:latest"},
Role=_LAMBDA_ROLE,
Timeout=30,
)
try:
assert resp["PackageType"] == "Image"
assert resp["Runtime"] == "", f"Expected empty Runtime for Image, got {resp['Runtime']!r}"
assert resp["Handler"] == "", f"Expected empty Handler for Image, got {resp['Handler']!r}"
finally:
lam.delete_function(FunctionName=fname)
def test_lambda_image_preserves_image_config(lam):
"""ImageConfig provided at creation must be preserved in the GetFunction response."""
fname = "tf-compat-image-config"
try:
lam.delete_function(FunctionName=fname)
except ClientError:
pass
lam.create_function(
FunctionName=fname,
PackageType="Image",
Code={"ImageUri": "my-repo/my-image:latest"},
Role=_LAMBDA_ROLE,
ImageConfig={"Command": ["main.lambda_handler"]},
)
try:
get_resp = lam.get_function(FunctionName=fname)
cfg = get_resp["Configuration"]
assert "ImageConfigResponse" in cfg, "ImageConfigResponse missing from get_function response"
assert cfg["ImageConfigResponse"]["ImageConfig"]["Command"] == ["main.lambda_handler"]
finally:
lam.delete_function(FunctionName=fname)
def test_lambda_empty_dead_letter_config(lam):
"""Functions without DeadLetterConfig must return empty dict, not {TargetArn: ''}."""
fname = "tf-compat-no-dlc"
try:
lam.delete_function(FunctionName=fname)
except ClientError:
pass
resp = lam.create_function(
FunctionName=fname,
Runtime="python3.12",
Handler="index.handler",
Role=_LAMBDA_ROLE,
Code={"ZipFile": _make_zip(_LAMBDA_CODE)},
)
try:
dlc = resp.get("DeadLetterConfig", {})
assert dlc == {} or "TargetArn" not in dlc or dlc.get("TargetArn") == "", \
f"Expected empty DeadLetterConfig, got {dlc!r}"
assert dlc.get("TargetArn") is None or dlc == {}, \
f"DeadLetterConfig should not have TargetArn when unconfigured, got {dlc!r}"
finally:
lam.delete_function(FunctionName=fname)
def test_esm_sqs_no_starting_position(lam, sqs):
"""SQS event source mappings must not include StartingPosition."""
fname = "tf-compat-esm-sqs"
try:
lam.delete_function(FunctionName=fname)
except ClientError:
pass
lam.create_function(
FunctionName=fname,
Runtime="python3.12",
Handler="index.handler",
Role=_LAMBDA_ROLE,
Code={"ZipFile": _make_zip(_LAMBDA_CODE)},
)
q_url = sqs.create_queue(QueueName="tf-compat-esm-queue")["QueueUrl"]
q_arn = sqs.get_queue_attributes(
QueueUrl=q_url, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
esm_uuid = None
try:
resp = lam.create_event_source_mapping(
EventSourceArn=q_arn,
FunctionName=fname,
BatchSize=5,
Enabled=True,
)
esm_uuid = resp["UUID"]
assert "StartingPosition" not in resp, \
f"SQS ESM should not have StartingPosition, got {resp.get('StartingPosition')!r}"
get_resp = lam.get_event_source_mapping(UUID=esm_uuid)
assert "StartingPosition" not in get_resp, \
"StartingPosition should not appear in get_event_source_mapping for SQS"
finally:
if esm_uuid:
lam.delete_event_source_mapping(UUID=esm_uuid)
lam.delete_function(FunctionName=fname)
sqs.delete_queue(QueueUrl=q_url)
def test_esm_kinesis_has_starting_position(lam, kin):
"""Kinesis event source mappings must include StartingPosition."""
fname = "tf-compat-esm-kinesis"
stream_name = "tf-compat-esm-stream"
try:
lam.delete_function(FunctionName=fname)
except ClientError:
pass
try:
kin.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
except ClientError:
pass
lam.create_function(
FunctionName=fname,
Runtime="python3.12",
Handler="index.handler",
Role=_LAMBDA_ROLE,
Code={"ZipFile": _make_zip(_LAMBDA_CODE)},
)
kin.create_stream(StreamName=stream_name, ShardCount=1)
stream_arn = kin.describe_stream(
StreamName=stream_name
)["StreamDescription"]["StreamARN"]
esm_uuid = None
try:
resp = lam.create_event_source_mapping(
EventSourceArn=stream_arn,
FunctionName=fname,
StartingPosition="TRIM_HORIZON",
BatchSize=100,
Enabled=True,
)
esm_uuid = resp["UUID"]
assert "StartingPosition" in resp, "Kinesis ESM must include StartingPosition"
assert resp["StartingPosition"] == "TRIM_HORIZON"
finally:
if esm_uuid:
lam.delete_event_source_mapping(UUID=esm_uuid)
lam.delete_function(FunctionName=fname)
try:
kin.delete_stream(StreamName=stream_name, EnforceConsumerDeletion=True)
except ClientError:
pass
def test_esm_response_no_function_name_field(lam, sqs):
"""ESM API responses should contain FunctionArn but not FunctionName (matching AWS)."""
fname = "tf-compat-esm-no-fname"
try:
lam.delete_function(FunctionName=fname)
except ClientError:
pass
lam.create_function(
FunctionName=fname,
Runtime="python3.12",
Handler="index.handler",
Role=_LAMBDA_ROLE,
Code={"ZipFile": _make_zip(_LAMBDA_CODE)},
)
q_url = sqs.create_queue(QueueName="tf-compat-esm-fname-queue")["QueueUrl"]
q_arn = sqs.get_queue_attributes(
QueueUrl=q_url, AttributeNames=["QueueArn"]
)["Attributes"]["QueueArn"]
esm_uuid = None
try:
resp = lam.create_event_source_mapping(
EventSourceArn=q_arn,
FunctionName=fname,
BatchSize=5,
Enabled=True,
)
esm_uuid = resp["UUID"]
assert "FunctionArn" in resp, "ESM response must include FunctionArn"
assert fname in resp["FunctionArn"], "FunctionArn must contain the function name"
finally:
if esm_uuid:
lam.delete_event_source_mapping(UUID=esm_uuid)
lam.delete_function(FunctionName=fname)
sqs.delete_queue(QueueUrl=q_url)
def test_lambda_update_function_configuration_layers(lam):
"""Attaching a layer via update-function-configuration should normalize ARN strings
to {Arn, CodeSize} dicts — regression test for 'str' object has no attribute 'get'."""
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as z:
z.writestr("util.py", "# layer code")
layer_resp = lam.publish_layer_version(
LayerName="update-cfg-layer", Content={"ZipFile": buf.getvalue()},
)
layer_arn = layer_resp["LayerVersionArn"]
fn_zip = io.BytesIO()
with zipfile.ZipFile(fn_zip, "w") as z:
z.writestr("index.py", "def handler(e, c): return {}")
lam.create_function(
FunctionName="fn-update-layer-test",
Runtime="python3.12",
Role="arn:aws:iam::000000000000:role/test",
Handler="index.handler",
Code={"ZipFile": fn_zip.getvalue()},
)
resp = lam.update_function_configuration(
FunctionName="fn-update-layer-test",
Layers=[layer_arn],
)
# Response Layers must be dicts with Arn key, not raw strings
assert len(resp["Layers"]) == 1
assert isinstance(resp["Layers"][0], dict)
assert resp["Layers"][0]["Arn"] == layer_arn
# GetFunction must also return normalized layer dicts
fn = lam.get_function(FunctionName="fn-update-layer-test")
assert fn["Configuration"]["Layers"][0]["Arn"] == layer_arn
# ============================================================================
# Unit tests — Lambda warm-container pool, ESM filter, CW Logs emitter,
# event-stream framing, throttle response shape. These mock containers and
# don't hit the live ministack server, so they run even without Docker.
# Originally lived in tests/test_lambda_pool.py — merged here for one-file-per-service.
# ============================================================================
import time
from unittest.mock import MagicMock
import pytest
import ministack.services.lambda_svc as lsvc
from ministack.core.responses import set_request_account_id
@pytest.fixture(autouse=True)
def _clear_pool():
"""Fresh pool before every test; also clear after so later tests don't see residue."""
lsvc._warm_pool.clear()
yield
lsvc._warm_pool.clear()
def _mk_container(running: bool = True):
"""Fake container with a .reload() that sets status, matching docker-py interface."""
c = MagicMock()
c.status = "running" if running else "exited"
def _reload():
# No-op — container.status stays at whatever was set last.
pass
c.reload.side_effect = _reload
return c
# ──────────────────────────────── pool key ──────────────────────────────────
def test_pool_key_scopes_by_account():
"""Same function in two accounts → two distinct keys → two distinct pools."""
set_request_account_id("111111111111")
k_a = lsvc._warm_pool_key("fn", {"CodeSha256": "abc"})
set_request_account_id("222222222222")
k_b = lsvc._warm_pool_key("fn", {"CodeSha256": "abc"})
assert k_a != k_b
assert k_a.startswith("111111111111:")
assert k_b.startswith("222222222222:")
def test_pool_key_differs_by_package_type():
set_request_account_id("111111111111")
k_zip = lsvc._warm_pool_key("fn", {"CodeSha256": "abc"})
k_img = lsvc._warm_pool_key("fn", {"PackageType": "Image", "ImageUri": "my/img:v1"})
assert k_zip != k_img
assert ":zip:" in k_zip
assert ":image:" in k_img
def test_pool_key_differs_by_code_sha():
"""Code update → new key → cold start (doesn't accidentally reuse old container)."""
set_request_account_id("111111111111")
k1 = lsvc._warm_pool_key("fn", {"CodeSha256": "sha-v1"})
k2 = lsvc._warm_pool_key("fn", {"CodeSha256": "sha-v2"})
assert k1 != k2
def test_pool_key_differs_by_image_uri():
set_request_account_id("111111111111")
k1 = lsvc._warm_pool_key("fn", {"PackageType": "Image", "ImageUri": "img:v1"})
k2 = lsvc._warm_pool_key("fn", {"PackageType": "Image", "ImageUri": "img:v2"})
assert k1 != k2
# ──────────────────────────── acquire / spawn / release ─────────────────────
def test_acquire_on_empty_pool_signals_spawn():
entry, reason = lsvc._pool_acquire("k", max_concurrency=None)
assert entry is None
assert reason == "spawn"
def test_register_then_reacquire_reuses_same_entry():
c = _mk_container()
entry1 = lsvc._pool_register("k", c, tmpdir=None)
assert entry1["in_use"] is True
# While in_use, next acquire can't reuse it — signals spawn.
entry2, reason = lsvc._pool_acquire("k", max_concurrency=None)
assert entry2 is None
assert reason == "spawn"
# After release, the same container is reused.
lsvc._pool_release(entry1)
assert entry1["in_use"] is False
entry3, reason = lsvc._pool_acquire("k", max_concurrency=None)
assert entry3 is entry1
assert reason == "reused"
assert entry3["in_use"] is True
def test_multiple_concurrent_invocations_get_separate_entries():
"""Two concurrent invocations must land on two distinct pool entries (not the same container)."""
c1 = _mk_container()
c2 = _mk_container()
e1 = lsvc._pool_register("k", c1, tmpdir=None)
# e1 is in_use — next acquire signals spawn, simulating cold start
_, reason = lsvc._pool_acquire("k", max_concurrency=None)
assert reason == "spawn"
e2 = lsvc._pool_register("k", c2, tmpdir=None)
assert e1 is not e2
assert e1["container"] is c1
assert e2["container"] is c2
assert len(lsvc._warm_pool["k"]) == 2
def test_function_concurrency_cap_rejects_when_full():
"""ReservedConcurrentExecutions=2 → 3rd concurrent invocation gets func_cap."""
for _ in range(2):
lsvc._pool_register("k", _mk_container(), tmpdir=None)
entry, reason = lsvc._pool_acquire("k", max_concurrency=2)
assert entry is None
assert reason == "func_cap"
def test_function_concurrency_cap_none_is_unbounded():
"""No ReservedConcurrentExecutions → can always spawn."""
for _ in range(50):
lsvc._pool_register("k", _mk_container(), tmpdir=None)
entry, reason = lsvc._pool_acquire("k", max_concurrency=None)
assert entry is None
assert reason == "spawn"
def test_account_concurrency_cap_rejects(monkeypatch):
"""Global account cap: 3 in-use total → 4th is throttled as acct_cap."""
monkeypatch.setattr(lsvc, "_ACCOUNT_CONCURRENCY_CAP", 3)
# 3 in-use entries across two pool keys
lsvc._pool_register("k1", _mk_container(), tmpdir=None)
lsvc._pool_register("k1", _mk_container(), tmpdir=None)
lsvc._pool_register("k2", _mk_container(), tmpdir=None)
entry, reason = lsvc._pool_acquire("k2", max_concurrency=None)
assert entry is None
assert reason == "acct_cap"
# ──────────────────────────── lifecycle: dead, remove, evict, clear ─────────
def test_dead_containers_are_pruned_on_acquire():
"""Pool must not hand out a dead container on reuse."""
dead = _mk_container(running=False)
alive_entry = lsvc._pool_register("k", _mk_container(running=True), tmpdir=None)
# Release alive so it becomes reusable
lsvc._pool_release(alive_entry)
# Sneak a dead one into the pool directly
lsvc._warm_pool["k"].append({
"container": dead, "tmpdir": None, "in_use": False,
"last_used": time.time(), "created": time.time(),
})
assert len(lsvc._warm_pool["k"]) == 2
# Acquire — dead one pruned, alive one reused
entry, reason = lsvc._pool_acquire("k", max_concurrency=None)
assert reason == "reused"
assert entry["container"] is alive_entry["container"]
assert len(lsvc._warm_pool["k"]) == 1
def test_pool_remove_kills_and_unregisters():
entry = lsvc._pool_register("k", _mk_container(), tmpdir=None)
lsvc._pool_remove(entry)
assert entry not in lsvc._warm_pool.get("k", [])
entry["container"].stop.assert_called()
entry["container"].remove.assert_called()
def test_pool_evict_idle_removes_only_expired_and_not_in_use(monkeypatch):
monkeypatch.setattr(lsvc, "_WARM_CONTAINER_TTL", 60)
busy = lsvc._pool_register("k", _mk_container(), tmpdir=None) # in_use=True
idle_old = lsvc._pool_register("k", _mk_container(), tmpdir=None)
lsvc._pool_release(idle_old)
idle_old["last_used"] = time.time() - 300 # past TTL
idle_fresh = lsvc._pool_register("k", _mk_container(), tmpdir=None)
lsvc._pool_release(idle_fresh) # last_used = now, within TTL
lsvc._pool_evict_idle()
remaining = lsvc._warm_pool.get("k", [])
assert busy in remaining # still in use — must not be evicted
assert idle_fresh in remaining # under TTL — kept
assert idle_old not in remaining
idle_old["container"].stop.assert_called()
def test_pool_clear_all_kills_everything():
for key in ("a", "b", "c"):
lsvc._pool_register(key, _mk_container(), tmpdir=None)
victims = [e for lst in lsvc._warm_pool.values() for e in lst]
assert len(victims) == 3
lsvc._pool_clear_all()
assert lsvc._warm_pool == {}
for v in victims:
v["container"].stop.assert_called()
v["container"].remove.assert_called()
# ──────────────────────────── multi-tenancy ─────────────────────────────────
def test_two_accounts_get_independent_pools():
"""Invocations in account A must not pick up account B's containers."""
set_request_account_id("111111111111")
k_a = lsvc._warm_pool_key("fn", {"CodeSha256": "sha"})
c_a = _mk_container()
e_a = lsvc._pool_register(k_a, c_a, tmpdir=None)
lsvc._pool_release(e_a)
set_request_account_id("222222222222")
k_b = lsvc._warm_pool_key("fn", {"CodeSha256": "sha"})
assert k_a != k_b
entry, reason = lsvc._pool_acquire(k_b, max_concurrency=None)
assert entry is None
assert reason == "spawn" # account B must cold-start; can't reuse A's container
def test_throttle_response_shape_matches_aws():
"""The throttle response body must match the AWS TooManyRequestsException shape."""
r = lsvc._throttle_response(
reason_code="ReservedFunctionConcurrentInvocationLimitExceeded",
msg="Rate Exceeded",
retry_after=1,
)
assert r["throttle"] is True
assert r["error"] is True
body = r["body"]
assert body["__type"] == "TooManyRequestsException"
assert body["Reason"] == "ReservedFunctionConcurrentInvocationLimitExceeded"
assert "retryAfterSeconds" in body
assert "message" in body
# ──────────────────── async retry + DLQ routing ─────────────────────────────
def test_route_async_failure_to_sqs_dlq():
"""Async invoke final failure routes an AWS-shaped envelope to the SQS DLQ."""
import ministack.services.sqs as _sqs
set_request_account_id("000000000000")
# Create a queue directly in the internal state
url = "http://localhost:4566/000000000000/dlq-test"
arn = "arn:aws:sqs:us-east-1:000000000000:dlq-test"
_sqs._queues[url] = {
"messages": [], "attributes": {"QueueArn": arn},
"is_fifo": False, "dedup_cache": {}, "fifo_seq": 0,
}
try:
lsvc._route_async_failure(
target_arn=arn,
func_name="doesnt-matter",
event={"input": "hi"},
result={"error": True, "function_error": "Unhandled",
"body": {"errorType": "Handler", "errorMessage": "boom"}},
)
assert len(_sqs._queues[url]["messages"]) == 1
import json as _json
envelope = _json.loads(_sqs._queues[url]["messages"][0]["body"])
assert envelope["requestPayload"] == {"input": "hi"}
assert envelope["requestContext"]["condition"] == "RetriesExhausted"
assert envelope["responseContext"]["functionError"] == "Unhandled"
assert envelope["responsePayload"]["errorMessage"] == "boom"
finally:
_sqs._queues.pop(url, None)
def test_route_async_failure_to_sns_topic():
"""Async invoke final failure can target an SNS topic (OnFailure destination)."""
import ministack.services.sns as _sns
set_request_account_id("000000000000")
arn = "arn:aws:sns:us-east-1:000000000000:async-fail"
_sns._topics[arn] = {
"arn": arn, "name": "async-fail",
"subscriptions": [], "messages": [], "tags": {}, "attributes": {},
}
try:
# Monkey-patch _fanout to observe the call without needing subscribers
called = {}
real_fanout = _sns._fanout
def _capture(topic_arn, msg_id, message, subject, *args, **kwargs):
called["topic_arn"] = topic_arn
called["message"] = message
called["subject"] = subject
_sns._fanout = _capture
try:
lsvc._route_async_failure(
target_arn=arn,
func_name="doesnt-matter",
event={"k": "v"},
result={"error": True, "function_error": "Handled",
"body": {"errorType": "X"}},
)
assert called.get("topic_arn") == arn
assert "requestPayload" in called.get("message", "")
finally:
_sns._fanout = real_fanout
finally:
_sns._topics.pop(arn, None)
def test_route_async_failure_unknown_target_logs_and_returns():
"""Unknown DLQ ARN must not raise — just logs."""
set_request_account_id("000000000000")
# Should NOT raise
lsvc._route_async_failure(
target_arn="arn:aws:sqs:us-east-1:000000000000:does-not-exist",
func_name="x", event={}, result={"error": True, "body": {}},
)
# ──────────────────── RIE result → function_error classification ────────────
def test_lambda_strict_hard_fails_when_docker_unavailable(monkeypatch):
"""LAMBDA_STRICT=1 + no Docker → Runtime.DockerUnavailable, NO fallback to warm/local."""
monkeypatch.setattr(lsvc, "LAMBDA_STRICT", True)
monkeypatch.setattr(lsvc, "_docker_available", False)
func = {"config": {
"FunctionName": "strict-test",
"Runtime": "python3.12",
"PackageType": "Zip",
"CodeSha256": "abc",
"Timeout": 3,
"MemorySize": 128,
}, "code_zip": b"\x00"}
result = lsvc._execute_function_docker(func, {"k": "v"})
assert result.get("error") is True
assert result["body"]["errorType"] == "Runtime.DockerUnavailable"
def test_lambda_permissive_falls_back_to_warm_without_docker(monkeypatch):
"""Default (LAMBDA_STRICT=False) + no Docker + python runtime → warm fallback."""
monkeypatch.setattr(lsvc, "LAMBDA_STRICT", False)
monkeypatch.setattr(lsvc, "_docker_available", False)
called = {"warm": False}
def _fake_warm(func, event):
called["warm"] = True
return {"body": {"ok": True}}
monkeypatch.setattr(lsvc, "_execute_function_warm", _fake_warm)
func = {"config": {
"FunctionName": "perm-test",
"Runtime": "python3.12",
"PackageType": "Zip",
"CodeSha256": "abc",
"Timeout": 3,
"MemorySize": 128,
}, "code_zip": b"\x00"}
lsvc._execute_function_docker(func, {})
assert called["warm"] is True
def test_emit_lambda_logs_writes_start_end_report_to_cw_logs():
"""Lambda → CW Logs emits AWS-shaped START / body / END / REPORT lines."""
import ministack.services.cloudwatch_logs as _cwl
set_request_account_id("000000000000")
_cwl._log_groups.clear()
func = {"config": {"FunctionName": "emit-test", "Version": "$LATEST", "MemorySize": 128}}
lsvc._emit_lambda_logs(
func, request_id="abc-1234",
log_text="user print line 1\nuser print line 2",
error=False, duration_ms=42,
)
assert "/aws/lambda/emit-test" in _cwl._log_groups
streams = _cwl._log_groups["/aws/lambda/emit-test"]["streams"]
assert len(streams) == 1
stream_name = next(iter(streams))
assert stream_name.startswith(tuple(f"{y:04d}/" for y in range(2024, 2031)))
assert "[$LATEST]" in stream_name
msgs = [e["message"] for e in streams[stream_name]["events"]]
assert any(m.startswith("START RequestId: abc-1234") and "$LATEST" in m for m in msgs)
assert "user print line 1" in msgs
assert "user print line 2" in msgs
assert any(m == "END RequestId: abc-1234" for m in msgs)
assert any(m.startswith("REPORT RequestId: abc-1234") and "Duration: 42 ms" in m for m in msgs)
def test_emit_lambda_logs_autocreate_is_per_function():
"""Each function gets its own /aws/lambda/{name} group."""
import ministack.services.cloudwatch_logs as _cwl
set_request_account_id("000000000000")
_cwl._log_groups.clear()
lsvc._emit_lambda_logs(
{"config": {"FunctionName": "fn-a", "Version": "$LATEST", "MemorySize": 128}},
"r1", "", False, 1,
)
lsvc._emit_lambda_logs(
{"config": {"FunctionName": "fn-b", "Version": "$LATEST", "MemorySize": 128}},
"r2", "", False, 1,
)
assert "/aws/lambda/fn-a" in _cwl._log_groups
assert "/aws/lambda/fn-b" in _cwl._log_groups
def test_emit_lambda_logs_failure_is_best_effort(monkeypatch):
"""A broken CW Logs module must not bubble into the Lambda invocation."""
import ministack.services.cloudwatch_logs as _cwl
# Nuke the target to force a write failure
monkeypatch.setattr(_cwl, "_log_groups", None)
# Must not raise
lsvc._emit_lambda_logs(
{"config": {"FunctionName": "crash", "Version": "$LATEST", "MemorySize": 128}},
"r", "", False, 1,
)
def test_match_esm_filter_equality():
"""Basic equality matching on a nested record."""
rec = {"body": {"orderType": "Premium", "region": "us-east-1"}}
assert lsvc._match_esm_filter(rec, {"body": {"orderType": ["Premium"]}}) is True
assert lsvc._match_esm_filter(rec, {"body": {"orderType": ["Basic"]}}) is False
def test_match_esm_filter_content_prefix_suffix_anything_but():
"""Content-filter dicts: prefix, suffix, anything-but, exists."""
rec = {"body": {"name": "prod-user-42"}}
assert lsvc._match_esm_filter(rec, {"body": {"name": [{"prefix": "prod-"}]}}) is True
assert lsvc._match_esm_filter(rec, {"body": {"name": [{"prefix": "dev-"}]}}) is False
assert lsvc._match_esm_filter(rec, {"body": {"name": [{"suffix": "-42"}]}}) is True
assert lsvc._match_esm_filter(rec, {"body": {"name": [{"anything-but": ["prod-user-42"]}]}}) is False
assert lsvc._match_esm_filter(rec, {"body": {"name": [{"anything-but": ["other"]}]}}) is True
assert lsvc._match_esm_filter(rec, {"body": {"missing": [{"exists": False}]}}) is True
assert lsvc._match_esm_filter(rec, {"body": {"name": [{"exists": True}]}}) is True
def test_match_esm_filter_numeric():
"""Numeric comparison operator."""
rec = {"body": {"count": 7}}
assert lsvc._match_esm_filter(rec, {"body": {"count": [{"numeric": [">", 5]}]}}) is True
assert lsvc._match_esm_filter(rec, {"body": {"count": [{"numeric": [">", 10]}]}}) is False
assert lsvc._match_esm_filter(rec, {"body": {"count": [{"numeric": [">", 5, "<", 10]}]}}) is True
def test_apply_filter_criteria_drops_non_matching_sqs_records():
"""SQS bodies are JSON-parsed before matching, matching AWS behaviour."""
import json as _json
esm = {"FilterCriteria": {"Filters": [
{"Pattern": _json.dumps({"body": {"orderType": ["Premium"]}})},
]}}
records = [
{"messageId": "a", "body": _json.dumps({"orderType": "Premium"})},
{"messageId": "b", "body": _json.dumps({"orderType": "Basic"})},
]
filtered = lsvc._apply_filter_criteria(records, esm)
assert [r["messageId"] for r in filtered] == ["a"]
def test_apply_filter_criteria_no_filters_passes_through():
records = [{"messageId": "x"}, {"messageId": "y"}]
assert lsvc._apply_filter_criteria(records, {}) == records
assert lsvc._apply_filter_criteria(records, {"FilterCriteria": {}}) == records
def test_event_stream_encode_roundtrip():
"""The vnd.amazon.eventstream encoder must produce a valid framed message
that boto3's own EventStream parser can decode."""
from botocore.eventstream import EventStreamBuffer
msg = lsvc._es_encode_message({
":message-type": "event",
":event-type": "PayloadChunk",
":content-type": "application/octet-stream",
}, b"hello-world")
buf = EventStreamBuffer()
buf.add_data(msg)
events = list(buf)
assert len(events) == 1
event = events[0]
# botocore surfaces headers as a dict[str, Any] on the parsed event
assert event.headers[":event-type"] == "PayloadChunk"
assert event.payload == b"hello-world"
def test_invoke_rie_classifies_unhandled_vs_handled():
"""If RIE returns X-Amz-Function-Error header the result carries
function_error='Unhandled'. A handler-returned errorType with no RIE
header should produce 'Handled'."""
# The classification logic lives inside _invoke_rie; unit-test by
# simulating what that branch does via a tiny inline replica.
parsed_error_payload = {"errorType": "E", "errorMessage": "m"}
# Case 1: RIE header present → Unhandled
has_header = True
if has_header or (isinstance(parsed_error_payload, dict) and parsed_error_payload.get("errorType")):
classification = "Unhandled" if has_header else "Handled"
assert classification == "Unhandled"
# Case 2: No RIE header, but body has errorType → Handled
has_header = False
if has_header or (isinstance(parsed_error_payload, dict) and parsed_error_payload.get("errorType")):
classification = "Unhandled" if has_header else "Handled"
assert classification == "Handled"