Spaces:
Running
Running
| import io | |
| import json | |
| import os | |
| import time | |
| import zipfile | |
| from urllib.parse import urlparse | |
| import pytest | |
| from botocore.exceptions import ClientError | |
| import uuid as _uuid_mod | |
| def _make_zip(code: str) -> bytes: | |
| buf = io.BytesIO() | |
| with zipfile.ZipFile(buf, "w") as zf: | |
| zf.writestr("index.py", code) | |
| return buf.getvalue() | |
| _LAMBDA_ROLE = "arn:aws:iam::000000000000:role/lambda-role" | |
| def test_dynamodb_basic(ddb): | |
| try: | |
| ddb.delete_table(TableName="TestTable1") | |
| except Exception: | |
| pass | |
| ddb.create_table( | |
| TableName="TestTable1", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName="TestTable1", Item={"pk": {"S": "key1"}, "data": {"S": "value1"}}) | |
| resp = ddb.get_item(TableName="TestTable1", Key={"pk": {"S": "key1"}}) | |
| assert resp["Item"]["data"]["S"] == "value1" | |
| ddb.delete_item(TableName="TestTable1", Key={"pk": {"S": "key1"}}) | |
| resp = ddb.get_item(TableName="TestTable1", Key={"pk": {"S": "key1"}}) | |
| assert "Item" not in resp | |
| def test_dynamodb_scan(ddb): | |
| try: | |
| ddb.delete_table(TableName="ScanTable") | |
| except Exception: | |
| pass | |
| ddb.create_table( | |
| TableName="ScanTable", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| for i in range(10): | |
| ddb.put_item(TableName="ScanTable", Item={"pk": {"S": f"key{i}"}, "val": {"N": str(i)}}) | |
| resp = ddb.scan(TableName="ScanTable") | |
| assert resp["Count"] == 10 | |
| def test_dynamodb_batch(ddb): | |
| try: | |
| ddb.delete_table(TableName="BatchTable") | |
| except Exception: | |
| pass | |
| ddb.create_table( | |
| TableName="BatchTable", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.batch_write_item( | |
| RequestItems={ | |
| "BatchTable": [{"PutRequest": {"Item": {"pk": {"S": f"bk{i}"}, "v": {"S": f"bv{i}"}}}} for i in range(5)] | |
| } | |
| ) | |
| resp = ddb.scan(TableName="BatchTable") | |
| assert resp["Count"] == 5 | |
| def test_dynamodb_describe_continuous_backups(ddb): | |
| ddb.create_table( | |
| TableName="ddb-pitr-tbl", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| resp = ddb.describe_continuous_backups(TableName="ddb-pitr-tbl") | |
| assert resp["ContinuousBackupsDescription"]["ContinuousBackupsStatus"] == "ENABLED" | |
| pitr = resp["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"] | |
| assert pitr["PointInTimeRecoveryStatus"] == "DISABLED" | |
| def test_dynamodb_update_continuous_backups(ddb): | |
| ddb.update_continuous_backups( | |
| TableName="ddb-pitr-tbl", | |
| PointInTimeRecoverySpecification={"PointInTimeRecoveryEnabled": True}, | |
| ) | |
| resp = ddb.describe_continuous_backups(TableName="ddb-pitr-tbl") | |
| pitr = resp["ContinuousBackupsDescription"]["PointInTimeRecoveryDescription"] | |
| assert pitr["PointInTimeRecoveryStatus"] == "ENABLED" | |
| def test_dynamodb_describe_endpoints(ddb): | |
| resp = ddb.describe_endpoints() | |
| assert len(resp["Endpoints"]) > 0 | |
| assert "Address" in resp["Endpoints"][0] | |
| def test_dynamodb_batch_write_consumed_capacity(ddb): | |
| ddb.create_table( | |
| TableName="batch-cap-regression", | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| resp = ddb.batch_write_item( | |
| RequestItems={ | |
| "batch-cap-regression": [ | |
| {"PutRequest": {"Item": {"pk": {"S": "k1"}}}}, | |
| ] | |
| }, | |
| ReturnConsumedCapacity="TOTAL", | |
| ) | |
| assert "ConsumedCapacity" in resp, "ConsumedCapacity must be present when ReturnConsumedCapacity=TOTAL" | |
| assert isinstance(resp["ConsumedCapacity"], list), "ConsumedCapacity must be a list for BatchWriteItem" | |
| assert resp["ConsumedCapacity"][0]["TableName"] == "batch-cap-regression" | |
| assert resp["ConsumedCapacity"][0]["CapacityUnits"] == 1.0 | |
| ddb.delete_table(TableName="batch-cap-regression") | |
| def test_dynamodb_put_item_gsi_capacity(ddb): | |
| """PutItem on a table with 1 GSI must return CapacityUnits=2.0 (table + GSI).""" | |
| ddb.create_table( | |
| TableName="gsi-cap-put", | |
| AttributeDefinitions=[ | |
| {"AttributeName": "pk", "AttributeType": "S"}, | |
| {"AttributeName": "sk", "AttributeType": "S"}, | |
| {"AttributeName": "last_name", "AttributeType": "S"}, | |
| ], | |
| KeySchema=[ | |
| {"AttributeName": "pk", "KeyType": "HASH"}, | |
| {"AttributeName": "sk", "KeyType": "RANGE"}, | |
| ], | |
| GlobalSecondaryIndexes=[ | |
| { | |
| "IndexName": "last_name-index", | |
| "KeySchema": [{"AttributeName": "last_name", "KeyType": "HASH"}], | |
| "Projection": {"ProjectionType": "ALL"}, | |
| } | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| resp = ddb.put_item( | |
| TableName="gsi-cap-put", | |
| Item={"pk": {"S": "p1"}, "sk": {"S": "s1"}, "last_name": {"S": "Smith"}}, | |
| ReturnConsumedCapacity="TOTAL", | |
| ) | |
| assert resp["ConsumedCapacity"]["CapacityUnits"] == 2.0 | |
| ddb.delete_table(TableName="gsi-cap-put") | |
| def test_dynamodb_batch_write_gsi_capacity(ddb): | |
| """BatchWriteItem with 2 items on a table with 1 GSI must return CapacityUnits=4.0.""" | |
| ddb.create_table( | |
| TableName="gsi-cap-batch", | |
| AttributeDefinitions=[ | |
| {"AttributeName": "pk", "AttributeType": "S"}, | |
| {"AttributeName": "sk", "AttributeType": "S"}, | |
| {"AttributeName": "age", "AttributeType": "N"}, | |
| ], | |
| KeySchema=[ | |
| {"AttributeName": "pk", "KeyType": "HASH"}, | |
| {"AttributeName": "sk", "KeyType": "RANGE"}, | |
| ], | |
| GlobalSecondaryIndexes=[ | |
| { | |
| "IndexName": "age-index", | |
| "KeySchema": [{"AttributeName": "age", "KeyType": "HASH"}], | |
| "Projection": {"ProjectionType": "ALL"}, | |
| } | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| resp = ddb.batch_write_item( | |
| RequestItems={ | |
| "gsi-cap-batch": [ | |
| {"PutRequest": {"Item": {"pk": {"S": "p2"}, "sk": {"S": "s2"}, "age": {"N": "25"}}}}, | |
| {"PutRequest": {"Item": {"pk": {"S": "p3"}, "sk": {"S": "s3"}, "age": {"N": "26"}}}}, | |
| ] | |
| }, | |
| ReturnConsumedCapacity="TOTAL", | |
| ) | |
| assert resp["ConsumedCapacity"][0]["CapacityUnits"] == 4.0 | |
| ddb.delete_table(TableName="gsi-cap-batch") | |
| def test_dynamodb_streams_table_has_stream_arn(ddb): | |
| """Table with StreamSpecification returns LatestStreamArn and operations succeed.""" | |
| table_name = "stream-arn-test" | |
| resp = ddb.create_table( | |
| TableName=table_name, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"}, | |
| ) | |
| desc = ddb.describe_table(TableName=table_name)["Table"] | |
| assert desc.get("LatestStreamArn") or desc.get("StreamSpecification", {}).get("StreamEnabled") | |
| # All write operations should succeed with streams enabled | |
| ddb.put_item(TableName=table_name, Item={"pk": {"S": "k1"}, "val": {"S": "v1"}}) | |
| ddb.update_item( | |
| TableName=table_name, | |
| Key={"pk": {"S": "k1"}}, | |
| UpdateExpression="SET val = :v", | |
| ExpressionAttributeValues={":v": {"S": "v2"}}, | |
| ) | |
| ddb.delete_item(TableName=table_name, Key={"pk": {"S": "k1"}}) | |
| # Verify item is gone | |
| get_resp = ddb.get_item(TableName=table_name, Key={"pk": {"S": "k1"}}) | |
| assert "Item" not in get_resp | |
| def test_dynamodb_tag_untag_resource(ddb): | |
| """Create table, tag it, list tags, untag, verify.""" | |
| table_name = "ddb-tag-test" | |
| try: | |
| ddb.delete_table(TableName=table_name) | |
| except Exception: | |
| pass | |
| resp = ddb.create_table( | |
| TableName=table_name, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| arn = resp["TableDescription"]["TableArn"] | |
| # Tag | |
| ddb.tag_resource(ResourceArn=arn, Tags=[ | |
| {"Key": "env", "Value": "test"}, | |
| {"Key": "team", "Value": "platform"}, | |
| ]) | |
| tags = ddb.list_tags_of_resource(ResourceArn=arn)["Tags"] | |
| tag_keys = {t["Key"] for t in tags} | |
| assert "env" in tag_keys | |
| assert "team" in tag_keys | |
| # Untag | |
| ddb.untag_resource(ResourceArn=arn, TagKeys=["team"]) | |
| tags2 = ddb.list_tags_of_resource(ResourceArn=arn)["Tags"] | |
| tag_keys2 = {t["Key"] for t in tags2} | |
| assert "env" in tag_keys2 | |
| assert "team" not in tag_keys2 | |
| def test_dynamodb_stream_to_lambda(lam, ddb): | |
| """DynamoDB stream records are delivered to Lambda via event source mapping.""" | |
| table_name = "intg-ddbstream-tbl" | |
| fn_name = "intg-ddbstream-fn" | |
| ddb.create_table( | |
| TableName=table_name, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"}, | |
| ) | |
| stream_arn = ddb.describe_table(TableName=table_name)["Table"]["LatestStreamArn"] | |
| assert stream_arn is not None | |
| code = ( | |
| "import json\n" | |
| "def handler(event, context):\n" | |
| " records = event.get('Records', [])\n" | |
| " return {'processed': len(records)}\n" | |
| ) | |
| lam.create_function( | |
| FunctionName=fn_name, | |
| Runtime="python3.11", | |
| Role=_LAMBDA_ROLE, | |
| Handler="index.handler", | |
| Code={"ZipFile": _make_zip(code)}, | |
| ) | |
| esm = lam.create_event_source_mapping( | |
| FunctionName=fn_name, | |
| EventSourceArn=stream_arn, | |
| StartingPosition="TRIM_HORIZON", | |
| BatchSize=10, | |
| ) | |
| assert esm["EventSourceArn"] == stream_arn | |
| assert esm["FunctionArn"].endswith(fn_name) | |
| assert esm["State"] in ("Creating", "Enabled") | |
| # Write items to trigger stream records | |
| ddb.put_item(TableName=table_name, Item={"pk": {"S": "a1"}, "data": {"S": "hello"}}) | |
| ddb.put_item(TableName=table_name, Item={"pk": {"S": "a2"}, "data": {"S": "world"}}) | |
| ddb.delete_item(TableName=table_name, Key={"pk": {"S": "a1"}}) | |
| # Allow background poller to process | |
| time.sleep(3) | |
| # Verify the ESM is still active | |
| esm_resp = lam.get_event_source_mapping(UUID=esm["UUID"]) | |
| assert esm_resp["EventSourceArn"] == stream_arn | |
| # Verify DynamoDB state is correct after stream operations | |
| scan = ddb.scan(TableName=table_name) | |
| pks = {item["pk"]["S"] for item in scan["Items"]} | |
| assert "a2" in pks | |
| assert "a1" not in pks | |
| # Cleanup ESM | |
| lam.delete_event_source_mapping(UUID=esm["UUID"]) | |
| # Migrated from test_ddb.py | |
| def test_dynamodb_create_table(ddb): | |
| resp = ddb.create_table( | |
| TableName="t_hash_only", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| desc = resp["TableDescription"] | |
| assert desc["TableName"] == "t_hash_only" | |
| assert desc["TableStatus"] == "ACTIVE" | |
| assert any(k["KeyType"] == "HASH" for k in desc["KeySchema"]) | |
| def test_dynamodb_create_table_composite(ddb): | |
| resp = ddb.create_table( | |
| TableName="t_composite", | |
| KeySchema=[ | |
| {"AttributeName": "pk", "KeyType": "HASH"}, | |
| {"AttributeName": "sk", "KeyType": "RANGE"}, | |
| ], | |
| AttributeDefinitions=[ | |
| {"AttributeName": "pk", "AttributeType": "S"}, | |
| {"AttributeName": "sk", "AttributeType": "S"}, | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ks = resp["TableDescription"]["KeySchema"] | |
| types = {k["KeyType"] for k in ks} | |
| assert types == {"HASH", "RANGE"} | |
| def test_dynamodb_create_table_duplicate(ddb): | |
| with pytest.raises(ClientError) as exc: | |
| ddb.create_table( | |
| TableName="t_hash_only", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "ResourceInUseException" | |
| def test_dynamodb_delete_table(ddb): | |
| ddb.create_table( | |
| TableName="t_to_delete", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| resp = ddb.delete_table(TableName="t_to_delete") | |
| assert resp["TableDescription"]["TableStatus"] == "DELETING" | |
| tables = ddb.list_tables()["TableNames"] | |
| assert "t_to_delete" not in tables | |
| def test_dynamodb_delete_table_not_found(ddb): | |
| with pytest.raises(ClientError) as exc: | |
| ddb.delete_table(TableName="t_nonexistent_xyz") | |
| assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException" | |
| def test_dynamodb_describe_table(ddb): | |
| ddb.create_table( | |
| TableName="t_describe_gsi", | |
| KeySchema=[ | |
| {"AttributeName": "pk", "KeyType": "HASH"}, | |
| {"AttributeName": "sk", "KeyType": "RANGE"}, | |
| ], | |
| AttributeDefinitions=[ | |
| {"AttributeName": "pk", "AttributeType": "S"}, | |
| {"AttributeName": "sk", "AttributeType": "S"}, | |
| {"AttributeName": "gsi_pk", "AttributeType": "S"}, | |
| ], | |
| GlobalSecondaryIndexes=[ | |
| { | |
| "IndexName": "gsi1", | |
| "KeySchema": [{"AttributeName": "gsi_pk", "KeyType": "HASH"}], | |
| "Projection": {"ProjectionType": "ALL"}, | |
| } | |
| ], | |
| LocalSecondaryIndexes=[ | |
| { | |
| "IndexName": "lsi1", | |
| "KeySchema": [ | |
| {"AttributeName": "pk", "KeyType": "HASH"}, | |
| {"AttributeName": "sk", "KeyType": "RANGE"}, | |
| ], | |
| "Projection": {"ProjectionType": "ALL"}, | |
| } | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| resp = ddb.describe_table(TableName="t_describe_gsi") | |
| table = resp["Table"] | |
| assert table["TableName"] == "t_describe_gsi" | |
| assert len(table["GlobalSecondaryIndexes"]) == 1 | |
| assert table["GlobalSecondaryIndexes"][0]["IndexName"] == "gsi1" | |
| assert len(table["LocalSecondaryIndexes"]) == 1 | |
| assert table["LocalSecondaryIndexes"][0]["IndexName"] == "lsi1" | |
| def test_dynamodb_list_tables(ddb): | |
| for i in range(3): | |
| try: | |
| ddb.create_table( | |
| TableName=f"t_list_{i}", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| except ClientError: | |
| pass | |
| resp = ddb.list_tables(Limit=2) | |
| assert len(resp["TableNames"]) <= 2 | |
| if "LastEvaluatedTableName" in resp: | |
| resp2 = ddb.list_tables(ExclusiveStartTableName=resp["LastEvaluatedTableName"], Limit=100) | |
| assert len(resp2["TableNames"]) >= 1 | |
| def test_dynamodb_put_get_item(ddb): | |
| ddb.put_item( | |
| TableName="t_hash_only", | |
| Item={ | |
| "pk": {"S": "allTypes"}, | |
| "str_attr": {"S": "hello"}, | |
| "num_attr": {"N": "42"}, | |
| "bool_attr": {"BOOL": True}, | |
| "null_attr": {"NULL": True}, | |
| "list_attr": {"L": [{"S": "a"}, {"N": "1"}]}, | |
| "map_attr": {"M": {"nested": {"S": "value"}}}, | |
| "ss_attr": {"SS": ["x", "y"]}, | |
| "ns_attr": {"NS": ["1", "2", "3"]}, | |
| }, | |
| ) | |
| resp = ddb.get_item(TableName="t_hash_only", Key={"pk": {"S": "allTypes"}}) | |
| item = resp["Item"] | |
| assert item["str_attr"]["S"] == "hello" | |
| assert item["num_attr"]["N"] == "42" | |
| assert item["bool_attr"]["BOOL"] is True | |
| assert item["null_attr"]["NULL"] is True | |
| assert len(item["list_attr"]["L"]) == 2 | |
| assert item["map_attr"]["M"]["nested"]["S"] == "value" | |
| assert set(item["ss_attr"]["SS"]) == {"x", "y"} | |
| assert set(item["ns_attr"]["NS"]) == {"1", "2", "3"} | |
| def test_dynamodb_put_item_condition(ddb): | |
| ddb.put_item( | |
| TableName="t_hash_only", | |
| Item={"pk": {"S": "cond_new"}, "val": {"S": "first"}}, | |
| ConditionExpression="attribute_not_exists(pk)", | |
| ) | |
| resp = ddb.get_item(TableName="t_hash_only", Key={"pk": {"S": "cond_new"}}) | |
| assert resp["Item"]["val"]["S"] == "first" | |
| def test_dynamodb_put_item_condition_fail(ddb): | |
| ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "cond_fail"}, "val": {"S": "v1"}}) | |
| with pytest.raises(ClientError) as exc: | |
| ddb.put_item( | |
| TableName="t_hash_only", | |
| Item={"pk": {"S": "cond_fail"}, "val": {"S": "v2"}}, | |
| ConditionExpression="attribute_not_exists(pk)", | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "ConditionalCheckFailedException" | |
| def test_dynamodb_delete_item(ddb): | |
| ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "to_del"}, "v": {"S": "gone"}}) | |
| ddb.delete_item(TableName="t_hash_only", Key={"pk": {"S": "to_del"}}) | |
| resp = ddb.get_item(TableName="t_hash_only", Key={"pk": {"S": "to_del"}}) | |
| assert "Item" not in resp | |
| def test_dynamodb_delete_item_return_old(ddb): | |
| ddb.put_item( | |
| TableName="t_hash_only", | |
| Item={"pk": {"S": "ret_old"}, "data": {"S": "precious"}}, | |
| ) | |
| resp = ddb.delete_item( | |
| TableName="t_hash_only", | |
| Key={"pk": {"S": "ret_old"}}, | |
| ReturnValues="ALL_OLD", | |
| ) | |
| assert resp["Attributes"]["data"]["S"] == "precious" | |
| def test_dynamodb_update_item_set(ddb): | |
| ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "upd_set"}, "count": {"N": "0"}}) | |
| resp = ddb.update_item( | |
| TableName="t_hash_only", | |
| Key={"pk": {"S": "upd_set"}}, | |
| UpdateExpression="SET #c = :val", | |
| ExpressionAttributeNames={"#c": "count"}, | |
| ExpressionAttributeValues={":val": {"N": "10"}}, | |
| ReturnValues="ALL_NEW", | |
| ) | |
| assert resp["Attributes"]["count"]["N"] == "10" | |
| def test_dynamodb_update_item_remove(ddb): | |
| ddb.put_item( | |
| TableName="t_hash_only", | |
| Item={"pk": {"S": "upd_rem"}, "extra": {"S": "bye"}, "keep": {"S": "stay"}}, | |
| ) | |
| resp = ddb.update_item( | |
| TableName="t_hash_only", | |
| Key={"pk": {"S": "upd_rem"}}, | |
| UpdateExpression="REMOVE extra", | |
| ReturnValues="ALL_NEW", | |
| ) | |
| assert "extra" not in resp["Attributes"] | |
| assert resp["Attributes"]["keep"]["S"] == "stay" | |
| def test_dynamodb_update_item_condition_on_missing_item_fails(ddb): | |
| """Missing item + attribute_exists(...) condition must fail with ConditionalCheckFailedException.""" | |
| try: | |
| ddb.delete_table(TableName="t_update_cond_missing") | |
| except Exception: | |
| pass | |
| ddb.create_table( | |
| TableName="t_update_cond_missing", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| missing_key = {"pk": {"S": "missing-update-item"}} | |
| with pytest.raises(ClientError) as exc: | |
| ddb.update_item( | |
| TableName="t_update_cond_missing", | |
| Key=missing_key, | |
| UpdateExpression="SET v = :v", | |
| ExpressionAttributeValues={":v": {"S": "x"}}, | |
| ConditionExpression="attribute_exists(pk)", | |
| ReturnValues="ALL_NEW", | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "ConditionalCheckFailedException" | |
| def test_dynamodb_get_item_missing_sort_key_fails_validation(ddb): | |
| try: | |
| ddb.delete_table(TableName="t_get_missing_sk") | |
| except Exception: | |
| pass | |
| ddb.create_table( | |
| TableName="t_get_missing_sk", | |
| KeySchema=[ | |
| {"AttributeName": "pk", "KeyType": "HASH"}, | |
| {"AttributeName": "sk", "KeyType": "RANGE"}, | |
| ], | |
| AttributeDefinitions=[ | |
| {"AttributeName": "pk", "AttributeType": "S"}, | |
| {"AttributeName": "sk", "AttributeType": "S"}, | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| with pytest.raises(ClientError) as exc: | |
| ddb.get_item(TableName="t_get_missing_sk", Key={"pk": {"S": "q_pk"}}) | |
| assert exc.value.response["Error"]["Code"] == "ValidationException" | |
| assert exc.value.response["Error"]["Message"] == "The provided key element does not match the schema" | |
| def test_dynamodb_get_item_wrong_key_type_fails_validation(ddb): | |
| try: | |
| ddb.delete_table(TableName="t_get_wrong_type") | |
| except Exception: | |
| pass | |
| ddb.create_table( | |
| TableName="t_get_wrong_type", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName="t_get_wrong_type", Item={"pk": {"S": "typed-key"}}) | |
| with pytest.raises(ClientError) as exc: | |
| ddb.get_item(TableName="t_get_wrong_type", Key={"pk": {"N": "123"}}) | |
| assert exc.value.response["Error"]["Code"] == "ValidationException" | |
| assert exc.value.response["Error"]["Message"] == "The provided key element does not match the schema" | |
| def test_dynamodb_update_item_extra_key_attribute_fails_validation(ddb): | |
| try: | |
| ddb.delete_table(TableName="t_update_extra_key") | |
| except Exception: | |
| pass | |
| ddb.create_table( | |
| TableName="t_update_extra_key", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| with pytest.raises(ClientError) as exc: | |
| ddb.update_item( | |
| TableName="t_update_extra_key", | |
| Key={"pk": {"S": "k1"}, "sk": {"S": "unexpected"}}, | |
| UpdateExpression="SET v = :v", | |
| ExpressionAttributeValues={":v": {"S": "x"}}, | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "ValidationException" | |
| assert exc.value.response["Error"]["Message"] == "The provided key element does not match the schema" | |
| def test_dynamodb_update_item_add(ddb): | |
| ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "upd_add"}, "counter": {"N": "5"}}) | |
| resp = ddb.update_item( | |
| TableName="t_hash_only", | |
| Key={"pk": {"S": "upd_add"}}, | |
| UpdateExpression="ADD counter :inc", | |
| ExpressionAttributeValues={":inc": {"N": "3"}}, | |
| ReturnValues="ALL_NEW", | |
| ) | |
| assert resp["Attributes"]["counter"]["N"] == "8" | |
| def test_dynamodb_update_item_all_old(ddb): | |
| ddb.put_item(TableName="t_hash_only", Item={"pk": {"S": "upd_old"}, "v": {"N": "1"}}) | |
| resp = ddb.update_item( | |
| TableName="t_hash_only", | |
| Key={"pk": {"S": "upd_old"}}, | |
| UpdateExpression="SET v = :new", | |
| ExpressionAttributeValues={":new": {"N": "99"}}, | |
| ReturnValues="ALL_OLD", | |
| ) | |
| assert resp["Attributes"]["v"]["N"] == "1" | |
| def test_dynamodb_query_pk_only(ddb): | |
| for i in range(3): | |
| ddb.put_item( | |
| TableName="t_composite", | |
| Item={"pk": {"S": "q_pk"}, "sk": {"S": f"sk_{i}"}, "n": {"N": str(i)}}, | |
| ) | |
| resp = ddb.query( | |
| TableName="t_composite", | |
| KeyConditionExpression="pk = :pk", | |
| ExpressionAttributeValues={":pk": {"S": "q_pk"}}, | |
| ) | |
| assert resp["Count"] == 3 | |
| def test_dynamodb_query_pk_sk(ddb): | |
| for i in range(5): | |
| ddb.put_item( | |
| TableName="t_composite", | |
| Item={"pk": {"S": "q_sk"}, "sk": {"S": f"item_{i:03d}"}}, | |
| ) | |
| resp_bw = ddb.query( | |
| TableName="t_composite", | |
| KeyConditionExpression="pk = :pk AND begins_with(sk, :prefix)", | |
| ExpressionAttributeValues={ | |
| ":pk": {"S": "q_sk"}, | |
| ":prefix": {"S": "item_00"}, | |
| }, | |
| ) | |
| assert resp_bw["Count"] >= 1 | |
| for item in resp_bw["Items"]: | |
| assert item["sk"]["S"].startswith("item_00") | |
| resp_bt = ddb.query( | |
| TableName="t_composite", | |
| KeyConditionExpression="pk = :pk AND sk BETWEEN :lo AND :hi", | |
| ExpressionAttributeValues={ | |
| ":pk": {"S": "q_sk"}, | |
| ":lo": {"S": "item_001"}, | |
| ":hi": {"S": "item_003"}, | |
| }, | |
| ) | |
| assert resp_bt["Count"] >= 1 | |
| for item in resp_bt["Items"]: | |
| assert "item_001" <= item["sk"]["S"] <= "item_003" | |
| def test_dynamodb_query_filter(ddb): | |
| for i in range(5): | |
| ddb.put_item( | |
| TableName="t_composite", | |
| Item={"pk": {"S": "q_filt"}, "sk": {"S": f"f_{i}"}, "val": {"N": str(i)}}, | |
| ) | |
| resp = ddb.query( | |
| TableName="t_composite", | |
| KeyConditionExpression="pk = :pk", | |
| FilterExpression="val > :min", | |
| ExpressionAttributeValues={":pk": {"S": "q_filt"}, ":min": {"N": "2"}}, | |
| ) | |
| assert resp["Count"] == 2 | |
| assert resp["ScannedCount"] == 5 | |
| def test_dynamodb_query_pagination(ddb): | |
| for i in range(6): | |
| ddb.put_item( | |
| TableName="t_composite", | |
| Item={"pk": {"S": "q_page"}, "sk": {"S": f"p_{i:03d}"}}, | |
| ) | |
| resp1 = ddb.query( | |
| TableName="t_composite", | |
| KeyConditionExpression="pk = :pk", | |
| ExpressionAttributeValues={":pk": {"S": "q_page"}}, | |
| Limit=3, | |
| ) | |
| assert resp1["Count"] == 3 | |
| assert "LastEvaluatedKey" in resp1 | |
| resp2 = ddb.query( | |
| TableName="t_composite", | |
| KeyConditionExpression="pk = :pk", | |
| ExpressionAttributeValues={":pk": {"S": "q_page"}}, | |
| ExclusiveStartKey=resp1["LastEvaluatedKey"], | |
| Limit=3, | |
| ) | |
| assert resp2["Count"] == 3 | |
| page1_sks = {it["sk"]["S"] for it in resp1["Items"]} | |
| page2_sks = {it["sk"]["S"] for it in resp2["Items"]} | |
| assert page1_sks.isdisjoint(page2_sks) | |
| def test_dynamodb_scan_from_ddb(ddb): | |
| ddb.create_table( | |
| TableName="t_scan", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| for i in range(8): | |
| ddb.put_item(TableName="t_scan", Item={"pk": {"S": f"sc_{i}"}, "n": {"N": str(i)}}) | |
| resp = ddb.scan(TableName="t_scan") | |
| assert resp["Count"] == 8 | |
| assert len(resp["Items"]) == 8 | |
| def test_dynamodb_scan_filter(ddb): | |
| resp = ddb.scan( | |
| TableName="t_scan", | |
| FilterExpression="n >= :min", | |
| ExpressionAttributeValues={":min": {"N": "5"}}, | |
| ) | |
| assert resp["Count"] == 3 | |
| for item in resp["Items"]: | |
| assert int(item["n"]["N"]) >= 5 | |
| def test_dynamodb_batch_write(ddb): | |
| ddb.create_table( | |
| TableName="t_bw", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.batch_write_item( | |
| RequestItems={ | |
| "t_bw": [{"PutRequest": {"Item": {"pk": {"S": f"bw_{i}"}, "data": {"S": f"d{i}"}}}} for i in range(10)] | |
| } | |
| ) | |
| resp = ddb.scan(TableName="t_bw") | |
| assert resp["Count"] == 10 | |
| def test_dynamodb_batch_get(ddb): | |
| resp = ddb.batch_get_item( | |
| RequestItems={ | |
| "t_bw": { | |
| "Keys": [{"pk": {"S": f"bw_{i}"}} for i in range(5)], | |
| } | |
| } | |
| ) | |
| assert len(resp["Responses"]["t_bw"]) == 5 | |
| def test_dynamodb_transact_write(ddb): | |
| ddb.create_table( | |
| TableName="t_tx", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.transact_write_items( | |
| TransactItems=[ | |
| { | |
| "Put": { | |
| "TableName": "t_tx", | |
| "Item": {"pk": {"S": "tx1"}, "v": {"S": "a"}}, | |
| } | |
| }, | |
| { | |
| "Put": { | |
| "TableName": "t_tx", | |
| "Item": {"pk": {"S": "tx2"}, "v": {"S": "b"}}, | |
| } | |
| }, | |
| { | |
| "Put": { | |
| "TableName": "t_tx", | |
| "Item": {"pk": {"S": "tx3"}, "v": {"S": "c"}}, | |
| } | |
| }, | |
| ] | |
| ) | |
| resp = ddb.scan(TableName="t_tx") | |
| assert resp["Count"] == 3 | |
| ddb.transact_write_items( | |
| TransactItems=[ | |
| {"Delete": {"TableName": "t_tx", "Key": {"pk": {"S": "tx3"}}}}, | |
| { | |
| "Update": { | |
| "TableName": "t_tx", | |
| "Key": {"pk": {"S": "tx1"}}, | |
| "UpdateExpression": "SET v = :new", | |
| "ExpressionAttributeValues": {":new": {"S": "updated"}}, | |
| }, | |
| }, | |
| ] | |
| ) | |
| item = ddb.get_item(TableName="t_tx", Key={"pk": {"S": "tx1"}})["Item"] | |
| assert item["v"]["S"] == "updated" | |
| gone = ddb.get_item(TableName="t_tx", Key={"pk": {"S": "tx3"}}) | |
| assert "Item" not in gone | |
| def test_dynamodb_transact_get(ddb): | |
| resp = ddb.transact_get_items( | |
| TransactItems=[ | |
| {"Get": {"TableName": "t_tx", "Key": {"pk": {"S": "tx1"}}}}, | |
| {"Get": {"TableName": "t_tx", "Key": {"pk": {"S": "tx2"}}}}, | |
| ] | |
| ) | |
| assert len(resp["Responses"]) == 2 | |
| assert resp["Responses"][0]["Item"]["pk"]["S"] == "tx1" | |
| assert resp["Responses"][1]["Item"]["pk"]["S"] == "tx2" | |
| def test_dynamodb_gsi_query(ddb): | |
| ddb.create_table( | |
| TableName="t_gsi_q", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[ | |
| {"AttributeName": "pk", "AttributeType": "S"}, | |
| {"AttributeName": "gsi_pk", "AttributeType": "S"}, | |
| ], | |
| GlobalSecondaryIndexes=[ | |
| { | |
| "IndexName": "gsi_index", | |
| "KeySchema": [{"AttributeName": "gsi_pk", "KeyType": "HASH"}], | |
| "Projection": {"ProjectionType": "ALL"}, | |
| } | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| for i in range(4): | |
| ddb.put_item( | |
| TableName="t_gsi_q", | |
| Item={ | |
| "pk": {"S": f"main_{i}"}, | |
| "gsi_pk": {"S": "shared_gsi"}, | |
| "data": {"N": str(i)}, | |
| }, | |
| ) | |
| ddb.put_item( | |
| TableName="t_gsi_q", | |
| Item={ | |
| "pk": {"S": "main_other"}, | |
| "gsi_pk": {"S": "other_gsi"}, | |
| "data": {"N": "99"}, | |
| }, | |
| ) | |
| resp = ddb.query( | |
| TableName="t_gsi_q", | |
| IndexName="gsi_index", | |
| KeyConditionExpression="gsi_pk = :gpk", | |
| ExpressionAttributeValues={":gpk": {"S": "shared_gsi"}}, | |
| ) | |
| assert resp["Count"] == 4 | |
| for item in resp["Items"]: | |
| assert item["gsi_pk"]["S"] == "shared_gsi" | |
| def test_dynamodb_ttl(ddb): | |
| import uuid as _uuid | |
| table = f"intg-ttl-{_uuid.uuid4().hex[:8]}" | |
| ddb.create_table( | |
| TableName=table, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| # Initially disabled | |
| resp = ddb.describe_time_to_live(TableName=table) | |
| assert resp["TimeToLiveDescription"]["TimeToLiveStatus"] == "DISABLED" | |
| # Enable TTL | |
| ddb.update_time_to_live( | |
| TableName=table, | |
| TimeToLiveSpecification={"Enabled": True, "AttributeName": "expires_at"}, | |
| ) | |
| resp = ddb.describe_time_to_live(TableName=table) | |
| assert resp["TimeToLiveDescription"]["TimeToLiveStatus"] == "ENABLED" | |
| assert resp["TimeToLiveDescription"]["AttributeName"] == "expires_at" | |
| # Disable TTL | |
| ddb.update_time_to_live( | |
| TableName=table, | |
| TimeToLiveSpecification={"Enabled": False, "AttributeName": "expires_at"}, | |
| ) | |
| resp = ddb.describe_time_to_live(TableName=table) | |
| assert resp["TimeToLiveDescription"]["TimeToLiveStatus"] == "DISABLED" | |
| ddb.delete_table(TableName=table) | |
| def test_dynamodb_update_table(ddb): | |
| import uuid as _uuid | |
| table = f"intg-updtbl-{_uuid.uuid4().hex[:8]}" | |
| ddb.create_table( | |
| TableName=table, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| resp = ddb.update_table( | |
| TableName=table, | |
| BillingMode="PROVISIONED", | |
| ProvisionedThroughput={"ReadCapacityUnits": 5, "WriteCapacityUnits": 5}, | |
| ) | |
| assert resp["TableDescription"]["TableName"] == table | |
| ddb.delete_table(TableName=table) | |
| def test_dynamodb_ttl_expiry(ddb): | |
| """TTL setting is stored and reported correctly; expiry enforcement is in the background reaper.""" | |
| import uuid as _uuid_mod | |
| table = f"intg-ttl-exp-{_uuid_mod.uuid4().hex[:8]}" | |
| ddb.create_table( | |
| TableName=table, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.update_time_to_live( | |
| TableName=table, | |
| TimeToLiveSpecification={"Enabled": True, "AttributeName": "expires_at"}, | |
| ) | |
| past = int(time.time()) - 10 | |
| ddb.put_item( | |
| TableName=table, | |
| Item={ | |
| "pk": {"S": "expired-item"}, | |
| "expires_at": {"N": str(past)}, | |
| "data": {"S": "should-be-gone"}, | |
| }, | |
| ) | |
| # Item present immediately (reaper hasn't run yet) | |
| resp = ddb.get_item(TableName=table, Key={"pk": {"S": "expired-item"}}) | |
| assert "Item" in resp | |
| # TTL setting is correctly reflected in DescribeTimeToLive | |
| desc = ddb.describe_time_to_live(TableName=table)["TimeToLiveDescription"] | |
| assert desc["TimeToLiveStatus"] == "ENABLED" | |
| assert desc["AttributeName"] == "expires_at" | |
| def test_dynamodb_query_pagination_hash_only(ddb): | |
| """Pagination on a hash-only table (no sort key) must return results after the ESK.""" | |
| table = "t_hash_paginate" | |
| ddb.create_table( | |
| TableName=table, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| for i in range(5): | |
| ddb.put_item(TableName=table, Item={"pk": {"S": f"item_{i:03d}"}, "v": {"N": str(i)}}) | |
| resp1 = ddb.scan(TableName=table, Limit=3) | |
| assert resp1["Count"] == 3 | |
| assert "LastEvaluatedKey" in resp1 | |
| resp2 = ddb.scan(TableName=table, Limit=3, ExclusiveStartKey=resp1["LastEvaluatedKey"]) | |
| assert resp2["Count"] == 2 | |
| all_pks = {it["pk"]["S"] for it in resp1["Items"]} | {it["pk"]["S"] for it in resp2["Items"]} | |
| assert len(all_pks) == 5 | |
| def test_dynamodb_update_item_updated_new(ddb): | |
| """UpdateItem ReturnValues=UPDATED_NEW returns only changed attributes.""" | |
| ddb.create_table( | |
| TableName="qa-ddb-updated-new", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item( | |
| TableName="qa-ddb-updated-new", | |
| Item={"pk": {"S": "k1"}, "a": {"S": "old"}, "b": {"N": "1"}}, | |
| ) | |
| resp = ddb.update_item( | |
| TableName="qa-ddb-updated-new", | |
| Key={"pk": {"S": "k1"}}, | |
| UpdateExpression="SET a = :new", | |
| ExpressionAttributeValues={":new": {"S": "new"}}, | |
| ReturnValues="UPDATED_NEW", | |
| ) | |
| assert "Attributes" in resp | |
| assert resp["Attributes"]["a"]["S"] == "new" | |
| assert "b" not in resp["Attributes"] | |
| def test_dynamodb_update_item_updated_old(ddb): | |
| """UpdateItem ReturnValues=UPDATED_OLD returns old values of changed attributes.""" | |
| ddb.create_table( | |
| TableName="qa-ddb-updated-old", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName="qa-ddb-updated-old", Item={"pk": {"S": "k1"}, "score": {"N": "10"}}) | |
| resp = ddb.update_item( | |
| TableName="qa-ddb-updated-old", | |
| Key={"pk": {"S": "k1"}}, | |
| UpdateExpression="SET score = :new", | |
| ExpressionAttributeValues={":new": {"N": "20"}}, | |
| ReturnValues="UPDATED_OLD", | |
| ) | |
| assert resp["Attributes"]["score"]["N"] == "10" | |
| def test_dynamodb_conditional_put_fails(ddb): | |
| """PutItem with attribute_not_exists condition fails if item already exists.""" | |
| ddb.create_table( | |
| TableName="qa-ddb-cond-put", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName="qa-ddb-cond-put", Item={"pk": {"S": "exists"}}) | |
| with pytest.raises(ClientError) as exc: | |
| ddb.put_item( | |
| TableName="qa-ddb-cond-put", | |
| Item={"pk": {"S": "exists"}, "data": {"S": "new"}}, | |
| ConditionExpression="attribute_not_exists(pk)", | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "ConditionalCheckFailedException" | |
| def test_dynamodb_query_with_filter_expression(ddb): | |
| """Query with FilterExpression reduces Count but not ScannedCount.""" | |
| ddb.create_table( | |
| TableName="qa-ddb-filter", | |
| KeySchema=[ | |
| {"AttributeName": "pk", "KeyType": "HASH"}, | |
| {"AttributeName": "sk", "KeyType": "RANGE"}, | |
| ], | |
| AttributeDefinitions=[ | |
| {"AttributeName": "pk", "AttributeType": "S"}, | |
| {"AttributeName": "sk", "AttributeType": "N"}, | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| for i in range(5): | |
| ddb.put_item( | |
| TableName="qa-ddb-filter", | |
| Item={ | |
| "pk": {"S": "user1"}, | |
| "sk": {"N": str(i)}, | |
| "active": {"BOOL": i % 2 == 0}, | |
| }, | |
| ) | |
| resp = ddb.query( | |
| TableName="qa-ddb-filter", | |
| KeyConditionExpression="pk = :pk", | |
| FilterExpression="active = :t", | |
| ExpressionAttributeValues={":pk": {"S": "user1"}, ":t": {"BOOL": True}}, | |
| ) | |
| assert resp["Count"] == 3 | |
| assert resp["ScannedCount"] == 5 | |
| def test_dynamodb_scan_with_limit_and_pagination(ddb): | |
| """Scan with Limit returns LastEvaluatedKey and pagination works.""" | |
| ddb.create_table( | |
| TableName="qa-ddb-scan-page", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| for i in range(10): | |
| ddb.put_item(TableName="qa-ddb-scan-page", Item={"pk": {"S": f"item{i:02d}"}}) | |
| all_items = [] | |
| lek = None | |
| while True: | |
| kwargs = {"TableName": "qa-ddb-scan-page", "Limit": 3} | |
| if lek: | |
| kwargs["ExclusiveStartKey"] = lek | |
| resp = ddb.scan(**kwargs) | |
| all_items.extend(resp["Items"]) | |
| lek = resp.get("LastEvaluatedKey") | |
| if not lek: | |
| break | |
| assert len(all_items) == 10 | |
| def test_dynamodb_transact_write_condition_cancel(ddb): | |
| """TransactWriteItems cancels entire transaction if one condition fails.""" | |
| ddb.create_table( | |
| TableName="qa-ddb-transact", | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName="qa-ddb-transact", Item={"pk": {"S": "existing"}}) | |
| with pytest.raises(ClientError) as exc: | |
| ddb.transact_write_items( | |
| TransactItems=[ | |
| { | |
| "Put": { | |
| "TableName": "qa-ddb-transact", | |
| "Item": {"pk": {"S": "new-item"}}, | |
| } | |
| }, | |
| { | |
| "Put": { | |
| "TableName": "qa-ddb-transact", | |
| "Item": {"pk": {"S": "existing"}, "data": {"S": "x"}}, | |
| "ConditionExpression": "attribute_not_exists(pk)", | |
| } | |
| }, | |
| ] | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "TransactionCanceledException" | |
| resp = ddb.get_item(TableName="qa-ddb-transact", Key={"pk": {"S": "new-item"}}) | |
| assert "Item" not in resp | |
| def test_dynamodb_batch_get_missing_table(ddb): | |
| """BatchGetItem with non-existent table returns it in UnprocessedKeys.""" | |
| resp = ddb.batch_get_item(RequestItems={"qa-ddb-nonexistent-xyz": {"Keys": [{"pk": {"S": "k1"}}]}}) | |
| assert "qa-ddb-nonexistent-xyz" in resp["UnprocessedKeys"] | |
| def test_dynamodb_scan_filter_legacy(ddb): | |
| """Scan with legacy ScanFilter (ComparisonOperator style) returns matching items.""" | |
| table = "intg-ddb-scanfilter" | |
| ddb.create_table( | |
| TableName=table, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| for i in range(5): | |
| ddb.put_item(TableName=table, Item={ | |
| "pk": {"S": f"sf_{i}"}, | |
| "color": {"S": "red" if i % 2 == 0 else "blue"}, | |
| }) | |
| resp = ddb.scan( | |
| TableName=table, | |
| ScanFilter={ | |
| "color": { | |
| "AttributeValueList": [{"S": "red"}], | |
| "ComparisonOperator": "EQ", | |
| } | |
| }, | |
| ) | |
| assert resp["Count"] == 3 | |
| for item in resp["Items"]: | |
| assert item["color"]["S"] == "red" | |
| def test_dynamodb_query_filter_legacy(ddb): | |
| """Query with legacy QueryFilter (ComparisonOperator style) returns matching items.""" | |
| table = "intg-ddb-queryfilter" | |
| ddb.create_table( | |
| TableName=table, | |
| KeySchema=[ | |
| {"AttributeName": "pk", "KeyType": "HASH"}, | |
| {"AttributeName": "sk", "KeyType": "RANGE"}, | |
| ], | |
| AttributeDefinitions=[ | |
| {"AttributeName": "pk", "AttributeType": "S"}, | |
| {"AttributeName": "sk", "AttributeType": "S"}, | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| for i in range(5): | |
| ddb.put_item(TableName=table, Item={ | |
| "pk": {"S": "qf_pk"}, | |
| "sk": {"S": f"sk_{i}"}, | |
| "status": {"S": "active" if i < 3 else "inactive"}, | |
| }) | |
| resp = ddb.query( | |
| TableName=table, | |
| KeyConditionExpression="pk = :pk", | |
| ExpressionAttributeValues={":pk": {"S": "qf_pk"}}, | |
| QueryFilter={ | |
| "status": { | |
| "AttributeValueList": [{"S": "active"}], | |
| "ComparisonOperator": "EQ", | |
| } | |
| }, | |
| ) | |
| assert resp["Count"] == 3 | |
| assert resp["ScannedCount"] == 5 | |
| for item in resp["Items"]: | |
| assert item["status"]["S"] == "active" | |
| # --------------------------------------------------------------------------- | |
| # Terraform compatibility tests | |
| # --------------------------------------------------------------------------- | |
| def test_dynamodb_pay_per_request_provisioned_throughput(ddb): | |
| """PAY_PER_REQUEST tables must return ProvisionedThroughput with zero values.""" | |
| tname = "tf-compat-ondemand" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| try: | |
| desc = ddb.describe_table(TableName=tname)["Table"] | |
| pt = desc["ProvisionedThroughput"] | |
| assert pt["ReadCapacityUnits"] == 0, \ | |
| f"Expected ReadCapacityUnits=0 for PAY_PER_REQUEST, got {pt['ReadCapacityUnits']}" | |
| assert pt["WriteCapacityUnits"] == 0, \ | |
| f"Expected WriteCapacityUnits=0 for PAY_PER_REQUEST, got {pt['WriteCapacityUnits']}" | |
| finally: | |
| ddb.delete_table(TableName=tname) | |
| def test_dynamodb_provisioned_keeps_capacity(ddb): | |
| """PROVISIONED tables must keep their configured throughput values.""" | |
| tname = "tf-compat-provisioned" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PROVISIONED", | |
| ProvisionedThroughput={"ReadCapacityUnits": 10, "WriteCapacityUnits": 5}, | |
| ) | |
| try: | |
| desc = ddb.describe_table(TableName=tname)["Table"] | |
| pt = desc["ProvisionedThroughput"] | |
| assert pt["ReadCapacityUnits"] == 10 | |
| assert pt["WriteCapacityUnits"] == 5 | |
| finally: | |
| ddb.delete_table(TableName=tname) | |
| def test_dynamodb_pay_per_request_gsi_zero_throughput(ddb): | |
| """GSIs on PAY_PER_REQUEST tables must have zero ProvisionedThroughput.""" | |
| tname = "tf-compat-ondemand-gsi" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[ | |
| {"AttributeName": "pk", "AttributeType": "S"}, | |
| {"AttributeName": "gsi_key", "AttributeType": "S"}, | |
| ], | |
| BillingMode="PAY_PER_REQUEST", | |
| GlobalSecondaryIndexes=[ | |
| { | |
| "IndexName": "gsi-test", | |
| "KeySchema": [{"AttributeName": "gsi_key", "KeyType": "HASH"}], | |
| "Projection": {"ProjectionType": "ALL"}, | |
| } | |
| ], | |
| ) | |
| try: | |
| desc = ddb.describe_table(TableName=tname)["Table"] | |
| gsis = desc.get("GlobalSecondaryIndexes", []) | |
| assert len(gsis) == 1, f"Expected 1 GSI, got {len(gsis)}" | |
| gsi_pt = gsis[0]["ProvisionedThroughput"] | |
| assert gsi_pt["ReadCapacityUnits"] == 0, \ | |
| f"Expected GSI ReadCapacityUnits=0 for PAY_PER_REQUEST, got {gsi_pt['ReadCapacityUnits']}" | |
| assert gsi_pt["WriteCapacityUnits"] == 0, \ | |
| f"Expected GSI WriteCapacityUnits=0 for PAY_PER_REQUEST, got {gsi_pt['WriteCapacityUnits']}" | |
| finally: | |
| ddb.delete_table(TableName=tname) | |
| def test_dynamodb_update_to_pay_per_request_zeroes_throughput(ddb): | |
| """Updating billing mode to PAY_PER_REQUEST should zero out throughput.""" | |
| tname = "tf-compat-update-billing" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PROVISIONED", | |
| ProvisionedThroughput={"ReadCapacityUnits": 10, "WriteCapacityUnits": 5}, | |
| ) | |
| try: | |
| ddb.update_table(TableName=tname, BillingMode="PAY_PER_REQUEST") | |
| desc = ddb.describe_table(TableName=tname)["Table"] | |
| pt = desc["ProvisionedThroughput"] | |
| assert pt["ReadCapacityUnits"] == 0, \ | |
| f"Expected ReadCapacityUnits=0 after switching to PAY_PER_REQUEST, got {pt['ReadCapacityUnits']}" | |
| assert pt["WriteCapacityUnits"] == 0, \ | |
| f"Expected WriteCapacityUnits=0 after switching to PAY_PER_REQUEST, got {pt['WriteCapacityUnits']}" | |
| finally: | |
| ddb.delete_table(TableName=tname) | |
| # --------------------------------------------------------------------------- | |
| # ExecuteStatement (PartiQL) | |
| # --------------------------------------------------------------------------- | |
| def test_partiql_select_all(ddb): | |
| """SELECT * FROM table — the IntelliJ use case.""" | |
| tname = "partiql-select-all" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "a"}, "val": {"S": "1"}}) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "b"}, "val": {"S": "2"}}) | |
| resp = ddb.execute_statement(Statement=f'SELECT * FROM "{tname}"') | |
| assert len(resp["Items"]) == 2 | |
| pks = sorted(it["pk"]["S"] for it in resp["Items"]) | |
| assert pks == ["a", "b"] | |
| def test_partiql_select_with_where(ddb): | |
| """SELECT with WHERE clause and ? parameter binding.""" | |
| tname = "partiql-select-where" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "x"}, "status": {"S": "active"}}) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "y"}, "status": {"S": "inactive"}}) | |
| resp = ddb.execute_statement( | |
| Statement=f'SELECT * FROM "{tname}" WHERE pk = ?', | |
| Parameters=[{"S": "x"}], | |
| ) | |
| assert len(resp["Items"]) == 1 | |
| assert resp["Items"][0]["pk"]["S"] == "x" | |
| def test_partiql_select_projection(ddb): | |
| """SELECT specific columns.""" | |
| tname = "partiql-select-proj" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "k1"}, "a": {"S": "1"}, "b": {"S": "2"}}) | |
| resp = ddb.execute_statement(Statement=f'SELECT pk, a FROM "{tname}"') | |
| assert len(resp["Items"]) == 1 | |
| item = resp["Items"][0] | |
| assert "pk" in item | |
| assert "a" in item | |
| assert "b" not in item | |
| def test_partiql_insert(ddb): | |
| """INSERT INTO table VALUE {...}.""" | |
| tname = "partiql-insert" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.execute_statement( | |
| Statement=f"INSERT INTO \"{tname}\" VALUE {{'pk': ?, 'data': ?}}", | |
| Parameters=[{"S": "ins1"}, {"S": "hello"}], | |
| ) | |
| resp = ddb.get_item(TableName=tname, Key={"pk": {"S": "ins1"}}) | |
| assert resp["Item"]["data"]["S"] == "hello" | |
| def test_partiql_insert_duplicate_rejected(ddb): | |
| """INSERT with duplicate key should fail.""" | |
| tname = "partiql-ins-dup" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "dup"}}) | |
| with pytest.raises(ClientError) as exc: | |
| ddb.execute_statement( | |
| Statement=f"INSERT INTO \"{tname}\" VALUE {{'pk': ?}}", | |
| Parameters=[{"S": "dup"}], | |
| ) | |
| assert "ConditionalCheckFailed" in exc.value.response["Error"]["Code"] | |
| def test_partiql_update(ddb): | |
| """UPDATE table SET attr = val WHERE pk = val.""" | |
| tname = "partiql-update" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "u1"}, "status": {"S": "old"}}) | |
| ddb.execute_statement( | |
| Statement=f"UPDATE \"{tname}\" SET status = ? WHERE pk = ?", | |
| Parameters=[{"S": "new"}, {"S": "u1"}], | |
| ) | |
| resp = ddb.get_item(TableName=tname, Key={"pk": {"S": "u1"}}) | |
| assert resp["Item"]["status"]["S"] == "new" | |
| def test_partiql_delete(ddb): | |
| """DELETE FROM table WHERE pk = val.""" | |
| tname = "partiql-delete" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "d1"}, "val": {"S": "x"}}) | |
| ddb.execute_statement( | |
| Statement=f'DELETE FROM "{tname}" WHERE pk = ?', | |
| Parameters=[{"S": "d1"}], | |
| ) | |
| resp = ddb.get_item(TableName=tname, Key={"pk": {"S": "d1"}}) | |
| assert "Item" not in resp | |
| def test_partiql_nonexistent_table(ddb): | |
| """ExecuteStatement on a nonexistent table should return ResourceNotFoundException.""" | |
| with pytest.raises(ClientError) as exc: | |
| ddb.execute_statement(Statement='SELECT * FROM "no-such-table-partiql"') | |
| assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException" | |
| def test_partiql_select_where_number(ddb): | |
| """WHERE clause with numeric comparison.""" | |
| tname = "partiql-num-where" | |
| try: | |
| ddb.delete_table(TableName=tname) | |
| except ClientError: | |
| pass | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| ) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "n1"}, "age": {"N": "25"}}) | |
| ddb.put_item(TableName=tname, Item={"pk": {"S": "n2"}, "age": {"N": "30"}}) | |
| resp = ddb.execute_statement( | |
| Statement=f'SELECT * FROM "{tname}" WHERE age > ?', | |
| Parameters=[{"N": "27"}], | |
| ) | |
| assert len(resp["Items"]) == 1 | |
| assert resp["Items"][0]["pk"]["S"] == "n2" | |
| def test_dynamodb_stream_arn_stable(ddb): | |
| """LatestStreamArn should be stable across DescribeTable calls.""" | |
| tname = f"stream-stable-{_uuid_mod.uuid4().hex[:8]}" | |
| ddb.create_table( | |
| TableName=tname, | |
| KeySchema=[{"AttributeName": "pk", "KeyType": "HASH"}], | |
| AttributeDefinitions=[{"AttributeName": "pk", "AttributeType": "S"}], | |
| BillingMode="PAY_PER_REQUEST", | |
| StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_AND_OLD_IMAGES"}, | |
| ) | |
| desc1 = ddb.describe_table(TableName=tname)["Table"] | |
| desc2 = ddb.describe_table(TableName=tname)["Table"] | |
| assert desc1["LatestStreamArn"] == desc2["LatestStreamArn"] | |
| assert desc1["LatestStreamLabel"] == desc2["LatestStreamLabel"] | |
| ddb.delete_table(TableName=tname) | |