Spaces:
Running
Running
| import io | |
| import json | |
| import os | |
| import time | |
| import zipfile | |
| from urllib.parse import urlparse | |
| import pytest | |
| from botocore.exceptions import ClientError | |
| import uuid as _uuid_mod | |
| def test_kms_create_symmetric_key(kms_client): | |
| resp = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", | |
| KeyUsage="ENCRYPT_DECRYPT", | |
| Description="test symmetric key", | |
| Tags=[{"TagKey": "env", "TagValue": "test"}], | |
| Policy="{}", | |
| ) | |
| meta = resp["KeyMetadata"] | |
| assert meta["KeyId"] | |
| assert meta["Arn"].startswith("arn:aws:kms:") | |
| assert meta["KeySpec"] == "SYMMETRIC_DEFAULT" | |
| assert meta["KeyUsage"] == "ENCRYPT_DECRYPT" | |
| assert meta["Enabled"] is True | |
| assert meta["KeyState"] == "Enabled" | |
| assert meta["Description"] == "test symmetric key" | |
| tags = kms_client.list_resource_tags(KeyId=meta["KeyId"])["Tags"] | |
| assert {"TagKey": "env", "TagValue": "test"} in tags | |
| policy = kms_client.get_key_policy(KeyId=meta["KeyId"], PolicyName="default")["Policy"] | |
| assert policy == "{}" | |
| def test_kms_create_rsa_2048_sign_key(kms_client): | |
| resp = kms_client.create_key( | |
| KeySpec="RSA_2048", | |
| KeyUsage="SIGN_VERIFY", | |
| Description="test RSA signing key", | |
| ) | |
| meta = resp["KeyMetadata"] | |
| assert meta["KeySpec"] == "RSA_2048" | |
| assert meta["KeyUsage"] == "SIGN_VERIFY" | |
| assert "RSASSA_PKCS1_V1_5_SHA_256" in meta["SigningAlgorithms"] | |
| def test_kms_create_rsa_4096_encrypt_key(kms_client): | |
| resp = kms_client.create_key( | |
| KeySpec="RSA_4096", | |
| KeyUsage="ENCRYPT_DECRYPT", | |
| ) | |
| meta = resp["KeyMetadata"] | |
| assert meta["KeySpec"] == "RSA_4096" | |
| assert "RSAES_OAEP_SHA_256" in meta["EncryptionAlgorithms"] | |
| def test_kms_list_keys(kms_client): | |
| created = kms_client.create_key(KeySpec="SYMMETRIC_DEFAULT") | |
| key_id = created["KeyMetadata"]["KeyId"] | |
| resp = kms_client.list_keys() | |
| key_ids = [k["KeyId"] for k in resp["Keys"]] | |
| assert key_id in key_ids | |
| def test_kms_describe_key(kms_client): | |
| created = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", Description="describe me" | |
| ) | |
| key_id = created["KeyMetadata"]["KeyId"] | |
| resp = kms_client.describe_key(KeyId=key_id) | |
| assert resp["KeyMetadata"]["Description"] == "describe me" | |
| assert resp["KeyMetadata"]["KeyId"] == key_id | |
| def test_kms_describe_key_by_arn(kms_client): | |
| created = kms_client.create_key(KeySpec="SYMMETRIC_DEFAULT") | |
| arn = created["KeyMetadata"]["Arn"] | |
| resp = kms_client.describe_key(KeyId=arn) | |
| assert resp["KeyMetadata"]["Arn"] == arn | |
| def test_kms_describe_nonexistent_key(kms_client): | |
| with pytest.raises(ClientError) as exc_info: | |
| kms_client.describe_key(KeyId="nonexistent-key-id") | |
| assert "NotFoundException" in str(exc_info.value) | |
| def test_kms_sign_and_verify_pkcs1(kms_client): | |
| key = kms_client.create_key(KeySpec="RSA_2048", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| message = b"header.payload" | |
| sign_resp = kms_client.sign( | |
| KeyId=key_id, | |
| Message=message, | |
| MessageType="RAW", | |
| SigningAlgorithm="RSASSA_PKCS1_V1_5_SHA_256", | |
| ) | |
| assert key_id in sign_resp["KeyId"] # KeyId in response is the full ARN | |
| assert sign_resp["SigningAlgorithm"] == "RSASSA_PKCS1_V1_5_SHA_256" | |
| assert len(sign_resp["Signature"]) > 0 | |
| verify_resp = kms_client.verify( | |
| KeyId=key_id, | |
| Message=message, | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="RSASSA_PKCS1_V1_5_SHA_256", | |
| ) | |
| assert verify_resp["SignatureValid"] is True | |
| def test_kms_sign_and_verify_pss(kms_client): | |
| key = kms_client.create_key(KeySpec="RSA_2048", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| message = b"test-pss-message" | |
| sign_resp = kms_client.sign( | |
| KeyId=key_id, | |
| Message=message, | |
| MessageType="RAW", | |
| SigningAlgorithm="RSASSA_PSS_SHA_256", | |
| ) | |
| verify_resp = kms_client.verify( | |
| KeyId=key_id, | |
| Message=message, | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="RSASSA_PSS_SHA_256", | |
| ) | |
| assert verify_resp["SignatureValid"] is True | |
| def test_kms_verify_wrong_message(kms_client): | |
| key = kms_client.create_key(KeySpec="RSA_2048", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| sign_resp = kms_client.sign( | |
| KeyId=key_id, | |
| Message=b"original", | |
| MessageType="RAW", | |
| SigningAlgorithm="RSASSA_PKCS1_V1_5_SHA_256", | |
| ) | |
| # Real AWS raises KMSInvalidSignatureException on invalid signature | |
| import pytest | |
| with pytest.raises(kms_client.exceptions.KMSInvalidSignatureException): | |
| kms_client.verify( | |
| KeyId=key_id, | |
| Message=b"tampered", | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="RSASSA_PKCS1_V1_5_SHA_256", | |
| ) | |
| def test_kms_jwt_signing_flow(kms_client): | |
| """Sign a JWT-style header.payload string and verify the signature.""" | |
| import base64 | |
| key = kms_client.create_key(KeySpec="RSA_2048", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| header = base64.urlsafe_b64encode( | |
| b'{"alg":"RS256","typ":"JWT"}' | |
| ).rstrip(b"=").decode() | |
| payload = base64.urlsafe_b64encode( | |
| b'{"sub":"user-2001","iss":"auth-service"}' | |
| ).rstrip(b"=").decode() | |
| signing_input = f"{header}.{payload}" | |
| sign_resp = kms_client.sign( | |
| KeyId=key_id, | |
| Message=signing_input.encode(), | |
| MessageType="RAW", | |
| SigningAlgorithm="RSASSA_PKCS1_V1_5_SHA_256", | |
| ) | |
| assert sign_resp["Signature"] | |
| verify_resp = kms_client.verify( | |
| KeyId=key_id, | |
| Message=signing_input.encode(), | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="RSASSA_PKCS1_V1_5_SHA_256", | |
| ) | |
| assert verify_resp["SignatureValid"] is True | |
| def test_kms_encrypt_decrypt_roundtrip(kms_client): | |
| key = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT" | |
| ) | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| plaintext = b"sensitive document content" | |
| enc_resp = kms_client.encrypt(KeyId=key_id, Plaintext=plaintext) | |
| assert key_id in enc_resp["KeyId"] | |
| dec_resp = kms_client.decrypt(CiphertextBlob=enc_resp["CiphertextBlob"]) | |
| assert dec_resp["Plaintext"] == plaintext | |
| def test_kms_encrypt_decrypt_with_explicit_key(kms_client): | |
| key = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT" | |
| ) | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| plaintext = b"another secret" | |
| enc_resp = kms_client.encrypt(KeyId=key_id, Plaintext=plaintext) | |
| dec_resp = kms_client.decrypt( | |
| KeyId=key_id, CiphertextBlob=enc_resp["CiphertextBlob"] | |
| ) | |
| assert dec_resp["Plaintext"] == plaintext | |
| def test_kms_generate_data_key_aes_256(kms_client): | |
| key = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT" | |
| ) | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| resp = kms_client.generate_data_key(KeyId=key_id, KeySpec="AES_256") | |
| assert key_id in resp["KeyId"] | |
| assert len(resp["Plaintext"]) == 32 | |
| assert resp["CiphertextBlob"] | |
| def test_kms_generate_data_key_aes_128(kms_client): | |
| key = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT" | |
| ) | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| resp = kms_client.generate_data_key(KeyId=key_id, KeySpec="AES_128") | |
| assert len(resp["Plaintext"]) == 16 | |
| def test_kms_generate_data_key_decrypt_roundtrip(kms_client): | |
| """Encrypted data key should be decryptable back to the plaintext.""" | |
| key = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT" | |
| ) | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| gen_resp = kms_client.generate_data_key(KeyId=key_id, KeySpec="AES_256") | |
| dec_resp = kms_client.decrypt(CiphertextBlob=gen_resp["CiphertextBlob"]) | |
| assert dec_resp["Plaintext"] == gen_resp["Plaintext"] | |
| def test_kms_generate_data_key_without_plaintext(kms_client): | |
| key = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT" | |
| ) | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| resp = kms_client.generate_data_key_without_plaintext( | |
| KeyId=key_id, KeySpec="AES_256" | |
| ) | |
| assert key_id in resp["KeyId"] | |
| assert resp["CiphertextBlob"] | |
| assert "Plaintext" not in resp | |
| def test_kms_get_public_key(kms_client): | |
| key = kms_client.create_key(KeySpec="RSA_2048", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| resp = kms_client.get_public_key(KeyId=key_id) | |
| assert key_id in resp["KeyId"] | |
| assert resp["KeySpec"] == "RSA_2048" | |
| assert resp["PublicKey"] | |
| def test_kms_encrypt_decrypt_with_encryption_context(kms_client): | |
| """EncryptionContext must match between encrypt and decrypt.""" | |
| key = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT" | |
| ) | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| plaintext = b"context-sensitive data" | |
| context = {"service": "storage", "bucket": "documents"} | |
| enc_resp = kms_client.encrypt( | |
| KeyId=key_id, Plaintext=plaintext, EncryptionContext=context | |
| ) | |
| dec_resp = kms_client.decrypt( | |
| CiphertextBlob=enc_resp["CiphertextBlob"], | |
| EncryptionContext=context, | |
| ) | |
| assert dec_resp["Plaintext"] == plaintext | |
| def test_kms_decrypt_wrong_context_fails(kms_client): | |
| """Decrypt with wrong EncryptionContext should fail.""" | |
| key = kms_client.create_key( | |
| KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT" | |
| ) | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| enc_resp = kms_client.encrypt( | |
| KeyId=key_id, | |
| Plaintext=b"secret", | |
| EncryptionContext={"env": "prod"}, | |
| ) | |
| with pytest.raises(ClientError) as exc_info: | |
| kms_client.decrypt( | |
| CiphertextBlob=enc_resp["CiphertextBlob"], | |
| EncryptionContext={"env": "dev"}, | |
| ) | |
| assert "InvalidCiphertextException" in str(exc_info.value) | |
| def test_kms_create_and_list_alias(kms_client): | |
| key = kms_client.create_key(KeySpec="SYMMETRIC_DEFAULT") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| kms_client.create_alias(AliasName="alias/test-alias", TargetKeyId=key_id) | |
| resp = kms_client.list_aliases() | |
| alias_names = [a["AliasName"] for a in resp["Aliases"]] | |
| assert "alias/test-alias" in alias_names | |
| def test_kms_use_alias_for_encrypt(kms_client): | |
| """Encrypt/Decrypt using alias instead of key ID.""" | |
| key = kms_client.create_key(KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| kms_client.create_alias(AliasName="alias/enc-alias", TargetKeyId=key_id) | |
| enc = kms_client.encrypt(KeyId="alias/enc-alias", Plaintext=b"via alias") | |
| dec = kms_client.decrypt(CiphertextBlob=enc["CiphertextBlob"]) | |
| assert dec["Plaintext"] == b"via alias" | |
| def test_kms_describe_key_by_alias(kms_client): | |
| key = kms_client.create_key(KeySpec="SYMMETRIC_DEFAULT") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| kms_client.create_alias(AliasName="alias/desc-alias", TargetKeyId=key_id) | |
| resp = kms_client.describe_key(KeyId="alias/desc-alias") | |
| assert resp["KeyMetadata"]["KeyId"] == key_id | |
| def test_kms_update_alias(kms_client): | |
| key1 = kms_client.create_key(KeySpec="SYMMETRIC_DEFAULT") | |
| key2 = kms_client.create_key(KeySpec="SYMMETRIC_DEFAULT") | |
| kms_client.create_alias(AliasName="alias/upd-alias", TargetKeyId=key1["KeyMetadata"]["KeyId"]) | |
| kms_client.update_alias(AliasName="alias/upd-alias", TargetKeyId=key2["KeyMetadata"]["KeyId"]) | |
| resp = kms_client.describe_key(KeyId="alias/upd-alias") | |
| assert resp["KeyMetadata"]["KeyId"] == key2["KeyMetadata"]["KeyId"] | |
| def test_kms_delete_alias(kms_client): | |
| key = kms_client.create_key(KeySpec="SYMMETRIC_DEFAULT") | |
| kms_client.create_alias(AliasName="alias/del-alias", TargetKeyId=key["KeyMetadata"]["KeyId"]) | |
| kms_client.delete_alias(AliasName="alias/del-alias") | |
| with pytest.raises(ClientError) as exc: | |
| kms_client.describe_key(KeyId="alias/del-alias") | |
| assert "NotFoundException" in str(exc.value) | |
| def test_kms_enable_disable_key_rotation(kms_client): | |
| """EnableKeyRotation / DisableKeyRotation / GetKeyRotationStatus.""" | |
| key = kms_client.create_key(KeyUsage="ENCRYPT_DECRYPT") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| status = kms_client.get_key_rotation_status(KeyId=key_id) | |
| assert status["KeyRotationEnabled"] is False | |
| kms_client.enable_key_rotation(KeyId=key_id) | |
| status = kms_client.get_key_rotation_status(KeyId=key_id) | |
| assert status["KeyRotationEnabled"] is True | |
| kms_client.disable_key_rotation(KeyId=key_id) | |
| status = kms_client.get_key_rotation_status(KeyId=key_id) | |
| assert status["KeyRotationEnabled"] is False | |
| kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7) | |
| def test_kms_get_put_key_policy(kms_client): | |
| """GetKeyPolicy / PutKeyPolicy.""" | |
| key = kms_client.create_key() | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| policy = kms_client.get_key_policy(KeyId=key_id, PolicyName="default") | |
| assert "Statement" in policy["Policy"] | |
| custom = '{"Version":"2012-10-17","Statement":[]}' | |
| kms_client.put_key_policy(KeyId=key_id, PolicyName="default", Policy=custom) | |
| got = kms_client.get_key_policy(KeyId=key_id, PolicyName="default") | |
| assert got["Policy"] == custom | |
| kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7) | |
| def test_kms_tag_untag_list_v2(kms_client): | |
| """TagResource / UntagResource / ListResourceTags.""" | |
| key = kms_client.create_key() | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| kms_client.tag_resource(KeyId=key_id, Tags=[ | |
| {"TagKey": "env", "TagValue": "test"}, | |
| {"TagKey": "team", "TagValue": "platform"}, | |
| ]) | |
| tags = kms_client.list_resource_tags(KeyId=key_id) | |
| tag_map = {t["TagKey"]: t["TagValue"] for t in tags["Tags"]} | |
| assert tag_map["env"] == "test" | |
| assert tag_map["team"] == "platform" | |
| kms_client.untag_resource(KeyId=key_id, TagKeys=["team"]) | |
| tags = kms_client.list_resource_tags(KeyId=key_id) | |
| assert len(tags["Tags"]) == 1 | |
| assert tags["Tags"][0]["TagKey"] == "env" | |
| kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7) | |
| def test_kms_enable_disable_key(kms_client): | |
| """EnableKey / DisableKey.""" | |
| key = kms_client.create_key() | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| assert key["KeyMetadata"]["KeyState"] == "Enabled" | |
| kms_client.disable_key(KeyId=key_id) | |
| desc = kms_client.describe_key(KeyId=key_id) | |
| assert desc["KeyMetadata"]["KeyState"] == "Disabled" | |
| kms_client.enable_key(KeyId=key_id) | |
| desc = kms_client.describe_key(KeyId=key_id) | |
| assert desc["KeyMetadata"]["KeyState"] == "Enabled" | |
| kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7) | |
| def test_kms_schedule_cancel_deletion(kms_client): | |
| """ScheduleKeyDeletion / CancelKeyDeletion.""" | |
| key = kms_client.create_key() | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| resp = kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7) | |
| assert resp["KeyState"] == "PendingDeletion" | |
| kms_client.cancel_key_deletion(KeyId=key_id) | |
| desc = kms_client.describe_key(KeyId=key_id) | |
| assert desc["KeyMetadata"]["KeyState"] == "Disabled" | |
| def test_kms_terraform_full_flow(kms_client): | |
| """Full Terraform aws_kms_key lifecycle.""" | |
| key = kms_client.create_key(KeySpec="SYMMETRIC_DEFAULT", KeyUsage="ENCRYPT_DECRYPT", Description="RDS key") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| kms_client.enable_key_rotation(KeyId=key_id) | |
| assert kms_client.get_key_rotation_status(KeyId=key_id)["KeyRotationEnabled"] is True | |
| pol = kms_client.get_key_policy(KeyId=key_id, PolicyName="default") | |
| assert len(pol["Policy"]) > 0 | |
| kms_client.tag_resource(KeyId=key_id, Tags=[{"TagKey": "Name", "TagValue": "rds-key"}]) | |
| assert kms_client.list_resource_tags(KeyId=key_id)["Tags"][0]["TagValue"] == "rds-key" | |
| desc = kms_client.describe_key(KeyId=key_id) | |
| assert desc["KeyMetadata"]["Description"] == "RDS key" | |
| kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7) | |
| def test_kms_list_key_policies(kms_client): | |
| """ListKeyPolicies returns default policy name.""" | |
| key = kms_client.create_key() | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| resp = kms_client.list_key_policies(KeyId=key_id) | |
| assert "default" in resp["PolicyNames"] | |
| kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7) | |
| def test_kms_create_ecc_secg_p256k1_key(kms_client): | |
| resp = kms_client.create_key( | |
| KeySpec="ECC_SECG_P256K1", | |
| KeyUsage="SIGN_VERIFY", | |
| Description="secp256k1 signing key", | |
| ) | |
| meta = resp["KeyMetadata"] | |
| assert meta["KeySpec"] == "ECC_SECG_P256K1" | |
| assert meta["KeyUsage"] == "SIGN_VERIFY" | |
| assert "ECDSA_SHA_256" in meta["SigningAlgorithms"] | |
| assert meta["EncryptionAlgorithms"] == [] | |
| def test_kms_ecc_sign_and_verify(kms_client): | |
| key = kms_client.create_key(KeySpec="ECC_SECG_P256K1", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| message = b"hello secp256k1" | |
| sign_resp = kms_client.sign( | |
| KeyId=key_id, | |
| Message=message, | |
| MessageType="RAW", | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| assert key_id in sign_resp["KeyId"] # KeyId in response is the full ARN | |
| assert sign_resp["SigningAlgorithm"] == "ECDSA_SHA_256" | |
| assert len(sign_resp["Signature"]) > 0 | |
| verify_resp = kms_client.verify( | |
| KeyId=key_id, | |
| Message=message, | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| assert verify_resp["SignatureValid"] is True | |
| def test_kms_ecc_verify_wrong_message(kms_client): | |
| key = kms_client.create_key(KeySpec="ECC_SECG_P256K1", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| sign_resp = kms_client.sign( | |
| KeyId=key_id, | |
| Message=b"original", | |
| MessageType="RAW", | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| import pytest | |
| with pytest.raises(kms_client.exceptions.KMSInvalidSignatureException): | |
| kms_client.verify( | |
| KeyId=key_id, | |
| Message=b"tampered", | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| def test_kms_ecc_get_public_key(kms_client): | |
| key = kms_client.create_key(KeySpec="ECC_SECG_P256K1", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| resp = kms_client.get_public_key(KeyId=key_id) | |
| assert key_id in resp["KeyId"] | |
| assert resp["KeySpec"] == "ECC_SECG_P256K1" | |
| assert resp["PublicKey"] | |
| assert "ECDSA_SHA_256" in resp["SigningAlgorithms"] | |
| def test_kms_ecc_nist_p256_sign_verify(kms_client): | |
| key = kms_client.create_key(KeySpec="ECC_NIST_P256", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| sign_resp = kms_client.sign( | |
| KeyId=key_id, | |
| Message=b"nist p256 message", | |
| MessageType="RAW", | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| verify_resp = kms_client.verify( | |
| KeyId=key_id, | |
| Message=b"nist p256 message", | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| assert verify_resp["SignatureValid"] is True | |
| def test_kms_ecc_nist_p384_sign_verify(kms_client): | |
| key = kms_client.create_key(KeySpec="ECC_NIST_P384", KeyUsage="SIGN_VERIFY") | |
| meta = key["KeyMetadata"] | |
| assert "ECDSA_SHA_384" in meta["SigningAlgorithms"] | |
| sign_resp = kms_client.sign( | |
| KeyId=meta["KeyId"], | |
| Message=b"nist p384 message", | |
| MessageType="RAW", | |
| SigningAlgorithm="ECDSA_SHA_384", | |
| ) | |
| verify_resp = kms_client.verify( | |
| KeyId=meta["KeyId"], | |
| Message=b"nist p384 message", | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="ECDSA_SHA_384", | |
| ) | |
| assert verify_resp["SignatureValid"] is True | |
| def test_kms_ecc_nist_p521_sign_verify(kms_client): | |
| key = kms_client.create_key(KeySpec="ECC_NIST_P521", KeyUsage="SIGN_VERIFY") | |
| meta = key["KeyMetadata"] | |
| assert "ECDSA_SHA_512" in meta["SigningAlgorithms"] | |
| sign_resp = kms_client.sign( | |
| KeyId=meta["KeyId"], | |
| Message=b"nist p521 message", | |
| MessageType="RAW", | |
| SigningAlgorithm="ECDSA_SHA_512", | |
| ) | |
| verify_resp = kms_client.verify( | |
| KeyId=meta["KeyId"], | |
| Message=b"nist p521 message", | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="ECDSA_SHA_512", | |
| ) | |
| assert verify_resp["SignatureValid"] is True | |
| def test_kms_ecc_sign_verify_digest_mode(kms_client): | |
| """Sign/Verify with MessageType=DIGEST (pre-hashed message).""" | |
| import hashlib | |
| key = kms_client.create_key(KeySpec="ECC_SECG_P256K1", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| message_digest = hashlib.sha256(b"original message").digest() | |
| sign_resp = kms_client.sign( | |
| KeyId=key_id, | |
| Message=message_digest, | |
| MessageType="DIGEST", | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| assert sign_resp["SigningAlgorithm"] == "ECDSA_SHA_256" | |
| verify_resp = kms_client.verify( | |
| KeyId=key_id, | |
| Message=message_digest, | |
| MessageType="DIGEST", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| assert verify_resp["SignatureValid"] is True | |
| # Wrong digest should fail with KMSInvalidSignatureException | |
| import pytest | |
| wrong_digest = hashlib.sha256(b"different message").digest() | |
| with pytest.raises(kms_client.exceptions.KMSInvalidSignatureException): | |
| kms_client.verify( | |
| KeyId=key_id, | |
| Message=wrong_digest, | |
| MessageType="DIGEST", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| def test_kms_ecc_sign_via_alias(kms_client): | |
| """Sign and verify using an alias instead of key ID.""" | |
| key = kms_client.create_key(KeySpec="ECC_SECG_P256K1", KeyUsage="SIGN_VERIFY") | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| kms_client.create_alias(AliasName="alias/ecc-sign-alias", TargetKeyId=key_id) | |
| sign_resp = kms_client.sign( | |
| KeyId="alias/ecc-sign-alias", | |
| Message=b"alias signing test", | |
| MessageType="RAW", | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| verify_resp = kms_client.verify( | |
| KeyId="alias/ecc-sign-alias", | |
| Message=b"alias signing test", | |
| MessageType="RAW", | |
| Signature=sign_resp["Signature"], | |
| SigningAlgorithm="ECDSA_SHA_256", | |
| ) | |
| assert verify_resp["SignatureValid"] is True | |
| def test_kms_key_rotation_with_period(kms_client): | |
| """EnableKeyRotation with custom RotationPeriodInDays.""" | |
| key = kms_client.create_key() | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| kms_client.enable_key_rotation(KeyId=key_id, RotationPeriodInDays=180) | |
| status = kms_client.get_key_rotation_status(KeyId=key_id) | |
| assert status["KeyRotationEnabled"] is True | |
| assert status["RotationPeriodInDays"] == 180 | |
| kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7) | |
| def test_kms_pending_deletion_blocks_encrypt(kms_client): | |
| """Encrypt on a PendingDeletion key should raise KMSInvalidStateException.""" | |
| import pytest | |
| key = kms_client.create_key() | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| kms_client.schedule_key_deletion(KeyId=key_id, PendingWindowInDays=7) | |
| with pytest.raises(kms_client.exceptions.KMSInvalidStateException): | |
| kms_client.encrypt(KeyId=key_id, Plaintext=b"test") | |
| def test_kms_disabled_key_blocks_encrypt(kms_client): | |
| """Encrypt on a disabled key should raise DisabledException.""" | |
| import pytest | |
| key = kms_client.create_key() | |
| key_id = key["KeyMetadata"]["KeyId"] | |
| kms_client.disable_key(KeyId=key_id) | |
| with pytest.raises(kms_client.exceptions.DisabledException): | |
| kms_client.encrypt(KeyId=key_id, Plaintext=b"test") | |