aws_rl_env / aws_infra /tests /test_rds.py
Sizzing's picture
Upload folder using huggingface_hub
c745a99 verified
import io
import json
import os
import time
import uuid as _uuid_mod
import zipfile
from urllib.parse import urlparse
import pytest
from botocore.exceptions import ClientError
def test_rds_create(rds):
rds.create_db_instance(
DBInstanceIdentifier="test-db",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="password123",
DBName="testdb",
AllocatedStorage=20,
)
resp = rds.describe_db_instances(DBInstanceIdentifier="test-db")
instances = resp["DBInstances"]
assert len(instances) == 1
assert instances[0]["DBInstanceIdentifier"] == "test-db"
assert instances[0]["Engine"] == "postgres"
assert "Address" in instances[0]["Endpoint"]
def test_rds_engines(rds):
resp = rds.describe_db_engine_versions(Engine="postgres")
assert len(resp["DBEngineVersions"]) > 0
def test_rds_cluster(rds):
rds.create_db_cluster(
DBClusterIdentifier="test-cluster",
Engine="aurora-postgresql",
MasterUsername="admin",
MasterUserPassword="password123",
)
resp = rds.describe_db_clusters(DBClusterIdentifier="test-cluster")
assert resp["DBClusters"][0]["DBClusterIdentifier"] == "test-cluster"
def test_rds_create_instance_v2(rds):
resp = rds.create_db_instance(
DBInstanceIdentifier="rds-ci-v2",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="pass123",
AllocatedStorage=20,
DBName="mydb",
)
inst = resp["DBInstance"]
assert inst["DBInstanceIdentifier"] == "rds-ci-v2"
assert inst["DBInstanceStatus"] == "available"
assert inst["Engine"] == "postgres"
assert "Address" in inst["Endpoint"]
assert "Port" in inst["Endpoint"]
def test_rds_describe_instances_v2(rds):
rds.create_db_instance(
DBInstanceIdentifier="rds-di-v2a",
DBInstanceClass="db.t3.micro",
Engine="mysql",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=10,
)
rds.create_db_instance(
DBInstanceIdentifier="rds-di-v2b",
DBInstanceClass="db.t3.small",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=20,
)
resp = rds.describe_db_instances()
ids = [i["DBInstanceIdentifier"] for i in resp["DBInstances"]]
assert "rds-di-v2a" in ids
assert "rds-di-v2b" in ids
resp2 = rds.describe_db_instances(DBInstanceIdentifier="rds-di-v2a")
assert len(resp2["DBInstances"]) == 1
assert resp2["DBInstances"][0]["Engine"] == "mysql"
def test_rds_delete_instance_v2(rds):
rds.create_db_instance(
DBInstanceIdentifier="rds-del-v2",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=10,
)
rds.delete_db_instance(DBInstanceIdentifier="rds-del-v2", SkipFinalSnapshot=True)
with pytest.raises(ClientError) as exc:
rds.describe_db_instances(DBInstanceIdentifier="rds-del-v2")
assert exc.value.response["Error"]["Code"] == "DBInstanceNotFoundFault"
def test_rds_modify_instance_v2(rds):
rds.create_db_instance(
DBInstanceIdentifier="rds-mod-v2",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=20,
)
rds.modify_db_instance(
DBInstanceIdentifier="rds-mod-v2",
DBInstanceClass="db.t3.small",
AllocatedStorage=50,
ApplyImmediately=True,
)
resp = rds.describe_db_instances(DBInstanceIdentifier="rds-mod-v2")
inst = resp["DBInstances"][0]
assert inst["DBInstanceClass"] == "db.t3.small"
assert inst["AllocatedStorage"] == 50
def test_rds_create_cluster_v2(rds):
resp = rds.create_db_cluster(
DBClusterIdentifier="rds-cc-v2",
Engine="aurora-postgresql",
MasterUsername="admin",
MasterUserPassword="pass123",
)
cluster = resp["DBCluster"]
assert cluster["DBClusterIdentifier"] == "rds-cc-v2"
assert cluster["Status"] == "available"
assert cluster["Engine"] == "aurora-postgresql"
assert "DBClusterArn" in cluster
desc = rds.describe_db_clusters(DBClusterIdentifier="rds-cc-v2")
assert desc["DBClusters"][0]["DBClusterIdentifier"] == "rds-cc-v2"
def test_rds_engine_versions_v2(rds):
pg = rds.describe_db_engine_versions(Engine="postgres")
assert len(pg["DBEngineVersions"]) > 0
assert all(v["Engine"] == "postgres" for v in pg["DBEngineVersions"])
mysql = rds.describe_db_engine_versions(Engine="mysql")
assert len(mysql["DBEngineVersions"]) > 0
assert all(v["Engine"] == "mysql" for v in mysql["DBEngineVersions"])
def test_rds_snapshot_v2(rds):
rds.create_db_instance(
DBInstanceIdentifier="rds-snap-v2",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=10,
)
resp = rds.create_db_snapshot(
DBSnapshotIdentifier="rds-snap-v2-s1",
DBInstanceIdentifier="rds-snap-v2",
)
snap = resp["DBSnapshot"]
assert snap["DBSnapshotIdentifier"] == "rds-snap-v2-s1"
assert snap["Status"] == "available"
desc = rds.describe_db_snapshots(DBSnapshotIdentifier="rds-snap-v2-s1")
assert len(desc["DBSnapshots"]) == 1
rds.delete_db_snapshot(DBSnapshotIdentifier="rds-snap-v2-s1")
with pytest.raises(ClientError) as exc:
rds.describe_db_snapshots(DBSnapshotIdentifier="rds-snap-v2-s1")
assert exc.value.response["Error"]["Code"] == "DBSnapshotNotFound"
def test_rds_tags_v2(rds):
rds.create_db_instance(
DBInstanceIdentifier="rds-tag-v2",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=10,
Tags=[{"Key": "env", "Value": "dev"}],
)
arn = rds.describe_db_instances(DBInstanceIdentifier="rds-tag-v2")["DBInstances"][0]["DBInstanceArn"]
tags = rds.list_tags_for_resource(ResourceName=arn)["TagList"]
assert any(t["Key"] == "env" and t["Value"] == "dev" for t in tags)
rds.add_tags_to_resource(ResourceName=arn, Tags=[{"Key": "team", "Value": "dba"}])
tags2 = rds.list_tags_for_resource(ResourceName=arn)["TagList"]
assert any(t["Key"] == "team" and t["Value"] == "dba" for t in tags2)
rds.remove_tags_from_resource(ResourceName=arn, TagKeys=["env"])
tags3 = rds.list_tags_for_resource(ResourceName=arn)["TagList"]
assert not any(t["Key"] == "env" for t in tags3)
assert any(t["Key"] == "team" for t in tags3)
def test_rds_cluster_parameter_group(rds):
rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName="test-cpg",
DBParameterGroupFamily="aurora-mysql8.0",
Description="Test cluster param group",
)
resp = rds.describe_db_cluster_parameter_groups(DBClusterParameterGroupName="test-cpg")
groups = resp["DBClusterParameterGroups"]
assert len(groups) >= 1
assert groups[0]["DBClusterParameterGroupName"] == "test-cpg"
rds.delete_db_cluster_parameter_group(DBClusterParameterGroupName="test-cpg")
def test_rds_modify_db_parameter_group(rds):
rds.create_db_parameter_group(
DBParameterGroupName="test-mpg",
DBParameterGroupFamily="mysql8.0",
Description="Test param group for modify",
)
resp = rds.modify_db_parameter_group(
DBParameterGroupName="test-mpg",
Parameters=[
{
"ParameterName": "max_connections",
"ParameterValue": "100",
"ApplyMethod": "immediate",
}
],
)
assert resp["DBParameterGroupName"] == "test-mpg"
def test_rds_cluster_snapshot(rds):
rds.create_db_cluster(
DBClusterIdentifier="snap-cl",
Engine="aurora-mysql",
MasterUsername="admin",
MasterUserPassword="password123",
)
rds.create_db_cluster_snapshot(
DBClusterSnapshotIdentifier="snap-cl-snap",
DBClusterIdentifier="snap-cl",
)
resp = rds.describe_db_cluster_snapshots(DBClusterSnapshotIdentifier="snap-cl-snap")
snaps = resp["DBClusterSnapshots"]
assert len(snaps) >= 1
assert snaps[0]["DBClusterSnapshotIdentifier"] == "snap-cl-snap"
rds.delete_db_cluster_snapshot(DBClusterSnapshotIdentifier="snap-cl-snap")
def test_rds_option_group(rds):
rds.create_option_group(
OptionGroupName="test-og",
EngineName="mysql",
MajorEngineVersion="8.0",
OptionGroupDescription="Test option group",
)
resp = rds.describe_option_groups(OptionGroupName="test-og")
groups = resp["OptionGroupsList"]
assert len(groups) >= 1
assert groups[0]["OptionGroupName"] == "test-og"
rds.delete_option_group(OptionGroupName="test-og")
def test_rds_start_stop_cluster(rds):
rds.create_db_cluster(
DBClusterIdentifier="ss-cl",
Engine="aurora-mysql",
MasterUsername="admin",
MasterUserPassword="password123",
)
rds.stop_db_cluster(DBClusterIdentifier="ss-cl")
resp = rds.describe_db_clusters(DBClusterIdentifier="ss-cl")
assert resp["DBClusters"][0]["Status"] == "stopped"
rds.start_db_cluster(DBClusterIdentifier="ss-cl")
resp2 = rds.describe_db_clusters(DBClusterIdentifier="ss-cl")
assert resp2["DBClusters"][0]["Status"] == "available"
def test_rds_modify_subnet_group(rds):
rds.create_db_subnet_group(
DBSubnetGroupName="test-mod-sg",
DBSubnetGroupDescription="Test SG",
SubnetIds=["subnet-111"],
)
rds.modify_db_subnet_group(
DBSubnetGroupName="test-mod-sg",
DBSubnetGroupDescription="Updated SG",
SubnetIds=["subnet-222", "subnet-333"],
)
resp = rds.describe_db_subnet_groups(DBSubnetGroupName="test-mod-sg")
assert resp["DBSubnetGroups"][0]["DBSubnetGroupDescription"] == "Updated SG"
def test_rds_snapshot_crud(rds):
"""CreateDBSnapshot / DescribeDBSnapshots / DeleteDBSnapshot."""
rds.create_db_instance(
DBInstanceIdentifier="qa-rds-snap-db",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="password",
AllocatedStorage=20,
)
try:
rds.create_db_snapshot(DBSnapshotIdentifier="qa-rds-snap-1", DBInstanceIdentifier="qa-rds-snap-db")
snaps = rds.describe_db_snapshots(DBSnapshotIdentifier="qa-rds-snap-1")["DBSnapshots"]
assert len(snaps) == 1
assert snaps[0]["DBSnapshotIdentifier"] == "qa-rds-snap-1"
assert snaps[0]["Status"] == "available"
rds.delete_db_snapshot(DBSnapshotIdentifier="qa-rds-snap-1")
snaps2 = rds.describe_db_snapshots()["DBSnapshots"]
assert not any(s["DBSnapshotIdentifier"] == "qa-rds-snap-1" for s in snaps2)
finally:
rds.delete_db_instance(DBInstanceIdentifier="qa-rds-snap-db", SkipFinalSnapshot=True)
def test_rds_deletion_protection(rds):
"""DeleteDBInstance fails when DeletionProtection=True."""
rds.create_db_instance(
DBInstanceIdentifier="qa-rds-protected",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="password",
AllocatedStorage=20,
DeletionProtection=True,
)
try:
with pytest.raises(ClientError) as exc:
rds.delete_db_instance(DBInstanceIdentifier="qa-rds-protected")
assert exc.value.response["Error"]["Code"] == "InvalidParameterCombination"
finally:
rds.modify_db_instance(
DBInstanceIdentifier="qa-rds-protected",
DeletionProtection=False,
ApplyImmediately=True,
)
rds.delete_db_instance(DBInstanceIdentifier="qa-rds-protected", SkipFinalSnapshot=True)
def test_rds_global_cluster_lifecycle(rds):
"""CreateGlobalCluster / DescribeGlobalClusters / DeleteGlobalCluster lifecycle."""
rds.create_global_cluster(
GlobalClusterIdentifier="test-global-1",
Engine="aurora-postgresql",
EngineVersion="15.3",
)
try:
resp = rds.describe_global_clusters(GlobalClusterIdentifier="test-global-1")
gcs = resp["GlobalClusters"]
assert len(gcs) == 1
gc = gcs[0]
assert gc["GlobalClusterIdentifier"] == "test-global-1"
assert gc["Engine"] == "aurora-postgresql"
assert gc["Status"] == "available"
assert "GlobalClusterArn" in gc
assert "GlobalClusterResourceId" in gc
finally:
rds.delete_global_cluster(GlobalClusterIdentifier="test-global-1")
with pytest.raises(ClientError) as exc:
rds.describe_global_clusters(GlobalClusterIdentifier="test-global-1")
assert exc.value.response["Error"]["Code"] == "GlobalClusterNotFoundFault"
def test_rds_global_cluster_with_source(rds):
"""CreateGlobalCluster with SourceDBClusterIdentifier picks up engine from source."""
rds.create_db_cluster(
DBClusterIdentifier="gc-source-cluster",
Engine="aurora-postgresql",
MasterUsername="admin",
MasterUserPassword="password123",
)
try:
rds.create_global_cluster(
GlobalClusterIdentifier="test-global-src",
SourceDBClusterIdentifier="gc-source-cluster",
)
resp = rds.describe_global_clusters(GlobalClusterIdentifier="test-global-src")
gc = resp["GlobalClusters"][0]
assert gc["Engine"] == "aurora-postgresql"
members = gc["GlobalClusterMembers"]
assert len(members) == 1
assert members[0]["IsWriter"] is True
# Remove the member, then delete
rds.remove_from_global_cluster(
GlobalClusterIdentifier="test-global-src",
DbClusterIdentifier="gc-source-cluster",
)
resp2 = rds.describe_global_clusters(GlobalClusterIdentifier="test-global-src")
assert len(resp2["GlobalClusters"][0]["GlobalClusterMembers"]) == 0
rds.delete_global_cluster(GlobalClusterIdentifier="test-global-src")
finally:
rds.delete_db_cluster(DBClusterIdentifier="gc-source-cluster", SkipFinalSnapshot=True)
def test_rds_global_cluster_delete_with_members_fails(rds):
"""DeleteGlobalCluster fails when writer members still attached."""
rds.create_db_cluster(
DBClusterIdentifier="gc-member-cluster",
Engine="aurora-postgresql",
MasterUsername="admin",
MasterUserPassword="password123",
)
rds.create_global_cluster(
GlobalClusterIdentifier="test-global-members",
SourceDBClusterIdentifier="gc-member-cluster",
)
try:
with pytest.raises(ClientError) as exc:
rds.delete_global_cluster(GlobalClusterIdentifier="test-global-members")
assert exc.value.response["Error"]["Code"] == "InvalidGlobalClusterStateFault"
finally:
rds.remove_from_global_cluster(
GlobalClusterIdentifier="test-global-members",
DbClusterIdentifier="gc-member-cluster",
)
rds.delete_global_cluster(GlobalClusterIdentifier="test-global-members")
rds.delete_db_cluster(DBClusterIdentifier="gc-member-cluster", SkipFinalSnapshot=True)
def test_rds_global_cluster_modify(rds):
"""ModifyGlobalCluster can rename and toggle DeletionProtection."""
rds.create_global_cluster(
GlobalClusterIdentifier="test-global-mod",
Engine="aurora-postgresql",
)
try:
rds.modify_global_cluster(
GlobalClusterIdentifier="test-global-mod",
DeletionProtection=True,
)
gc = rds.describe_global_clusters(
GlobalClusterIdentifier="test-global-mod"
)["GlobalClusters"][0]
assert gc["DeletionProtection"] is True
# Cannot delete while protected
with pytest.raises(ClientError) as exc:
rds.delete_global_cluster(GlobalClusterIdentifier="test-global-mod")
assert exc.value.response["Error"]["Code"] == "InvalidParameterCombination"
# Rename
rds.modify_global_cluster(
GlobalClusterIdentifier="test-global-mod",
NewGlobalClusterIdentifier="test-global-renamed",
DeletionProtection=False,
)
resp = rds.describe_global_clusters(GlobalClusterIdentifier="test-global-renamed")
assert resp["GlobalClusters"][0]["GlobalClusterIdentifier"] == "test-global-renamed"
with pytest.raises(ClientError):
rds.describe_global_clusters(GlobalClusterIdentifier="test-global-mod")
finally:
try:
rds.modify_global_cluster(
GlobalClusterIdentifier="test-global-renamed",
DeletionProtection=False,
)
rds.delete_global_cluster(GlobalClusterIdentifier="test-global-renamed")
except Exception:
pass
def test_rds_modify_and_describe_db_parameters(rds):
"""ModifyDBParameterGroup stores ApplyMethod; DescribeDBParameters returns it with Source filter."""
rds.create_db_parameter_group(
DBParameterGroupName="test-param-persist",
DBParameterGroupFamily="mysql8.0",
Description="param persistence test",
)
rds.modify_db_parameter_group(
DBParameterGroupName="test-param-persist",
Parameters=[
{
"ParameterName": "max_connections",
"ParameterValue": "200",
"ApplyMethod": "immediate",
},
{
"ParameterName": "custom_param_xyz",
"ParameterValue": "hello",
"ApplyMethod": "pending-reboot",
},
],
)
# Describe with Source=user - should only return modified params
resp = rds.describe_db_parameters(
DBParameterGroupName="test-param-persist", Source="user"
)
params = resp["Parameters"]
names = [p["ParameterName"] for p in params]
assert "max_connections" in names
assert "custom_param_xyz" in names
mc = next(p for p in params if p["ParameterName"] == "max_connections")
assert mc["ParameterValue"] == "200"
assert mc["ApplyMethod"] == "immediate"
cp = next(p for p in params if p["ParameterName"] == "custom_param_xyz")
assert cp["ParameterValue"] == "hello"
assert cp["ApplyMethod"] == "pending-reboot"
def test_rds_reset_db_parameters(rds):
"""ResetDBParameterGroup supports targeted and full reset of user overrides."""
rds.create_db_parameter_group(
DBParameterGroupName="test-param-reset",
DBParameterGroupFamily="mysql8.0",
Description="param reset test",
)
rds.modify_db_parameter_group(
DBParameterGroupName="test-param-reset",
Parameters=[
{
"ParameterName": "max_connections",
"ParameterValue": "200",
"ApplyMethod": "immediate",
},
{
"ParameterName": "custom_param_xyz",
"ParameterValue": "hello",
"ApplyMethod": "pending-reboot",
},
],
)
rds.reset_db_parameter_group(
DBParameterGroupName="test-param-reset",
Parameters=[
{
"ParameterName": "custom_param_xyz",
"ApplyMethod": "pending-reboot",
},
],
)
resp = rds.describe_db_parameters(
DBParameterGroupName="test-param-reset", Source="user"
)
names = [p["ParameterName"] for p in resp["Parameters"]]
assert "max_connections" in names
assert "custom_param_xyz" not in names
rds.reset_db_parameter_group(
DBParameterGroupName="test-param-reset",
ResetAllParameters=True,
)
resp2 = rds.describe_db_parameters(
DBParameterGroupName="test-param-reset", Source="user"
)
assert len(resp2["Parameters"]) == 0
defaults = rds.describe_db_parameters(
DBParameterGroupName="test-param-reset", Source="engine-default"
)["Parameters"]
max_connections = next(
p for p in defaults if p["ParameterName"] == "max_connections"
)
assert max_connections["ParameterValue"] == "151"
def test_rds_modify_and_describe_cluster_parameters(rds):
"""ModifyDBClusterParameterGroup stores ApplyMethod; DescribeDBClusterParameters returns it."""
rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName="test-cparam-persist",
DBParameterGroupFamily="aurora-mysql8.0",
Description="cluster param persistence test",
)
rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName="test-cparam-persist",
Parameters=[
{
"ParameterName": "innodb_lock_wait_timeout",
"ParameterValue": "60",
"ApplyMethod": "immediate",
},
],
)
resp = rds.describe_db_cluster_parameters(
DBClusterParameterGroupName="test-cparam-persist", Source="user"
)
params = resp["Parameters"]
assert len(params) >= 1
p = next(p for p in params if p["ParameterName"] == "innodb_lock_wait_timeout")
assert p["ParameterValue"] == "60"
assert p["ApplyMethod"] == "immediate"
# engine-default filter should return empty when no defaults are tracked
resp2 = rds.describe_db_cluster_parameters(
DBClusterParameterGroupName="test-cparam-persist", Source="engine-default"
)
assert len(resp2["Parameters"]) == 0
def test_rds_reset_cluster_parameters(rds):
"""ResetDBClusterParameterGroup clears targeted overrides and full group state."""
rds.create_db_cluster_parameter_group(
DBClusterParameterGroupName="test-cparam-reset",
DBParameterGroupFamily="aurora-mysql8.0",
Description="cluster param reset test",
)
rds.modify_db_cluster_parameter_group(
DBClusterParameterGroupName="test-cparam-reset",
Parameters=[
{
"ParameterName": "innodb_lock_wait_timeout",
"ParameterValue": "60",
"ApplyMethod": "immediate",
},
{
"ParameterName": "time_zone",
"ParameterValue": "UTC",
"ApplyMethod": "pending-reboot",
},
],
)
rds.reset_db_cluster_parameter_group(
DBClusterParameterGroupName="test-cparam-reset",
Parameters=[
{
"ParameterName": "time_zone",
"ApplyMethod": "pending-reboot",
},
],
)
resp = rds.describe_db_cluster_parameters(
DBClusterParameterGroupName="test-cparam-reset", Source="user"
)
names = [p["ParameterName"] for p in resp["Parameters"]]
assert "innodb_lock_wait_timeout" in names
assert "time_zone" not in names
rds.reset_db_cluster_parameter_group(
DBClusterParameterGroupName="test-cparam-reset",
ResetAllParameters=True,
)
resp2 = rds.describe_db_cluster_parameters(
DBClusterParameterGroupName="test-cparam-reset", Source="user"
)
assert len(resp2["Parameters"]) == 0
def test_rds_describe_engine_versions_family(rds):
"""DBParameterGroupFamily should not double-prefix the engine name."""
resp = rds.describe_db_engine_versions(Engine="aurora-mysql")
versions = resp["DBEngineVersions"]
assert len(versions) >= 1
for v in versions:
family = v["DBParameterGroupFamily"]
# Should be e.g. "aurora-mysql8.0", not "aurora-mysqlaurora-mysql8.0"
assert not family.startswith("aurora-mysqlaurora-"), f"Double-prefixed family: {family}"
def test_rds_parse_member_list_both_formats():
"""_parse_member_list handles both Prefix.member.N and Prefix.MemberName.N formats."""
from ministack.services.rds import _parse_member_list
# Standard member.N format (direct API calls)
params_standard = {
"SubnetIds.member.1": "subnet-aaa",
"SubnetIds.member.2": "subnet-bbb",
}
result = _parse_member_list(params_standard, "SubnetIds")
assert result == ["subnet-aaa", "subnet-bbb"]
# Botocore serializer format: Prefix.MemberName.N (via SFN aws-sdk)
params_botocore = {
"SubnetIds.SubnetIdentifier.1": "subnet-xxx",
"SubnetIds.SubnetIdentifier.2": "subnet-yyy",
"SubnetIds.SubnetIdentifier.3": "subnet-zzz",
}
result2 = _parse_member_list(params_botocore, "SubnetIds")
assert result2 == ["subnet-xxx", "subnet-yyy", "subnet-zzz"]
# Empty case
assert _parse_member_list({}, "SubnetIds") == []
def test_rds_describe_by_dbi_resource_id(rds):
"""DescribeDBInstances should accept DbiResourceId as the identifier (AWS parity)."""
resp = rds.create_db_instance(
DBInstanceIdentifier="resid-lookup-test",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="password123",
AllocatedStorage=20,
)
resource_id = resp["DBInstance"]["DbiResourceId"]
assert resource_id.startswith("db-")
desc = rds.describe_db_instances(DBInstanceIdentifier=resource_id)
assert len(desc["DBInstances"]) == 1
assert desc["DBInstances"][0]["DBInstanceIdentifier"] == "resid-lookup-test"
assert desc["DBInstances"][0]["DbiResourceId"] == resource_id
def test_rds_instance_inherits_cluster_username(rds):
"""CreateDBInstance inherits MasterUsername from parent cluster."""
rds.create_db_cluster(
DBClusterIdentifier="inherit-cluster",
Engine="aurora-mysql",
MasterUsername="myadmin",
MasterUserPassword="s3cret!",
)
rds.create_db_instance(
DBInstanceIdentifier="inherit-cluster-1",
DBClusterIdentifier="inherit-cluster",
DBInstanceClass="db.r6g.large",
Engine="aurora-mysql",
)
resp = rds.describe_db_instances(DBInstanceIdentifier="inherit-cluster-1")
inst = resp["DBInstances"][0]
assert inst["MasterUsername"] == "myadmin"
assert inst["DBClusterIdentifier"] == "inherit-cluster"
def test_rds_modify_cluster_password(rds):
"""ModifyDBCluster with MasterUserPassword succeeds."""
rds.create_db_cluster(
DBClusterIdentifier="pw-mod-cluster",
Engine="aurora-mysql",
MasterUsername="admin",
MasterUserPassword="old_pass",
)
rds.modify_db_cluster(
DBClusterIdentifier="pw-mod-cluster",
MasterUserPassword="new_pass",
)
resp = rds.describe_db_clusters(DBClusterIdentifier="pw-mod-cluster")
cluster = resp["DBClusters"][0]
assert cluster["DBClusterIdentifier"] == "pw-mod-cluster"
def test_rds_modify_instance_password(rds):
"""ModifyDBInstance with MasterUserPassword updates the stored password."""
rds.create_db_instance(
DBInstanceIdentifier="pw-mod-inst",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="old_pass",
AllocatedStorage=20,
)
# Password change should succeed without error
rds.modify_db_instance(
DBInstanceIdentifier="pw-mod-inst",
MasterUserPassword="new_pass",
ApplyImmediately=True,
)
resp = rds.describe_db_instances(DBInstanceIdentifier="pw-mod-inst")
inst = resp["DBInstances"][0]
assert inst["DBInstanceIdentifier"] == "pw-mod-inst"
# Other fields should remain unchanged
assert inst["MasterUsername"] == "admin"
assert inst["Engine"] == "postgres"
assert inst["DBInstanceStatus"] == "available"
# ---------------------------------------------------------------------------
# Tests for the 8 previously-untested operations
# ---------------------------------------------------------------------------
def test_rds_create_read_replica(rds):
"""CreateDBInstanceReadReplica creates a replica linked to the source."""
rds.create_db_instance(
DBInstanceIdentifier="rr-source",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="pass123",
AllocatedStorage=20,
)
try:
resp = rds.create_db_instance_read_replica(
DBInstanceIdentifier="rr-replica",
SourceDBInstanceIdentifier="rr-source",
)
replica = resp["DBInstance"]
assert replica["DBInstanceIdentifier"] == "rr-replica"
assert replica["ReadReplicaSourceDBInstanceIdentifier"] == "rr-source"
assert replica["DBInstanceStatus"] == "available"
assert replica["Engine"] == "postgres"
assert "Address" in replica["Endpoint"]
# Source should list the replica
source = rds.describe_db_instances(DBInstanceIdentifier="rr-source")["DBInstances"][0]
assert "rr-replica" in source["ReadReplicaDBInstanceIdentifiers"]
# Duplicate replica id should fail
with pytest.raises(ClientError) as exc:
rds.create_db_instance_read_replica(
DBInstanceIdentifier="rr-replica",
SourceDBInstanceIdentifier="rr-source",
)
assert exc.value.response["Error"]["Code"] == "DBInstanceAlreadyExistsFault"
finally:
rds.delete_db_instance(DBInstanceIdentifier="rr-replica", SkipFinalSnapshot=True)
rds.delete_db_instance(DBInstanceIdentifier="rr-source", SkipFinalSnapshot=True)
def test_rds_create_read_replica_source_not_found(rds):
"""CreateDBInstanceReadReplica fails when the source instance does not exist."""
with pytest.raises(ClientError) as exc:
rds.create_db_instance_read_replica(
DBInstanceIdentifier="rr-orphan",
SourceDBInstanceIdentifier="rr-nonexistent",
)
assert exc.value.response["Error"]["Code"] == "DBInstanceNotFoundFault"
def test_rds_reboot_db_instance(rds):
"""RebootDBInstance sets the instance status back to available."""
rds.create_db_instance(
DBInstanceIdentifier="reboot-test",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=10,
)
try:
resp = rds.reboot_db_instance(DBInstanceIdentifier="reboot-test")
assert resp["DBInstance"]["DBInstanceStatus"] == "available"
desc = rds.describe_db_instances(DBInstanceIdentifier="reboot-test")
assert desc["DBInstances"][0]["DBInstanceStatus"] == "available"
finally:
rds.delete_db_instance(DBInstanceIdentifier="reboot-test", SkipFinalSnapshot=True)
def test_rds_reboot_db_instance_not_found(rds):
"""RebootDBInstance fails for a non-existent instance."""
with pytest.raises(ClientError) as exc:
rds.reboot_db_instance(DBInstanceIdentifier="no-such-instance")
assert exc.value.response["Error"]["Code"] == "DBInstanceNotFoundFault"
def test_rds_restore_from_snapshot(rds):
"""RestoreDBInstanceFromDBSnapshot creates a new instance from a snapshot."""
rds.create_db_instance(
DBInstanceIdentifier="restore-src",
DBInstanceClass="db.t3.micro",
Engine="postgres",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=20,
DBName="srcdb",
)
rds.create_db_snapshot(
DBSnapshotIdentifier="restore-snap",
DBInstanceIdentifier="restore-src",
)
try:
resp = rds.restore_db_instance_from_db_snapshot(
DBInstanceIdentifier="restored-db",
DBSnapshotIdentifier="restore-snap",
DBInstanceClass="db.t3.small",
)
inst = resp["DBInstance"]
assert inst["DBInstanceIdentifier"] == "restored-db"
assert inst["DBInstanceStatus"] == "available"
assert inst["Engine"] == "postgres"
assert inst["DBInstanceClass"] == "db.t3.small"
desc = rds.describe_db_instances(DBInstanceIdentifier="restored-db")
assert len(desc["DBInstances"]) == 1
# Duplicate target id should fail
with pytest.raises(ClientError) as exc:
rds.restore_db_instance_from_db_snapshot(
DBInstanceIdentifier="restored-db",
DBSnapshotIdentifier="restore-snap",
)
assert exc.value.response["Error"]["Code"] == "DBInstanceAlreadyExistsFault"
finally:
rds.delete_db_instance(DBInstanceIdentifier="restored-db", SkipFinalSnapshot=True)
rds.delete_db_snapshot(DBSnapshotIdentifier="restore-snap")
rds.delete_db_instance(DBInstanceIdentifier="restore-src", SkipFinalSnapshot=True)
def test_rds_restore_from_snapshot_not_found(rds):
"""RestoreDBInstanceFromDBSnapshot fails when the snapshot does not exist."""
with pytest.raises(ClientError) as exc:
rds.restore_db_instance_from_db_snapshot(
DBInstanceIdentifier="will-not-exist",
DBSnapshotIdentifier="no-such-snap",
)
assert exc.value.response["Error"]["Code"] == "DBSnapshotNotFound"
def test_rds_start_db_instance(rds):
"""StartDBInstance transitions a stopped instance to available."""
rds.create_db_instance(
DBInstanceIdentifier="start-test",
DBInstanceClass="db.t3.micro",
Engine="mysql",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=10,
)
try:
rds.stop_db_instance(DBInstanceIdentifier="start-test")
stopped = rds.describe_db_instances(DBInstanceIdentifier="start-test")["DBInstances"][0]
assert stopped["DBInstanceStatus"] == "stopped"
resp = rds.start_db_instance(DBInstanceIdentifier="start-test")
assert resp["DBInstance"]["DBInstanceStatus"] == "available"
started = rds.describe_db_instances(DBInstanceIdentifier="start-test")["DBInstances"][0]
assert started["DBInstanceStatus"] == "available"
finally:
rds.delete_db_instance(DBInstanceIdentifier="start-test", SkipFinalSnapshot=True)
def test_rds_start_db_instance_not_found(rds):
"""StartDBInstance fails for a non-existent instance."""
with pytest.raises(ClientError) as exc:
rds.start_db_instance(DBInstanceIdentifier="ghost-instance")
assert exc.value.response["Error"]["Code"] == "DBInstanceNotFoundFault"
def test_rds_stop_db_instance(rds):
"""StopDBInstance transitions an available instance to stopped."""
rds.create_db_instance(
DBInstanceIdentifier="stop-test",
DBInstanceClass="db.t3.micro",
Engine="mysql",
MasterUsername="admin",
MasterUserPassword="pass",
AllocatedStorage=10,
)
try:
resp = rds.stop_db_instance(DBInstanceIdentifier="stop-test")
assert resp["DBInstance"]["DBInstanceStatus"] == "stopped"
desc = rds.describe_db_instances(DBInstanceIdentifier="stop-test")["DBInstances"][0]
assert desc["DBInstanceStatus"] == "stopped"
finally:
rds.delete_db_instance(DBInstanceIdentifier="stop-test", SkipFinalSnapshot=True)
def test_rds_stop_db_instance_not_found(rds):
"""StopDBInstance fails for a non-existent instance."""
with pytest.raises(ClientError) as exc:
rds.stop_db_instance(DBInstanceIdentifier="ghost-instance-2")
assert exc.value.response["Error"]["Code"] == "DBInstanceNotFoundFault"
def test_rds_describe_option_group_options(rds):
"""DescribeOptionGroupOptions returns an empty list (stub)."""
resp = rds.describe_option_group_options(EngineName="mysql")
assert "OptionGroupOptions" in resp
assert resp["OptionGroupOptions"] == []
def test_rds_describe_orderable_db_instance_options(rds):
"""DescribeOrderableDBInstanceOptions returns instance classes for an engine."""
resp = rds.describe_orderable_db_instance_options(Engine="postgres")
options = resp["OrderableDBInstanceOptions"]
assert len(options) > 0
engines = {o["Engine"] for o in options}
assert engines == {"postgres"}
classes = {o["DBInstanceClass"] for o in options}
assert "db.t3.micro" in classes
assert "db.r5.large" in classes
# Filter by DBInstanceClass
resp2 = rds.describe_orderable_db_instance_options(
Engine="mysql", DBInstanceClass="db.t3.micro",
)
options2 = resp2["OrderableDBInstanceOptions"]
assert len(options2) == 1
assert options2[0]["DBInstanceClass"] == "db.t3.micro"
assert options2[0]["Engine"] == "mysql"
def test_rds_enable_http_endpoint(rds):
"""EnableHttpEndpoint enables Data API on an Aurora cluster."""
rds.create_db_cluster(
DBClusterIdentifier="http-ep-cluster",
Engine="aurora-mysql",
MasterUsername="admin",
MasterUserPassword="password123",
)
try:
cluster_arn = rds.describe_db_clusters(
DBClusterIdentifier="http-ep-cluster"
)["DBClusters"][0]["DBClusterArn"]
resp = rds.enable_http_endpoint(ResourceArn=cluster_arn)
assert resp["ResourceArn"] == cluster_arn
assert resp["HttpEndpointEnabled"] is True
desc = rds.describe_db_clusters(DBClusterIdentifier="http-ep-cluster")
assert desc["DBClusters"][0]["HttpEndpointEnabled"] is True
finally:
rds.delete_db_cluster(DBClusterIdentifier="http-ep-cluster", SkipFinalSnapshot=True)
def test_rds_enable_http_endpoint_not_found(rds):
"""EnableHttpEndpoint fails when the cluster ARN does not exist."""
with pytest.raises(ClientError) as exc:
rds.enable_http_endpoint(
ResourceArn="arn:aws:rds:us-east-1:123456789012:cluster:no-such-cluster"
)
assert exc.value.response["Error"]["Code"] == "DBClusterNotFoundFault"