aws_rl_env / aws_infra /tests /test_secretsmanager.py
Sizzing's picture
Upload folder using huggingface_hub
c745a99 verified
import io
import json
import os
import time
import zipfile
from urllib.parse import urlparse
import pytest
from botocore.exceptions import ClientError
import uuid as _uuid_mod
def test_secretsmanager_resource_policy(sm):
sm.create_secret(Name="sm-pol-sec", SecretString="secret-val")
policy = json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": "*",
"Action": "secretsmanager:GetSecretValue",
"Resource": "*",
}
],
}
)
sm.put_resource_policy(SecretId="sm-pol-sec", ResourcePolicy=policy)
resp = sm.get_resource_policy(SecretId="sm-pol-sec")
assert resp["Name"] == "sm-pol-sec"
assert "ResourcePolicy" in resp
sm.delete_resource_policy(SecretId="sm-pol-sec")
def test_secretsmanager_validate_resource_policy(sm):
policy = json.dumps({"Version": "2012-10-17", "Statement": []})
resp = sm.validate_resource_policy(ResourcePolicy=policy)
assert resp["PolicyValidationPassed"] is True
def test_secretsmanager_rotate_secret(sm):
"""RotateSecret creates a new version and promotes it to AWSCURRENT."""
sm.create_secret(Name="rotate-test-v39", SecretString="original")
resp = sm.rotate_secret(
SecretId="rotate-test-v39",
RotationLambdaARN="arn:aws:lambda:us-east-1:000000000000:function:rotator",
RotationRules={"AutomaticallyAfterDays": 30},
)
assert "VersionId" in resp
desc = sm.describe_secret(SecretId="rotate-test-v39")
assert desc["RotationEnabled"] is True
assert desc["RotationLambdaARN"] == "arn:aws:lambda:us-east-1:000000000000:function:rotator"
current = sm.get_secret_value(SecretId="rotate-test-v39", VersionStage="AWSCURRENT")
assert current["SecretString"] == "original"
sm.delete_secret(SecretId="rotate-test-v39", ForceDeleteWithoutRecovery=True)
# Migrated from test_secrets.py
def test_secretsmanager_create_get(sm):
sm.create_secret(Name="test-secret-1", SecretString='{"user":"admin"}')
resp = sm.get_secret_value(SecretId="test-secret-1")
assert json.loads(resp["SecretString"])["user"] == "admin"
def test_secretsmanager_update_list(sm):
sm.create_secret(Name="test-secret-2", SecretString="original")
sm.update_secret(SecretId="test-secret-2", SecretString="updated")
resp = sm.get_secret_value(SecretId="test-secret-2")
assert resp["SecretString"] == "updated"
listed = sm.list_secrets()
assert any(s["Name"] == "test-secret-2" for s in listed["SecretList"])
def test_secretsmanager_create_get_v2(sm):
sm.create_secret(Name="sm-cg-v2", SecretString='{"user":"admin","pass":"s3cr3t"}')
resp = sm.get_secret_value(SecretId="sm-cg-v2")
parsed = json.loads(resp["SecretString"])
assert parsed["user"] == "admin"
assert parsed["pass"] == "s3cr3t"
assert "VersionId" in resp
assert "ARN" in resp
sm.create_secret(Name="sm-cg-bin", SecretBinary=b"\x00\x01\x02")
resp_bin = sm.get_secret_value(SecretId="sm-cg-bin")
assert resp_bin["SecretBinary"] == b"\x00\x01\x02"
def test_secretsmanager_update_v2(sm):
sm.create_secret(Name="sm-upd-v2", SecretString="original")
sm.update_secret(SecretId="sm-upd-v2", SecretString="updated", Description="new desc")
resp = sm.get_secret_value(SecretId="sm-upd-v2")
assert resp["SecretString"] == "updated"
desc = sm.describe_secret(SecretId="sm-upd-v2")
assert desc["Description"] == "new desc"
def test_secretsmanager_list_v2(sm):
sm.create_secret(Name="sm-list-a", SecretString="a")
sm.create_secret(Name="sm-list-b", SecretString="b")
listed = sm.list_secrets()
names = [s["Name"] for s in listed["SecretList"]]
assert "sm-list-a" in names
assert "sm-list-b" in names
def test_secretsmanager_delete_v2(sm):
sm.create_secret(Name="sm-del-v2", SecretString="gone")
sm.delete_secret(SecretId="sm-del-v2", ForceDeleteWithoutRecovery=True)
with pytest.raises(ClientError) as exc:
sm.get_secret_value(SecretId="sm-del-v2")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_secretsmanager_delete_with_recovery(sm):
sm.create_secret(Name="sm-del-rec", SecretString="recoverable")
sm.delete_secret(SecretId="sm-del-rec", RecoveryWindowInDays=7)
with pytest.raises(ClientError) as exc:
sm.get_secret_value(SecretId="sm-del-rec")
assert (
"marked for deletion" in exc.value.response["Error"]["Message"].lower()
or exc.value.response["Error"]["Code"] == "InvalidRequestException"
)
desc = sm.describe_secret(SecretId="sm-del-rec")
assert "DeletedDate" in desc
sm.restore_secret(SecretId="sm-del-rec")
resp = sm.get_secret_value(SecretId="sm-del-rec")
assert resp["SecretString"] == "recoverable"
def test_secretsmanager_put_value_version_stages_v2(sm):
sm.create_secret(Name="sm-pvs-v2", SecretString="v1")
sm.put_secret_value(SecretId="sm-pvs-v2", SecretString="v2")
desc = sm.describe_secret(SecretId="sm-pvs-v2")
stages = desc["VersionIdsToStages"]
current_vids = [vid for vid, s in stages.items() if "AWSCURRENT" in s]
previous_vids = [vid for vid, s in stages.items() if "AWSPREVIOUS" in s]
assert len(current_vids) == 1
assert len(previous_vids) == 1
assert current_vids[0] != previous_vids[0]
cur = sm.get_secret_value(SecretId="sm-pvs-v2", VersionStage="AWSCURRENT")
assert cur["SecretString"] == "v2"
prev = sm.get_secret_value(SecretId="sm-pvs-v2", VersionStage="AWSPREVIOUS")
assert prev["SecretString"] == "v1"
def test_secretsmanager_describe_v2(sm):
sm.create_secret(
Name="sm-dsc-v2",
SecretString="val",
Description="detailed desc",
Tags=[{"Key": "Env", "Value": "dev"}],
)
resp = sm.describe_secret(SecretId="sm-dsc-v2")
assert resp["Name"] == "sm-dsc-v2"
assert resp["Description"] == "detailed desc"
assert any(t["Key"] == "Env" for t in resp["Tags"])
assert "VersionIdsToStages" in resp
assert "ARN" in resp
def test_secretsmanager_tags_v2(sm):
sm.create_secret(Name="sm-tag-v2", SecretString="val")
sm.tag_resource(SecretId="sm-tag-v2", Tags=[{"Key": "team", "Value": "backend"}])
sm.tag_resource(SecretId="sm-tag-v2", Tags=[{"Key": "env", "Value": "prod"}])
desc = sm.describe_secret(SecretId="sm-tag-v2")
assert any(t["Key"] == "team" and t["Value"] == "backend" for t in desc["Tags"])
assert any(t["Key"] == "env" and t["Value"] == "prod" for t in desc["Tags"])
sm.untag_resource(SecretId="sm-tag-v2", TagKeys=["team"])
desc2 = sm.describe_secret(SecretId="sm-tag-v2")
assert not any(t["Key"] == "team" for t in desc2.get("Tags", []))
assert any(t["Key"] == "env" for t in desc2.get("Tags", []))
def test_secretsmanager_get_random_password_v2(sm):
resp = sm.get_random_password(PasswordLength=32)
assert len(resp["RandomPassword"]) == 32
resp2 = sm.get_random_password(PasswordLength=20, ExcludeCharacters="aeiou")
pw = resp2["RandomPassword"]
assert len(pw) == 20
for c in "aeiou":
assert c not in pw
# Migrated from test_sm.py
def test_secretsmanager_put_secret_value_stages(sm):
"""PutSecretValue stages manage AWSCURRENT/AWSPREVIOUS correctly."""
sm.create_secret(Name="qa-sm-stages", SecretString="v1")
sm.put_secret_value(SecretId="qa-sm-stages", SecretString="v2")
sm.put_secret_value(SecretId="qa-sm-stages", SecretString="v3")
current = sm.get_secret_value(SecretId="qa-sm-stages", VersionStage="AWSCURRENT")
assert current["SecretString"] == "v3"
previous = sm.get_secret_value(SecretId="qa-sm-stages", VersionStage="AWSPREVIOUS")
assert previous["SecretString"] == "v2"
def test_secretsmanager_list_secret_version_ids(sm):
"""ListSecretVersionIds returns all versions."""
sm.create_secret(Name="qa-sm-versions", SecretString="initial")
sm.put_secret_value(SecretId="qa-sm-versions", SecretString="second")
resp = sm.list_secret_version_ids(SecretId="qa-sm-versions")
assert len(resp["Versions"]) >= 2
def test_secretsmanager_update_secret_version_stage_moves_current(sm):
"""UpdateSecretVersionStage can move AWSCURRENT and refresh AWSPREVIOUS."""
first = sm.create_secret(Name="qa-sm-stage-move-current", SecretString="v1")
first_vid = first["VersionId"]
second_vid = "22222222-2222-2222-2222-222222222222"
sm.put_secret_value(
SecretId="qa-sm-stage-move-current",
SecretString="v2",
ClientRequestToken=second_vid,
)
sm.update_secret_version_stage(
SecretId="qa-sm-stage-move-current",
VersionStage="AWSCURRENT",
RemoveFromVersionId=second_vid,
MoveToVersionId=first_vid,
)
current = sm.get_secret_value(SecretId="qa-sm-stage-move-current", VersionStage="AWSCURRENT")
assert current["SecretString"] == "v1"
previous = sm.get_secret_value(SecretId="qa-sm-stage-move-current", VersionStage="AWSPREVIOUS")
assert previous["SecretString"] == "v2"
versions = sm.list_secret_version_ids(SecretId="qa-sm-stage-move-current")["Versions"]
version_stages = {v["VersionId"]: set(v["VersionStages"]) for v in versions}
assert version_stages[first_vid] == {"AWSCURRENT"}
assert version_stages[second_vid] == {"AWSPREVIOUS"}
def test_secretsmanager_update_secret_version_stage_moves_and_removes_custom_label(sm):
"""UpdateSecretVersionStage can move a custom label and then detach it."""
first = sm.create_secret(Name="qa-sm-stage-custom", SecretString="v1")
first_vid = first["VersionId"]
second_vid = "33333333-3333-3333-3333-333333333333"
sm.put_secret_value(
SecretId="qa-sm-stage-custom",
SecretString="v2",
ClientRequestToken=second_vid,
VersionStages=["BLUE"],
)
before = sm.get_secret_value(SecretId="qa-sm-stage-custom", VersionStage="BLUE")
assert before["SecretString"] == "v2"
sm.update_secret_version_stage(
SecretId="qa-sm-stage-custom",
VersionStage="BLUE",
RemoveFromVersionId=second_vid,
MoveToVersionId=first_vid,
)
moved = sm.get_secret_value(SecretId="qa-sm-stage-custom", VersionStage="BLUE")
assert moved["SecretString"] == "v1"
sm.update_secret_version_stage(
SecretId="qa-sm-stage-custom",
VersionStage="BLUE",
RemoveFromVersionId=first_vid,
)
versions = sm.list_secret_version_ids(SecretId="qa-sm-stage-custom")["Versions"]
version_stages = {v["VersionId"]: set(v["VersionStages"]) for v in versions}
assert "BLUE" not in version_stages[first_vid]
assert "BLUE" not in version_stages[second_vid]
with pytest.raises(ClientError) as exc:
sm.get_secret_value(SecretId="qa-sm-stage-custom", VersionStage="BLUE")
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_secretsmanager_update_secret_version_stage_requires_matching_remove_version(sm):
"""Moving an attached label requires RemoveFromVersionId to match the current owner."""
first = sm.create_secret(Name="qa-sm-stage-guard", SecretString="v1")
first_vid = first["VersionId"]
second_vid = "44444444-4444-4444-4444-444444444444"
sm.put_secret_value(
SecretId="qa-sm-stage-guard",
SecretString="v2",
ClientRequestToken=second_vid,
)
with pytest.raises(ClientError) as exc:
sm.update_secret_version_stage(
SecretId="qa-sm-stage-guard",
VersionStage="AWSCURRENT",
MoveToVersionId=first_vid,
)
assert exc.value.response["Error"]["Code"] == "InvalidParameterException"
def test_secretsmanager_delete_and_restore(sm):
"""DeleteSecret schedules deletion; RestoreSecret cancels it."""
sm.create_secret(Name="qa-sm-restore", SecretString="data")
sm.delete_secret(SecretId="qa-sm-restore", RecoveryWindowInDays=7)
with pytest.raises(ClientError) as exc:
sm.get_secret_value(SecretId="qa-sm-restore")
assert exc.value.response["Error"]["Code"] == "InvalidRequestException"
sm.restore_secret(SecretId="qa-sm-restore")
val = sm.get_secret_value(SecretId="qa-sm-restore")
assert val["SecretString"] == "data"
def test_secretsmanager_get_random_password(sm):
"""GetRandomPassword returns a password of the requested length."""
resp = sm.get_random_password(PasswordLength=24, ExcludeNumbers=True)
pwd = resp["RandomPassword"]
assert len(pwd) == 24
assert not any(c.isdigit() for c in pwd)
def test_secretsmanager_batch_get_secret_value(sm):
sm.create_secret(Name="batch-s1", SecretString="val1")
sm.create_secret(Name="batch-s2", SecretString="val2")
resp = sm.batch_get_secret_value(SecretIdList=["batch-s1", "batch-s2"])
assert len(resp["SecretValues"]) == 2
names = {s["Name"] for s in resp["SecretValues"]}
assert "batch-s1" in names
assert "batch-s2" in names
assert len(resp.get("Errors", [])) == 0
def test_secretsmanager_batch_get_secret_value_with_missing(sm):
resp = sm.batch_get_secret_value(SecretIdList=["batch-s1", "nonexistent-secret"])
assert len(resp["SecretValues"]) == 1
assert len(resp["Errors"]) == 1
assert resp["Errors"][0]["SecretId"] == "nonexistent-secret"
def test_secretsmanager_kms_key_id_on_create_and_describe(sm):
sm.create_secret(Name="kms-test-secret", SecretString="val", KmsKeyId="alias/my-key")
resp = sm.describe_secret(SecretId="kms-test-secret")
assert resp["KmsKeyId"] == "alias/my-key"
def test_secretsmanager_kms_key_id_on_update(sm):
sm.update_secret(SecretId="kms-test-secret", KmsKeyId="alias/other-key")
resp = sm.describe_secret(SecretId="kms-test-secret")
assert resp["KmsKeyId"] == "alias/other-key"
def test_secretsmanager_get_by_partial_arn(sm):
"""GetSecretValue with a partial ARN (no random suffix) must resolve the secret."""
import uuid as _uuid
name = f"partial-arn-test/{_uuid.uuid4().hex[:8]}"
created = sm.create_secret(Name=name, SecretString="partial-arn-value")
full_arn = created["ARN"]
# Full ARN works
assert sm.get_secret_value(SecretId=full_arn)["SecretString"] == "partial-arn-value"
# Partial ARN: strip the random suffix (last hyphen + 6 chars)
partial_arn = full_arn.rsplit("-", 1)[0]
assert partial_arn != full_arn
assert sm.get_secret_value(SecretId=partial_arn)["SecretString"] == "partial-arn-value"