import io import json import os import time import zipfile from urllib.parse import urlparse import pytest from botocore.exceptions import ClientError import uuid as _uuid_mod def test_s3_create_bucket(s3): s3.create_bucket(Bucket="intg-s3-create") buckets = s3.list_buckets()["Buckets"] assert any(b["Name"] == "intg-s3-create" for b in buckets) def test_s3_create_bucket_already_exists(s3): # Real AWS: creating a bucket you already own is idempotent — returns 200 s3.create_bucket(Bucket="intg-s3-dup") s3.create_bucket(Bucket="intg-s3-dup") # must not raise def test_s3_delete_bucket(s3): s3.create_bucket(Bucket="intg-s3-delbkt") s3.delete_bucket(Bucket="intg-s3-delbkt") buckets = [b["Name"] for b in s3.list_buckets()["Buckets"]] assert "intg-s3-delbkt" not in buckets def test_s3_delete_bucket_not_empty(s3): s3.create_bucket(Bucket="intg-s3-notempty") s3.put_object(Bucket="intg-s3-notempty", Key="file.txt", Body=b"data") with pytest.raises(ClientError) as exc: s3.delete_bucket(Bucket="intg-s3-notempty") assert exc.value.response["Error"]["Code"] == "BucketNotEmpty" def test_s3_delete_bucket_not_found(s3): with pytest.raises(ClientError) as exc: s3.delete_bucket(Bucket="intg-s3-nonexistent-xyz") assert exc.value.response["Error"]["Code"] == "NoSuchBucket" def test_s3_head_bucket(s3): s3.create_bucket(Bucket="intg-s3-headbkt") resp = s3.head_bucket(Bucket="intg-s3-headbkt") assert resp["ResponseMetadata"]["HTTPStatusCode"] == 200 with pytest.raises(ClientError) as exc: s3.head_bucket(Bucket="intg-s3-headbkt-missing") assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404 def test_s3_put_get_object(s3): s3.create_bucket(Bucket="intg-s3-putget") s3.put_object(Bucket="intg-s3-putget", Key="hello.txt", Body=b"Hello, World!") resp = s3.get_object(Bucket="intg-s3-putget", Key="hello.txt") assert resp["Body"].read() == b"Hello, World!" def test_s3_put_object_no_bucket(s3): with pytest.raises(ClientError) as exc: s3.put_object(Bucket="intg-s3-nobucket-xyz", Key="k", Body=b"x") assert exc.value.response["Error"]["Code"] == "NoSuchBucket" def test_s3_put_get_json_chunked(s3): """AWS SDK v2 sends PutObject with chunked Transfer-Encoding — body must be decoded cleanly.""" import urllib.request, urllib.parse, json as _json bucket = "intg-s3-chunked" s3.create_bucket(Bucket=bucket) payload = _json.dumps({"hello": "world", "number": 42}) # Simulate AWS chunked encoding: one chunk + terminator chunk_body = payload.encode() chunk_size = f"{len(chunk_body):x}".encode() fake_sig = b"abc123" chunked = ( chunk_size + b";chunk-signature=" + fake_sig + b"\r\n" + chunk_body + b"\r\n" + b"0;chunk-signature=" + fake_sig + b"\r\n\r\n" ) endpoint = "http://localhost:4566/" + bucket + "/test.json" req = urllib.request.Request(endpoint, data=chunked, method="PUT", headers={ "x-amz-content-sha256": "STREAMING-AWS4-HMAC-SHA256-PAYLOAD", "Content-Type": "application/json", "Authorization": "AWS4-HMAC-SHA256 Credential=test/20240101/us-east-1/s3/aws4_request, SignedHeaders=host, Signature=fake", }) with urllib.request.urlopen(req) as r: assert r.status == 200 resp = s3.get_object(Bucket=bucket, Key="test.json") body = resp["Body"].read().decode() assert _json.loads(body) == {"hello": "world", "number": 42} def test_s3_head_object(s3): s3.create_bucket(Bucket="intg-s3-headobj") s3.put_object( Bucket="intg-s3-headobj", Key="data.bin", Body=b"0123456789", ContentType="application/octet-stream", ) resp = s3.head_object(Bucket="intg-s3-headobj", Key="data.bin") assert resp["ContentLength"] == 10 assert resp["ContentType"] == "application/octet-stream" assert "ETag" in resp def test_s3_head_object_not_found(s3): s3.create_bucket(Bucket="intg-s3-headobj404") with pytest.raises(ClientError) as exc: s3.head_object(Bucket="intg-s3-headobj404", Key="missing.txt") assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 404 def test_s3_delete_object(s3): s3.create_bucket(Bucket="intg-s3-delobj") s3.put_object(Bucket="intg-s3-delobj", Key="bye.txt", Body=b"bye") s3.delete_object(Bucket="intg-s3-delobj", Key="bye.txt") with pytest.raises(ClientError): s3.get_object(Bucket="intg-s3-delobj", Key="bye.txt") def test_s3_delete_object_idempotent(s3): s3.create_bucket(Bucket="intg-s3-delidempotent") resp = s3.delete_object(Bucket="intg-s3-delidempotent", Key="nonexistent.txt") assert resp["ResponseMetadata"]["HTTPStatusCode"] == 204 def test_s3_copy_object(s3): s3.create_bucket(Bucket="intg-s3-copysrc") s3.create_bucket(Bucket="intg-s3-copydst") s3.put_object(Bucket="intg-s3-copysrc", Key="original.txt", Body=b"copy me") s3.copy_object( CopySource={"Bucket": "intg-s3-copysrc", "Key": "original.txt"}, Bucket="intg-s3-copydst", Key="copied.txt", ) resp = s3.get_object(Bucket="intg-s3-copydst", Key="copied.txt") assert resp["Body"].read() == b"copy me" def test_s3_copy_object_metadata_replace(s3): bkt = "intg-s3-copymeta" s3.create_bucket(Bucket=bkt) s3.put_object( Bucket=bkt, Key="src.txt", Body=b"metadata test", Metadata={"original-key": "original-value"}, ) s3.copy_object( CopySource={"Bucket": bkt, "Key": "src.txt"}, Bucket=bkt, Key="dst.txt", MetadataDirective="REPLACE", Metadata={"replaced-key": "replaced-value"}, ) resp = s3.head_object(Bucket=bkt, Key="dst.txt") assert resp["Metadata"].get("replaced-key") == "replaced-value" assert "original-key" not in resp["Metadata"] def test_s3_list_objects_v1(s3): bkt = "intg-s3-listv1" s3.create_bucket(Bucket=bkt) for key in [ "photos/2023/a.jpg", "photos/2023/b.jpg", "photos/2024/c.jpg", "docs/readme.md", ]: s3.put_object(Bucket=bkt, Key=key, Body=b"x") resp = s3.list_objects(Bucket=bkt, Prefix="photos/", Delimiter="/") prefixes = [p["Prefix"] for p in resp.get("CommonPrefixes", [])] assert "photos/2023/" in prefixes assert "photos/2024/" in prefixes assert len(resp.get("Contents", [])) == 0 def test_s3_list_objects_v2(s3): bkt = "intg-s3-listv2" s3.create_bucket(Bucket=bkt) for key in ["a/1.txt", "a/2.txt", "b/3.txt"]: s3.put_object(Bucket=bkt, Key=key, Body=b"v2") resp = s3.list_objects_v2(Bucket=bkt, Prefix="a/") assert resp["KeyCount"] == 2 keys = [c["Key"] for c in resp["Contents"]] assert "a/1.txt" in keys assert "a/2.txt" in keys def test_s3_list_objects_pagination(s3): bkt = "intg-s3-listpage" s3.create_bucket(Bucket=bkt) for i in range(7): s3.put_object(Bucket=bkt, Key=f"item-{i:02d}.txt", Body=b"p") resp = s3.list_objects_v2(Bucket=bkt, MaxKeys=3) assert resp["IsTruncated"] is True assert resp["KeyCount"] == 3 token = resp["NextContinuationToken"] all_keys = [c["Key"] for c in resp["Contents"]] while resp["IsTruncated"]: resp = s3.list_objects_v2( Bucket=bkt, MaxKeys=3, ContinuationToken=token, ) all_keys.extend(c["Key"] for c in resp["Contents"]) token = resp.get("NextContinuationToken", "") assert len(all_keys) == 7 def test_s3_delete_objects_batch(s3): bkt = "intg-s3-batchdel" s3.create_bucket(Bucket=bkt) keys = [f"obj-{i}.txt" for i in range(5)] for k in keys: s3.put_object(Bucket=bkt, Key=k, Body=b"batch") resp = s3.delete_objects( Bucket=bkt, Delete={"Objects": [{"Key": k} for k in keys], "Quiet": False}, ) assert len(resp.get("Deleted", [])) == 5 listing = s3.list_objects_v2(Bucket=bkt) assert listing["KeyCount"] == 0 def test_s3_multipart_upload(s3): bkt = "intg-s3-multipart" s3.create_bucket(Bucket=bkt) key = "large.bin" mpu = s3.create_multipart_upload(Bucket=bkt, Key=key) upload_id = mpu["UploadId"] p1 = s3.upload_part( Bucket=bkt, Key=key, UploadId=upload_id, PartNumber=1, Body=b"A" * 100, ) p2 = s3.upload_part( Bucket=bkt, Key=key, UploadId=upload_id, PartNumber=2, Body=b"B" * 100, ) s3.complete_multipart_upload( Bucket=bkt, Key=key, UploadId=upload_id, MultipartUpload={ "Parts": [ {"PartNumber": 1, "ETag": p1["ETag"]}, {"PartNumber": 2, "ETag": p2["ETag"]}, ] }, ) resp = s3.get_object(Bucket=bkt, Key=key) assert resp["Body"].read() == b"A" * 100 + b"B" * 100 def test_s3_abort_multipart_upload(s3): bkt = "intg-s3-abortmpu" s3.create_bucket(Bucket=bkt) key = "aborted.bin" mpu = s3.create_multipart_upload(Bucket=bkt, Key=key) upload_id = mpu["UploadId"] s3.upload_part( Bucket=bkt, Key=key, UploadId=upload_id, PartNumber=1, Body=b"X" * 50, ) s3.abort_multipart_upload(Bucket=bkt, Key=key, UploadId=upload_id) with pytest.raises(ClientError) as exc: s3.get_object(Bucket=bkt, Key=key) assert exc.value.response["Error"]["Code"] == "NoSuchKey" def test_s3_get_object_range(s3): bkt = "intg-s3-range" s3.create_bucket(Bucket=bkt) s3.put_object(Bucket=bkt, Key="ranged.txt", Body=b"0123456789") resp = s3.get_object(Bucket=bkt, Key="ranged.txt", Range="bytes=2-5") assert resp["Body"].read() == b"2345" assert resp["ContentLength"] == 4 assert "bytes" in resp.get("ContentRange", "") assert resp["ResponseMetadata"]["HTTPStatusCode"] == 206 def test_s3_object_metadata(s3): bkt = "intg-s3-meta" s3.create_bucket(Bucket=bkt) s3.put_object( Bucket=bkt, Key="meta.txt", Body=b"metadata", Metadata={"custom-key": "custom-value", "another": "data"}, ) resp = s3.head_object(Bucket=bkt, Key="meta.txt") assert resp["Metadata"]["custom-key"] == "custom-value" assert resp["Metadata"]["another"] == "data" def test_s3_bucket_tagging(s3): bkt = "intg-s3-bkttags" s3.create_bucket(Bucket=bkt) s3.put_bucket_tagging( Bucket=bkt, Tagging={ "TagSet": [ {"Key": "env", "Value": "test"}, {"Key": "team", "Value": "platform"}, ] }, ) resp = s3.get_bucket_tagging(Bucket=bkt) tags = {t["Key"]: t["Value"] for t in resp["TagSet"]} assert tags["env"] == "test" assert tags["team"] == "platform" s3.delete_bucket_tagging(Bucket=bkt) with pytest.raises(ClientError) as exc: s3.get_bucket_tagging(Bucket=bkt) assert exc.value.response["Error"]["Code"] == "NoSuchTagSet" def test_s3_control_list_tags_for_resource(s3): """S3 Control ListTagsForResource must return tags set via PutBucketTagging. Regression: Terraform AWS Provider >= 5 calls s3control:ListTagsForResource when a `tags` block is set on aws_s3_bucket. The handler was returning an empty list regardless of bucket tags, causing perpetual drift. """ from conftest import make_client bkt = "intg-s3control-tags" account_id = "123456789012" s3.create_bucket(Bucket=bkt) s3.put_bucket_tagging( Bucket=bkt, Tagging={"TagSet": [{"Key": "name", "Value": "ministack-test"}]}, ) s3control = make_client("s3control") arn = f"arn:aws:s3:::{bkt}" resp = s3control.list_tags_for_resource(AccountId=account_id, ResourceArn=arn) tags = {t["Key"]: t["Value"] for t in resp.get("Tags", [])} assert tags.get("name") == "ministack-test" def test_s3_control_list_tags_via_s3_control_host(s3): """S3 Control requests via s3-control.localhost host must not be intercepted by S3 vhost.""" import urllib.request, urllib.parse bkt = "intg-s3control-host" s3.create_bucket(Bucket=bkt) s3.put_bucket_tagging( Bucket=bkt, Tagging={"TagSet": [{"Key": "env", "Value": "test"}]}, ) arn = urllib.parse.quote(f"arn:aws:s3:::{bkt}", safe="") req = urllib.request.Request( f"http://localhost:4566/v20180820/tags/{arn}", method="GET", headers={ "x-amz-account-id": "000000000000", "Host": "s3-control.localhost:4566", }, ) with urllib.request.urlopen(req) as r: assert r.status == 200 body = r.read().decode() assert "env" in body assert "test" in body def test_s3_bucket_policy(s3): bkt = "intg-s3-policy" s3.create_bucket(Bucket=bkt) policy = json.dumps( { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": "*", "Action": "s3:GetObject", "Resource": f"arn:aws:s3:::{bkt}/*", } ], } ) s3.put_bucket_policy(Bucket=bkt, Policy=policy) resp = s3.get_bucket_policy(Bucket=bkt) stored = json.loads(resp["Policy"]) assert stored["Version"] == "2012-10-17" assert len(stored["Statement"]) == 1 def test_s3_object_tagging(s3): bkt = "intg-s3-objtags" s3.create_bucket(Bucket=bkt) s3.put_object(Bucket=bkt, Key="tagged.txt", Body=b"tagged") s3.put_object_tagging( Bucket=bkt, Key="tagged.txt", Tagging={ "TagSet": [ {"Key": "status", "Value": "active"}, {"Key": "priority", "Value": "high"}, ] }, ) resp = s3.get_object_tagging(Bucket=bkt, Key="tagged.txt") tags = {t["Key"]: t["Value"] for t in resp["TagSet"]} assert tags["status"] == "active" assert tags["priority"] == "high" def test_s3_public_access_block(s3): bkt = "intg-s3-pab" s3.create_bucket(Bucket=bkt) s3.put_public_access_block( Bucket=bkt, PublicAccessBlockConfiguration={ "BlockPublicAcls": True, "IgnorePublicAcls": True, "BlockPublicPolicy": False, "RestrictPublicBuckets": False, }, ) resp = s3.get_public_access_block(Bucket=bkt) cfg = resp["PublicAccessBlockConfiguration"] assert cfg["BlockPublicAcls"] is True assert cfg["BlockPublicPolicy"] is False s3.delete_public_access_block(Bucket=bkt) def test_s3_ownership_controls(s3): bkt = "intg-s3-ownership" s3.create_bucket(Bucket=bkt) s3.put_bucket_ownership_controls( Bucket=bkt, OwnershipControls={"Rules": [{"ObjectOwnership": "BucketOwnerPreferred"}]}, ) resp = s3.get_bucket_ownership_controls(Bucket=bkt) assert resp["OwnershipControls"]["Rules"][0]["ObjectOwnership"] == "BucketOwnerPreferred" s3.delete_bucket_ownership_controls(Bucket=bkt) def test_s3_object_lock_configuration(s3): bkt = "intg-s3-objlock-cfg" s3.create_bucket( Bucket=bkt, ObjectLockEnabledForBucket=True, ) resp = s3.get_object_lock_configuration(Bucket=bkt) assert resp["ObjectLockConfiguration"]["ObjectLockEnabled"] == "Enabled" s3.put_object_lock_configuration( Bucket=bkt, ObjectLockConfiguration={ "ObjectLockEnabled": "Enabled", "Rule": { "DefaultRetention": { "Mode": "GOVERNANCE", "Days": 30, } }, }, ) resp = s3.get_object_lock_configuration(Bucket=bkt) ret = resp["ObjectLockConfiguration"]["Rule"]["DefaultRetention"] assert ret["Mode"] == "GOVERNANCE" assert ret["Days"] == 30 def test_s3_object_lock_requires_versioning(s3): bkt = "intg-s3-objlock-nover" s3.create_bucket(Bucket=bkt) with pytest.raises(ClientError) as exc: s3.put_object_lock_configuration( Bucket=bkt, ObjectLockConfiguration={ "ObjectLockEnabled": "Enabled", }, ) assert exc.value.response["Error"]["Code"] == "InvalidBucketState" def test_s3_object_retention(s3): bkt = "intg-s3-retention" s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) s3.put_object(Bucket=bkt, Key="doc.txt", Body=b"hello") from datetime import datetime, timezone, timedelta retain_until = datetime.now(timezone.utc) + timedelta(days=1) s3.put_object_retention( Bucket=bkt, Key="doc.txt", Retention={"Mode": "GOVERNANCE", "RetainUntilDate": retain_until}, ) resp = s3.get_object_retention(Bucket=bkt, Key="doc.txt") assert resp["Retention"]["Mode"] == "GOVERNANCE" assert "RetainUntilDate" in resp["Retention"] def test_s3_object_legal_hold(s3): bkt = "intg-s3-legalhold" s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) s3.put_object(Bucket=bkt, Key="evidence.txt", Body=b"data") s3.put_object_legal_hold( Bucket=bkt, Key="evidence.txt", LegalHold={"Status": "ON"}, ) resp = s3.get_object_legal_hold(Bucket=bkt, Key="evidence.txt") assert resp["LegalHold"]["Status"] == "ON" s3.put_object_legal_hold( Bucket=bkt, Key="evidence.txt", LegalHold={"Status": "OFF"}, ) resp = s3.get_object_legal_hold(Bucket=bkt, Key="evidence.txt") assert resp["LegalHold"]["Status"] == "OFF" def test_s3_object_lock_prevents_delete(s3): bkt = "intg-s3-lock-del" s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) s3.put_object(Bucket=bkt, Key="locked.txt", Body=b"immutable") s3.put_object_legal_hold( Bucket=bkt, Key="locked.txt", LegalHold={"Status": "ON"}, ) with pytest.raises(ClientError) as exc: s3.delete_object(Bucket=bkt, Key="locked.txt") assert exc.value.response["Error"]["Code"] == "AccessDenied" # Remove legal hold, add governance retention s3.put_object_legal_hold( Bucket=bkt, Key="locked.txt", LegalHold={"Status": "OFF"}, ) from datetime import datetime, timezone, timedelta retain_until = datetime.now(timezone.utc) + timedelta(days=1) s3.put_object_retention( Bucket=bkt, Key="locked.txt", Retention={"Mode": "GOVERNANCE", "RetainUntilDate": retain_until}, ) with pytest.raises(ClientError) as exc: s3.delete_object(Bucket=bkt, Key="locked.txt") assert exc.value.response["Error"]["Code"] == "AccessDenied" # Bypass governance retention s3.delete_object( Bucket=bkt, Key="locked.txt", BypassGovernanceRetention=True, ) with pytest.raises(ClientError): s3.head_object(Bucket=bkt, Key="locked.txt") def test_s3_bucket_replication(s3): src = "intg-s3-repl-src" s3.create_bucket(Bucket=src) s3.put_bucket_versioning(Bucket=src, VersioningConfiguration={"Status": "Enabled"}) s3.put_bucket_replication( Bucket=src, ReplicationConfiguration={ "Role": "arn:aws:iam::012345678901:role/repl", "Rules": [ { "Status": "Enabled", "Destination": {"Bucket": "arn:aws:s3:::intg-s3-repl-dst"}, } ], }, ) resp = s3.get_bucket_replication(Bucket=src) assert resp["ReplicationConfiguration"]["Role"] == "arn:aws:iam::012345678901:role/repl" assert len(resp["ReplicationConfiguration"]["Rules"]) == 1 s3.delete_bucket_replication(Bucket=src) with pytest.raises(ClientError) as exc: s3.get_bucket_replication(Bucket=src) assert exc.value.response["Error"]["Code"] == "ReplicationConfigurationNotFoundError" def test_s3_replication_requires_versioning(s3): bkt = "intg-s3-repl-nover" s3.create_bucket(Bucket=bkt) with pytest.raises(ClientError) as exc: s3.put_bucket_replication( Bucket=bkt, ReplicationConfiguration={ "Role": "arn:aws:iam::012345678901:role/repl", "Rules": [ { "Status": "Enabled", "Destination": {"Bucket": "arn:aws:s3:::somewhere"}, } ], }, ) assert exc.value.response["Error"]["Code"] == "InvalidRequest" def test_s3_put_object_with_lock_headers(s3): bkt = "intg-s3-put-lock-hdr" s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) from datetime import datetime, timezone, timedelta retain_until = datetime.now(timezone.utc) + timedelta(days=5) s3.put_object( Bucket=bkt, Key="locked-via-header.txt", Body=b"data", ObjectLockMode="GOVERNANCE", ObjectLockRetainUntilDate=retain_until, ObjectLockLegalHoldStatus="ON", ) ret = s3.get_object_retention(Bucket=bkt, Key="locked-via-header.txt") assert ret["Retention"]["Mode"] == "GOVERNANCE" hold = s3.get_object_legal_hold(Bucket=bkt, Key="locked-via-header.txt") assert hold["LegalHold"]["Status"] == "ON" def test_s3_put_object_with_tagging_header(s3): bkt = "intg-s3-put-tag-hdr" s3.create_bucket(Bucket=bkt) s3.put_object( Bucket=bkt, Key="tagged-inline.txt", Body=b"hello", Tagging="env=prod&team=backend", ) resp = s3.get_object_tagging(Bucket=bkt, Key="tagged-inline.txt") tags = {t["Key"]: t["Value"] for t in resp["TagSet"]} assert tags["env"] == "prod" assert tags["team"] == "backend" def test_s3_default_retention_applied(s3): bkt = "intg-s3-default-ret" s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) s3.put_object_lock_configuration( Bucket=bkt, ObjectLockConfiguration={ "ObjectLockEnabled": "Enabled", "Rule": { "DefaultRetention": { "Mode": "COMPLIANCE", "Days": 7, } }, }, ) s3.put_object(Bucket=bkt, Key="auto-locked.txt", Body=b"data") ret = s3.get_object_retention(Bucket=bkt, Key="auto-locked.txt") assert ret["Retention"]["Mode"] == "COMPLIANCE" assert "RetainUntilDate" in ret["Retention"] def test_s3_batch_delete_enforces_lock(s3): bkt = "intg-s3-batch-lock" s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) s3.put_object(Bucket=bkt, Key="a.txt", Body=b"a") s3.put_object(Bucket=bkt, Key="b.txt", Body=b"b") s3.put_object_legal_hold(Bucket=bkt, Key="a.txt", LegalHold={"Status": "ON"}) resp = s3.delete_objects( Bucket=bkt, Delete={"Objects": [{"Key": "a.txt"}, {"Key": "b.txt"}]}, ) deleted_keys = [d["Key"] for d in resp.get("Deleted", [])] error_keys = [e["Key"] for e in resp.get("Errors", [])] assert "b.txt" in deleted_keys assert "a.txt" in error_keys def test_s3_copy_preserves_tags_and_lock(s3): src = "intg-s3-copy-tag-src" dst = "intg-s3-copy-tag-dst" s3.create_bucket(Bucket=src, ObjectLockEnabledForBucket=True) s3.create_bucket(Bucket=dst, ObjectLockEnabledForBucket=True) s3.put_object(Bucket=src, Key="orig.txt", Body=b"data") s3.put_object_tagging( Bucket=src, Key="orig.txt", Tagging={"TagSet": [{"Key": "env", "Value": "staging"}]}, ) s3.put_object_legal_hold(Bucket=src, Key="orig.txt", LegalHold={"Status": "ON"}) s3.copy_object(Bucket=dst, Key="copy.txt", CopySource=f"{src}/orig.txt") tags = s3.get_object_tagging(Bucket=dst, Key="copy.txt") tag_map = {t["Key"]: t["Value"] for t in tags["TagSet"]} assert tag_map["env"] == "staging" hold = s3.get_object_legal_hold(Bucket=dst, Key="copy.txt") assert hold["LegalHold"]["Status"] == "ON" def test_s3_copy_replace_tags(s3): bkt = "intg-s3-copy-repl-tag" s3.create_bucket(Bucket=bkt) s3.put_object(Bucket=bkt, Key="src.txt", Body=b"data") s3.put_object_tagging( Bucket=bkt, Key="src.txt", Tagging={"TagSet": [{"Key": "old", "Value": "val"}]}, ) s3.copy_object( Bucket=bkt, Key="dst.txt", CopySource=f"{bkt}/src.txt", TaggingDirective="REPLACE", Tagging="new=val2", ) tags = s3.get_object_tagging(Bucket=bkt, Key="dst.txt") tag_map = {t["Key"]: t["Value"] for t in tags["TagSet"]} assert "old" not in tag_map assert tag_map["new"] == "val2" def test_s3_tag_count_limit(s3): bkt = "intg-s3-tag-limit" s3.create_bucket(Bucket=bkt) s3.put_object(Bucket=bkt, Key="toomany.txt", Body=b"x") with pytest.raises(ClientError) as exc: s3.put_object_tagging( Bucket=bkt, Key="toomany.txt", Tagging={"TagSet": [{"Key": f"k{i}", "Value": f"v{i}"} for i in range(11)]}, ) assert exc.value.response["Error"]["Code"] == "BadRequest" def test_s3_replication_validates_dest_versioning(s3): src = "intg-s3-repl-val-src" dst = "intg-s3-repl-val-dst" s3.create_bucket(Bucket=src) s3.create_bucket(Bucket=dst) s3.put_bucket_versioning(Bucket=src, VersioningConfiguration={"Status": "Enabled"}) # dst has no versioning with pytest.raises(ClientError) as exc: s3.put_bucket_replication( Bucket=src, ReplicationConfiguration={ "Role": "arn:aws:iam::012345678901:role/repl", "Rules": [ { "Status": "Enabled", "Destination": {"Bucket": f"arn:aws:s3:::{dst}"}, } ], }, ) assert exc.value.response["Error"]["Code"] == "InvalidRequest" def test_s3_head_object_returns_lock_headers(s3): bkt = "intg-s3-head-lock-hdr" s3.create_bucket(Bucket=bkt, ObjectLockEnabledForBucket=True) from datetime import datetime, timezone, timedelta retain_until = datetime.now(timezone.utc) + timedelta(days=3) s3.put_object( Bucket=bkt, Key="locked.txt", Body=b"data", ObjectLockMode="GOVERNANCE", ObjectLockRetainUntilDate=retain_until, ObjectLockLegalHoldStatus="ON", ) resp = s3.head_object(Bucket=bkt, Key="locked.txt") assert resp["ObjectLockMode"] == "GOVERNANCE" assert "ObjectLockRetainUntilDate" in resp assert resp["ObjectLockLegalHoldStatus"] == "ON" get_resp = s3.get_object(Bucket=bkt, Key="locked.txt") assert get_resp["ObjectLockMode"] == "GOVERNANCE" assert get_resp["ObjectLockLegalHoldStatus"] == "ON" def test_s3_event_notification_to_sqs(s3, sqs): s3.create_bucket(Bucket="s3-evt-bkt") queue_url = sqs.create_queue(QueueName="s3-evt-queue")["QueueUrl"] queue_arn = sqs.get_queue_attributes( QueueUrl=queue_url, AttributeNames=["QueueArn"], )["Attributes"]["QueueArn"] s3.put_bucket_notification_configuration( Bucket="s3-evt-bkt", NotificationConfiguration={ "QueueConfigurations": [{"QueueArn": queue_arn, "Events": ["s3:ObjectCreated:*"]}], }, ) s3.put_object(Bucket="s3-evt-bkt", Key="test-notify.txt", Body=b"hello") time.sleep(0.5) msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) assert "Messages" in msgs and len(msgs["Messages"]) > 0 body = json.loads(msgs["Messages"][0]["Body"]) assert body["Records"][0]["eventSource"] == "aws:s3" assert body["Records"][0]["s3"]["object"]["key"] == "test-notify.txt" def test_s3_event_notification_filter(s3, sqs): s3.create_bucket(Bucket="s3-evt-filter-bkt") queue_url = sqs.create_queue(QueueName="s3-evt-filter-q")["QueueUrl"] queue_arn = sqs.get_queue_attributes( QueueUrl=queue_url, AttributeNames=["QueueArn"], )["Attributes"]["QueueArn"] s3.put_bucket_notification_configuration( Bucket="s3-evt-filter-bkt", NotificationConfiguration={ "QueueConfigurations": [ { "QueueArn": queue_arn, "Events": ["s3:ObjectCreated:*"], "Filter": {"Key": {"FilterRules": [{"Name": "suffix", "Value": ".csv"}]}}, } ], }, ) s3.put_object(Bucket="s3-evt-filter-bkt", Key="data.txt", Body=b"no match") s3.put_object(Bucket="s3-evt-filter-bkt", Key="data.csv", Body=b"match") time.sleep(0.5) msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) keys = [json.loads(m["Body"])["Records"][0]["s3"]["object"]["key"] for m in msgs.get("Messages", [])] assert "data.csv" in keys assert "data.txt" not in keys def test_s3_event_notification_delete(s3, sqs): s3.create_bucket(Bucket="s3-evt-del-bkt") queue_url = sqs.create_queue(QueueName="s3-evt-del-q")["QueueUrl"] queue_arn = sqs.get_queue_attributes( QueueUrl=queue_url, AttributeNames=["QueueArn"], )["Attributes"]["QueueArn"] s3.put_bucket_notification_configuration( Bucket="s3-evt-del-bkt", NotificationConfiguration={ "QueueConfigurations": [{"QueueArn": queue_arn, "Events": ["s3:ObjectRemoved:*"]}], }, ) s3.put_object(Bucket="s3-evt-del-bkt", Key="to-del.txt", Body=b"bye") s3.delete_object(Bucket="s3-evt-del-bkt", Key="to-del.txt") time.sleep(0.5) msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) assert "Messages" in msgs and len(msgs["Messages"]) > 0 body = json.loads(msgs["Messages"][0]["Body"]) assert "ObjectRemoved" in body["Records"][0]["eventName"] def test_s3_eventbridge_notification(s3, sqs, eb): """S3 EventBridgeConfiguration sends events to EventBridge, routed to SQS via rule.""" s3.create_bucket(Bucket="s3-eb-bkt") queue_url = sqs.create_queue(QueueName="s3-eb-target-q")["QueueUrl"] queue_arn = sqs.get_queue_attributes( QueueUrl=queue_url, AttributeNames=["QueueArn"], )["Attributes"]["QueueArn"] # Enable EventBridge on bucket s3.put_bucket_notification_configuration( Bucket="s3-eb-bkt", NotificationConfiguration={"EventBridgeConfiguration": {}}, ) # Create EventBridge rule matching S3 events → SQS target eb.put_rule( Name="s3-to-sqs-rule", EventPattern=json.dumps({"source": ["aws.s3"]}), State="ENABLED", ) eb.put_targets( Rule="s3-to-sqs-rule", Targets=[{"Id": "sqs-target", "Arn": queue_arn}], ) # Upload object — should trigger S3 → EventBridge → SQS s3.put_object(Bucket="s3-eb-bkt", Key="hello.txt", Body=b"world") time.sleep(0.5) msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) assert "Messages" in msgs and len(msgs["Messages"]) > 0 body = json.loads(msgs["Messages"][0]["Body"]) assert body["source"] == "aws.s3" assert body["detail"]["bucket"]["name"] == "s3-eb-bkt" assert body["detail"]["object"]["key"] == "hello.txt" def test_s3_list_object_versions(s3): s3.create_bucket(Bucket="s3-ver-bkt") s3.put_object(Bucket="s3-ver-bkt", Key="v1.txt", Body=b"v1") s3.put_object(Bucket="s3-ver-bkt", Key="v2.txt", Body=b"v2") resp = s3.list_object_versions(Bucket="s3-ver-bkt") versions = resp.get("Versions", []) assert len(versions) >= 2 keys = [v["Key"] for v in versions] assert "v1.txt" in keys and "v2.txt" in keys def test_s3_list_object_versions_multiple_puts_same_key(s3): """Multiple PUTs to the same key with versioning enabled should return all versions.""" bkt = "s3-ver-multi" s3.create_bucket(Bucket=bkt) s3.put_bucket_versioning(Bucket=bkt, VersioningConfiguration={"Status": "Enabled"}) r1 = s3.put_object(Bucket=bkt, Key="doc.txt", Body=b"v1") r2 = s3.put_object(Bucket=bkt, Key="doc.txt", Body=b"v2") r3 = s3.put_object(Bucket=bkt, Key="doc.txt", Body=b"v3") assert r1["VersionId"] != r2["VersionId"] assert r2["VersionId"] != r3["VersionId"] resp = s3.list_object_versions(Bucket=bkt) versions = resp.get("Versions", []) assert len(versions) == 3 version_ids = [v["VersionId"] for v in versions] assert r1["VersionId"] in version_ids assert r2["VersionId"] in version_ids assert r3["VersionId"] in version_ids latest = [v for v in versions if v["IsLatest"]] assert len(latest) == 1 assert latest[0]["VersionId"] == r3["VersionId"] def test_s3_multipart_upload_returns_version_id(s3): """CompleteMultipartUpload should return VersionId when versioning is enabled.""" bkt = "s3-ver-mpu" s3.create_bucket(Bucket=bkt) s3.put_bucket_versioning(Bucket=bkt, VersioningConfiguration={"Status": "Enabled"}) mpu = s3.create_multipart_upload(Bucket=bkt, Key="big.bin") upload_id = mpu["UploadId"] part = s3.upload_part(Bucket=bkt, Key="big.bin", UploadId=upload_id, PartNumber=1, Body=b"x" * 1000) resp = s3.complete_multipart_upload( Bucket=bkt, Key="big.bin", UploadId=upload_id, MultipartUpload={"Parts": [{"PartNumber": 1, "ETag": part["ETag"]}]}, ) assert "VersionId" in resp, "CompleteMultipartUpload must return VersionId" first_vid = resp["VersionId"] # Second multipart to same key — different version mpu2 = s3.create_multipart_upload(Bucket=bkt, Key="big.bin") part2 = s3.upload_part(Bucket=bkt, Key="big.bin", UploadId=mpu2["UploadId"], PartNumber=1, Body=b"y" * 1000) resp2 = s3.complete_multipart_upload( Bucket=bkt, Key="big.bin", UploadId=mpu2["UploadId"], MultipartUpload={"Parts": [{"PartNumber": 1, "ETag": part2["ETag"]}]}, ) assert resp2["VersionId"] != first_vid # Both versions should appear in list_object_versions versions = s3.list_object_versions(Bucket=bkt).get("Versions", []) vids = [v["VersionId"] for v in versions] assert first_vid in vids assert resp2["VersionId"] in vids latest = [v for v in versions if v["IsLatest"]] assert len(latest) == 1 assert latest[0]["VersionId"] == resp2["VersionId"] def test_s3_copy_object_returns_version_id(s3): """CopyObject should return VersionId and track versions when versioning is enabled.""" bkt = "s3-ver-copy" s3.create_bucket(Bucket=bkt) s3.put_bucket_versioning(Bucket=bkt, VersioningConfiguration={"Status": "Enabled"}) s3.put_object(Bucket=bkt, Key="src.txt", Body=b"original") resp = s3.copy_object(Bucket=bkt, Key="dst.txt", CopySource=f"{bkt}/src.txt") assert "VersionId" in resp, "CopyObject must return VersionId" first_vid = resp["VersionId"] # Copy again — different version resp2 = s3.copy_object(Bucket=bkt, Key="dst.txt", CopySource=f"{bkt}/src.txt") assert resp2["VersionId"] != first_vid versions = s3.list_object_versions(Bucket=bkt, Prefix="dst.txt").get("Versions", []) assert len(versions) == 2, f"Expected 2 versions for dst.txt, got {len(versions)}" latest = [v for v in versions if v["IsLatest"]] assert len(latest) == 1 def test_s3_multipart_no_version_without_versioning(s3): """CompleteMultipartUpload should NOT return VersionId when versioning is disabled.""" bkt = "s3-nover-mpu" s3.create_bucket(Bucket=bkt) mpu = s3.create_multipart_upload(Bucket=bkt, Key="file.bin") part = s3.upload_part(Bucket=bkt, Key="file.bin", UploadId=mpu["UploadId"], PartNumber=1, Body=b"data") resp = s3.complete_multipart_upload( Bucket=bkt, Key="file.bin", UploadId=mpu["UploadId"], MultipartUpload={"Parts": [{"PartNumber": 1, "ETag": part["ETag"]}]}, ) assert "VersionId" not in resp, "Should not return VersionId without versioning" def test_s3_bucket_website(s3): s3.create_bucket(Bucket="s3-web-bkt") s3.put_bucket_website( Bucket="s3-web-bkt", WebsiteConfiguration={"IndexDocument": {"Suffix": "index.html"}}, ) resp = s3.get_bucket_website(Bucket="s3-web-bkt") assert resp["IndexDocument"]["Suffix"] == "index.html" s3.delete_bucket_website(Bucket="s3-web-bkt") with pytest.raises(ClientError): s3.get_bucket_website(Bucket="s3-web-bkt") def test_s3_put_bucket_logging(s3): s3.create_bucket(Bucket="s3-log-bkt") s3.put_bucket_logging( Bucket="s3-log-bkt", BucketLoggingStatus={ "LoggingEnabled": {"TargetBucket": "s3-log-bkt", "TargetPrefix": "logs/"}, }, ) resp = s3.get_bucket_logging(Bucket="s3-log-bkt") assert "LoggingEnabled" in resp def test_s3_bucket_versioning(s3): s3.create_bucket(Bucket="intg-s3-versioning") s3.put_bucket_versioning( Bucket="intg-s3-versioning", VersioningConfiguration={"Status": "Enabled"}, ) resp = s3.get_bucket_versioning(Bucket="intg-s3-versioning") assert resp["Status"] == "Enabled" def test_s3_put_object_returns_version_id(s3): s3.create_bucket(Bucket="intg-s3-ver-put") s3.put_bucket_versioning( Bucket="intg-s3-ver-put", VersioningConfiguration={"Status": "Enabled"}, ) resp = s3.put_object(Bucket="intg-s3-ver-put", Key="hello.txt", Body=b"v1") assert "VersionId" in resp assert len(resp["VersionId"]) > 0 # Second put should get a different version resp2 = s3.put_object(Bucket="intg-s3-ver-put", Key="hello.txt", Body=b"v2") assert resp2["VersionId"] != resp["VersionId"] def test_s3_put_object_no_version_id_without_versioning(s3): s3.create_bucket(Bucket="intg-s3-nover-put") resp = s3.put_object(Bucket="intg-s3-nover-put", Key="hello.txt", Body=b"data") assert "VersionId" not in resp def test_s3_bucket_encryption(s3): s3.create_bucket(Bucket="intg-s3-enc") s3.put_bucket_encryption( Bucket="intg-s3-enc", ServerSideEncryptionConfiguration={ "Rules": [{"ApplyServerSideEncryptionByDefault": {"SSEAlgorithm": "AES256"}}] }, ) resp = s3.get_bucket_encryption(Bucket="intg-s3-enc") rules = resp["ServerSideEncryptionConfiguration"]["Rules"] assert rules[0]["ApplyServerSideEncryptionByDefault"]["SSEAlgorithm"] == "AES256" s3.delete_bucket_encryption(Bucket="intg-s3-enc") with pytest.raises(ClientError) as exc: s3.get_bucket_encryption(Bucket="intg-s3-enc") assert exc.value.response["Error"]["Code"] == "ServerSideEncryptionConfigurationNotFoundError" def test_s3_bucket_lifecycle(s3): s3.create_bucket(Bucket="intg-s3-lifecycle") s3.put_bucket_lifecycle_configuration( Bucket="intg-s3-lifecycle", LifecycleConfiguration={ "Rules": [ { "ID": "expire-old", "Status": "Enabled", "Filter": {"Prefix": "logs/"}, "Expiration": {"Days": 30}, } ] }, ) resp = s3.get_bucket_lifecycle_configuration(Bucket="intg-s3-lifecycle") assert resp["Rules"][0]["ID"] == "expire-old" s3.delete_bucket_lifecycle(Bucket="intg-s3-lifecycle") with pytest.raises(ClientError) as exc: s3.get_bucket_lifecycle_configuration(Bucket="intg-s3-lifecycle") assert exc.value.response["Error"]["Code"] == "NoSuchLifecycleConfiguration" def test_s3_bucket_cors(s3): s3.create_bucket(Bucket="intg-s3-cors") s3.put_bucket_cors( Bucket="intg-s3-cors", CORSConfiguration={ "CORSRules": [ { "AllowedHeaders": ["*"], "AllowedMethods": ["GET", "PUT"], "AllowedOrigins": ["https://example.com"], "MaxAgeSeconds": 3000, } ] }, ) resp = s3.get_bucket_cors(Bucket="intg-s3-cors") assert resp["CORSRules"][0]["AllowedOrigins"] == ["https://example.com"] s3.delete_bucket_cors(Bucket="intg-s3-cors") with pytest.raises(ClientError) as exc: s3.get_bucket_cors(Bucket="intg-s3-cors") assert exc.value.response["Error"]["Code"] == "NoSuchCORSConfiguration" def test_s3_bucket_acl(s3): s3.create_bucket(Bucket="intg-s3-acl") resp = s3.get_bucket_acl(Bucket="intg-s3-acl") assert "Owner" in resp assert "Grants" in resp def test_s3_range_suffix(s3): """Range: bytes=-N returns last N bytes.""" s3.create_bucket(Bucket="qa-s3-range-suffix") s3.put_object(Bucket="qa-s3-range-suffix", Key="data.txt", Body=b"0123456789") resp = s3.get_object(Bucket="qa-s3-range-suffix", Key="data.txt", Range="bytes=-3") assert resp["Body"].read() == b"789" assert resp["ResponseMetadata"]["HTTPStatusCode"] == 206 def test_s3_range_beyond_end(s3): """Range start beyond file size returns 416.""" s3.create_bucket(Bucket="qa-s3-range-beyond") s3.put_object(Bucket="qa-s3-range-beyond", Key="small.txt", Body=b"hello") with pytest.raises(ClientError) as exc: s3.get_object(Bucket="qa-s3-range-beyond", Key="small.txt", Range="bytes=100-200") assert exc.value.response["ResponseMetadata"]["HTTPStatusCode"] == 416 def test_s3_list_v1_marker_pagination(s3): """ListObjects v1 Marker pagination returns correct pages.""" s3.create_bucket(Bucket="qa-s3-marker") keys = [f"file{i:03d}.txt" for i in range(10)] for k in keys: s3.put_object(Bucket="qa-s3-marker", Key=k, Body=b"x") # NextMarker only returned when Delimiter is set (AWS spec) resp1 = s3.list_objects(Bucket="qa-s3-marker", MaxKeys=4, Delimiter="/") assert resp1["IsTruncated"] is True assert len(resp1["Contents"]) == 4 marker = resp1["NextMarker"] resp2 = s3.list_objects(Bucket="qa-s3-marker", MaxKeys=4, Marker=marker, Delimiter="/") page2_keys = [o["Key"] for o in resp2["Contents"]] page1_keys = [o["Key"] for o in resp1["Contents"]] assert not any(k in page1_keys for k in page2_keys) def test_s3_delete_objects_returns_deleted(s3): """DeleteObjects returns each deleted key in Deleted list.""" s3.create_bucket(Bucket="qa-s3-batch-del") for i in range(3): s3.put_object(Bucket="qa-s3-batch-del", Key=f"obj{i}.txt", Body=b"x") resp = s3.delete_objects( Bucket="qa-s3-batch-del", Delete={"Objects": [{"Key": f"obj{i}.txt"} for i in range(3)]}, ) assert len(resp["Deleted"]) == 3 assert not resp.get("Errors") def test_s3_put_object_content_type_preserved(s3): """Content-Type set on PutObject is returned on GetObject.""" s3.create_bucket(Bucket="qa-s3-ct") s3.put_object( Bucket="qa-s3-ct", Key="page.html", Body=b"", ContentType="text/html; charset=utf-8", ) resp = s3.get_object(Bucket="qa-s3-ct", Key="page.html") assert "text/html" in resp["ContentType"] def test_s3_head_object_returns_content_length(s3): """HeadObject must return correct ContentLength.""" s3.create_bucket(Bucket="qa-s3-head-len") body = b"exactly twenty bytes" s3.put_object(Bucket="qa-s3-head-len", Key="f.bin", Body=body) resp = s3.head_object(Bucket="qa-s3-head-len", Key="f.bin") assert resp["ContentLength"] == len(body) def test_s3_copy_preserves_metadata(s3): """CopyObject with MetadataDirective=COPY preserves source metadata.""" s3.create_bucket(Bucket="qa-s3-copy-meta") s3.put_object( Bucket="qa-s3-copy-meta", Key="src.txt", Body=b"data", Metadata={"x-custom": "value123"}, ) s3.copy_object( CopySource={"Bucket": "qa-s3-copy-meta", "Key": "src.txt"}, Bucket="qa-s3-copy-meta", Key="dst.txt", MetadataDirective="COPY", ) resp = s3.head_object(Bucket="qa-s3-copy-meta", Key="dst.txt") assert resp["Metadata"].get("x-custom") == "value123" def test_s3_multipart_list_parts(s3): """ListParts returns uploaded parts before completion.""" s3.create_bucket(Bucket="qa-s3-listparts") mpu = s3.create_multipart_upload(Bucket="qa-s3-listparts", Key="big.bin") uid = mpu["UploadId"] p1 = s3.upload_part( Bucket="qa-s3-listparts", Key="big.bin", UploadId=uid, PartNumber=1, Body=b"A" * 50, ) p2 = s3.upload_part( Bucket="qa-s3-listparts", Key="big.bin", UploadId=uid, PartNumber=2, Body=b"B" * 50, ) parts = s3.list_parts(Bucket="qa-s3-listparts", Key="big.bin", UploadId=uid)["Parts"] assert len(parts) == 2 assert parts[0]["PartNumber"] == 1 assert parts[1]["PartNumber"] == 2 s3.complete_multipart_upload( Bucket="qa-s3-listparts", Key="big.bin", UploadId=uid, MultipartUpload={ "Parts": [ {"PartNumber": 1, "ETag": p1["ETag"]}, {"PartNumber": 2, "ETag": p2["ETag"]}, ] }, ) def test_s3_list_multipart_uploads(s3): """ListMultipartUploads returns in-progress uploads.""" s3.create_bucket(Bucket="qa-s3-list-mpu") uid1 = s3.create_multipart_upload(Bucket="qa-s3-list-mpu", Key="a.bin")["UploadId"] uid2 = s3.create_multipart_upload(Bucket="qa-s3-list-mpu", Key="b.bin")["UploadId"] resp = s3.list_multipart_uploads(Bucket="qa-s3-list-mpu") upload_ids = {u["UploadId"] for u in resp.get("Uploads", [])} assert uid1 in upload_ids assert uid2 in upload_ids s3.abort_multipart_upload(Bucket="qa-s3-list-mpu", Key="a.bin", UploadId=uid1) s3.abort_multipart_upload(Bucket="qa-s3-list-mpu", Key="b.bin", UploadId=uid2) def test_s3_get_object_with_version_id(s3): """Enable versioning, put 2 versions of same key, verify version IDs differ.""" bucket = "s3-version-get-test" s3.create_bucket(Bucket=bucket) s3.put_bucket_versioning( Bucket=bucket, VersioningConfiguration={"Status": "Enabled"}, ) # Put version 1 r1 = s3.put_object(Bucket=bucket, Key="file.txt", Body=b"version-1") vid1 = r1.get("VersionId") assert vid1 is not None # Put version 2 r2 = s3.put_object(Bucket=bucket, Key="file.txt", Body=b"version-2") vid2 = r2.get("VersionId") assert vid2 is not None assert vid1 != vid2 # GetObject returns latest version with its VersionId get_resp = s3.get_object(Bucket=bucket, Key="file.txt") assert get_resp["Body"].read() == b"version-2" assert get_resp.get("VersionId") == vid2 def test_s3_eventbridge_notification_on_delete(s3, sqs, eb): """S3 delete_object should send EventBridge event when EventBridgeConfiguration is enabled.""" bucket = "s3-eb-del-bkt" s3.create_bucket(Bucket=bucket) queue_url = sqs.create_queue(QueueName="s3-eb-del-target-q")["QueueUrl"] queue_arn = sqs.get_queue_attributes( QueueUrl=queue_url, AttributeNames=["QueueArn"], )["Attributes"]["QueueArn"] # Enable EventBridge on bucket s3.put_bucket_notification_configuration( Bucket=bucket, NotificationConfiguration={"EventBridgeConfiguration": {}}, ) # Create EventBridge rule matching S3 events -> SQS target eb.put_rule( Name="s3-del-to-sqs-rule", EventPattern=json.dumps({"source": ["aws.s3"]}), State="ENABLED", ) eb.put_targets( Rule="s3-del-to-sqs-rule", Targets=[{"Id": "sqs-del-target", "Arn": queue_arn}], ) # Put then delete object s3.put_object(Bucket=bucket, Key="del-test.txt", Body=b"data") # Drain the put event time.sleep(0.5) sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=1) # Now delete s3.delete_object(Bucket=bucket, Key="del-test.txt") time.sleep(0.5) msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) assert "Messages" in msgs and len(msgs["Messages"]) > 0 body = json.loads(msgs["Messages"][0]["Body"]) assert body["source"] == "aws.s3" assert body["detail"]["bucket"]["name"] == bucket assert body["detail"]["object"]["key"] == "del-test.txt" def test_s3_upload_part_copy(s3): """Multipart upload with UploadPartCopy (x-amz-copy-source) produces correct final object.""" bkt = "intg-s3-partcopy" s3.create_bucket(Bucket=bkt) src_key = "source-obj.txt" dst_key = "dest-obj.txt" src_data = b"COPIED-DATA-FROM-SOURCE" s3.put_object(Bucket=bkt, Key=src_key, Body=src_data) mpu = s3.create_multipart_upload(Bucket=bkt, Key=dst_key) upload_id = mpu["UploadId"] copy_resp = s3.upload_part_copy( Bucket=bkt, Key=dst_key, UploadId=upload_id, PartNumber=1, CopySource={"Bucket": bkt, "Key": src_key}, ) etag = copy_resp["CopyPartResult"]["ETag"] s3.complete_multipart_upload( Bucket=bkt, Key=dst_key, UploadId=upload_id, MultipartUpload={ "Parts": [{"PartNumber": 1, "ETag": etag}] }, ) resp = s3.get_object(Bucket=bkt, Key=dst_key) assert resp["Body"].read() == src_data def test_s3_event_to_sqs(s3, sqs): """S3 notification delivers event to SQS on object creation and deletion.""" bucket = "intg-s3evt-sqs" queue_name = "intg-s3evt-sqs-q" s3.create_bucket(Bucket=bucket) queue_url = sqs.create_queue(QueueName=queue_name)["QueueUrl"] queue_arn = sqs.get_queue_attributes( QueueUrl=queue_url, AttributeNames=["QueueArn"] )["Attributes"]["QueueArn"] s3.put_bucket_notification_configuration( Bucket=bucket, NotificationConfiguration={ "QueueConfigurations": [ { "QueueArn": queue_arn, "Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"], } ], }, ) # Put an object — should fire ObjectCreated event s3.put_object(Bucket=bucket, Key="hello.txt", Body=b"world") time.sleep(1) msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) assert "Messages" in msgs and len(msgs["Messages"]) >= 1 body = json.loads(msgs["Messages"][0]["Body"]) assert body["Records"][0]["eventSource"] == "aws:s3" assert body["Records"][0]["eventName"].startswith("ObjectCreated:") assert body["Records"][0]["s3"]["bucket"]["name"] == bucket assert body["Records"][0]["s3"]["object"]["key"] == "hello.txt" # Delete receipts so queue is clean for m in msgs["Messages"]: sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=m["ReceiptHandle"]) # Delete the object — should fire ObjectRemoved event s3.delete_object(Bucket=bucket, Key="hello.txt") time.sleep(1) msgs = sqs.receive_message(QueueUrl=queue_url, MaxNumberOfMessages=10, WaitTimeSeconds=2) assert "Messages" in msgs and len(msgs["Messages"]) >= 1 del_body = json.loads(msgs["Messages"][0]["Body"]) assert del_body["Records"][0]["eventName"].startswith("ObjectRemoved:") def test_s3_lifecycle_transition_round_trip(s3): """PUT lifecycle with Transition, verify GET returns canonical XML with correct fields.""" bucket = "intg-s3-lc-transition" s3.create_bucket(Bucket=bucket) s3.put_bucket_lifecycle_configuration( Bucket=bucket, LifecycleConfiguration={ "Rules": [{ "ID": "archive-rule", "Status": "Enabled", "Filter": {"Prefix": "data/"}, "Transitions": [ {"Days": 30, "StorageClass": "STANDARD_IA"}, {"Days": 90, "StorageClass": "GLACIER"}, ], "Expiration": {"Days": 365}, }] }, ) resp = s3.get_bucket_lifecycle_configuration(Bucket=bucket) rule = resp["Rules"][0] assert rule["ID"] == "archive-rule" assert rule["Status"] == "Enabled" assert rule["Filter"]["Prefix"] == "data/" transitions = rule["Transitions"] assert len(transitions) == 2 assert transitions[0]["Days"] == 30 assert transitions[0]["StorageClass"] == "STANDARD_IA" assert transitions[1]["Days"] == 90 assert transitions[1]["StorageClass"] == "GLACIER" assert rule["Expiration"]["Days"] == 365 def test_s3_lifecycle_noncurrent_version(s3): """PUT lifecycle with NoncurrentVersionExpiration, verify round-trip.""" bucket = "intg-s3-lc-noncurrent" s3.create_bucket(Bucket=bucket) s3.put_bucket_lifecycle_configuration( Bucket=bucket, LifecycleConfiguration={ "Rules": [{ "ID": "noncurrent-cleanup", "Status": "Enabled", "Filter": {"Prefix": ""}, "NoncurrentVersionExpiration": {"NoncurrentDays": 30}, }] }, ) resp = s3.get_bucket_lifecycle_configuration(Bucket=bucket) rule = resp["Rules"][0] assert rule["NoncurrentVersionExpiration"]["NoncurrentDays"] == 30 def test_s3_lifecycle_multiple_rules(s3): """Multiple lifecycle rules survive PUT/GET round-trip.""" bucket = "intg-s3-lc-multi" s3.create_bucket(Bucket=bucket) s3.put_bucket_lifecycle_configuration( Bucket=bucket, LifecycleConfiguration={ "Rules": [ {"ID": "rule-1", "Status": "Enabled", "Filter": {"Prefix": "a/"}, "Expiration": {"Days": 10}}, {"ID": "rule-2", "Status": "Disabled", "Filter": {"Prefix": "b/"}, "Expiration": {"Days": 20}}, {"ID": "rule-3", "Status": "Enabled", "Filter": {"Prefix": "c/"}, "Expiration": {"Days": 30}}, ] }, ) resp = s3.get_bucket_lifecycle_configuration(Bucket=bucket) assert len(resp["Rules"]) == 3 ids = [r["ID"] for r in resp["Rules"]] assert "rule-1" in ids assert "rule-2" in ids assert "rule-3" in ids disabled = [r for r in resp["Rules"] if r["ID"] == "rule-2"][0] assert disabled["Status"] == "Disabled" def test_s3_lifecycle_abort_multipart(s3): """AbortIncompleteMultipartUpload round-trip.""" bucket = "intg-s3-lc-abort" s3.create_bucket(Bucket=bucket) s3.put_bucket_lifecycle_configuration( Bucket=bucket, LifecycleConfiguration={ "Rules": [{ "ID": "abort-uploads", "Status": "Enabled", "Filter": {"Prefix": ""}, "AbortIncompleteMultipartUpload": {"DaysAfterInitiation": 7}, }] }, ) resp = s3.get_bucket_lifecycle_configuration(Bucket=bucket) assert resp["Rules"][0]["AbortIncompleteMultipartUpload"]["DaysAfterInitiation"] == 7