aws_rl_env / aws_infra /tests /test_rds_data.py
Sizzing's picture
Upload folder using huggingface_hub
c745a99 verified
"""
Tests for RDS Data API service emulator.
Since no real DB containers are available in CI, these tests focus on:
- API routing (requests reach the handler, not 404)
- Parameter validation (missing resourceArn, missing sql, etc.)
- Transaction lifecycle error paths
- Invalid resource ARN handling
"""
import json
import urllib.request
import os
import pytest
from botocore.exceptions import ClientError
ENDPOINT = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566")
REGION = "us-east-1"
ACCOUNT_ID = "000000000000"
FAKE_CLUSTER_ARN = f"arn:aws:rds:{REGION}:{ACCOUNT_ID}:cluster:nonexistent-cluster"
FAKE_SECRET_ARN = f"arn:aws:secretsmanager:{REGION}:{ACCOUNT_ID}:secret:nonexistent-secret"
def _raw_post(path, body):
"""Send a raw POST to the MiniStack endpoint (bypassing boto3 since
rds-data uses REST paths like /Execute)."""
data = json.dumps(body).encode()
req = urllib.request.Request(
f"{ENDPOINT}{path}",
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
try:
resp = urllib.request.urlopen(req, timeout=10)
return resp.status, json.loads(resp.read())
except urllib.error.HTTPError as e:
return e.code, json.loads(e.read())
# ── Routing tests ──────────────────────────────────────────
def test_execute_route_exists():
"""POST /Execute reaches the rds-data handler (not a 404)."""
status, body = _raw_post("/Execute", {})
# Should get a 400 (missing params), not 404
assert status == 400
assert "BadRequestException" in str(body) or "resourceArn" in str(body)
def test_begin_transaction_route_exists():
"""POST /BeginTransaction reaches the rds-data handler."""
status, body = _raw_post("/BeginTransaction", {})
assert status == 400
def test_commit_transaction_route_exists():
"""POST /CommitTransaction reaches the rds-data handler."""
status, body = _raw_post("/CommitTransaction", {})
assert status == 400
def test_rollback_transaction_route_exists():
"""POST /RollbackTransaction reaches the rds-data handler."""
status, body = _raw_post("/RollbackTransaction", {})
assert status == 400
def test_batch_execute_route_exists():
"""POST /BatchExecute reaches the rds-data handler."""
status, body = _raw_post("/BatchExecute", {})
assert status == 400
# ── Parameter validation ───────────────────────────────────
def test_execute_missing_resource_arn():
status, body = _raw_post("/Execute", {
"secretArn": FAKE_SECRET_ARN,
"sql": "SELECT 1",
})
assert status == 400
assert "resourceArn" in body.get("message", body.get("Message", ""))
def test_execute_missing_secret_arn():
status, body = _raw_post("/Execute", {
"resourceArn": FAKE_CLUSTER_ARN,
"sql": "SELECT 1",
})
assert status == 400
assert "secretArn" in body.get("message", body.get("Message", ""))
def test_execute_missing_sql():
status, body = _raw_post("/Execute", {
"resourceArn": FAKE_CLUSTER_ARN,
"secretArn": FAKE_SECRET_ARN,
})
assert status == 400
assert "sql" in body.get("message", body.get("Message", ""))
def test_batch_execute_missing_sql():
status, body = _raw_post("/BatchExecute", {
"resourceArn": FAKE_CLUSTER_ARN,
"secretArn": FAKE_SECRET_ARN,
})
assert status == 400
assert "sql" in body.get("message", body.get("Message", ""))
# ── Invalid ARN ────────────────────────────────────────────
def test_execute_nonexistent_cluster():
"""ExecuteStatement with a non-existent cluster ARN returns an error."""
status, body = _raw_post("/Execute", {
"resourceArn": FAKE_CLUSTER_ARN,
"secretArn": FAKE_SECRET_ARN,
"sql": "SELECT 1",
})
assert status == 400
assert "not found" in body.get("message", body.get("Message", "")).lower()
def test_begin_transaction_nonexistent_cluster():
"""BeginTransaction with a non-existent cluster ARN returns an error."""
status, body = _raw_post("/BeginTransaction", {
"resourceArn": FAKE_CLUSTER_ARN,
"secretArn": FAKE_SECRET_ARN,
})
assert status == 400
assert "not found" in body.get("message", body.get("Message", "")).lower()
def test_batch_execute_nonexistent_cluster():
status, body = _raw_post("/BatchExecute", {
"resourceArn": FAKE_CLUSTER_ARN,
"secretArn": FAKE_SECRET_ARN,
"sql": "INSERT INTO t VALUES (1)",
})
assert status == 400
assert "not found" in body.get("message", body.get("Message", "")).lower()
# ── Transaction lifecycle (error paths) ────────────────────
def test_commit_missing_transaction_id():
status, body = _raw_post("/CommitTransaction", {})
assert status == 400
assert "transactionId" in body.get("message", body.get("Message", ""))
def test_rollback_missing_transaction_id():
status, body = _raw_post("/RollbackTransaction", {})
assert status == 400
assert "transactionId" in body.get("message", body.get("Message", ""))
def test_commit_nonexistent_transaction():
status, body = _raw_post("/CommitTransaction", {
"transactionId": "nonexistent-txn-id",
})
assert status == 404
assert "not found" in body.get("message", body.get("Message", "")).lower()
def test_rollback_nonexistent_transaction():
status, body = _raw_post("/RollbackTransaction", {
"transactionId": "nonexistent-txn-id",
})
assert status == 404
assert "not found" in body.get("message", body.get("Message", "")).lower()
# ── Invalid JSON ───────────────────────────────────────────
def test_execute_invalid_json():
"""Malformed JSON body returns BadRequestException."""
req = urllib.request.Request(
f"{ENDPOINT}/Execute",
data=b"not-json{{{",
headers={"Content-Type": "application/json"},
method="POST",
)
try:
resp = urllib.request.urlopen(req, timeout=10)
status = resp.status
body = json.loads(resp.read())
except urllib.error.HTTPError as e:
status = e.code
body = json.loads(e.read())
assert status == 400
assert "Invalid JSON" in body.get("message", body.get("Message", ""))
# ── Parameter conversion (unit tests) ─────────────────────
def test_convert_parameters_all_types():
"""_convert_parameters handles all RDS Data API value types."""
from ministack.services.rds_data import _convert_parameters
params = [
{"name": "s", "value": {"stringValue": "hello"}},
{"name": "n", "value": {"longValue": 42}},
{"name": "d", "value": {"doubleValue": 3.14}},
{"name": "b", "value": {"booleanValue": True}},
{"name": "null_val", "value": {"isNull": True}},
{"name": "blob", "value": {"blobValue": "AQID"}}, # base64 of b'\x01\x02\x03'
]
result = _convert_parameters(params)
assert result["s"] == "hello"
assert result["n"] == 42
assert result["d"] == 3.14
assert result["b"] is True
assert result["null_val"] is None
assert result["blob"] == b"\x01\x02\x03"
def test_convert_parameters_empty():
"""_convert_parameters returns empty dict for empty/None input."""
from ministack.services.rds_data import _convert_parameters
assert _convert_parameters([]) == {}
assert _convert_parameters(None) == {}
def test_convert_parameters_missing_name_skipped():
"""Parameters without a name are skipped."""
from ministack.services.rds_data import _convert_parameters
params = [
{"value": {"stringValue": "no-name"}},
{"name": "valid", "value": {"stringValue": "ok"}},
]
result = _convert_parameters(params)
assert len(result) == 1
assert result["valid"] == "ok"
def test_convert_parameters_empty_value():
"""Parameter with empty value object returns None."""
from ministack.services.rds_data import _convert_parameters
result = _convert_parameters([{"name": "x", "value": {}}])
assert result["x"] is None
# ── Stub mode tests ────────────────────────────────────────
def _setup_stub_cluster(rds, sm):
"""Create an RDS cluster (no real DB container) and a secret for stub testing."""
import uuid as _uuid
cluster_id = f"stub-test-{_uuid.uuid4().hex[:8]}"
rds.create_db_cluster(
DBClusterIdentifier=cluster_id,
Engine="aurora-mysql",
MasterUsername="admin",
MasterUserPassword="testpass123",
)
secret_arn = sm.create_secret(
Name=f"stub-secret-{_uuid.uuid4().hex[:8]}",
SecretString='{"username":"admin","password":"testpass123"}',
)["ARN"]
cluster_arn = f"arn:aws:rds:{REGION}:{ACCOUNT_ID}:cluster:{cluster_id}"
return cluster_arn, secret_arn
def _exec(cluster_arn, secret_arn, sql):
"""Execute a SQL statement via the stub and return (status, body)."""
return _raw_post("/Execute", {
"resourceArn": cluster_arn,
"secretArn": secret_arn,
"sql": sql,
})
def test_rds_data_stub_create_and_query_databases(rds, sm):
"""CREATE DATABASE via stub, then query information_schema.schemata."""
cluster_arn, secret_arn = _setup_stub_cluster(rds, sm)
status, _ = _exec(cluster_arn, secret_arn, "CREATE DATABASE myappdb")
assert status == 200
status, body = _exec(
cluster_arn, secret_arn,
"SELECT schema_name FROM information_schema.schemata WHERE schema_name IN ('myappdb')",
)
assert status == 200
names = [r[0]["stringValue"] for r in body.get("records", [])]
assert "myappdb" in names
def test_rds_data_stub_create_and_query_users(rds, sm):
"""CREATE USER via stub, then query mysql.user."""
cluster_arn, secret_arn = _setup_stub_cluster(rds, sm)
status, _ = _exec(cluster_arn, secret_arn, "CREATE USER 'appuser'@'%' IDENTIFIED BY 'pass'")
assert status == 200
status, body = _exec(
cluster_arn, secret_arn,
"SELECT User FROM mysql.user WHERE User='appuser'",
)
assert status == 200
names = [r[0]["stringValue"] for r in body.get("records", [])]
assert "appuser" in names
def test_rds_data_stub_grant_and_show_grants(rds, sm):
"""GRANT privileges, then SHOW GRANTS FOR."""
cluster_arn, secret_arn = _setup_stub_cluster(rds, sm)
_exec(cluster_arn, secret_arn, "CREATE USER 'grantee'@'%' IDENTIFIED BY 'pass'")
status, _ = _exec(
cluster_arn, secret_arn,
"GRANT ALL PRIVILEGES ON mydb.* TO 'grantee'@'%'",
)
assert status == 200
status, body = _exec(cluster_arn, secret_arn, "SHOW GRANTS FOR 'grantee'")
assert status == 200
grants = [r[0]["stringValue"] for r in body.get("records", [])]
assert any("GRANT" in g and "grantee" in g for g in grants)
def test_rds_data_stub_drop_database(rds, sm):
"""CREATE then DROP DATABASE, verify gone from queries."""
cluster_arn, secret_arn = _setup_stub_cluster(rds, sm)
_exec(cluster_arn, secret_arn, "CREATE DATABASE dropme")
_exec(cluster_arn, secret_arn, "DROP DATABASE dropme")
status, body = _exec(
cluster_arn, secret_arn,
"SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'dropme'",
)
assert status == 200
names = [r[0]["stringValue"] for r in body.get("records", [])]
assert "dropme" not in names
def test_rds_data_stub_drop_user(rds, sm):
"""CREATE then DROP USER, verify gone from queries."""
cluster_arn, secret_arn = _setup_stub_cluster(rds, sm)
_exec(cluster_arn, secret_arn, "CREATE USER 'tempuser'@'%' IDENTIFIED BY 'pass'")
_exec(cluster_arn, secret_arn, "DROP USER 'tempuser'@'%'")
status, body = _exec(
cluster_arn, secret_arn,
"SELECT User FROM mysql.user WHERE User='tempuser'",
)
assert status == 200
# Should return no records (empty records list from _stub_success)
records = body.get("records", [])
names = [r[0]["stringValue"] for r in records] if records else []
assert "tempuser" not in names
def test_rds_data_secret_credentials_parsing():
"""_get_secret_credentials extracts username and password from secret."""
from ministack.services import secretsmanager, rds_data
from ministack.core.responses import set_request_account_id
set_request_account_id("test")
# Create a secret with JSON credentials
secretsmanager._secrets["test-cred-secret"] = {
"ARN": "arn:aws:secretsmanager:us-east-1:000000000000:secret:test-cred",
"Name": "test-cred-secret",
"Versions": {
"v1": {
"Stages": ["AWSCURRENT"],
"SecretString": '{"username":"app_rw","password":"p@ss123"}',
}
},
}
user, pw = rds_data._get_secret_credentials(
"arn:aws:secretsmanager:us-east-1:000000000000:secret:test-cred")
assert user == "app_rw"
assert pw == "p@ss123"
# Clean up
del secretsmanager._secrets["test-cred-secret"]
def test_rds_data_secret_credentials_no_username():
"""_get_secret_credentials returns None username for password-only secret."""
from ministack.services import secretsmanager, rds_data
from ministack.core.responses import set_request_account_id
set_request_account_id("test")
secretsmanager._secrets["pw-only-secret"] = {
"ARN": "arn:aws:secretsmanager:us-east-1:000000000000:secret:pw-only",
"Name": "pw-only-secret",
"Versions": {
"v1": {
"Stages": ["AWSCURRENT"],
"SecretString": '{"password":"just-a-password"}',
}
},
}
user, pw = rds_data._get_secret_credentials(
"arn:aws:secretsmanager:us-east-1:000000000000:secret:pw-only")
assert user is None
assert pw == "just-a-password"
del secretsmanager._secrets["pw-only-secret"]