Spaces:
Running
Running
| import io | |
| import json | |
| import os | |
| import time | |
| import zipfile | |
| from urllib.parse import urlparse | |
| import pytest | |
| from botocore.exceptions import ClientError | |
| import uuid as _uuid_mod | |
| def test_s3_create_bucket(s3): | |
| s3.create_bucket(Bucket="intg-s3-create") | |
| buckets = s3.list_buckets()["Buckets"] | |
| assert any(b["Name"] == "intg-s3-create" for b in buckets) | |
| def test_s3_create_bucket_already_exists(s3): | |
| # Real AWS: creating a bucket you already own is idempotent — returns 200 | |
| s3.create_bucket(Bucket="intg-s3-dup") | |
| s3.create_bucket(Bucket="intg-s3-dup") # must not raise | |
| def test_s3_delete_bucket(s3): | |
| s3.create_bucket(Bucket="intg-s3-delbkt") | |
| s3.delete_bucket(Bucket="intg-s3-delbkt") | |
| buckets = [b["Name"] for b in s3.list_buckets()["Buckets"]] | |
| assert "intg-s3-delbkt" not in buckets | |
| def test_s3_delete_bucket_not_empty(s3): | |
| s3.create_bucket(Bucket="intg-s3-notempty") | |
| s3.put_object(Bucket="intg-s3-notempty", Key="file.txt", Body=b"data") | |
| with pytest.raises(ClientError) as exc: | |
| s3.delete_bucket(Bucket="intg-s3-notempty") | |
| assert exc.value.response["Error"]["Code"] == "BucketNotEmpty" | |
| def test_s3_delete_bucket_not_found(s3): | |
| with pytest.raises(ClientError) as exc: | |
| s3.delete_bucket(Bucket="intg-s3-nonexistent-xyz") | |
| assert exc.value.response["Error"]["Code"] == "NoSuchBucket" | |
| def test_s3_head_bucket(s3): | |
| s3.create_bucket(Bucket="intg-s3-headbkt") | |
| resp = s3.head_bucket(Bucket="intg-s3-headbkt") | |
| assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 | |
| with pytest.raises(ClientError) as exc: | |
| s3.head_bucket(Bucket="intg-s3-headbkt-missing") | |
| assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404 | |
| def test_s3_put_get_object(s3): | |
| s3.create_bucket(Bucket="intg-s3-putget") | |
| s3.put_object(Bucket="intg-s3-putget", Key="hello.txt", Body=b"Hello, World!") | |
| resp = s3.get_object(Bucket="intg-s3-putget", Key="hello.txt") | |
| assert resp["Body"].read() == b"Hello, World!" | |
| def test_s3_put_object_no_bucket(s3): | |
| with pytest.raises(ClientError) as exc: | |
| s3.put_object(Bucket="intg-s3-nobucket-xyz", Key="k", Body=b"x") | |
| assert exc.value.response["Error"]["Code"] == "NoSuchBucket" | |
| def test_s3_put_get_json_chunked(s3): | |
| """AWS SDK v2 sends PutObject with chunked Transfer-Encoding — body must be decoded cleanly.""" | |
| import urllib.request, urllib.parse, json as _json | |
| bucket = "intg-s3-chunked" | |
| s3.create_bucket(Bucket=bucket) | |
| payload = _json.dumps({"hello": "world", "number": 42}) | |
| # Simulate AWS chunked encoding: one chunk + terminator | |
| chunk_body = payload.encode() | |
| chunk_size = f"{len(chunk_body):x}".encode() | |
| fake_sig = b"abc123" | |
| chunked = ( | |
| chunk_size + b";chunk-signature=" + fake_sig + b"\r\n" + | |
| chunk_body + b"\r\n" + | |
| b"0;chunk-signature=" + fake_sig + b"\r\n\r\n" | |
| ) | |
| endpoint = "http://localhost:4566/" + bucket + "/test.json" | |
| req = urllib.request.Request(endpoint, data=chunked, method="PUT", headers={ | |
| "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", | |
| "Content-Type": "application/json", | |
| "Authorization": "AWS4-HMAC-SHA256 Credential=test/20240101/us-east-1/s3/aws4_request, SignedHeaders=host, Signature=fake", | |
| }) | |
| with urllib.request.urlopen(req) as r: | |
| assert r.status == 200 | |
| resp = s3.get_object(Bucket=bucket, Key="test.json") | |
| body = resp["Body"].read().decode() | |
| assert _json.loads(body) == {"hello": "world", "number": 42} | |
| def test_s3_head_object(s3): | |
| s3.create_bucket(Bucket="intg-s3-headobj") | |
| s3.put_object( | |
| Bucket="intg-s3-headobj", | |
| Key="data.bin", | |
| Body=b"0123456789", | |
| ContentType="application/octet-stream", | |
| ) | |
| resp = s3.head_object(Bucket="intg-s3-headobj", Key="data.bin") | |
| assert resp["ContentLength"] == 10 | |
| assert resp["ContentType"] == "application/octet-stream" | |
| assert "ETag" in resp | |
| def test_s3_head_object_not_found(s3): | |
| s3.create_bucket(Bucket="intg-s3-headobj404") | |
| with pytest.raises(ClientError) as exc: | |
| s3.head_object(Bucket="intg-s3-headobj404", Key="missing.txt") | |
| assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404 | |
| def test_s3_delete_object(s3): | |
| s3.create_bucket(Bucket="intg-s3-delobj") | |
| s3.put_object(Bucket="intg-s3-delobj", Key="bye.txt", Body=b"bye") | |
| s3.delete_object(Bucket="intg-s3-delobj", Key="bye.txt") | |
| with pytest.raises(ClientError): | |
| s3.get_object(Bucket="intg-s3-delobj", Key="bye.txt") | |
| def test_s3_delete_object_idempotent(s3): | |
| s3.create_bucket(Bucket="intg-s3-delidempotent") | |
| resp = s3.delete_object(Bucket="intg-s3-delidempotent", Key="nonexistent.txt") | |
| assert resp["ResponseMetadata"]["HTTPStatusCode"] == 204 | |
| def test_s3_copy_object(s3): | |
| s3.create_bucket(Bucket="intg-s3-copysrc") | |
| s3.create_bucket(Bucket="intg-s3-copydst") | |
| s3.put_object(Bucket="intg-s3-copysrc", Key="original.txt", Body=b"copy me") | |
| s3.copy_object( | |
| CopySource={"Bucket": "intg-s3-copysrc", "Key": "original.txt"}, | |
| Bucket="intg-s3-copydst", | |
| Key="copied.txt", | |
| ) | |
| resp = s3.get_object(Bucket="intg-s3-copydst", Key="copied.txt") | |
| assert resp["Body"].read() == b"copy me" | |
| def test_s3_copy_object_metadata_replace(s3): | |
| bkt = "intg-s3-copymeta" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_object( | |
| Bucket=bkt, | |
| Key="src.txt", | |
| Body=b"metadata test", | |
| Metadata={"original-key": "original-value"}, | |
| ) | |
| s3.copy_object( | |
| CopySource={"Bucket": bkt, "Key": "src.txt"}, | |
| Bucket=bkt, | |
| Key="dst.txt", | |
| MetadataDirective="REPLACE", | |
| Metadata={"replaced-key": "replaced-value"}, | |
| ) | |
| resp = s3.head_object(Bucket=bkt, Key="dst.txt") | |
| assert resp["Metadata"].get("replaced-key") == "replaced-value" | |
| assert "original-key" not in resp["Metadata"] | |
| def test_s3_list_objects_v1(s3): | |
| bkt = "intg-s3-listv1" | |
| s3.create_bucket(Bucket=bkt) | |
| for key in [ | |
| "photos/2023/a.jpg", | |
| "photos/2023/b.jpg", | |
| "photos/2024/c.jpg", | |
| "docs/readme.md", | |
| ]: | |
| s3.put_object(Bucket=bkt, Key=key, Body=b"x") | |
| resp = s3.list_objects(Bucket=bkt, Prefix="photos/", Delimiter="/") | |
| prefixes = [p["Prefix"] for p in resp.get("CommonPrefixes", [])] | |
| assert "photos/2023/" in prefixes | |
| assert "photos/2024/" in prefixes | |
| assert len(resp.get("Contents", [])) == 0 | |
| def test_s3_list_objects_v2(s3): | |
| bkt = "intg-s3-listv2" | |
| s3.create_bucket(Bucket=bkt) | |
| for key in ["a/1.txt", "a/2.txt", "b/3.txt"]: | |
| s3.put_object(Bucket=bkt, Key=key, Body=b"v2") | |
| resp = s3.list_objects_v2(Bucket=bkt, Prefix="a/") | |
| assert resp["KeyCount"] == 2 | |
| keys = [c["Key"] for c in resp["Contents"]] | |
| assert "a/1.txt" in keys | |
| assert "a/2.txt" in keys | |
| def test_s3_list_objects_pagination(s3): | |
| bkt = "intg-s3-listpage" | |
| s3.create_bucket(Bucket=bkt) | |
| for i in range(7): | |
| s3.put_object(Bucket=bkt, Key=f"item-{i:02d}.txt", Body=b"p") | |
| resp = s3.list_objects_v2(Bucket=bkt, MaxKeys=3) | |
| assert resp["IsTruncated"] is True | |
| assert resp["KeyCount"] == 3 | |
| token = resp["NextContinuationToken"] | |
| all_keys = [c["Key"] for c in resp["Contents"]] | |
| while resp["IsTruncated"]: | |
| resp = s3.list_objects_v2( | |
| Bucket=bkt, | |
| MaxKeys=3, | |
| ContinuationToken=token, | |
| ) | |
| all_keys.extend(c["Key"] for c in resp["Contents"]) | |
| token = resp.get("NextContinuationToken", "") | |
| assert len(all_keys) == 7 | |
| def test_s3_delete_objects_batch(s3): | |
| bkt = "intg-s3-batchdel" | |
| s3.create_bucket(Bucket=bkt) | |
| keys = [f"obj-{i}.txt" for i in range(5)] | |
| for k in keys: | |
| s3.put_object(Bucket=bkt, Key=k, Body=b"batch") | |
| resp = s3.delete_objects( | |
| Bucket=bkt, | |
| Delete={"Objects": [{"Key": k} for k in keys], "Quiet": False}, | |
| ) | |
| assert len(resp.get("Deleted", [])) == 5 | |
| listing = s3.list_objects_v2(Bucket=bkt) | |
| assert listing["KeyCount"] == 0 | |
| def test_s3_multipart_upload(s3): | |
| bkt = "intg-s3-multipart" | |
| s3.create_bucket(Bucket=bkt) | |
| key = "large.bin" | |
| mpu = s3.create_multipart_upload(Bucket=bkt, Key=key) | |
| upload_id = mpu["UploadId"] | |
| p1 = s3.upload_part( | |
| Bucket=bkt, | |
| Key=key, | |
| UploadId=upload_id, | |
| PartNumber=1, | |
| Body=b"A" * 100, | |
| ) | |
| p2 = s3.upload_part( | |
| Bucket=bkt, | |
| Key=key, | |
| UploadId=upload_id, | |
| PartNumber=2, | |
| Body=b"B" * 100, | |
| ) | |
| s3.complete_multipart_upload( | |
| Bucket=bkt, | |
| Key=key, | |
| UploadId=upload_id, | |
| MultipartUpload={ | |
| "Parts": [ | |
| {"PartNumber": 1, "ETag": p1["ETag"]}, | |
| {"PartNumber": 2, "ETag": p2["ETag"]}, | |
| ] | |
| }, | |
| ) | |
| resp = s3.get_object(Bucket=bkt, Key=key) | |
| assert resp["Body"].read() == b"A" * 100 + b"B" * 100 | |
| def test_s3_abort_multipart_upload(s3): | |
| bkt = "intg-s3-abortmpu" | |
| s3.create_bucket(Bucket=bkt) | |
| key = "aborted.bin" | |
| mpu = s3.create_multipart_upload(Bucket=bkt, Key=key) | |
| upload_id = mpu["UploadId"] | |
| s3.upload_part( | |
| Bucket=bkt, | |
| Key=key, | |
| UploadId=upload_id, | |
| PartNumber=1, | |
| Body=b"X" * 50, | |
| ) | |
| s3.abort_multipart_upload(Bucket=bkt, Key=key, UploadId=upload_id) | |
| with pytest.raises(ClientError) as exc: | |
| s3.get_object(Bucket=bkt, Key=key) | |
| assert exc.value.response["Error"]["Code"] == "NoSuchKey" | |
| def test_s3_get_object_range(s3): | |
| bkt = "intg-s3-range" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_object(Bucket=bkt, Key="ranged.txt", Body=b"0123456789") | |
| resp = s3.get_object(Bucket=bkt, Key="ranged.txt", Range="bytes=2-5") | |
| assert resp["Body"].read() == b"2345" | |
| assert resp["ContentLength"] == 4 | |
| assert "bytes" in resp.get("ContentRange", "") | |
| assert resp["ResponseMetadata"]["HTTPStatusCode"] == 206 | |
| def test_s3_object_metadata(s3): | |
| bkt = "intg-s3-meta" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_object( | |
| Bucket=bkt, | |
| Key="meta.txt", | |
| Body=b"metadata", | |
| Metadata={"custom-key": "custom-value", "another": "data"}, | |
| ) | |
| resp = s3.head_object(Bucket=bkt, Key="meta.txt") | |
| assert resp["Metadata"]["custom-key"] == "custom-value" | |
| assert resp["Metadata"]["another"] == "data" | |
| def test_s3_bucket_tagging(s3): | |
| bkt = "intg-s3-bkttags" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_bucket_tagging( | |
| Bucket=bkt, | |
| Tagging={ | |
| "TagSet": [ | |
| {"Key": "env", "Value": "test"}, | |
| {"Key": "team", "Value": "platform"}, | |
| ] | |
| }, | |
| ) | |
| resp = s3.get_bucket_tagging(Bucket=bkt) | |
| tags = {t["Key"]: t["Value"] for t in resp["TagSet"]} | |
| assert tags["env"] == "test" | |
| assert tags["team"] == "platform" | |
| s3.delete_bucket_tagging(Bucket=bkt) | |
| with pytest.raises(ClientError) as exc: | |
| s3.get_bucket_tagging(Bucket=bkt) | |
| assert exc.value.response["Error"]["Code"] == "NoSuchTagSet" | |
| def test_s3_control_list_tags_for_resource(s3): | |
| """S3 Control ListTagsForResource must return tags set via PutBucketTagging. | |
| Regression: Terraform AWS Provider >= 5 calls s3control:ListTagsForResource | |
| when a `tags` block is set on aws_s3_bucket. The handler was returning an | |
| empty list regardless of bucket tags, causing perpetual drift. | |
| """ | |
| from conftest import make_client | |
| bkt = "intg-s3control-tags" | |
| account_id = "123456789012" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_bucket_tagging( | |
| Bucket=bkt, | |
| Tagging={"TagSet": [{"Key": "name", "Value": "ministack-test"}]}, | |
| ) | |
| s3control = make_client("s3control") | |
| arn = f"arn:aws:s3:::{bkt}" | |
| resp = s3control.list_tags_for_resource(AccountId=account_id, ResourceArn=arn) | |
| tags = {t["Key"]: t["Value"] for t in resp.get("Tags", [])} | |
| assert tags.get("name") == "ministack-test" | |
| def test_s3_control_list_tags_via_s3_control_host(s3): | |
| """S3 Control requests via s3-control.localhost host must not be intercepted by S3 vhost.""" | |
| import urllib.request, urllib.parse | |
| bkt = "intg-s3control-host" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_bucket_tagging( | |
| Bucket=bkt, | |
| Tagging={"TagSet": [{"Key": "env", "Value": "test"}]}, | |
| ) | |
| arn = urllib.parse.quote(f"arn:aws:s3:::{bkt}", safe="") | |
| req = urllib.request.Request( | |
| f"http://localhost:4566/v20180820/tags/{arn}", | |
| method="GET", | |
| headers={ | |
| "x-amz-account-id": "000000000000", | |
| "Host": "s3-control.localhost:4566", | |
| }, | |
| ) | |
| with urllib.request.urlopen(req) as r: | |
| assert r.status == 200 | |
| body = r.read().decode() | |
| assert "env" in body | |
| assert "test" in body | |
| def test_s3_bucket_policy(s3): | |
| bkt = "intg-s3-policy" | |
| s3.create_bucket(Bucket=bkt) | |
| policy = json.dumps( | |
| { | |
| "Version": "2012-10-17", | |
| "Statement": [ | |
| { | |
| "Effect": "Allow", | |
| "Principal": "*", | |
| "Action": "s3:GetObject", | |
| "Resource": f"arn:aws:s3:::{bkt}/*", | |
| } | |
| ], | |
| } | |
| ) | |
| s3.put_bucket_policy(Bucket=bkt, Policy=policy) | |
| resp = s3.get_bucket_policy(Bucket=bkt) | |
| stored = json.loads(resp["Policy"]) | |
| assert stored["Version"] == "2012-10-17" | |
| assert len(stored["Statement"]) == 1 | |
| def test_s3_object_tagging(s3): | |
| bkt = "intg-s3-objtags" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_object(Bucket=bkt, Key="tagged.txt", Body=b"tagged") | |
| s3.put_object_tagging( | |
| Bucket=bkt, | |
| Key="tagged.txt", | |
| Tagging={ | |
| "TagSet": [ | |
| {"Key": "status", "Value": "active"}, | |
| {"Key": "priority", "Value": "high"}, | |
| ] | |
| }, | |
| ) | |
| resp = s3.get_object_tagging(Bucket=bkt, Key="tagged.txt") | |
| tags = {t["Key"]: t["Value"] for t in resp["TagSet"]} | |
| assert tags["status"] == "active" | |
| assert tags["priority"] == "high" | |
| def test_s3_public_access_block(s3): | |
| bkt = "intg-s3-pab" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_public_access_block( | |
| Bucket=bkt, | |
| PublicAccessBlockConfiguration={ | |
| "BlockPublicAcls": True, | |
| "IgnorePublicAcls": True, | |
| "BlockPublicPolicy": False, | |
| "RestrictPublicBuckets": False, | |
| }, | |
| ) | |
| resp = s3.get_public_access_block(Bucket=bkt) | |
| cfg = resp["PublicAccessBlockConfiguration"] | |
| assert cfg["BlockPublicAcls"] is True | |
| assert cfg["BlockPublicPolicy"] is False | |
| s3.delete_public_access_block(Bucket=bkt) | |
| def test_s3_ownership_controls(s3): | |
| bkt = "intg-s3-ownership" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_bucket_ownership_controls( | |
| Bucket=bkt, | |
| OwnershipControls={"Rules": [{"ObjectOwnership": "BucketOwnerPreferred"}]}, | |
| ) | |
| resp = s3.get_bucket_ownership_controls(Bucket=bkt) | |
| assert resp["OwnershipControls"]["Rules"][0]["ObjectOwnership"] == "BucketOwnerPreferred" | |
| s3.delete_bucket_ownership_controls(Bucket=bkt) | |
| def test_s3_object_lock_configuration(s3): | |
| bkt = "intg-s3-objlock-cfg" | |
| s3.create_bucket( | |
| Bucket=bkt, | |
| ObjectLockEnabledForBucket=True, | |
| ) | |
| resp = s3.get_object_lock_configuration(Bucket=bkt) | |
| assert resp["ObjectLockConfiguration"]["ObjectLockEnabled"] == "Enabled" | |
| s3.put_object_lock_configuration( | |
| Bucket=bkt, | |
| ObjectLockConfiguration={ | |
| "ObjectLockEnabled": "Enabled", | |
| "Rule": { | |
| "DefaultRetention": { | |
| "Mode": "GOVERNANCE", | |
| "Days": 30, | |
| } | |
| }, | |
| }, | |
| ) | |
| resp = s3.get_object_lock_configuration(Bucket=bkt) | |
| ret = resp["ObjectLockConfiguration"]["Rule"]["DefaultRetention"] | |
| assert ret["Mode"] == "GOVERNANCE" | |
| assert ret["Days"] == 30 | |
| def test_s3_object_lock_requires_versioning(s3): | |
| bkt = "intg-s3-objlock-nover" | |
| s3.create_bucket(Bucket=bkt) | |
| with pytest.raises(ClientError) as exc: | |
| s3.put_object_lock_configuration( | |
| Bucket=bkt, | |
| ObjectLockConfiguration={ | |
| "ObjectLockEnabled": "Enabled", | |
| }, | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "InvalidBucketState" | |
| def test_s3_object_retention(s3): | |
| bkt = "intg-s3-retention" | |
| s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) | |
| s3.put_object(Bucket=bkt, Key="doc.txt", Body=b"hello") | |
| from datetime import datetime, timezone, timedelta | |
| retain_until = datetime.now(timezone.utc) + timedelta(days=1) | |
| s3.put_object_retention( | |
| Bucket=bkt, | |
| Key="doc.txt", | |
| Retention={"Mode": "GOVERNANCE", "RetainUntilDate": retain_until}, | |
| ) | |
| resp = s3.get_object_retention(Bucket=bkt, Key="doc.txt") | |
| assert resp["Retention"]["Mode"] == "GOVERNANCE" | |
| assert "RetainUntilDate" in resp["Retention"] | |
| def test_s3_object_legal_hold(s3): | |
| bkt = "intg-s3-legalhold" | |
| s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) | |
| s3.put_object(Bucket=bkt, Key="evidence.txt", Body=b"data") | |
| s3.put_object_legal_hold( | |
| Bucket=bkt, | |
| Key="evidence.txt", | |
| LegalHold={"Status": "ON"}, | |
| ) | |
| resp = s3.get_object_legal_hold(Bucket=bkt, Key="evidence.txt") | |
| assert resp["LegalHold"]["Status"] == "ON" | |
| s3.put_object_legal_hold( | |
| Bucket=bkt, | |
| Key="evidence.txt", | |
| LegalHold={"Status": "OFF"}, | |
| ) | |
| resp = s3.get_object_legal_hold(Bucket=bkt, Key="evidence.txt") | |
| assert resp["LegalHold"]["Status"] == "OFF" | |
| def test_s3_object_lock_prevents_delete(s3): | |
| bkt = "intg-s3-lock-del" | |
| s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) | |
| s3.put_object(Bucket=bkt, Key="locked.txt", Body=b"immutable") | |
| s3.put_object_legal_hold( | |
| Bucket=bkt, | |
| Key="locked.txt", | |
| LegalHold={"Status": "ON"}, | |
| ) | |
| with pytest.raises(ClientError) as exc: | |
| s3.delete_object(Bucket=bkt, Key="locked.txt") | |
| assert exc.value.response["Error"]["Code"] == "AccessDenied" | |
| # Remove legal hold, add governance retention | |
| s3.put_object_legal_hold( | |
| Bucket=bkt, | |
| Key="locked.txt", | |
| LegalHold={"Status": "OFF"}, | |
| ) | |
| from datetime import datetime, timezone, timedelta | |
| retain_until = datetime.now(timezone.utc) + timedelta(days=1) | |
| s3.put_object_retention( | |
| Bucket=bkt, | |
| Key="locked.txt", | |
| Retention={"Mode": "GOVERNANCE", "RetainUntilDate": retain_until}, | |
| ) | |
| with pytest.raises(ClientError) as exc: | |
| s3.delete_object(Bucket=bkt, Key="locked.txt") | |
| assert exc.value.response["Error"]["Code"] == "AccessDenied" | |
| # Bypass governance retention | |
| s3.delete_object( | |
| Bucket=bkt, | |
| Key="locked.txt", | |
| BypassGovernanceRetention=True, | |
| ) | |
| with pytest.raises(ClientError): | |
| s3.head_object(Bucket=bkt, Key="locked.txt") | |
| def test_s3_bucket_replication(s3): | |
| src = "intg-s3-repl-src" | |
| s3.create_bucket(Bucket=src) | |
| s3.put_bucket_versioning(Bucket=src, VersioningConfiguration={"Status": "Enabled"}) | |
| s3.put_bucket_replication( | |
| Bucket=src, | |
| ReplicationConfiguration={ | |
| "Role": "arn:aws:iam::012345678901:role/repl", | |
| "Rules": [ | |
| { | |
| "Status": "Enabled", | |
| "Destination": {"Bucket": "arn:aws:s3:::intg-s3-repl-dst"}, | |
| } | |
| ], | |
| }, | |
| ) | |
| resp = s3.get_bucket_replication(Bucket=src) | |
| assert resp["ReplicationConfiguration"]["Role"] == "arn:aws:iam::012345678901:role/repl" | |
| assert len(resp["ReplicationConfiguration"]["Rules"]) == 1 | |
| s3.delete_bucket_replication(Bucket=src) | |
| with pytest.raises(ClientError) as exc: | |
| s3.get_bucket_replication(Bucket=src) | |
| assert exc.value.response["Error"]["Code"] == "ReplicationConfigurationNotFoundError" | |
| def test_s3_replication_requires_versioning(s3): | |
| bkt = "intg-s3-repl-nover" | |
| s3.create_bucket(Bucket=bkt) | |
| with pytest.raises(ClientError) as exc: | |
| s3.put_bucket_replication( | |
| Bucket=bkt, | |
| ReplicationConfiguration={ | |
| "Role": "arn:aws:iam::012345678901:role/repl", | |
| "Rules": [ | |
| { | |
| "Status": "Enabled", | |
| "Destination": {"Bucket": "arn:aws:s3:::somewhere"}, | |
| } | |
| ], | |
| }, | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "InvalidRequest" | |
| def test_s3_put_object_with_lock_headers(s3): | |
| bkt = "intg-s3-put-lock-hdr" | |
| s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) | |
| from datetime import datetime, timezone, timedelta | |
| retain_until = datetime.now(timezone.utc) + timedelta(days=5) | |
| s3.put_object( | |
| Bucket=bkt, | |
| Key="locked-via-header.txt", | |
| Body=b"data", | |
| ObjectLockMode="GOVERNANCE", | |
| ObjectLockRetainUntilDate=retain_until, | |
| ObjectLockLegalHoldStatus="ON", | |
| ) | |
| ret = s3.get_object_retention(Bucket=bkt, Key="locked-via-header.txt") | |
| assert ret["Retention"]["Mode"] == "GOVERNANCE" | |
| hold = s3.get_object_legal_hold(Bucket=bkt, Key="locked-via-header.txt") | |
| assert hold["LegalHold"]["Status"] == "ON" | |
| def test_s3_put_object_with_tagging_header(s3): | |
| bkt = "intg-s3-put-tag-hdr" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_object( | |
| Bucket=bkt, | |
| Key="tagged-inline.txt", | |
| Body=b"hello", | |
| Tagging="env=prod&team=backend", | |
| ) | |
| resp = s3.get_object_tagging(Bucket=bkt, Key="tagged-inline.txt") | |
| tags = {t["Key"]: t["Value"] for t in resp["TagSet"]} | |
| assert tags["env"] == "prod" | |
| assert tags["team"] == "backend" | |
| def test_s3_default_retention_applied(s3): | |
| bkt = "intg-s3-default-ret" | |
| s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) | |
| s3.put_object_lock_configuration( | |
| Bucket=bkt, | |
| ObjectLockConfiguration={ | |
| "ObjectLockEnabled": "Enabled", | |
| "Rule": { | |
| "DefaultRetention": { | |
| "Mode": "COMPLIANCE", | |
| "Days": 7, | |
| } | |
| }, | |
| }, | |
| ) | |
| s3.put_object(Bucket=bkt, Key="auto-locked.txt", Body=b"data") | |
| ret = s3.get_object_retention(Bucket=bkt, Key="auto-locked.txt") | |
| assert ret["Retention"]["Mode"] == "COMPLIANCE" | |
| assert "RetainUntilDate" in ret["Retention"] | |
| def test_s3_batch_delete_enforces_lock(s3): | |
| bkt = "intg-s3-batch-lock" | |
| s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) | |
| s3.put_object(Bucket=bkt, Key="a.txt", Body=b"a") | |
| s3.put_object(Bucket=bkt, Key="b.txt", Body=b"b") | |
| s3.put_object_legal_hold(Bucket=bkt, Key="a.txt", LegalHold={"Status": "ON"}) | |
| resp = s3.delete_objects( | |
| Bucket=bkt, | |
| Delete={"Objects": [{"Key": "a.txt"}, {"Key": "b.txt"}]}, | |
| ) | |
| deleted_keys = [d["Key"] for d in resp.get("Deleted", [])] | |
| error_keys = [e["Key"] for e in resp.get("Errors", [])] | |
| assert "b.txt" in deleted_keys | |
| assert "a.txt" in error_keys | |
| def test_s3_copy_preserves_tags_and_lock(s3): | |
| src = "intg-s3-copy-tag-src" | |
| dst = "intg-s3-copy-tag-dst" | |
| s3.create_bucket(Bucket=src, ObjectLockEnabledForBucket=True) | |
| s3.create_bucket(Bucket=dst, ObjectLockEnabledForBucket=True) | |
| s3.put_object(Bucket=src, Key="orig.txt", Body=b"data") | |
| s3.put_object_tagging( | |
| Bucket=src, | |
| Key="orig.txt", | |
| Tagging={"TagSet": [{"Key": "env", "Value": "staging"}]}, | |
| ) | |
| s3.put_object_legal_hold(Bucket=src, Key="orig.txt", LegalHold={"Status": "ON"}) | |
| s3.copy_object(Bucket=dst, Key="copy.txt", CopySource=f"{src}/orig.txt") | |
| tags = s3.get_object_tagging(Bucket=dst, Key="copy.txt") | |
| tag_map = {t["Key"]: t["Value"] for t in tags["TagSet"]} | |
| assert tag_map["env"] == "staging" | |
| hold = s3.get_object_legal_hold(Bucket=dst, Key="copy.txt") | |
| assert hold["LegalHold"]["Status"] == "ON" | |
| def test_s3_copy_replace_tags(s3): | |
| bkt = "intg-s3-copy-repl-tag" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_object(Bucket=bkt, Key="src.txt", Body=b"data") | |
| s3.put_object_tagging( | |
| Bucket=bkt, | |
| Key="src.txt", | |
| Tagging={"TagSet": [{"Key": "old", "Value": "val"}]}, | |
| ) | |
| s3.copy_object( | |
| Bucket=bkt, | |
| Key="dst.txt", | |
| CopySource=f"{bkt}/src.txt", | |
| TaggingDirective="REPLACE", | |
| Tagging="new=val2", | |
| ) | |
| tags = s3.get_object_tagging(Bucket=bkt, Key="dst.txt") | |
| tag_map = {t["Key"]: t["Value"] for t in tags["TagSet"]} | |
| assert "old" not in tag_map | |
| assert tag_map["new"] == "val2" | |
| def test_s3_tag_count_limit(s3): | |
| bkt = "intg-s3-tag-limit" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_object(Bucket=bkt, Key="toomany.txt", Body=b"x") | |
| with pytest.raises(ClientError) as exc: | |
| s3.put_object_tagging( | |
| Bucket=bkt, | |
| Key="toomany.txt", | |
| Tagging={"TagSet": [{"Key": f"k{i}", "Value": f"v{i}"} for i in range(11)]}, | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "BadRequest" | |
| def test_s3_replication_validates_dest_versioning(s3): | |
| src = "intg-s3-repl-val-src" | |
| dst = "intg-s3-repl-val-dst" | |
| s3.create_bucket(Bucket=src) | |
| s3.create_bucket(Bucket=dst) | |
| s3.put_bucket_versioning(Bucket=src, VersioningConfiguration={"Status": "Enabled"}) | |
| # dst has no versioning | |
| with pytest.raises(ClientError) as exc: | |
| s3.put_bucket_replication( | |
| Bucket=src, | |
| ReplicationConfiguration={ | |
| "Role": "arn:aws:iam::012345678901:role/repl", | |
| "Rules": [ | |
| { | |
| "Status": "Enabled", | |
| "Destination": {"Bucket": f"arn:aws:s3:::{dst}"}, | |
| } | |
| ], | |
| }, | |
| ) | |
| assert exc.value.response["Error"]["Code"] == "InvalidRequest" | |
| def test_s3_head_object_returns_lock_headers(s3): | |
| bkt = "intg-s3-head-lock-hdr" | |
| s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) | |
| from datetime import datetime, timezone, timedelta | |
| retain_until = datetime.now(timezone.utc) + timedelta(days=3) | |
| s3.put_object( | |
| Bucket=bkt, | |
| Key="locked.txt", | |
| Body=b"data", | |
| ObjectLockMode="GOVERNANCE", | |
| ObjectLockRetainUntilDate=retain_until, | |
| ObjectLockLegalHoldStatus="ON", | |
| ) | |
| resp = s3.head_object(Bucket=bkt, Key="locked.txt") | |
| assert resp["ObjectLockMode"] == "GOVERNANCE" | |
| assert "ObjectLockRetainUntilDate" in resp | |
| assert resp["ObjectLockLegalHoldStatus"] == "ON" | |
| get_resp = s3.get_object(Bucket=bkt, Key="locked.txt") | |
| assert get_resp["ObjectLockMode"] == "GOVERNANCE" | |
| assert get_resp["ObjectLockLegalHoldStatus"] == "ON" | |
| def test_s3_event_notification_to_sqs(s3, sqs): | |
| s3.create_bucket(Bucket="s3-evt-bkt") | |
| queue_url = sqs.create_queue(QueueName="s3-evt-queue")["QueueUrl"] | |
| queue_arn = sqs.get_queue_attributes( | |
| QueueUrl=queue_url, | |
| AttributeNames=["QueueArn"], | |
| )["Attributes"]["QueueArn"] | |
| s3.put_bucket_notification_configuration( | |
| Bucket="s3-evt-bkt", | |
| NotificationConfiguration={ | |
| "QueueConfigurations": [{"QueueArn": queue_arn, "Events": ["s3:ObjectCreated:*"]}], | |
| }, | |
| ) | |
| s3.put_object(Bucket="s3-evt-bkt", Key="test-notify.txt", Body=b"hello") | |
| time.sleep(0.5) | |
| msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) | |
| assert "Messages" in msgs and len(msgs["Messages"]) > 0 | |
| body = json.loads(msgs["Messages"][0]["Body"]) | |
| assert body["Records"][0]["eventSource"] == "aws:s3" | |
| assert body["Records"][0]["s3"]["object"]["key"] == "test-notify.txt" | |
| def test_s3_event_notification_filter(s3, sqs): | |
| s3.create_bucket(Bucket="s3-evt-filter-bkt") | |
| queue_url = sqs.create_queue(QueueName="s3-evt-filter-q")["QueueUrl"] | |
| queue_arn = sqs.get_queue_attributes( | |
| QueueUrl=queue_url, | |
| AttributeNames=["QueueArn"], | |
| )["Attributes"]["QueueArn"] | |
| s3.put_bucket_notification_configuration( | |
| Bucket="s3-evt-filter-bkt", | |
| NotificationConfiguration={ | |
| "QueueConfigurations": [ | |
| { | |
| "QueueArn": queue_arn, | |
| "Events": ["s3:ObjectCreated:*"], | |
| "Filter": {"Key": {"FilterRules": [{"Name": "suffix", "Value": ".csv"}]}}, | |
| } | |
| ], | |
| }, | |
| ) | |
| s3.put_object(Bucket="s3-evt-filter-bkt", Key="data.txt", Body=b"no match") | |
| s3.put_object(Bucket="s3-evt-filter-bkt", Key="data.csv", Body=b"match") | |
| time.sleep(0.5) | |
| msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) | |
| keys = [json.loads(m["Body"])["Records"][0]["s3"]["object"]["key"] for m in msgs.get("Messages", [])] | |
| assert "data.csv" in keys | |
| assert "data.txt" not in keys | |
| def test_s3_event_notification_delete(s3, sqs): | |
| s3.create_bucket(Bucket="s3-evt-del-bkt") | |
| queue_url = sqs.create_queue(QueueName="s3-evt-del-q")["QueueUrl"] | |
| queue_arn = sqs.get_queue_attributes( | |
| QueueUrl=queue_url, | |
| AttributeNames=["QueueArn"], | |
| )["Attributes"]["QueueArn"] | |
| s3.put_bucket_notification_configuration( | |
| Bucket="s3-evt-del-bkt", | |
| NotificationConfiguration={ | |
| "QueueConfigurations": [{"QueueArn": queue_arn, "Events": ["s3:ObjectRemoved:*"]}], | |
| }, | |
| ) | |
| s3.put_object(Bucket="s3-evt-del-bkt", Key="to-del.txt", Body=b"bye") | |
| s3.delete_object(Bucket="s3-evt-del-bkt", Key="to-del.txt") | |
| time.sleep(0.5) | |
| msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) | |
| assert "Messages" in msgs and len(msgs["Messages"]) > 0 | |
| body = json.loads(msgs["Messages"][0]["Body"]) | |
| assert "ObjectRemoved" in body["Records"][0]["eventName"] | |
| def test_s3_eventbridge_notification(s3, sqs, eb): | |
| """S3 EventBridgeConfiguration sends events to EventBridge, routed to SQS via rule.""" | |
| s3.create_bucket(Bucket="s3-eb-bkt") | |
| queue_url = sqs.create_queue(QueueName="s3-eb-target-q")["QueueUrl"] | |
| queue_arn = sqs.get_queue_attributes( | |
| QueueUrl=queue_url, AttributeNames=["QueueArn"], | |
| )["Attributes"]["QueueArn"] | |
| # Enable EventBridge on bucket | |
| s3.put_bucket_notification_configuration( | |
| Bucket="s3-eb-bkt", | |
| NotificationConfiguration={"EventBridgeConfiguration": {}}, | |
| ) | |
| # Create EventBridge rule matching S3 events → SQS target | |
| eb.put_rule( | |
| Name="s3-to-sqs-rule", | |
| EventPattern=json.dumps({"source": ["aws.s3"]}), | |
| State="ENABLED", | |
| ) | |
| eb.put_targets( | |
| Rule="s3-to-sqs-rule", | |
| Targets=[{"Id": "sqs-target", "Arn": queue_arn}], | |
| ) | |
| # Upload object — should trigger S3 → EventBridge → SQS | |
| s3.put_object(Bucket="s3-eb-bkt", Key="hello.txt", Body=b"world") | |
| time.sleep(0.5) | |
| msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) | |
| assert "Messages" in msgs and len(msgs["Messages"]) > 0 | |
| body = json.loads(msgs["Messages"][0]["Body"]) | |
| assert body["source"] == "aws.s3" | |
| assert body["detail"]["bucket"]["name"] == "s3-eb-bkt" | |
| assert body["detail"]["object"]["key"] == "hello.txt" | |
| def test_s3_list_object_versions(s3): | |
| s3.create_bucket(Bucket="s3-ver-bkt") | |
| s3.put_object(Bucket="s3-ver-bkt", Key="v1.txt", Body=b"v1") | |
| s3.put_object(Bucket="s3-ver-bkt", Key="v2.txt", Body=b"v2") | |
| resp = s3.list_object_versions(Bucket="s3-ver-bkt") | |
| versions = resp.get("Versions", []) | |
| assert len(versions) >= 2 | |
| keys = [v["Key"] for v in versions] | |
| assert "v1.txt" in keys and "v2.txt" in keys | |
| def test_s3_list_object_versions_multiple_puts_same_key(s3): | |
| """Multiple PUTs to the same key with versioning enabled should return all versions.""" | |
| bkt = "s3-ver-multi" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_bucket_versioning(Bucket=bkt, VersioningConfiguration={"Status": "Enabled"}) | |
| r1 = s3.put_object(Bucket=bkt, Key="doc.txt", Body=b"v1") | |
| r2 = s3.put_object(Bucket=bkt, Key="doc.txt", Body=b"v2") | |
| r3 = s3.put_object(Bucket=bkt, Key="doc.txt", Body=b"v3") | |
| assert r1["VersionId"] != r2["VersionId"] | |
| assert r2["VersionId"] != r3["VersionId"] | |
| resp = s3.list_object_versions(Bucket=bkt) | |
| versions = resp.get("Versions", []) | |
| assert len(versions) == 3 | |
| version_ids = [v["VersionId"] for v in versions] | |
| assert r1["VersionId"] in version_ids | |
| assert r2["VersionId"] in version_ids | |
| assert r3["VersionId"] in version_ids | |
| latest = [v for v in versions if v["IsLatest"]] | |
| assert len(latest) == 1 | |
| assert latest[0]["VersionId"] == r3["VersionId"] | |
| def test_s3_multipart_upload_returns_version_id(s3): | |
| """CompleteMultipartUpload should return VersionId when versioning is enabled.""" | |
| bkt = "s3-ver-mpu" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_bucket_versioning(Bucket=bkt, VersioningConfiguration={"Status": "Enabled"}) | |
| mpu = s3.create_multipart_upload(Bucket=bkt, Key="big.bin") | |
| upload_id = mpu["UploadId"] | |
| part = s3.upload_part(Bucket=bkt, Key="big.bin", UploadId=upload_id, PartNumber=1, Body=b"x" * 1000) | |
| resp = s3.complete_multipart_upload( | |
| Bucket=bkt, Key="big.bin", UploadId=upload_id, | |
| MultipartUpload={"Parts": [{"PartNumber": 1, "ETag": part["ETag"]}]}, | |
| ) | |
| assert "VersionId" in resp, "CompleteMultipartUpload must return VersionId" | |
| first_vid = resp["VersionId"] | |
| # Second multipart to same key — different version | |
| mpu2 = s3.create_multipart_upload(Bucket=bkt, Key="big.bin") | |
| part2 = s3.upload_part(Bucket=bkt, Key="big.bin", UploadId=mpu2["UploadId"], PartNumber=1, Body=b"y" * 1000) | |
| resp2 = s3.complete_multipart_upload( | |
| Bucket=bkt, Key="big.bin", UploadId=mpu2["UploadId"], | |
| MultipartUpload={"Parts": [{"PartNumber": 1, "ETag": part2["ETag"]}]}, | |
| ) | |
| assert resp2["VersionId"] != first_vid | |
| # Both versions should appear in list_object_versions | |
| versions = s3.list_object_versions(Bucket=bkt).get("Versions", []) | |
| vids = [v["VersionId"] for v in versions] | |
| assert first_vid in vids | |
| assert resp2["VersionId"] in vids | |
| latest = [v for v in versions if v["IsLatest"]] | |
| assert len(latest) == 1 | |
| assert latest[0]["VersionId"] == resp2["VersionId"] | |
| def test_s3_copy_object_returns_version_id(s3): | |
| """CopyObject should return VersionId and track versions when versioning is enabled.""" | |
| bkt = "s3-ver-copy" | |
| s3.create_bucket(Bucket=bkt) | |
| s3.put_bucket_versioning(Bucket=bkt, VersioningConfiguration={"Status": "Enabled"}) | |
| s3.put_object(Bucket=bkt, Key="src.txt", Body=b"original") | |
| resp = s3.copy_object(Bucket=bkt, Key="dst.txt", CopySource=f"{bkt}/src.txt") | |
| assert "VersionId" in resp, "CopyObject must return VersionId" | |
| first_vid = resp["VersionId"] | |
| # Copy again — different version | |
| resp2 = s3.copy_object(Bucket=bkt, Key="dst.txt", CopySource=f"{bkt}/src.txt") | |
| assert resp2["VersionId"] != first_vid | |
| versions = s3.list_object_versions(Bucket=bkt, Prefix="dst.txt").get("Versions", []) | |
| assert len(versions) == 2, f"Expected 2 versions for dst.txt, got {len(versions)}" | |
| latest = [v for v in versions if v["IsLatest"]] | |
| assert len(latest) == 1 | |
| def test_s3_multipart_no_version_without_versioning(s3): | |
| """CompleteMultipartUpload should NOT return VersionId when versioning is disabled.""" | |
| bkt = "s3-nover-mpu" | |
| s3.create_bucket(Bucket=bkt) | |
| mpu = s3.create_multipart_upload(Bucket=bkt, Key="file.bin") | |
| part = s3.upload_part(Bucket=bkt, Key="file.bin", UploadId=mpu["UploadId"], PartNumber=1, Body=b"data") | |
| resp = s3.complete_multipart_upload( | |
| Bucket=bkt, Key="file.bin", UploadId=mpu["UploadId"], | |
| MultipartUpload={"Parts": [{"PartNumber": 1, "ETag": part["ETag"]}]}, | |
| ) | |
| assert "VersionId" not in resp, "Should not return VersionId without versioning" | |
| def test_s3_bucket_website(s3): | |
| s3.create_bucket(Bucket="s3-web-bkt") | |
| s3.put_bucket_website( | |
| Bucket="s3-web-bkt", | |
| WebsiteConfiguration={"IndexDocument": {"Suffix": "index.html"}}, | |
| ) | |
| resp = s3.get_bucket_website(Bucket="s3-web-bkt") | |
| assert resp["IndexDocument"]["Suffix"] == "index.html" | |
| s3.delete_bucket_website(Bucket="s3-web-bkt") | |
| with pytest.raises(ClientError): | |
| s3.get_bucket_website(Bucket="s3-web-bkt") | |
| def test_s3_put_bucket_logging(s3): | |
| s3.create_bucket(Bucket="s3-log-bkt") | |
| s3.put_bucket_logging( | |
| Bucket="s3-log-bkt", | |
| BucketLoggingStatus={ | |
| "LoggingEnabled": {"TargetBucket": "s3-log-bkt", "TargetPrefix": "logs/"}, | |
| }, | |
| ) | |
| resp = s3.get_bucket_logging(Bucket="s3-log-bkt") | |
| assert "LoggingEnabled" in resp | |
| def test_s3_bucket_versioning(s3): | |
| s3.create_bucket(Bucket="intg-s3-versioning") | |
| s3.put_bucket_versioning( | |
| Bucket="intg-s3-versioning", | |
| VersioningConfiguration={"Status": "Enabled"}, | |
| ) | |
| resp = s3.get_bucket_versioning(Bucket="intg-s3-versioning") | |
| assert resp["Status"] == "Enabled" | |
| def test_s3_put_object_returns_version_id(s3): | |
| s3.create_bucket(Bucket="intg-s3-ver-put") | |
| s3.put_bucket_versioning( | |
| Bucket="intg-s3-ver-put", | |
| VersioningConfiguration={"Status": "Enabled"}, | |
| ) | |
| resp = s3.put_object(Bucket="intg-s3-ver-put", Key="hello.txt", Body=b"v1") | |
| assert "VersionId" in resp | |
| assert len(resp["VersionId"]) > 0 | |
| # Second put should get a different version | |
| resp2 = s3.put_object(Bucket="intg-s3-ver-put", Key="hello.txt", Body=b"v2") | |
| assert resp2["VersionId"] != resp["VersionId"] | |
| def test_s3_put_object_no_version_id_without_versioning(s3): | |
| s3.create_bucket(Bucket="intg-s3-nover-put") | |
| resp = s3.put_object(Bucket="intg-s3-nover-put", Key="hello.txt", Body=b"data") | |
| assert "VersionId" not in resp | |
| def test_s3_bucket_encryption(s3): | |
| s3.create_bucket(Bucket="intg-s3-enc") | |
| s3.put_bucket_encryption( | |
| Bucket="intg-s3-enc", | |
| ServerSideEncryptionConfiguration={ | |
| "Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}] | |
| }, | |
| ) | |
| resp = s3.get_bucket_encryption(Bucket="intg-s3-enc") | |
| rules = resp["ServerSideEncryptionConfiguration"]["Rules"] | |
| assert rules[0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] == "AES256" | |
| s3.delete_bucket_encryption(Bucket="intg-s3-enc") | |
| with pytest.raises(ClientError) as exc: | |
| s3.get_bucket_encryption(Bucket="intg-s3-enc") | |
| assert exc.value.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError" | |
| def test_s3_bucket_lifecycle(s3): | |
| s3.create_bucket(Bucket="intg-s3-lifecycle") | |
| s3.put_bucket_lifecycle_configuration( | |
| Bucket="intg-s3-lifecycle", | |
| LifecycleConfiguration={ | |
| "Rules": [ | |
| { | |
| "ID": "expire-old", | |
| "Status": "Enabled", | |
| "Filter": {"Prefix": "logs/"}, | |
| "Expiration": {"Days": 30}, | |
| } | |
| ] | |
| }, | |
| ) | |
| resp = s3.get_bucket_lifecycle_configuration(Bucket="intg-s3-lifecycle") | |
| assert resp["Rules"][0]["ID"] == "expire-old" | |
| s3.delete_bucket_lifecycle(Bucket="intg-s3-lifecycle") | |
| with pytest.raises(ClientError) as exc: | |
| s3.get_bucket_lifecycle_configuration(Bucket="intg-s3-lifecycle") | |
| assert exc.value.response["Error"]["Code"] == "NoSuchLifecycleConfiguration" | |
| def test_s3_bucket_cors(s3): | |
| s3.create_bucket(Bucket="intg-s3-cors") | |
| s3.put_bucket_cors( | |
| Bucket="intg-s3-cors", | |
| CORSConfiguration={ | |
| "CORSRules": [ | |
| { | |
| "AllowedHeaders": ["*"], | |
| "AllowedMethods": ["GET", "PUT"], | |
| "AllowedOrigins": ["https://example.com"], | |
| "MaxAgeSeconds": 3000, | |
| } | |
| ] | |
| }, | |
| ) | |
| resp = s3.get_bucket_cors(Bucket="intg-s3-cors") | |
| assert resp["CORSRules"][0]["AllowedOrigins"] == ["https://example.com"] | |
| s3.delete_bucket_cors(Bucket="intg-s3-cors") | |
| with pytest.raises(ClientError) as exc: | |
| s3.get_bucket_cors(Bucket="intg-s3-cors") | |
| assert exc.value.response["Error"]["Code"] == "NoSuchCORSConfiguration" | |
| def test_s3_bucket_acl(s3): | |
| s3.create_bucket(Bucket="intg-s3-acl") | |
| resp = s3.get_bucket_acl(Bucket="intg-s3-acl") | |
| assert "Owner" in resp | |
| assert "Grants" in resp | |
| def test_s3_range_suffix(s3): | |
| """Range: bytes=-N returns last N bytes.""" | |
| s3.create_bucket(Bucket="qa-s3-range-suffix") | |
| s3.put_object(Bucket="qa-s3-range-suffix", Key="data.txt", Body=b"0123456789") | |
| resp = s3.get_object(Bucket="qa-s3-range-suffix", Key="data.txt", Range="bytes=-3") | |
| assert resp["Body"].read() == b"789" | |
| assert resp["ResponseMetadata"]["HTTPStatusCode"] == 206 | |
| def test_s3_range_beyond_end(s3): | |
| """Range start beyond file size returns 416.""" | |
| s3.create_bucket(Bucket="qa-s3-range-beyond") | |
| s3.put_object(Bucket="qa-s3-range-beyond", Key="small.txt", Body=b"hello") | |
| with pytest.raises(ClientError) as exc: | |
| s3.get_object(Bucket="qa-s3-range-beyond", Key="small.txt", Range="bytes=100-200") | |
| assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 416 | |
| def test_s3_list_v1_marker_pagination(s3): | |
| """ListObjects v1 Marker pagination returns correct pages.""" | |
| s3.create_bucket(Bucket="qa-s3-marker") | |
| keys = [f"file{i:03d}.txt" for i in range(10)] | |
| for k in keys: | |
| s3.put_object(Bucket="qa-s3-marker", Key=k, Body=b"x") | |
| # NextMarker only returned when Delimiter is set (AWS spec) | |
| resp1 = s3.list_objects(Bucket="qa-s3-marker", MaxKeys=4, Delimiter="/") | |
| assert resp1["IsTruncated"] is True | |
| assert len(resp1["Contents"]) == 4 | |
| marker = resp1["NextMarker"] | |
| resp2 = s3.list_objects(Bucket="qa-s3-marker", MaxKeys=4, Marker=marker, Delimiter="/") | |
| page2_keys = [o["Key"] for o in resp2["Contents"]] | |
| page1_keys = [o["Key"] for o in resp1["Contents"]] | |
| assert not any(k in page1_keys for k in page2_keys) | |
| def test_s3_delete_objects_returns_deleted(s3): | |
| """DeleteObjects returns each deleted key in Deleted list.""" | |
| s3.create_bucket(Bucket="qa-s3-batch-del") | |
| for i in range(3): | |
| s3.put_object(Bucket="qa-s3-batch-del", Key=f"obj{i}.txt", Body=b"x") | |
| resp = s3.delete_objects( | |
| Bucket="qa-s3-batch-del", | |
| Delete={"Objects": [{"Key": f"obj{i}.txt"} for i in range(3)]}, | |
| ) | |
| assert len(resp["Deleted"]) == 3 | |
| assert not resp.get("Errors") | |
| def test_s3_put_object_content_type_preserved(s3): | |
| """Content-Type set on PutObject is returned on GetObject.""" | |
| s3.create_bucket(Bucket="qa-s3-ct") | |
| s3.put_object( | |
| Bucket="qa-s3-ct", | |
| Key="page.html", | |
| Body=b"<html/>", | |
| ContentType="text/html; charset=utf-8", | |
| ) | |
| resp = s3.get_object(Bucket="qa-s3-ct", Key="page.html") | |
| assert "text/html" in resp["ContentType"] | |
| def test_s3_head_object_returns_content_length(s3): | |
| """HeadObject must return correct ContentLength.""" | |
| s3.create_bucket(Bucket="qa-s3-head-len") | |
| body = b"exactly twenty bytes" | |
| s3.put_object(Bucket="qa-s3-head-len", Key="f.bin", Body=body) | |
| resp = s3.head_object(Bucket="qa-s3-head-len", Key="f.bin") | |
| assert resp["ContentLength"] == len(body) | |
| def test_s3_copy_preserves_metadata(s3): | |
| """CopyObject with MetadataDirective=COPY preserves source metadata.""" | |
| s3.create_bucket(Bucket="qa-s3-copy-meta") | |
| s3.put_object( | |
| Bucket="qa-s3-copy-meta", | |
| Key="src.txt", | |
| Body=b"data", | |
| Metadata={"x-custom": "value123"}, | |
| ) | |
| s3.copy_object( | |
| CopySource={"Bucket": "qa-s3-copy-meta", "Key": "src.txt"}, | |
| Bucket="qa-s3-copy-meta", | |
| Key="dst.txt", | |
| MetadataDirective="COPY", | |
| ) | |
| resp = s3.head_object(Bucket="qa-s3-copy-meta", Key="dst.txt") | |
| assert resp["Metadata"].get("x-custom") == "value123" | |
| def test_s3_multipart_list_parts(s3): | |
| """ListParts returns uploaded parts before completion.""" | |
| s3.create_bucket(Bucket="qa-s3-listparts") | |
| mpu = s3.create_multipart_upload(Bucket="qa-s3-listparts", Key="big.bin") | |
| uid = mpu["UploadId"] | |
| p1 = s3.upload_part( | |
| Bucket="qa-s3-listparts", | |
| Key="big.bin", | |
| UploadId=uid, | |
| PartNumber=1, | |
| Body=b"A" * 50, | |
| ) | |
| p2 = s3.upload_part( | |
| Bucket="qa-s3-listparts", | |
| Key="big.bin", | |
| UploadId=uid, | |
| PartNumber=2, | |
| Body=b"B" * 50, | |
| ) | |
| parts = s3.list_parts(Bucket="qa-s3-listparts", Key="big.bin", UploadId=uid)["Parts"] | |
| assert len(parts) == 2 | |
| assert parts[0]["PartNumber"] == 1 | |
| assert parts[1]["PartNumber"] == 2 | |
| s3.complete_multipart_upload( | |
| Bucket="qa-s3-listparts", | |
| Key="big.bin", | |
| UploadId=uid, | |
| MultipartUpload={ | |
| "Parts": [ | |
| {"PartNumber": 1, "ETag": p1["ETag"]}, | |
| {"PartNumber": 2, "ETag": p2["ETag"]}, | |
| ] | |
| }, | |
| ) | |
| def test_s3_list_multipart_uploads(s3): | |
| """ListMultipartUploads returns in-progress uploads.""" | |
| s3.create_bucket(Bucket="qa-s3-list-mpu") | |
| uid1 = s3.create_multipart_upload(Bucket="qa-s3-list-mpu", Key="a.bin")["UploadId"] | |
| uid2 = s3.create_multipart_upload(Bucket="qa-s3-list-mpu", Key="b.bin")["UploadId"] | |
| resp = s3.list_multipart_uploads(Bucket="qa-s3-list-mpu") | |
| upload_ids = {u["UploadId"] for u in resp.get("Uploads", [])} | |
| assert uid1 in upload_ids | |
| assert uid2 in upload_ids | |
| s3.abort_multipart_upload(Bucket="qa-s3-list-mpu", Key="a.bin", UploadId=uid1) | |
| s3.abort_multipart_upload(Bucket="qa-s3-list-mpu", Key="b.bin", UploadId=uid2) | |
| def test_s3_get_object_with_version_id(s3): | |
| """Enable versioning, put 2 versions of same key, verify version IDs differ.""" | |
| bucket = "s3-version-get-test" | |
| s3.create_bucket(Bucket=bucket) | |
| s3.put_bucket_versioning( | |
| Bucket=bucket, | |
| VersioningConfiguration={"Status": "Enabled"}, | |
| ) | |
| # Put version 1 | |
| r1 = s3.put_object(Bucket=bucket, Key="file.txt", Body=b"version-1") | |
| vid1 = r1.get("VersionId") | |
| assert vid1 is not None | |
| # Put version 2 | |
| r2 = s3.put_object(Bucket=bucket, Key="file.txt", Body=b"version-2") | |
| vid2 = r2.get("VersionId") | |
| assert vid2 is not None | |
| assert vid1 != vid2 | |
| # GetObject returns latest version with its VersionId | |
| get_resp = s3.get_object(Bucket=bucket, Key="file.txt") | |
| assert get_resp["Body"].read() == b"version-2" | |
| assert get_resp.get("VersionId") == vid2 | |
| def test_s3_eventbridge_notification_on_delete(s3, sqs, eb): | |
| """S3 delete_object should send EventBridge event when EventBridgeConfiguration is enabled.""" | |
| bucket = "s3-eb-del-bkt" | |
| s3.create_bucket(Bucket=bucket) | |
| queue_url = sqs.create_queue(QueueName="s3-eb-del-target-q")["QueueUrl"] | |
| queue_arn = sqs.get_queue_attributes( | |
| QueueUrl=queue_url, AttributeNames=["QueueArn"], | |
| )["Attributes"]["QueueArn"] | |
| # Enable EventBridge on bucket | |
| s3.put_bucket_notification_configuration( | |
| Bucket=bucket, | |
| NotificationConfiguration={"EventBridgeConfiguration": {}}, | |
| ) | |
| # Create EventBridge rule matching S3 events -> SQS target | |
| eb.put_rule( | |
| Name="s3-del-to-sqs-rule", | |
| EventPattern=json.dumps({"source": ["aws.s3"]}), | |
| State="ENABLED", | |
| ) | |
| eb.put_targets( | |
| Rule="s3-del-to-sqs-rule", | |
| Targets=[{"Id": "sqs-del-target", "Arn": queue_arn}], | |
| ) | |
| # Put then delete object | |
| s3.put_object(Bucket=bucket, Key="del-test.txt", Body=b"data") | |
| # Drain the put event | |
| time.sleep(0.5) | |
| sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=1) | |
| # Now delete | |
| s3.delete_object(Bucket=bucket, Key="del-test.txt") | |
| time.sleep(0.5) | |
| msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) | |
| assert "Messages" in msgs and len(msgs["Messages"]) > 0 | |
| body = json.loads(msgs["Messages"][0]["Body"]) | |
| assert body["source"] == "aws.s3" | |
| assert body["detail"]["bucket"]["name"] == bucket | |
| assert body["detail"]["object"]["key"] == "del-test.txt" | |
| def test_s3_upload_part_copy(s3): | |
| """Multipart upload with UploadPartCopy (x-amz-copy-source) produces correct final object.""" | |
| bkt = "intg-s3-partcopy" | |
| s3.create_bucket(Bucket=bkt) | |
| src_key = "source-obj.txt" | |
| dst_key = "dest-obj.txt" | |
| src_data = b"COPIED-DATA-FROM-SOURCE" | |
| s3.put_object(Bucket=bkt, Key=src_key, Body=src_data) | |
| mpu = s3.create_multipart_upload(Bucket=bkt, Key=dst_key) | |
| upload_id = mpu["UploadId"] | |
| copy_resp = s3.upload_part_copy( | |
| Bucket=bkt, | |
| Key=dst_key, | |
| UploadId=upload_id, | |
| PartNumber=1, | |
| CopySource={"Bucket": bkt, "Key": src_key}, | |
| ) | |
| etag = copy_resp["CopyPartResult"]["ETag"] | |
| s3.complete_multipart_upload( | |
| Bucket=bkt, | |
| Key=dst_key, | |
| UploadId=upload_id, | |
| MultipartUpload={ | |
| "Parts": [{"PartNumber": 1, "ETag": etag}] | |
| }, | |
| ) | |
| resp = s3.get_object(Bucket=bkt, Key=dst_key) | |
| assert resp["Body"].read() == src_data | |
| def test_s3_event_to_sqs(s3, sqs): | |
| """S3 notification delivers event to SQS on object creation and deletion.""" | |
| bucket = "intg-s3evt-sqs" | |
| queue_name = "intg-s3evt-sqs-q" | |
| s3.create_bucket(Bucket=bucket) | |
| queue_url = sqs.create_queue(QueueName=queue_name)["QueueUrl"] | |
| queue_arn = sqs.get_queue_attributes( | |
| QueueUrl=queue_url, AttributeNames=["QueueArn"] | |
| )["Attributes"]["QueueArn"] | |
| s3.put_bucket_notification_configuration( | |
| Bucket=bucket, | |
| NotificationConfiguration={ | |
| "QueueConfigurations": [ | |
| { | |
| "QueueArn": queue_arn, | |
| "Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"], | |
| } | |
| ], | |
| }, | |
| ) | |
| # Put an object — should fire ObjectCreated event | |
| s3.put_object(Bucket=bucket, Key="hello.txt", Body=b"world") | |
| time.sleep(1) | |
| msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) | |
| assert "Messages" in msgs and len(msgs["Messages"]) >= 1 | |
| body = json.loads(msgs["Messages"][0]["Body"]) | |
| assert body["Records"][0]["eventSource"] == "aws:s3" | |
| assert body["Records"][0]["eventName"].startswith("ObjectCreated:") | |
| assert body["Records"][0]["s3"]["bucket"]["name"] == bucket | |
| assert body["Records"][0]["s3"]["object"]["key"] == "hello.txt" | |
| # Delete receipts so queue is clean | |
| for m in msgs["Messages"]: | |
| sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]) | |
| # Delete the object — should fire ObjectRemoved event | |
| s3.delete_object(Bucket=bucket, Key="hello.txt") | |
| time.sleep(1) | |
| msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) | |
| assert "Messages" in msgs and len(msgs["Messages"]) >= 1 | |
| del_body = json.loads(msgs["Messages"][0]["Body"]) | |
| assert del_body["Records"][0]["eventName"].startswith("ObjectRemoved:") | |
| def test_s3_lifecycle_transition_round_trip(s3): | |
| """PUT lifecycle with Transition, verify GET returns canonical XML with correct fields.""" | |
| bucket = "intg-s3-lc-transition" | |
| s3.create_bucket(Bucket=bucket) | |
| s3.put_bucket_lifecycle_configuration( | |
| Bucket=bucket, | |
| LifecycleConfiguration={ | |
| "Rules": [{ | |
| "ID": "archive-rule", | |
| "Status": "Enabled", | |
| "Filter": {"Prefix": "data/"}, | |
| "Transitions": [ | |
| {"Days": 30, "StorageClass": "STANDARD_IA"}, | |
| {"Days": 90, "StorageClass": "GLACIER"}, | |
| ], | |
| "Expiration": {"Days": 365}, | |
| }] | |
| }, | |
| ) | |
| resp = s3.get_bucket_lifecycle_configuration(Bucket=bucket) | |
| rule = resp["Rules"][0] | |
| assert rule["ID"] == "archive-rule" | |
| assert rule["Status"] == "Enabled" | |
| assert rule["Filter"]["Prefix"] == "data/" | |
| transitions = rule["Transitions"] | |
| assert len(transitions) == 2 | |
| assert transitions[0]["Days"] == 30 | |
| assert transitions[0]["StorageClass"] == "STANDARD_IA" | |
| assert transitions[1]["Days"] == 90 | |
| assert transitions[1]["StorageClass"] == "GLACIER" | |
| assert rule["Expiration"]["Days"] == 365 | |
| def test_s3_lifecycle_noncurrent_version(s3): | |
| """PUT lifecycle with NoncurrentVersionExpiration, verify round-trip.""" | |
| bucket = "intg-s3-lc-noncurrent" | |
| s3.create_bucket(Bucket=bucket) | |
| s3.put_bucket_lifecycle_configuration( | |
| Bucket=bucket, | |
| LifecycleConfiguration={ | |
| "Rules": [{ | |
| "ID": "noncurrent-cleanup", | |
| "Status": "Enabled", | |
| "Filter": {"Prefix": ""}, | |
| "NoncurrentVersionExpiration": {"NoncurrentDays": 30}, | |
| }] | |
| }, | |
| ) | |
| resp = s3.get_bucket_lifecycle_configuration(Bucket=bucket) | |
| rule = resp["Rules"][0] | |
| assert rule["NoncurrentVersionExpiration"]["NoncurrentDays"] == 30 | |
| def test_s3_lifecycle_multiple_rules(s3): | |
| """Multiple lifecycle rules survive PUT/GET round-trip.""" | |
| bucket = "intg-s3-lc-multi" | |
| s3.create_bucket(Bucket=bucket) | |
| s3.put_bucket_lifecycle_configuration( | |
| Bucket=bucket, | |
| LifecycleConfiguration={ | |
| "Rules": [ | |
| {"ID": "rule-1", "Status": "Enabled", "Filter": {"Prefix": "a/"}, "Expiration": {"Days": 10}}, | |
| {"ID": "rule-2", "Status": "Disabled", "Filter": {"Prefix": "b/"}, "Expiration": {"Days": 20}}, | |
| {"ID": "rule-3", "Status": "Enabled", "Filter": {"Prefix": "c/"}, "Expiration": {"Days": 30}}, | |
| ] | |
| }, | |
| ) | |
| resp = s3.get_bucket_lifecycle_configuration(Bucket=bucket) | |
| assert len(resp["Rules"]) == 3 | |
| ids = [r["ID"] for r in resp["Rules"]] | |
| assert "rule-1" in ids | |
| assert "rule-2" in ids | |
| assert "rule-3" in ids | |
| disabled = [r for r in resp["Rules"] if r["ID"] == "rule-2"][0] | |
| assert disabled["Status"] == "Disabled" | |
| def test_s3_lifecycle_abort_multipart(s3): | |
| """AbortIncompleteMultipartUpload round-trip.""" | |
| bucket = "intg-s3-lc-abort" | |
| s3.create_bucket(Bucket=bucket) | |
| s3.put_bucket_lifecycle_configuration( | |
| Bucket=bucket, | |
| LifecycleConfiguration={ | |
| "Rules": [{ | |
| "ID": "abort-uploads", | |
| "Status": "Enabled", | |
| "Filter": {"Prefix": ""}, | |
| "AbortIncompleteMultipartUpload": {"DaysAfterInitiation": 7}, | |
| }] | |
| }, | |
| ) | |
| resp = s3.get_bucket_lifecycle_configuration(Bucket=bucket) | |
| assert resp["Rules"][0]["AbortIncompleteMultipartUpload"]["DaysAfterInitiation"] == 7 | |