Spaces:
Running
Running
| """ | |
| Tests for RDS Data API service emulator. | |
| Since no real DB containers are available in CI, these tests focus on: | |
| - API routing (requests reach the handler, not 404) | |
| - Parameter validation (missing resourceArn, missing sql, etc.) | |
| - Transaction lifecycle error paths | |
| - Invalid resource ARN handling | |
| """ | |
| import json | |
| import urllib.request | |
| import os | |
| import pytest | |
| from botocore.exceptions import ClientError | |
| ENDPOINT = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566") | |
| REGION = "us-east-1" | |
| ACCOUNT_ID = "000000000000" | |
| FAKE_CLUSTER_ARN = f"arn:aws:rds:{REGION}:{ACCOUNT_ID}:cluster:nonexistent-cluster" | |
| FAKE_SECRET_ARN = f"arn:aws:secretsmanager:{REGION}:{ACCOUNT_ID}:secret:nonexistent-secret" | |
| def _raw_post(path, body): | |
| """Send a raw POST to the MiniStack endpoint (bypassing boto3 since | |
| rds-data uses REST paths like /Execute).""" | |
| data = json.dumps(body).encode() | |
| req = urllib.request.Request( | |
| f"{ENDPOINT}{path}", | |
| data=data, | |
| headers={"Content-Type": "application/json"}, | |
| method="POST", | |
| ) | |
| try: | |
| resp = urllib.request.urlopen(req, timeout=10) | |
| return resp.status, json.loads(resp.read()) | |
| except urllib.error.HTTPError as e: | |
| return e.code, json.loads(e.read()) | |
| # ── Routing tests ────────────────────────────────────────── | |
| def test_execute_route_exists(): | |
| """POST /Execute reaches the rds-data handler (not a 404).""" | |
| status, body = _raw_post("/Execute", {}) | |
| # Should get a 400 (missing params), not 404 | |
| assert status == 400 | |
| assert "BadRequestException" in str(body) or "resourceArn" in str(body) | |
| def test_begin_transaction_route_exists(): | |
| """POST /BeginTransaction reaches the rds-data handler.""" | |
| status, body = _raw_post("/BeginTransaction", {}) | |
| assert status == 400 | |
| def test_commit_transaction_route_exists(): | |
| """POST /CommitTransaction reaches the rds-data handler.""" | |
| status, body = _raw_post("/CommitTransaction", {}) | |
| assert status == 400 | |
| def test_rollback_transaction_route_exists(): | |
| """POST /RollbackTransaction reaches the rds-data handler.""" | |
| status, body = _raw_post("/RollbackTransaction", {}) | |
| assert status == 400 | |
| def test_batch_execute_route_exists(): | |
| """POST /BatchExecute reaches the rds-data handler.""" | |
| status, body = _raw_post("/BatchExecute", {}) | |
| assert status == 400 | |
| # ── Parameter validation ─────────────────────────────────── | |
| def test_execute_missing_resource_arn(): | |
| status, body = _raw_post("/Execute", { | |
| "secretArn": FAKE_SECRET_ARN, | |
| "sql": "SELECT 1", | |
| }) | |
| assert status == 400 | |
| assert "resourceArn" in body.get("message", body.get("Message", "")) | |
| def test_execute_missing_secret_arn(): | |
| status, body = _raw_post("/Execute", { | |
| "resourceArn": FAKE_CLUSTER_ARN, | |
| "sql": "SELECT 1", | |
| }) | |
| assert status == 400 | |
| assert "secretArn" in body.get("message", body.get("Message", "")) | |
| def test_execute_missing_sql(): | |
| status, body = _raw_post("/Execute", { | |
| "resourceArn": FAKE_CLUSTER_ARN, | |
| "secretArn": FAKE_SECRET_ARN, | |
| }) | |
| assert status == 400 | |
| assert "sql" in body.get("message", body.get("Message", "")) | |
| def test_batch_execute_missing_sql(): | |
| status, body = _raw_post("/BatchExecute", { | |
| "resourceArn": FAKE_CLUSTER_ARN, | |
| "secretArn": FAKE_SECRET_ARN, | |
| }) | |
| assert status == 400 | |
| assert "sql" in body.get("message", body.get("Message", "")) | |
| # ── Invalid ARN ──────────────────────────────────────────── | |
| def test_execute_nonexistent_cluster(): | |
| """ExecuteStatement with a non-existent cluster ARN returns an error.""" | |
| status, body = _raw_post("/Execute", { | |
| "resourceArn": FAKE_CLUSTER_ARN, | |
| "secretArn": FAKE_SECRET_ARN, | |
| "sql": "SELECT 1", | |
| }) | |
| assert status == 400 | |
| assert "not found" in body.get("message", body.get("Message", "")).lower() | |
| def test_begin_transaction_nonexistent_cluster(): | |
| """BeginTransaction with a non-existent cluster ARN returns an error.""" | |
| status, body = _raw_post("/BeginTransaction", { | |
| "resourceArn": FAKE_CLUSTER_ARN, | |
| "secretArn": FAKE_SECRET_ARN, | |
| }) | |
| assert status == 400 | |
| assert "not found" in body.get("message", body.get("Message", "")).lower() | |
| def test_batch_execute_nonexistent_cluster(): | |
| status, body = _raw_post("/BatchExecute", { | |
| "resourceArn": FAKE_CLUSTER_ARN, | |
| "secretArn": FAKE_SECRET_ARN, | |
| "sql": "INSERT INTO t VALUES (1)", | |
| }) | |
| assert status == 400 | |
| assert "not found" in body.get("message", body.get("Message", "")).lower() | |
| # ── Transaction lifecycle (error paths) ──────────────────── | |
| def test_commit_missing_transaction_id(): | |
| status, body = _raw_post("/CommitTransaction", {}) | |
| assert status == 400 | |
| assert "transactionId" in body.get("message", body.get("Message", "")) | |
| def test_rollback_missing_transaction_id(): | |
| status, body = _raw_post("/RollbackTransaction", {}) | |
| assert status == 400 | |
| assert "transactionId" in body.get("message", body.get("Message", "")) | |
| def test_commit_nonexistent_transaction(): | |
| status, body = _raw_post("/CommitTransaction", { | |
| "transactionId": "nonexistent-txn-id", | |
| }) | |
| assert status == 404 | |
| assert "not found" in body.get("message", body.get("Message", "")).lower() | |
| def test_rollback_nonexistent_transaction(): | |
| status, body = _raw_post("/RollbackTransaction", { | |
| "transactionId": "nonexistent-txn-id", | |
| }) | |
| assert status == 404 | |
| assert "not found" in body.get("message", body.get("Message", "")).lower() | |
| # ── Invalid JSON ─────────────────────────────────────────── | |
| def test_execute_invalid_json(): | |
| """Malformed JSON body returns BadRequestException.""" | |
| req = urllib.request.Request( | |
| f"{ENDPOINT}/Execute", | |
| data=b"not-json{{{", | |
| headers={"Content-Type": "application/json"}, | |
| method="POST", | |
| ) | |
| try: | |
| resp = urllib.request.urlopen(req, timeout=10) | |
| status = resp.status | |
| body = json.loads(resp.read()) | |
| except urllib.error.HTTPError as e: | |
| status = e.code | |
| body = json.loads(e.read()) | |
| assert status == 400 | |
| assert "Invalid JSON" in body.get("message", body.get("Message", "")) | |
| # ── Parameter conversion (unit tests) ───────────────────── | |
| def test_convert_parameters_all_types(): | |
| """_convert_parameters handles all RDS Data API value types.""" | |
| from ministack.services.rds_data import _convert_parameters | |
| params = [ | |
| {"name": "s", "value": {"stringValue": "hello"}}, | |
| {"name": "n", "value": {"longValue": 42}}, | |
| {"name": "d", "value": {"doubleValue": 3.14}}, | |
| {"name": "b", "value": {"booleanValue": True}}, | |
| {"name": "null_val", "value": {"isNull": True}}, | |
| {"name": "blob", "value": {"blobValue": "AQID"}}, # base64 of b'\x01\x02\x03' | |
| ] | |
| result = _convert_parameters(params) | |
| assert result["s"] == "hello" | |
| assert result["n"] == 42 | |
| assert result["d"] == 3.14 | |
| assert result["b"] is True | |
| assert result["null_val"] is None | |
| assert result["blob"] == b"\x01\x02\x03" | |
| def test_convert_parameters_empty(): | |
| """_convert_parameters returns empty dict for empty/None input.""" | |
| from ministack.services.rds_data import _convert_parameters | |
| assert _convert_parameters([]) == {} | |
| assert _convert_parameters(None) == {} | |
| def test_convert_parameters_missing_name_skipped(): | |
| """Parameters without a name are skipped.""" | |
| from ministack.services.rds_data import _convert_parameters | |
| params = [ | |
| {"value": {"stringValue": "no-name"}}, | |
| {"name": "valid", "value": {"stringValue": "ok"}}, | |
| ] | |
| result = _convert_parameters(params) | |
| assert len(result) == 1 | |
| assert result["valid"] == "ok" | |
| def test_convert_parameters_empty_value(): | |
| """Parameter with empty value object returns None.""" | |
| from ministack.services.rds_data import _convert_parameters | |
| result = _convert_parameters([{"name": "x", "value": {}}]) | |
| assert result["x"] is None | |
| # ── Stub mode tests ──────────────────────────────────────── | |
| def _setup_stub_cluster(rds, sm): | |
| """Create an RDS cluster (no real DB container) and a secret for stub testing.""" | |
| import uuid as _uuid | |
| cluster_id = f"stub-test-{_uuid.uuid4().hex[:8]}" | |
| rds.create_db_cluster( | |
| DBClusterIdentifier=cluster_id, | |
| Engine="aurora-mysql", | |
| MasterUsername="admin", | |
| MasterUserPassword="testpass123", | |
| ) | |
| secret_arn = sm.create_secret( | |
| Name=f"stub-secret-{_uuid.uuid4().hex[:8]}", | |
| SecretString='{"username":"admin","password":"testpass123"}', | |
| )["ARN"] | |
| cluster_arn = f"arn:aws:rds:{REGION}:{ACCOUNT_ID}:cluster:{cluster_id}" | |
| return cluster_arn, secret_arn | |
| def _exec(cluster_arn, secret_arn, sql): | |
| """Execute a SQL statement via the stub and return (status, body).""" | |
| return _raw_post("/Execute", { | |
| "resourceArn": cluster_arn, | |
| "secretArn": secret_arn, | |
| "sql": sql, | |
| }) | |
| def test_rds_data_stub_create_and_query_databases(rds, sm): | |
| """CREATE DATABASE via stub, then query information_schema.schemata.""" | |
| cluster_arn, secret_arn = _setup_stub_cluster(rds, sm) | |
| status, _ = _exec(cluster_arn, secret_arn, "CREATE DATABASE myappdb") | |
| assert status == 200 | |
| status, body = _exec( | |
| cluster_arn, secret_arn, | |
| "SELECT schema_name FROM information_schema.schemata WHERE schema_name IN ('myappdb')", | |
| ) | |
| assert status == 200 | |
| names = [r[0]["stringValue"] for r in body.get("records", [])] | |
| assert "myappdb" in names | |
| def test_rds_data_stub_create_and_query_users(rds, sm): | |
| """CREATE USER via stub, then query mysql.user.""" | |
| cluster_arn, secret_arn = _setup_stub_cluster(rds, sm) | |
| status, _ = _exec(cluster_arn, secret_arn, "CREATE USER 'appuser'@'%' IDENTIFIED BY 'pass'") | |
| assert status == 200 | |
| status, body = _exec( | |
| cluster_arn, secret_arn, | |
| "SELECT User FROM mysql.user WHERE User='appuser'", | |
| ) | |
| assert status == 200 | |
| names = [r[0]["stringValue"] for r in body.get("records", [])] | |
| assert "appuser" in names | |
| def test_rds_data_stub_grant_and_show_grants(rds, sm): | |
| """GRANT privileges, then SHOW GRANTS FOR.""" | |
| cluster_arn, secret_arn = _setup_stub_cluster(rds, sm) | |
| _exec(cluster_arn, secret_arn, "CREATE USER 'grantee'@'%' IDENTIFIED BY 'pass'") | |
| status, _ = _exec( | |
| cluster_arn, secret_arn, | |
| "GRANT ALL PRIVILEGES ON mydb.* TO 'grantee'@'%'", | |
| ) | |
| assert status == 200 | |
| status, body = _exec(cluster_arn, secret_arn, "SHOW GRANTS FOR 'grantee'") | |
| assert status == 200 | |
| grants = [r[0]["stringValue"] for r in body.get("records", [])] | |
| assert any("GRANT" in g and "grantee" in g for g in grants) | |
| def test_rds_data_stub_drop_database(rds, sm): | |
| """CREATE then DROP DATABASE, verify gone from queries.""" | |
| cluster_arn, secret_arn = _setup_stub_cluster(rds, sm) | |
| _exec(cluster_arn, secret_arn, "CREATE DATABASE dropme") | |
| _exec(cluster_arn, secret_arn, "DROP DATABASE dropme") | |
| status, body = _exec( | |
| cluster_arn, secret_arn, | |
| "SELECT schema_name FROM information_schema.schemata WHERE schema_name = 'dropme'", | |
| ) | |
| assert status == 200 | |
| names = [r[0]["stringValue"] for r in body.get("records", [])] | |
| assert "dropme" not in names | |
| def test_rds_data_stub_drop_user(rds, sm): | |
| """CREATE then DROP USER, verify gone from queries.""" | |
| cluster_arn, secret_arn = _setup_stub_cluster(rds, sm) | |
| _exec(cluster_arn, secret_arn, "CREATE USER 'tempuser'@'%' IDENTIFIED BY 'pass'") | |
| _exec(cluster_arn, secret_arn, "DROP USER 'tempuser'@'%'") | |
| status, body = _exec( | |
| cluster_arn, secret_arn, | |
| "SELECT User FROM mysql.user WHERE User='tempuser'", | |
| ) | |
| assert status == 200 | |
| # Should return no records (empty records list from _stub_success) | |
| records = body.get("records", []) | |
| names = [r[0]["stringValue"] for r in records] if records else [] | |
| assert "tempuser" not in names | |
| def test_rds_data_secret_credentials_parsing(): | |
| """_get_secret_credentials extracts username and password from secret.""" | |
| from ministack.services import secretsmanager, rds_data | |
| from ministack.core.responses import set_request_account_id | |
| set_request_account_id("test") | |
| # Create a secret with JSON credentials | |
| secretsmanager._secrets["test-cred-secret"] = { | |
| "ARN": "arn:aws:secretsmanager:us-east-1:000000000000:secret:test-cred", | |
| "Name": "test-cred-secret", | |
| "Versions": { | |
| "v1": { | |
| "Stages": ["AWSCURRENT"], | |
| "SecretString": '{"username":"app_rw","password":"p@ss123"}', | |
| } | |
| }, | |
| } | |
| user, pw = rds_data._get_secret_credentials( | |
| "arn:aws:secretsmanager:us-east-1:000000000000:secret:test-cred") | |
| assert user == "app_rw" | |
| assert pw == "p@ss123" | |
| # Clean up | |
| del secretsmanager._secrets["test-cred-secret"] | |
| def test_rds_data_secret_credentials_no_username(): | |
| """_get_secret_credentials returns None username for password-only secret.""" | |
| from ministack.services import secretsmanager, rds_data | |
| from ministack.core.responses import set_request_account_id | |
| set_request_account_id("test") | |
| secretsmanager._secrets["pw-only-secret"] = { | |
| "ARN": "arn:aws:secretsmanager:us-east-1:000000000000:secret:pw-only", | |
| "Name": "pw-only-secret", | |
| "Versions": { | |
| "v1": { | |
| "Stages": ["AWSCURRENT"], | |
| "SecretString": '{"password":"just-a-password"}', | |
| } | |
| }, | |
| } | |
| user, pw = rds_data._get_secret_credentials( | |
| "arn:aws:secretsmanager:us-east-1:000000000000:secret:pw-only") | |
| assert user is None | |
| assert pw == "just-a-password" | |
| del secretsmanager._secrets["pw-only-secret"] | |