aws_rl_env / aws_infra /tests /test_dynamodb.py
Sizzing's picture
Upload folder using huggingface_hub
c745a99 verified
import io
import json
import os
import time
import zipfile
from urllib.parse import urlparse
import pytest
from botocore.exceptions import ClientError
import uuid as _uuid_mod
def _make_zip(code: str) -> bytes:
buf = io.BytesIO()
with zipfile.ZipFile(buf, "w") as zf:
zf.writestr("index.py", code)
return buf.getvalue()
_LAMBDA_ROLE = "arn:aws:iam::000000000000:role/lambda-role"
def test_dynamodb_basic(ddb):
try:
ddb.delete_table(TableName="TestTable1")
except Exception:
pass
ddb.create_table(
TableName="TestTable1",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName="TestTable1", Item={"pk": {"S": "key1"}, "data": {"S": "value1"}})
resp = ddb.get_item(TableName="TestTable1", Key={"pk": {"S": "key1"}})
assert resp["Item"]["data"]["S"] == "value1"
ddb.delete_item(TableName="TestTable1", Key={"pk": {"S": "key1"}})
resp = ddb.get_item(TableName="TestTable1", Key={"pk": {"S": "key1"}})
assert "Item" not in resp
def test_dynamodb_scan(ddb):
try:
ddb.delete_table(TableName="ScanTable")
except Exception:
pass
ddb.create_table(
TableName="ScanTable",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
for i in range(10):
ddb.put_item(TableName="ScanTable", Item={"pk": {"S": f"key{i}"}, "val": {"N": str(i)}})
resp = ddb.scan(TableName="ScanTable")
assert resp["Count"] == 10
def test_dynamodb_batch(ddb):
try:
ddb.delete_table(TableName="BatchTable")
except Exception:
pass
ddb.create_table(
TableName="BatchTable",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.batch_write_item(
RequestItems={
"BatchTable": [{"PutRequest": {"Item": {"pk": {"S": f"bk{i}"}, "v": {"S": f"bv{i}"}}}} for i in range(5)]
}
)
resp = ddb.scan(TableName="BatchTable")
assert resp["Count"] == 5
def test_dynamodb_describe_continuous_backups(ddb):
ddb.create_table(
TableName="ddb-pitr-tbl",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
resp = ddb.describe_continuous_backups(TableName="ddb-pitr-tbl")
assert resp["ContinuousBackupsDescription"]["ContinuousBackupsStatus"] == "ENABLED"
pitr = resp["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
assert pitr["PointInTimeRecoveryStatus"] == "DISABLED"
def test_dynamodb_update_continuous_backups(ddb):
ddb.update_continuous_backups(
TableName="ddb-pitr-tbl",
PointInTimeRecoverySpecification={"PointInTimeRecoveryEnabled": True},
)
resp = ddb.describe_continuous_backups(TableName="ddb-pitr-tbl")
pitr = resp["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"]
assert pitr["PointInTimeRecoveryStatus"] == "ENABLED"
def test_dynamodb_describe_endpoints(ddb):
resp = ddb.describe_endpoints()
assert len(resp["Endpoints"]) > 0
assert "Address" in resp["Endpoints"][0]
def test_dynamodb_batch_write_consumed_capacity(ddb):
ddb.create_table(
TableName="batch-cap-regression",
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
BillingMode="PAY_PER_REQUEST",
)
resp = ddb.batch_write_item(
RequestItems={
"batch-cap-regression": [
{"PutRequest": {"Item": {"pk": {"S": "k1"}}}},
]
},
ReturnConsumedCapacity="TOTAL",
)
assert "ConsumedCapacity" in resp, "ConsumedCapacity must be present when ReturnConsumedCapacity=TOTAL"
assert isinstance(resp["ConsumedCapacity"], list), "ConsumedCapacity must be a list for BatchWriteItem"
assert resp["ConsumedCapacity"][0]["TableName"] == "batch-cap-regression"
assert resp["ConsumedCapacity"][0]["CapacityUnits"] == 1.0
ddb.delete_table(TableName="batch-cap-regression")
def test_dynamodb_put_item_gsi_capacity(ddb):
"""PutItem on a table with 1 GSI must return CapacityUnits=2.0 (table + GSI)."""
ddb.create_table(
TableName="gsi-cap-put",
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
{"AttributeName": "last_name", "AttributeType": "S"},
],
KeySchema=[
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
GlobalSecondaryIndexes=[
{
"IndexName": "last_name-index",
"KeySchema": [{"AttributeName": "last_name", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}
],
BillingMode="PAY_PER_REQUEST",
)
resp = ddb.put_item(
TableName="gsi-cap-put",
Item={"pk": {"S": "p1"}, "sk": {"S": "s1"}, "last_name": {"S": "Smith"}},
ReturnConsumedCapacity="TOTAL",
)
assert resp["ConsumedCapacity"]["CapacityUnits"] == 2.0
ddb.delete_table(TableName="gsi-cap-put")
def test_dynamodb_batch_write_gsi_capacity(ddb):
"""BatchWriteItem with 2 items on a table with 1 GSI must return CapacityUnits=4.0."""
ddb.create_table(
TableName="gsi-cap-batch",
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
{"AttributeName": "age", "AttributeType": "N"},
],
KeySchema=[
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
GlobalSecondaryIndexes=[
{
"IndexName": "age-index",
"KeySchema": [{"AttributeName": "age", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}
],
BillingMode="PAY_PER_REQUEST",
)
resp = ddb.batch_write_item(
RequestItems={
"gsi-cap-batch": [
{"PutRequest": {"Item": {"pk": {"S": "p2"}, "sk": {"S": "s2"}, "age": {"N": "25"}}}},
{"PutRequest": {"Item": {"pk": {"S": "p3"}, "sk": {"S": "s3"}, "age": {"N": "26"}}}},
]
},
ReturnConsumedCapacity="TOTAL",
)
assert resp["ConsumedCapacity"][0]["CapacityUnits"] == 4.0
ddb.delete_table(TableName="gsi-cap-batch")
def test_dynamodb_streams_table_has_stream_arn(ddb):
"""Table with StreamSpecification returns LatestStreamArn and operations succeed."""
table_name = "stream-arn-test"
resp = ddb.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"},
)
desc = ddb.describe_table(TableName=table_name)["Table"]
assert desc.get("LatestStreamArn") or desc.get("StreamSpecification", {}).get("StreamEnabled")
# All write operations should succeed with streams enabled
ddb.put_item(TableName=table_name, Item={"pk": {"S": "k1"}, "val": {"S": "v1"}})
ddb.update_item(
TableName=table_name,
Key={"pk": {"S": "k1"}},
UpdateExpression="SET val = :v",
ExpressionAttributeValues={":v": {"S": "v2"}},
)
ddb.delete_item(TableName=table_name, Key={"pk": {"S": "k1"}})
# Verify item is gone
get_resp = ddb.get_item(TableName=table_name, Key={"pk": {"S": "k1"}})
assert "Item" not in get_resp
def test_dynamodb_tag_untag_resource(ddb):
"""Create table, tag it, list tags, untag, verify."""
table_name = "ddb-tag-test"
try:
ddb.delete_table(TableName=table_name)
except Exception:
pass
resp = ddb.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
arn = resp["TableDescription"]["TableArn"]
# Tag
ddb.tag_resource(ResourceArn=arn, Tags=[
{"Key": "env", "Value": "test"},
{"Key": "team", "Value": "platform"},
])
tags = ddb.list_tags_of_resource(ResourceArn=arn)["Tags"]
tag_keys = {t["Key"] for t in tags}
assert "env" in tag_keys
assert "team" in tag_keys
# Untag
ddb.untag_resource(ResourceArn=arn, TagKeys=["team"])
tags2 = ddb.list_tags_of_resource(ResourceArn=arn)["Tags"]
tag_keys2 = {t["Key"] for t in tags2}
assert "env" in tag_keys2
assert "team" not in tag_keys2
def test_dynamodb_stream_to_lambda(lam, ddb):
"""DynamoDB stream records are delivered to Lambda via event source mapping."""
table_name = "intg-ddbstream-tbl"
fn_name = "intg-ddbstream-fn"
ddb.create_table(
TableName=table_name,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"},
)
stream_arn = ddb.describe_table(TableName=table_name)["Table"]["LatestStreamArn"]
assert stream_arn is not None
code = (
"import json\n"
"def handler(event, context):\n"
" records = event.get('Records', [])\n"
" return {'processed': len(records)}\n"
)
lam.create_function(
FunctionName=fn_name,
Runtime="python3.11",
Role=_LAMBDA_ROLE,
Handler="index.handler",
Code={"ZipFile": _make_zip(code)},
)
esm = lam.create_event_source_mapping(
FunctionName=fn_name,
EventSourceArn=stream_arn,
StartingPosition="TRIM_HORIZON",
BatchSize=10,
)
assert esm["EventSourceArn"] == stream_arn
assert esm["FunctionArn"].endswith(fn_name)
assert esm["State"] in ("Creating", "Enabled")
# Write items to trigger stream records
ddb.put_item(TableName=table_name, Item={"pk": {"S": "a1"}, "data": {"S": "hello"}})
ddb.put_item(TableName=table_name, Item={"pk": {"S": "a2"}, "data": {"S": "world"}})
ddb.delete_item(TableName=table_name, Key={"pk": {"S": "a1"}})
# Allow background poller to process
time.sleep(3)
# Verify the ESM is still active
esm_resp = lam.get_event_source_mapping(UUID=esm["UUID"])
assert esm_resp["EventSourceArn"] == stream_arn
# Verify DynamoDB state is correct after stream operations
scan = ddb.scan(TableName=table_name)
pks = {item["pk"]["S"] for item in scan["Items"]}
assert "a2" in pks
assert "a1" not in pks
# Cleanup ESM
lam.delete_event_source_mapping(UUID=esm["UUID"])
# Migrated from test_ddb.py
def test_dynamodb_create_table(ddb):
resp = ddb.create_table(
TableName="t_hash_only",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
desc = resp["TableDescription"]
assert desc["TableName"] == "t_hash_only"
assert desc["TableStatus"] == "ACTIVE"
assert any(k["KeyType"] == "HASH" for k in desc["KeySchema"])
def test_dynamodb_create_table_composite(ddb):
resp = ddb.create_table(
TableName="t_composite",
KeySchema=[
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
)
ks = resp["TableDescription"]["KeySchema"]
types = {k["KeyType"] for k in ks}
assert types == {"HASH", "RANGE"}
def test_dynamodb_create_table_duplicate(ddb):
with pytest.raises(ClientError) as exc:
ddb.create_table(
TableName="t_hash_only",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
assert exc.value.response["Error"]["Code"] == "ResourceInUseException"
def test_dynamodb_delete_table(ddb):
ddb.create_table(
TableName="t_to_delete",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
resp = ddb.delete_table(TableName="t_to_delete")
assert resp["TableDescription"]["TableStatus"] == "DELETING"
tables = ddb.list_tables()["TableNames"]
assert "t_to_delete" not in tables
def test_dynamodb_delete_table_not_found(ddb):
with pytest.raises(ClientError) as exc:
ddb.delete_table(TableName="t_nonexistent_xyz")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_dynamodb_describe_table(ddb):
ddb.create_table(
TableName="t_describe_gsi",
KeySchema=[
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
{"AttributeName": "gsi_pk", "AttributeType": "S"},
],
GlobalSecondaryIndexes=[
{
"IndexName": "gsi1",
"KeySchema": [{"AttributeName": "gsi_pk", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}
],
LocalSecondaryIndexes=[
{
"IndexName": "lsi1",
"KeySchema": [
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
"Projection": {"ProjectionType": "ALL"},
}
],
BillingMode="PAY_PER_REQUEST",
)
resp = ddb.describe_table(TableName="t_describe_gsi")
table = resp["Table"]
assert table["TableName"] == "t_describe_gsi"
assert len(table["GlobalSecondaryIndexes"]) == 1
assert table["GlobalSecondaryIndexes"][0]["IndexName"] == "gsi1"
assert len(table["LocalSecondaryIndexes"]) == 1
assert table["LocalSecondaryIndexes"][0]["IndexName"] == "lsi1"
def test_dynamodb_list_tables(ddb):
for i in range(3):
try:
ddb.create_table(
TableName=f"t_list_{i}",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
except ClientError:
pass
resp = ddb.list_tables(Limit=2)
assert len(resp["TableNames"]) <= 2
if "LastEvaluatedTableName" in resp:
resp2 = ddb.list_tables(ExclusiveStartTableName=resp["LastEvaluatedTableName"], Limit=100)
assert len(resp2["TableNames"]) >= 1
def test_dynamodb_put_get_item(ddb):
ddb.put_item(
TableName="t_hash_only",
Item={
"pk": {"S": "allTypes"},
"str_attr": {"S": "hello"},
"num_attr": {"N": "42"},
"bool_attr": {"BOOL": True},
"null_attr": {"NULL": True},
"list_attr": {"L": [{"S": "a"}, {"N": "1"}]},
"map_attr": {"M": {"nested": {"S": "value"}}},
"ss_attr": {"SS": ["x", "y"]},
"ns_attr": {"NS": ["1", "2", "3"]},
},
)
resp = ddb.get_item(TableName="t_hash_only", Key={"pk": {"S": "allTypes"}})
item = resp["Item"]
assert item["str_attr"]["S"] == "hello"
assert item["num_attr"]["N"] == "42"
assert item["bool_attr"]["BOOL"] is True
assert item["null_attr"]["NULL"] is True
assert len(item["list_attr"]["L"]) == 2
assert item["map_attr"]["M"]["nested"]["S"] == "value"
assert set(item["ss_attr"]["SS"]) == {"x", "y"}
assert set(item["ns_attr"]["NS"]) == {"1", "2", "3"}
def test_dynamodb_put_item_condition(ddb):
ddb.put_item(
TableName="t_hash_only",
Item={"pk": {"S": "cond_new"}, "val": {"S": "first"}},
ConditionExpression="attribute_not_exists(pk)",
)
resp = ddb.get_item(TableName="t_hash_only", Key={"pk": {"S": "cond_new"}})
assert resp["Item"]["val"]["S"] == "first"
def test_dynamodb_put_item_condition_fail(ddb):
ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "cond_fail"}, "val": {"S": "v1"}})
with pytest.raises(ClientError) as exc:
ddb.put_item(
TableName="t_hash_only",
Item={"pk": {"S": "cond_fail"}, "val": {"S": "v2"}},
ConditionExpression="attribute_not_exists(pk)",
)
assert exc.value.response["Error"]["Code"] == "ConditionalCheckFailedException"
def test_dynamodb_delete_item(ddb):
ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "to_del"}, "v": {"S": "gone"}})
ddb.delete_item(TableName="t_hash_only", Key={"pk": {"S": "to_del"}})
resp = ddb.get_item(TableName="t_hash_only", Key={"pk": {"S": "to_del"}})
assert "Item" not in resp
def test_dynamodb_delete_item_return_old(ddb):
ddb.put_item(
TableName="t_hash_only",
Item={"pk": {"S": "ret_old"}, "data": {"S": "precious"}},
)
resp = ddb.delete_item(
TableName="t_hash_only",
Key={"pk": {"S": "ret_old"}},
ReturnValues="ALL_OLD",
)
assert resp["Attributes"]["data"]["S"] == "precious"
def test_dynamodb_update_item_set(ddb):
ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "upd_set"}, "count": {"N": "0"}})
resp = ddb.update_item(
TableName="t_hash_only",
Key={"pk": {"S": "upd_set"}},
UpdateExpression="SET #c = :val",
ExpressionAttributeNames={"#c": "count"},
ExpressionAttributeValues={":val": {"N": "10"}},
ReturnValues="ALL_NEW",
)
assert resp["Attributes"]["count"]["N"] == "10"
def test_dynamodb_update_item_remove(ddb):
ddb.put_item(
TableName="t_hash_only",
Item={"pk": {"S": "upd_rem"}, "extra": {"S": "bye"}, "keep": {"S": "stay"}},
)
resp = ddb.update_item(
TableName="t_hash_only",
Key={"pk": {"S": "upd_rem"}},
UpdateExpression="REMOVE extra",
ReturnValues="ALL_NEW",
)
assert "extra" not in resp["Attributes"]
assert resp["Attributes"]["keep"]["S"] == "stay"
def test_dynamodb_update_item_condition_on_missing_item_fails(ddb):
"""Missing item + attribute_exists(...) condition must fail with ConditionalCheckFailedException."""
try:
ddb.delete_table(TableName="t_update_cond_missing")
except Exception:
pass
ddb.create_table(
TableName="t_update_cond_missing",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
missing_key = {"pk": {"S": "missing-update-item"}}
with pytest.raises(ClientError) as exc:
ddb.update_item(
TableName="t_update_cond_missing",
Key=missing_key,
UpdateExpression="SET v = :v",
ExpressionAttributeValues={":v": {"S": "x"}},
ConditionExpression="attribute_exists(pk)",
ReturnValues="ALL_NEW",
)
assert exc.value.response["Error"]["Code"] == "ConditionalCheckFailedException"
def test_dynamodb_get_item_missing_sort_key_fails_validation(ddb):
try:
ddb.delete_table(TableName="t_get_missing_sk")
except Exception:
pass
ddb.create_table(
TableName="t_get_missing_sk",
KeySchema=[
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
)
with pytest.raises(ClientError) as exc:
ddb.get_item(TableName="t_get_missing_sk", Key={"pk": {"S": "q_pk"}})
assert exc.value.response["Error"]["Code"] == "ValidationException"
assert exc.value.response["Error"]["Message"] == "The provided key element does not match the schema"
def test_dynamodb_get_item_wrong_key_type_fails_validation(ddb):
try:
ddb.delete_table(TableName="t_get_wrong_type")
except Exception:
pass
ddb.create_table(
TableName="t_get_wrong_type",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName="t_get_wrong_type", Item={"pk": {"S": "typed-key"}})
with pytest.raises(ClientError) as exc:
ddb.get_item(TableName="t_get_wrong_type", Key={"pk": {"N": "123"}})
assert exc.value.response["Error"]["Code"] == "ValidationException"
assert exc.value.response["Error"]["Message"] == "The provided key element does not match the schema"
def test_dynamodb_update_item_extra_key_attribute_fails_validation(ddb):
try:
ddb.delete_table(TableName="t_update_extra_key")
except Exception:
pass
ddb.create_table(
TableName="t_update_extra_key",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
with pytest.raises(ClientError) as exc:
ddb.update_item(
TableName="t_update_extra_key",
Key={"pk": {"S": "k1"}, "sk": {"S": "unexpected"}},
UpdateExpression="SET v = :v",
ExpressionAttributeValues={":v": {"S": "x"}},
)
assert exc.value.response["Error"]["Code"] == "ValidationException"
assert exc.value.response["Error"]["Message"] == "The provided key element does not match the schema"
def test_dynamodb_update_item_add(ddb):
ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "upd_add"}, "counter": {"N": "5"}})
resp = ddb.update_item(
TableName="t_hash_only",
Key={"pk": {"S": "upd_add"}},
UpdateExpression="ADD counter :inc",
ExpressionAttributeValues={":inc": {"N": "3"}},
ReturnValues="ALL_NEW",
)
assert resp["Attributes"]["counter"]["N"] == "8"
def test_dynamodb_update_item_all_old(ddb):
ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "upd_old"}, "v": {"N": "1"}})
resp = ddb.update_item(
TableName="t_hash_only",
Key={"pk": {"S": "upd_old"}},
UpdateExpression="SET v = :new",
ExpressionAttributeValues={":new": {"N": "99"}},
ReturnValues="ALL_OLD",
)
assert resp["Attributes"]["v"]["N"] == "1"
def test_dynamodb_query_pk_only(ddb):
for i in range(3):
ddb.put_item(
TableName="t_composite",
Item={"pk": {"S": "q_pk"}, "sk": {"S": f"sk_{i}"}, "n": {"N": str(i)}},
)
resp = ddb.query(
TableName="t_composite",
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "q_pk"}},
)
assert resp["Count"] == 3
def test_dynamodb_query_pk_sk(ddb):
for i in range(5):
ddb.put_item(
TableName="t_composite",
Item={"pk": {"S": "q_sk"}, "sk": {"S": f"item_{i:03d}"}},
)
resp_bw = ddb.query(
TableName="t_composite",
KeyConditionExpression="pk = :pk AND begins_with(sk, :prefix)",
ExpressionAttributeValues={
":pk": {"S": "q_sk"},
":prefix": {"S": "item_00"},
},
)
assert resp_bw["Count"] >= 1
for item in resp_bw["Items"]:
assert item["sk"]["S"].startswith("item_00")
resp_bt = ddb.query(
TableName="t_composite",
KeyConditionExpression="pk = :pk AND sk BETWEEN :lo AND :hi",
ExpressionAttributeValues={
":pk": {"S": "q_sk"},
":lo": {"S": "item_001"},
":hi": {"S": "item_003"},
},
)
assert resp_bt["Count"] >= 1
for item in resp_bt["Items"]:
assert "item_001" <= item["sk"]["S"] <= "item_003"
def test_dynamodb_query_filter(ddb):
for i in range(5):
ddb.put_item(
TableName="t_composite",
Item={"pk": {"S": "q_filt"}, "sk": {"S": f"f_{i}"}, "val": {"N": str(i)}},
)
resp = ddb.query(
TableName="t_composite",
KeyConditionExpression="pk = :pk",
FilterExpression="val > :min",
ExpressionAttributeValues={":pk": {"S": "q_filt"}, ":min": {"N": "2"}},
)
assert resp["Count"] == 2
assert resp["ScannedCount"] == 5
def test_dynamodb_query_pagination(ddb):
for i in range(6):
ddb.put_item(
TableName="t_composite",
Item={"pk": {"S": "q_page"}, "sk": {"S": f"p_{i:03d}"}},
)
resp1 = ddb.query(
TableName="t_composite",
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "q_page"}},
Limit=3,
)
assert resp1["Count"] == 3
assert "LastEvaluatedKey" in resp1
resp2 = ddb.query(
TableName="t_composite",
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "q_page"}},
ExclusiveStartKey=resp1["LastEvaluatedKey"],
Limit=3,
)
assert resp2["Count"] == 3
page1_sks = {it["sk"]["S"] for it in resp1["Items"]}
page2_sks = {it["sk"]["S"] for it in resp2["Items"]}
assert page1_sks.isdisjoint(page2_sks)
def test_dynamodb_scan_from_ddb(ddb):
ddb.create_table(
TableName="t_scan",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
for i in range(8):
ddb.put_item(TableName="t_scan", Item={"pk": {"S": f"sc_{i}"}, "n": {"N": str(i)}})
resp = ddb.scan(TableName="t_scan")
assert resp["Count"] == 8
assert len(resp["Items"]) == 8
def test_dynamodb_scan_filter(ddb):
resp = ddb.scan(
TableName="t_scan",
FilterExpression="n >= :min",
ExpressionAttributeValues={":min": {"N": "5"}},
)
assert resp["Count"] == 3
for item in resp["Items"]:
assert int(item["n"]["N"]) >= 5
def test_dynamodb_batch_write(ddb):
ddb.create_table(
TableName="t_bw",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.batch_write_item(
RequestItems={
"t_bw": [{"PutRequest": {"Item": {"pk": {"S": f"bw_{i}"}, "data": {"S": f"d{i}"}}}} for i in range(10)]
}
)
resp = ddb.scan(TableName="t_bw")
assert resp["Count"] == 10
def test_dynamodb_batch_get(ddb):
resp = ddb.batch_get_item(
RequestItems={
"t_bw": {
"Keys": [{"pk": {"S": f"bw_{i}"}} for i in range(5)],
}
}
)
assert len(resp["Responses"]["t_bw"]) == 5
def test_dynamodb_transact_write(ddb):
ddb.create_table(
TableName="t_tx",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.transact_write_items(
TransactItems=[
{
"Put": {
"TableName": "t_tx",
"Item": {"pk": {"S": "tx1"}, "v": {"S": "a"}},
}
},
{
"Put": {
"TableName": "t_tx",
"Item": {"pk": {"S": "tx2"}, "v": {"S": "b"}},
}
},
{
"Put": {
"TableName": "t_tx",
"Item": {"pk": {"S": "tx3"}, "v": {"S": "c"}},
}
},
]
)
resp = ddb.scan(TableName="t_tx")
assert resp["Count"] == 3
ddb.transact_write_items(
TransactItems=[
{"Delete": {"TableName": "t_tx", "Key": {"pk": {"S": "tx3"}}}},
{
"Update": {
"TableName": "t_tx",
"Key": {"pk": {"S": "tx1"}},
"UpdateExpression": "SET v = :new",
"ExpressionAttributeValues": {":new": {"S": "updated"}},
},
},
]
)
item = ddb.get_item(TableName="t_tx", Key={"pk": {"S": "tx1"}})["Item"]
assert item["v"]["S"] == "updated"
gone = ddb.get_item(TableName="t_tx", Key={"pk": {"S": "tx3"}})
assert "Item" not in gone
def test_dynamodb_transact_get(ddb):
resp = ddb.transact_get_items(
TransactItems=[
{"Get": {"TableName": "t_tx", "Key": {"pk": {"S": "tx1"}}}},
{"Get": {"TableName": "t_tx", "Key": {"pk": {"S": "tx2"}}}},
]
)
assert len(resp["Responses"]) == 2
assert resp["Responses"][0]["Item"]["pk"]["S"] == "tx1"
assert resp["Responses"][1]["Item"]["pk"]["S"] == "tx2"
def test_dynamodb_gsi_query(ddb):
ddb.create_table(
TableName="t_gsi_q",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "gsi_pk", "AttributeType": "S"},
],
GlobalSecondaryIndexes=[
{
"IndexName": "gsi_index",
"KeySchema": [{"AttributeName": "gsi_pk", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}
],
BillingMode="PAY_PER_REQUEST",
)
for i in range(4):
ddb.put_item(
TableName="t_gsi_q",
Item={
"pk": {"S": f"main_{i}"},
"gsi_pk": {"S": "shared_gsi"},
"data": {"N": str(i)},
},
)
ddb.put_item(
TableName="t_gsi_q",
Item={
"pk": {"S": "main_other"},
"gsi_pk": {"S": "other_gsi"},
"data": {"N": "99"},
},
)
resp = ddb.query(
TableName="t_gsi_q",
IndexName="gsi_index",
KeyConditionExpression="gsi_pk = :gpk",
ExpressionAttributeValues={":gpk": {"S": "shared_gsi"}},
)
assert resp["Count"] == 4
for item in resp["Items"]:
assert item["gsi_pk"]["S"] == "shared_gsi"
def test_dynamodb_ttl(ddb):
import uuid as _uuid
table = f"intg-ttl-{_uuid.uuid4().hex[:8]}"
ddb.create_table(
TableName=table,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
# Initially disabled
resp = ddb.describe_time_to_live(TableName=table)
assert resp["TimeToLiveDescription"]["TimeToLiveStatus"] == "DISABLED"
# Enable TTL
ddb.update_time_to_live(
TableName=table,
TimeToLiveSpecification={"Enabled": True, "AttributeName": "expires_at"},
)
resp = ddb.describe_time_to_live(TableName=table)
assert resp["TimeToLiveDescription"]["TimeToLiveStatus"] == "ENABLED"
assert resp["TimeToLiveDescription"]["AttributeName"] == "expires_at"
# Disable TTL
ddb.update_time_to_live(
TableName=table,
TimeToLiveSpecification={"Enabled": False, "AttributeName": "expires_at"},
)
resp = ddb.describe_time_to_live(TableName=table)
assert resp["TimeToLiveDescription"]["TimeToLiveStatus"] == "DISABLED"
ddb.delete_table(TableName=table)
def test_dynamodb_update_table(ddb):
import uuid as _uuid
table = f"intg-updtbl-{_uuid.uuid4().hex[:8]}"
ddb.create_table(
TableName=table,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
resp = ddb.update_table(
TableName=table,
BillingMode="PROVISIONED",
ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5},
)
assert resp["TableDescription"]["TableName"] == table
ddb.delete_table(TableName=table)
def test_dynamodb_ttl_expiry(ddb):
"""TTL setting is stored and reported correctly; expiry enforcement is in the background reaper."""
import uuid as _uuid_mod
table = f"intg-ttl-exp-{_uuid_mod.uuid4().hex[:8]}"
ddb.create_table(
TableName=table,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.update_time_to_live(
TableName=table,
TimeToLiveSpecification={"Enabled": True, "AttributeName": "expires_at"},
)
past = int(time.time()) - 10
ddb.put_item(
TableName=table,
Item={
"pk": {"S": "expired-item"},
"expires_at": {"N": str(past)},
"data": {"S": "should-be-gone"},
},
)
# Item present immediately (reaper hasn't run yet)
resp = ddb.get_item(TableName=table, Key={"pk": {"S": "expired-item"}})
assert "Item" in resp
# TTL setting is correctly reflected in DescribeTimeToLive
desc = ddb.describe_time_to_live(TableName=table)["TimeToLiveDescription"]
assert desc["TimeToLiveStatus"] == "ENABLED"
assert desc["AttributeName"] == "expires_at"
def test_dynamodb_query_pagination_hash_only(ddb):
"""Pagination on a hash-only table (no sort key) must return results after the ESK."""
table = "t_hash_paginate"
ddb.create_table(
TableName=table,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
for i in range(5):
ddb.put_item(TableName=table, Item={"pk": {"S": f"item_{i:03d}"}, "v": {"N": str(i)}})
resp1 = ddb.scan(TableName=table, Limit=3)
assert resp1["Count"] == 3
assert "LastEvaluatedKey" in resp1
resp2 = ddb.scan(TableName=table, Limit=3, ExclusiveStartKey=resp1["LastEvaluatedKey"])
assert resp2["Count"] == 2
all_pks = {it["pk"]["S"] for it in resp1["Items"]} | {it["pk"]["S"] for it in resp2["Items"]}
assert len(all_pks) == 5
def test_dynamodb_update_item_updated_new(ddb):
"""UpdateItem ReturnValues=UPDATED_NEW returns only changed attributes."""
ddb.create_table(
TableName="qa-ddb-updated-new",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(
TableName="qa-ddb-updated-new",
Item={"pk": {"S": "k1"}, "a": {"S": "old"}, "b": {"N": "1"}},
)
resp = ddb.update_item(
TableName="qa-ddb-updated-new",
Key={"pk": {"S": "k1"}},
UpdateExpression="SET a = :new",
ExpressionAttributeValues={":new": {"S": "new"}},
ReturnValues="UPDATED_NEW",
)
assert "Attributes" in resp
assert resp["Attributes"]["a"]["S"] == "new"
assert "b" not in resp["Attributes"]
def test_dynamodb_update_item_updated_old(ddb):
"""UpdateItem ReturnValues=UPDATED_OLD returns old values of changed attributes."""
ddb.create_table(
TableName="qa-ddb-updated-old",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName="qa-ddb-updated-old", Item={"pk": {"S": "k1"}, "score": {"N": "10"}})
resp = ddb.update_item(
TableName="qa-ddb-updated-old",
Key={"pk": {"S": "k1"}},
UpdateExpression="SET score = :new",
ExpressionAttributeValues={":new": {"N": "20"}},
ReturnValues="UPDATED_OLD",
)
assert resp["Attributes"]["score"]["N"] == "10"
def test_dynamodb_conditional_put_fails(ddb):
"""PutItem with attribute_not_exists condition fails if item already exists."""
ddb.create_table(
TableName="qa-ddb-cond-put",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName="qa-ddb-cond-put", Item={"pk": {"S": "exists"}})
with pytest.raises(ClientError) as exc:
ddb.put_item(
TableName="qa-ddb-cond-put",
Item={"pk": {"S": "exists"}, "data": {"S": "new"}},
ConditionExpression="attribute_not_exists(pk)",
)
assert exc.value.response["Error"]["Code"] == "ConditionalCheckFailedException"
def test_dynamodb_query_with_filter_expression(ddb):
"""Query with FilterExpression reduces Count but not ScannedCount."""
ddb.create_table(
TableName="qa-ddb-filter",
KeySchema=[
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "N"},
],
BillingMode="PAY_PER_REQUEST",
)
for i in range(5):
ddb.put_item(
TableName="qa-ddb-filter",
Item={
"pk": {"S": "user1"},
"sk": {"N": str(i)},
"active": {"BOOL": i % 2 == 0},
},
)
resp = ddb.query(
TableName="qa-ddb-filter",
KeyConditionExpression="pk = :pk",
FilterExpression="active = :t",
ExpressionAttributeValues={":pk": {"S": "user1"}, ":t": {"BOOL": True}},
)
assert resp["Count"] == 3
assert resp["ScannedCount"] == 5
def test_dynamodb_scan_with_limit_and_pagination(ddb):
"""Scan with Limit returns LastEvaluatedKey and pagination works."""
ddb.create_table(
TableName="qa-ddb-scan-page",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
for i in range(10):
ddb.put_item(TableName="qa-ddb-scan-page", Item={"pk": {"S": f"item{i:02d}"}})
all_items = []
lek = None
while True:
kwargs = {"TableName": "qa-ddb-scan-page", "Limit": 3}
if lek:
kwargs["ExclusiveStartKey"] = lek
resp = ddb.scan(**kwargs)
all_items.extend(resp["Items"])
lek = resp.get("LastEvaluatedKey")
if not lek:
break
assert len(all_items) == 10
def test_dynamodb_transact_write_condition_cancel(ddb):
"""TransactWriteItems cancels entire transaction if one condition fails."""
ddb.create_table(
TableName="qa-ddb-transact",
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName="qa-ddb-transact", Item={"pk": {"S": "existing"}})
with pytest.raises(ClientError) as exc:
ddb.transact_write_items(
TransactItems=[
{
"Put": {
"TableName": "qa-ddb-transact",
"Item": {"pk": {"S": "new-item"}},
}
},
{
"Put": {
"TableName": "qa-ddb-transact",
"Item": {"pk": {"S": "existing"}, "data": {"S": "x"}},
"ConditionExpression": "attribute_not_exists(pk)",
}
},
]
)
assert exc.value.response["Error"]["Code"] == "TransactionCanceledException"
resp = ddb.get_item(TableName="qa-ddb-transact", Key={"pk": {"S": "new-item"}})
assert "Item" not in resp
def test_dynamodb_batch_get_missing_table(ddb):
"""BatchGetItem with non-existent table returns it in UnprocessedKeys."""
resp = ddb.batch_get_item(RequestItems={"qa-ddb-nonexistent-xyz": {"Keys": [{"pk": {"S": "k1"}}]}})
assert "qa-ddb-nonexistent-xyz" in resp["UnprocessedKeys"]
def test_dynamodb_scan_filter_legacy(ddb):
"""Scan with legacy ScanFilter (ComparisonOperator style) returns matching items."""
table = "intg-ddb-scanfilter"
ddb.create_table(
TableName=table,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
for i in range(5):
ddb.put_item(TableName=table, Item={
"pk": {"S": f"sf_{i}"},
"color": {"S": "red" if i % 2 == 0 else "blue"},
})
resp = ddb.scan(
TableName=table,
ScanFilter={
"color": {
"AttributeValueList": [{"S": "red"}],
"ComparisonOperator": "EQ",
}
},
)
assert resp["Count"] == 3
for item in resp["Items"]:
assert item["color"]["S"] == "red"
def test_dynamodb_query_filter_legacy(ddb):
"""Query with legacy QueryFilter (ComparisonOperator style) returns matching items."""
table = "intg-ddb-queryfilter"
ddb.create_table(
TableName=table,
KeySchema=[
{"AttributeName": "pk", "KeyType": "HASH"},
{"AttributeName": "sk", "KeyType": "RANGE"},
],
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "sk", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
)
for i in range(5):
ddb.put_item(TableName=table, Item={
"pk": {"S": "qf_pk"},
"sk": {"S": f"sk_{i}"},
"status": {"S": "active" if i < 3 else "inactive"},
})
resp = ddb.query(
TableName=table,
KeyConditionExpression="pk = :pk",
ExpressionAttributeValues={":pk": {"S": "qf_pk"}},
QueryFilter={
"status": {
"AttributeValueList": [{"S": "active"}],
"ComparisonOperator": "EQ",
}
},
)
assert resp["Count"] == 3
assert resp["ScannedCount"] == 5
for item in resp["Items"]:
assert item["status"]["S"] == "active"
# ---------------------------------------------------------------------------
# Terraform compatibility tests
# ---------------------------------------------------------------------------
def test_dynamodb_pay_per_request_provisioned_throughput(ddb):
"""PAY_PER_REQUEST tables must return ProvisionedThroughput with zero values."""
tname = "tf-compat-ondemand"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
try:
desc = ddb.describe_table(TableName=tname)["Table"]
pt = desc["ProvisionedThroughput"]
assert pt["ReadCapacityUnits"] == 0, \
f"Expected ReadCapacityUnits=0 for PAY_PER_REQUEST, got {pt['ReadCapacityUnits']}"
assert pt["WriteCapacityUnits"] == 0, \
f"Expected WriteCapacityUnits=0 for PAY_PER_REQUEST, got {pt['WriteCapacityUnits']}"
finally:
ddb.delete_table(TableName=tname)
def test_dynamodb_provisioned_keeps_capacity(ddb):
"""PROVISIONED tables must keep their configured throughput values."""
tname = "tf-compat-provisioned"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PROVISIONED",
ProvisionedThroughput={"ReadCapacityUnits": 10, "WriteCapacityUnits": 5},
)
try:
desc = ddb.describe_table(TableName=tname)["Table"]
pt = desc["ProvisionedThroughput"]
assert pt["ReadCapacityUnits"] == 10
assert pt["WriteCapacityUnits"] == 5
finally:
ddb.delete_table(TableName=tname)
def test_dynamodb_pay_per_request_gsi_zero_throughput(ddb):
"""GSIs on PAY_PER_REQUEST tables must have zero ProvisionedThroughput."""
tname = "tf-compat-ondemand-gsi"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[
{"AttributeName": "pk", "AttributeType": "S"},
{"AttributeName": "gsi_key", "AttributeType": "S"},
],
BillingMode="PAY_PER_REQUEST",
GlobalSecondaryIndexes=[
{
"IndexName": "gsi-test",
"KeySchema": [{"AttributeName": "gsi_key", "KeyType": "HASH"}],
"Projection": {"ProjectionType": "ALL"},
}
],
)
try:
desc = ddb.describe_table(TableName=tname)["Table"]
gsis = desc.get("GlobalSecondaryIndexes", [])
assert len(gsis) == 1, f"Expected 1 GSI, got {len(gsis)}"
gsi_pt = gsis[0]["ProvisionedThroughput"]
assert gsi_pt["ReadCapacityUnits"] == 0, \
f"Expected GSI ReadCapacityUnits=0 for PAY_PER_REQUEST, got {gsi_pt['ReadCapacityUnits']}"
assert gsi_pt["WriteCapacityUnits"] == 0, \
f"Expected GSI WriteCapacityUnits=0 for PAY_PER_REQUEST, got {gsi_pt['WriteCapacityUnits']}"
finally:
ddb.delete_table(TableName=tname)
def test_dynamodb_update_to_pay_per_request_zeroes_throughput(ddb):
"""Updating billing mode to PAY_PER_REQUEST should zero out throughput."""
tname = "tf-compat-update-billing"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PROVISIONED",
ProvisionedThroughput={"ReadCapacityUnits": 10, "WriteCapacityUnits": 5},
)
try:
ddb.update_table(TableName=tname, BillingMode="PAY_PER_REQUEST")
desc = ddb.describe_table(TableName=tname)["Table"]
pt = desc["ProvisionedThroughput"]
assert pt["ReadCapacityUnits"] == 0, \
f"Expected ReadCapacityUnits=0 after switching to PAY_PER_REQUEST, got {pt['ReadCapacityUnits']}"
assert pt["WriteCapacityUnits"] == 0, \
f"Expected WriteCapacityUnits=0 after switching to PAY_PER_REQUEST, got {pt['WriteCapacityUnits']}"
finally:
ddb.delete_table(TableName=tname)
# ---------------------------------------------------------------------------
# ExecuteStatement (PartiQL)
# ---------------------------------------------------------------------------
def test_partiql_select_all(ddb):
"""SELECT * FROM table — the IntelliJ use case."""
tname = "partiql-select-all"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName=tname, Item={"pk": {"S": "a"}, "val": {"S": "1"}})
ddb.put_item(TableName=tname, Item={"pk": {"S": "b"}, "val": {"S": "2"}})
resp = ddb.execute_statement(Statement=f'SELECT * FROM "{tname}"')
assert len(resp["Items"]) == 2
pks = sorted(it["pk"]["S"] for it in resp["Items"])
assert pks == ["a", "b"]
def test_partiql_select_with_where(ddb):
"""SELECT with WHERE clause and ? parameter binding."""
tname = "partiql-select-where"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName=tname, Item={"pk": {"S": "x"}, "status": {"S": "active"}})
ddb.put_item(TableName=tname, Item={"pk": {"S": "y"}, "status": {"S": "inactive"}})
resp = ddb.execute_statement(
Statement=f'SELECT * FROM "{tname}" WHERE pk = ?',
Parameters=[{"S": "x"}],
)
assert len(resp["Items"]) == 1
assert resp["Items"][0]["pk"]["S"] == "x"
def test_partiql_select_projection(ddb):
"""SELECT specific columns."""
tname = "partiql-select-proj"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName=tname, Item={"pk": {"S": "k1"}, "a": {"S": "1"}, "b": {"S": "2"}})
resp = ddb.execute_statement(Statement=f'SELECT pk, a FROM "{tname}"')
assert len(resp["Items"]) == 1
item = resp["Items"][0]
assert "pk" in item
assert "a" in item
assert "b" not in item
def test_partiql_insert(ddb):
"""INSERT INTO table VALUE {...}."""
tname = "partiql-insert"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.execute_statement(
Statement=f"INSERT INTO \"{tname}\" VALUE {{'pk': ?, 'data': ?}}",
Parameters=[{"S": "ins1"}, {"S": "hello"}],
)
resp = ddb.get_item(TableName=tname, Key={"pk": {"S": "ins1"}})
assert resp["Item"]["data"]["S"] == "hello"
def test_partiql_insert_duplicate_rejected(ddb):
"""INSERT with duplicate key should fail."""
tname = "partiql-ins-dup"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName=tname, Item={"pk": {"S": "dup"}})
with pytest.raises(ClientError) as exc:
ddb.execute_statement(
Statement=f"INSERT INTO \"{tname}\" VALUE {{'pk': ?}}",
Parameters=[{"S": "dup"}],
)
assert "ConditionalCheckFailed" in exc.value.response["Error"]["Code"]
def test_partiql_update(ddb):
"""UPDATE table SET attr = val WHERE pk = val."""
tname = "partiql-update"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName=tname, Item={"pk": {"S": "u1"}, "status": {"S": "old"}})
ddb.execute_statement(
Statement=f"UPDATE \"{tname}\" SET status = ? WHERE pk = ?",
Parameters=[{"S": "new"}, {"S": "u1"}],
)
resp = ddb.get_item(TableName=tname, Key={"pk": {"S": "u1"}})
assert resp["Item"]["status"]["S"] == "new"
def test_partiql_delete(ddb):
"""DELETE FROM table WHERE pk = val."""
tname = "partiql-delete"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName=tname, Item={"pk": {"S": "d1"}, "val": {"S": "x"}})
ddb.execute_statement(
Statement=f'DELETE FROM "{tname}" WHERE pk = ?',
Parameters=[{"S": "d1"}],
)
resp = ddb.get_item(TableName=tname, Key={"pk": {"S": "d1"}})
assert "Item" not in resp
def test_partiql_nonexistent_table(ddb):
"""ExecuteStatement on a nonexistent table should return ResourceNotFoundException."""
with pytest.raises(ClientError) as exc:
ddb.execute_statement(Statement='SELECT * FROM "no-such-table-partiql"')
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_partiql_select_where_number(ddb):
"""WHERE clause with numeric comparison."""
tname = "partiql-num-where"
try:
ddb.delete_table(TableName=tname)
except ClientError:
pass
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
)
ddb.put_item(TableName=tname, Item={"pk": {"S": "n1"}, "age": {"N": "25"}})
ddb.put_item(TableName=tname, Item={"pk": {"S": "n2"}, "age": {"N": "30"}})
resp = ddb.execute_statement(
Statement=f'SELECT * FROM "{tname}" WHERE age > ?',
Parameters=[{"N": "27"}],
)
assert len(resp["Items"]) == 1
assert resp["Items"][0]["pk"]["S"] == "n2"
def test_dynamodb_stream_arn_stable(ddb):
"""LatestStreamArn should be stable across DescribeTable calls."""
tname = f"stream-stable-{_uuid_mod.uuid4().hex[:8]}"
ddb.create_table(
TableName=tname,
KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}],
AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}],
BillingMode="PAY_PER_REQUEST",
StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"},
)
desc1 = ddb.describe_table(TableName=tname)["Table"]
desc2 = ddb.describe_table(TableName=tname)["Table"]
assert desc1["LatestStreamArn"] == desc2["LatestStreamArn"]
assert desc1["LatestStreamLabel"] == desc2["LatestStreamLabel"]
ddb.delete_table(TableName=tname)