Spaces:
Running
Running
| import io | |
| import json | |
| import os | |
| import time | |
| import zipfile | |
| from urllib.parse import urlparse | |
| import pytest | |
| from botocore.exceptions import ClientError | |
| import uuid as _uuid_mod | |
| def test_secretsmanager_resource_policy(sm): | |
| sm.create_secret(Name="sm-pol-sec", SecretString="secret-val") | |
| policy = json.dumps( | |
| { | |
| "Version": "2012-10-17", | |
| "Statement": [ | |
| { | |
| "Effect": "Allow", | |
| "Principal": "*", | |
| "Action": "secretsmanager:GetSecretValue", | |
| "Resource": "*", | |
| } | |
| ], | |
| } | |
| ) | |
| sm.put_resource_policy(SecretId="sm-pol-sec", ResourcePolicy=policy) | |
| resp = sm.get_resource_policy(SecretId="sm-pol-sec") | |
| assert resp["Name"] == "sm-pol-sec" | |
| assert "ResourcePolicy" in resp | |
| sm.delete_resource_policy(SecretId="sm-pol-sec") | |
| def test_secretsmanager_validate_resource_policy(sm): | |
| policy = json.dumps({"Version": "2012-10-17", "Statement": []}) | |
| resp = sm.validate_resource_policy(ResourcePolicy=policy) | |
| assert resp["PolicyValidationPassed"] is True | |
| def test_secretsmanager_rotate_secret(sm): | |
| """RotateSecret creates a new version and promotes it to AWSCURRENT.""" | |
| sm.create_secret(Name="rotate-test-v39", SecretString="original") | |
| resp = sm.rotate_secret( | |
| SecretId="rotate-test-v39", | |
| RotationLambdaARN="arn:aws:lambda:us-east-1:000000000000:function:rotator", | |
| RotationRules={"AutomaticallyAfterDays": 30}, | |
| ) | |
| assert "VersionId" in resp | |
| desc = sm.describe_secret(SecretId="rotate-test-v39") | |
| assert desc["RotationEnabled"] is True | |
| assert desc["RotationLambdaARN"] == "arn:aws:lambda:us-east-1:000000000000:function:rotator" | |
| current = sm.get_secret_value(SecretId="rotate-test-v39", VersionStage="AWSCURRENT") | |
| assert current["SecretString"] == "original" | |
| sm.delete_secret(SecretId="rotate-test-v39", ForceDeleteWithoutRecovery=True) | |
| # Migrated from test_secrets.py | |
| def test_secretsmanager_create_get(sm): | |
| sm.create_secret(Name="test-secret-1", SecretString='{"user":"admin"}') | |
| resp = sm.get_secret_value(SecretId="test-secret-1") | |
| assert json.loads(resp["SecretString"])["user"] == "admin" | |
| def test_secretsmanager_update_list(sm): | |
| sm.create_secret(Name="test-secret-2", SecretString="original") | |
| sm.update_secret(SecretId="test-secret-2", SecretString="updated") | |
| resp = sm.get_secret_value(SecretId="test-secret-2") | |
| assert resp["SecretString"] == "updated" | |
| listed = sm.list_secrets() | |
| assert any(s["Name"] == "test-secret-2" for s in listed["SecretList"]) | |
| def test_secretsmanager_create_get_v2(sm): | |
| sm.create_secret(Name="sm-cg-v2", SecretString='{"user":"admin","pass":"s3cr3t"}') | |
| resp = sm.get_secret_value(SecretId="sm-cg-v2") | |
| parsed = json.loads(resp["SecretString"]) | |
| assert parsed["user"] == "admin" | |
| assert parsed["pass"] == "s3cr3t" | |
| assert "VersionId" in resp | |
| assert "ARN" in resp | |
| sm.create_secret(Name="sm-cg-bin", SecretBinary=b"\x00\x01\x02") | |
| resp_bin = sm.get_secret_value(SecretId="sm-cg-bin") | |
| assert resp_bin["SecretBinary"] == b"\x00\x01\x02" | |
| def test_secretsmanager_update_v2(sm): | |
| sm.create_secret(Name="sm-upd-v2", SecretString="original") | |
| sm.update_secret(SecretId="sm-upd-v2", SecretString="updated", Description="new desc") | |
| resp = sm.get_secret_value(SecretId="sm-upd-v2") | |
| assert resp["SecretString"] == "updated" | |
| desc = sm.describe_secret(SecretId="sm-upd-v2") | |
| assert desc["Description"] == "new desc" | |
| def test_secretsmanager_list_v2(sm): | |
| sm.create_secret(Name="sm-list-a", SecretString="a") | |
| sm.create_secret(Name="sm-list-b", SecretString="b") | |
| listed = sm.list_secrets() | |
| names = [s["Name"] for s in listed["SecretList"]] | |
| assert "sm-list-a" in names | |
| assert "sm-list-b" in names | |
| def test_secretsmanager_delete_v2(sm): | |
| sm.create_secret(Name="sm-del-v2", SecretString="gone") | |
| sm.delete_secret(SecretId="sm-del-v2", ForceDeleteWithoutRecovery=True) | |
| with pytest.raises(ClientError) as exc: | |
| sm.get_secret_value(SecretId="sm-del-v2") | |
| assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException" | |
| def test_secretsmanager_delete_with_recovery(sm): | |
| sm.create_secret(Name="sm-del-rec", SecretString="recoverable") | |
| sm.delete_secret(SecretId="sm-del-rec", RecoveryWindowInDays=7) | |
| with pytest.raises(ClientError) as exc: | |
| sm.get_secret_value(SecretId="sm-del-rec") | |
| assert ( | |
| "marked for deletion" in exc.value.response["Error"]["Message"].lower() | |
| or exc.value.response["Error"]["Code"] == "InvalidRequestException" | |
| ) | |
| desc = sm.describe_secret(SecretId="sm-del-rec") | |
| assert "DeletedDate" in desc | |
| sm.restore_secret(SecretId="sm-del-rec") | |
| resp = sm.get_secret_value(SecretId="sm-del-rec") | |
| assert resp["SecretString"] == "recoverable" | |
| def test_secretsmanager_put_value_version_stages_v2(sm): | |
| sm.create_secret(Name="sm-pvs-v2", SecretString="v1") | |
| sm.put_secret_value(SecretId="sm-pvs-v2", SecretString="v2") | |
| desc = sm.describe_secret(SecretId="sm-pvs-v2") | |
| stages = desc["VersionIdsToStages"] | |
| current_vids = [vid for vid, s in stages.items() if "AWSCURRENT" in s] | |
| previous_vids = [vid for vid, s in stages.items() if "AWSPREVIOUS" in s] | |
| assert len(current_vids) == 1 | |
| assert len(previous_vids) == 1 | |
| assert current_vids[0] != previous_vids[0] | |
| cur = sm.get_secret_value(SecretId="sm-pvs-v2", VersionStage="AWSCURRENT") | |
| assert cur["SecretString"] == "v2" | |
| prev = sm.get_secret_value(SecretId="sm-pvs-v2", VersionStage="AWSPREVIOUS") | |
| assert prev["SecretString"] == "v1" | |
| def test_secretsmanager_describe_v2(sm): | |
| sm.create_secret( | |
| Name="sm-dsc-v2", | |
| SecretString="val", | |
| Description="detailed desc", | |
| Tags=[{"Key": "Env", "Value": "dev"}], | |
| ) | |
| resp = sm.describe_secret(SecretId="sm-dsc-v2") | |
| assert resp["Name"] == "sm-dsc-v2" | |
| assert resp["Description"] == "detailed desc" | |
| assert any(t["Key"] == "Env" for t in resp["Tags"]) | |
| assert "VersionIdsToStages" in resp | |
| assert "ARN" in resp | |
| def test_secretsmanager_tags_v2(sm): | |
| sm.create_secret(Name="sm-tag-v2", SecretString="val") | |
| sm.tag_resource(SecretId="sm-tag-v2", Tags=[{"Key": "team", "Value": "backend"}]) | |
| sm.tag_resource(SecretId="sm-tag-v2", Tags=[{"Key": "env", "Value": "prod"}]) | |
| desc = sm.describe_secret(SecretId="sm-tag-v2") | |
| assert any(t["Key"] == "team" and t["Value"] == "backend" for t in desc["Tags"]) | |
| assert any(t["Key"] == "env" and t["Value"] == "prod" for t in desc["Tags"]) | |
| sm.untag_resource(SecretId="sm-tag-v2", TagKeys=["team"]) | |
| desc2 = sm.describe_secret(SecretId="sm-tag-v2") | |
| assert not any(t["Key"] == "team" for t in desc2.get("Tags", [])) | |
| assert any(t["Key"] == "env" for t in desc2.get("Tags", [])) | |
| def test_secretsmanager_get_random_password_v2(sm): | |
| resp = sm.get_random_password(PasswordLength=32) | |
| assert len(resp["RandomPassword"]) == 32 | |
| resp2 = sm.get_random_password(PasswordLength=20, ExcludeCharacters="aeiou") | |
| pw = resp2["RandomPassword"] | |
| assert len(pw) == 20 | |
| for c in "aeiou": | |
| assert c not in pw | |
| # Migrated from test_sm.py | |
| def test_secretsmanager_put_secret_value_stages(sm): | |
| """PutSecretValue stages manage AWSCURRENT/AWSPREVIOUS correctly.""" | |
| sm.create_secret(Name="qa-sm-stages", SecretString="v1") | |
| sm.put_secret_value(SecretId="qa-sm-stages", SecretString="v2") | |
| sm.put_secret_value(SecretId="qa-sm-stages", SecretString="v3") | |
| current = sm.get_secret_value(SecretId="qa-sm-stages", VersionStage="AWSCURRENT") | |
| assert current["SecretString"] == "v3" | |
| previous = sm.get_secret_value(SecretId="qa-sm-stages", VersionStage="AWSPREVIOUS") | |
| assert previous["SecretString"] == "v2" | |
| def test_secretsmanager_list_secret_version_ids(sm): | |
| """ListSecretVersionIds returns all versions.""" | |
| sm.create_secret(Name="qa-sm-versions", SecretString="initial") | |
| sm.put_secret_value(SecretId="qa-sm-versions", SecretString="second") | |
| resp = sm.list_secret_version_ids(SecretId="qa-sm-versions") | |
| assert len(resp["Versions"]) >= 2 | |
| def test_secretsmanager_update_secret_version_stage_moves_current(sm): | |
| """UpdateSecretVersionStage can move AWSCURRENT and refresh AWSPREVIOUS.""" | |
| first = sm.create_secret(Name="qa-sm-stage-move-current", SecretString="v1") | |
| first_vid = first["VersionId"] | |
| second_vid = "22222222-2222-2222-2222-222222222222" | |
| sm.put_secret_value( | |
| SecretId="qa-sm-stage-move-current", | |
| SecretString="v2", | |
| ClientRequestToken=second_vid, | |
| ) | |
| sm.update_secret_version_stage( | |
| SecretId="qa-sm-stage-move-current", | |
| VersionStage="AWSCURRENT", | |
| RemoveFromVersionId=second_vid, | |
| MoveToVersionId=first_vid, | |
| ) | |
| current = sm.get_secret_value(SecretId="qa-sm-stage-move-current", VersionStage="AWSCURRENT") | |
| assert current["SecretString"] == "v1" | |
| previous = sm.get_secret_value(SecretId="qa-sm-stage-move-current", VersionStage="AWSPREVIOUS") | |
| assert previous["SecretString"] == "v2" | |
| versions = sm.list_secret_version_ids(SecretId="qa-sm-stage-move-current")["Versions"] | |
| version_stages = {v["VersionId"]: set(v["VersionStages"]) for v in versions} | |
| assert version_stages[first_vid] == {"AWSCURRENT"} | |
| assert version_stages[second_vid] == {"AWSPREVIOUS"} | |
| def test_secretsmanager_update_secret_version_stage_moves_and_removes_custom_label(sm): | |
| """UpdateSecretVersionStage can move a custom label and then detach it.""" | |
| first = sm.create_secret(Name="qa-sm-stage-custom", SecretString="v1") | |
| first_vid = first["VersionId"] | |
| second_vid = "33333333-3333-3333-3333-333333333333" | |
| sm.put_secret_value( | |
| SecretId="qa-sm-stage-custom", | |
| SecretString="v2", | |
| ClientRequestToken=second_vid, | |
| VersionStages=["BLUE"], | |
| ) | |
| before = sm.get_secret_value(SecretId="qa-sm-stage-custom", VersionStage="BLUE") | |
| assert before["SecretString"] == "v2" | |
| sm.update_secret_version_stage( | |
| SecretId="qa-sm-stage-custom", | |
| VersionStage="BLUE", | |
| RemoveFromVersionId=second_vid, | |
| MoveToVersionId=first_vid, | |
| ) | |
| moved = sm.get_secret_value(SecretId="qa-sm-stage-custom", VersionStage="BLUE") | |
| assert moved["SecretString"] == "v1" | |
| sm.update_secret_version_stage( | |
| SecretId="qa-sm-stage-custom", | |
| VersionStage="BLUE", | |
| RemoveFromVersionId=first_vid, | |
| ) | |
| versions = sm.list_secret_version_ids(SecretId="qa-sm-stage-custom")["Versions"] | |
| version_stages = {v["VersionId"]: set(v["VersionStages"]) for v in versions} | |
| assert "BLUE" not in version_stages[first_vid] | |
| assert "BLUE" not in version_stages[second_vid] | |
| with pytest.raises(ClientError) as exc: | |
| sm.get_secret_value(SecretId="qa-sm-stage-custom", VersionStage="BLUE") | |
| assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException" | |
| def test_secretsmanager_update_secret_version_stage_requires_matching_remove_version(sm): | |
| """Moving an attached label requires RemoveFromVersionId to match the current owner.""" | |
| first = sm.create_secret(Name="qa-sm-stage-guard", SecretString="v1") | |
| first_vid = first["VersionId"] | |
| second_vid = "44444444-4444-4444-4444-444444444444" | |
| sm.put_secret_value( | |
| SecretId="qa-sm-stage-guard", | |
| SecretString="v2", | |
| ClientRequestToken=second_vid, | |
| ) | |
| with pytest.raises(ClientError) as exc: | |
| sm.update_secret_version_stage( | |
| SecretId="qa-sm-stage-guard", | |
| VersionStage="AWSCURRENT", | |
| MoveToVersionId=first_vid, | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "InvalidParameterException" | |
| def test_secretsmanager_delete_and_restore(sm): | |
| """DeleteSecret schedules deletion; RestoreSecret cancels it.""" | |
| sm.create_secret(Name="qa-sm-restore", SecretString="data") | |
| sm.delete_secret(SecretId="qa-sm-restore", RecoveryWindowInDays=7) | |
| with pytest.raises(ClientError) as exc: | |
| sm.get_secret_value(SecretId="qa-sm-restore") | |
| assert exc.value.response["Error"]["Code"] == "InvalidRequestException" | |
| sm.restore_secret(SecretId="qa-sm-restore") | |
| val = sm.get_secret_value(SecretId="qa-sm-restore") | |
| assert val["SecretString"] == "data" | |
| def test_secretsmanager_get_random_password(sm): | |
| """GetRandomPassword returns a password of the requested length.""" | |
| resp = sm.get_random_password(PasswordLength=24, ExcludeNumbers=True) | |
| pwd = resp["RandomPassword"] | |
| assert len(pwd) == 24 | |
| assert not any(c.isdigit() for c in pwd) | |
| def test_secretsmanager_batch_get_secret_value(sm): | |
| sm.create_secret(Name="batch-s1", SecretString="val1") | |
| sm.create_secret(Name="batch-s2", SecretString="val2") | |
| resp = sm.batch_get_secret_value(SecretIdList=["batch-s1", "batch-s2"]) | |
| assert len(resp["SecretValues"]) == 2 | |
| names = {s["Name"] for s in resp["SecretValues"]} | |
| assert "batch-s1" in names | |
| assert "batch-s2" in names | |
| assert len(resp.get("Errors", [])) == 0 | |
| def test_secretsmanager_batch_get_secret_value_with_missing(sm): | |
| resp = sm.batch_get_secret_value(SecretIdList=["batch-s1", "nonexistent-secret"]) | |
| assert len(resp["SecretValues"]) == 1 | |
| assert len(resp["Errors"]) == 1 | |
| assert resp["Errors"][0]["SecretId"] == "nonexistent-secret" | |
| def test_secretsmanager_kms_key_id_on_create_and_describe(sm): | |
| sm.create_secret(Name="kms-test-secret", SecretString="val", KmsKeyId="alias/my-key") | |
| resp = sm.describe_secret(SecretId="kms-test-secret") | |
| assert resp["KmsKeyId"] == "alias/my-key" | |
| def test_secretsmanager_kms_key_id_on_update(sm): | |
| sm.update_secret(SecretId="kms-test-secret", KmsKeyId="alias/other-key") | |
| resp = sm.describe_secret(SecretId="kms-test-secret") | |
| assert resp["KmsKeyId"] == "alias/other-key" | |
| def test_secretsmanager_get_by_partial_arn(sm): | |
| """GetSecretValue with a partial ARN (no random suffix) must resolve the secret.""" | |
| import uuid as _uuid | |
| name = f"partial-arn-test/{_uuid.uuid4().hex[:8]}" | |
| created = sm.create_secret(Name=name, SecretString="partial-arn-value") | |
| full_arn = created["ARN"] | |
| # Full ARN works | |
| assert sm.get_secret_value(SecretId=full_arn)["SecretString"] == "partial-arn-value" | |
| # Partial ARN: strip the random suffix (last hyphen + 6 chars) | |
| partial_arn = full_arn.rsplit("-", 1)[0] | |
| assert partial_arn != full_arn | |
| assert sm.get_secret_value(SecretId=partial_arn)["SecretString"] == "partial-arn-value" | |