import io import json import os import time import zipfile from urllib.parse import urlparse import pytest from botocore.exceptions import ClientError import uuid as _uuid_mod def test_logs_put_get(logs): logs.create_log_group(logGroupName="/test/ministack") logs.create_log_stream(logGroupName="/test/ministack", logStreamName="stream1") logs.put_log_events( logGroupName="/test/ministack", logStreamName="stream1", logEvents=[ {"timestamp": int(time.time() * 1000), "message": "Hello from MiniStack"}, {"timestamp": int(time.time() * 1000), "message": "Second log line"}, ], ) resp = logs.get_log_events(logGroupName="/test/ministack", logStreamName="stream1") assert len(resp["events"]) == 2 def test_logs_filter(logs): resp = logs.filter_log_events(logGroupName="/test/ministack", filterPattern="MiniStack") assert len(resp["events"]) >= 1 def test_logs_create_group_v2(logs): logs.create_log_group(logGroupName="/cwl/cg-v2") resp = logs.describe_log_groups(logGroupNamePrefix="/cwl/cg-v2") assert any(g["logGroupName"] == "/cwl/cg-v2" for g in resp["logGroups"]) def test_logs_create_group_duplicate_v2(logs): logs.create_log_group(logGroupName="/cwl/dup-v2") with pytest.raises(ClientError) as exc: logs.create_log_group(logGroupName="/cwl/dup-v2") assert exc.value.response["Error"]["Code"] == "ResourceAlreadyExistsException" def test_logs_delete_group_v2(logs): logs.create_log_group(logGroupName="/cwl/del-v2") logs.delete_log_group(logGroupName="/cwl/del-v2") resp = logs.describe_log_groups(logGroupNamePrefix="/cwl/del-v2") assert not any(g["logGroupName"] == "/cwl/del-v2" for g in resp["logGroups"]) def test_logs_describe_groups_v2(logs): logs.create_log_group(logGroupName="/cwl/dg-a") logs.create_log_group(logGroupName="/cwl/dg-b") resp = logs.describe_log_groups(logGroupNamePrefix="/cwl/dg-") names = [g["logGroupName"] for g in resp["logGroups"]] assert "/cwl/dg-a" in names assert "/cwl/dg-b" in names def test_logs_create_stream_v2(logs): logs.create_log_group(logGroupName="/cwl/str-v2") logs.create_log_stream(logGroupName="/cwl/str-v2", logStreamName="stream-a") logs.create_log_stream(logGroupName="/cwl/str-v2", logStreamName="stream-b") resp = logs.describe_log_streams(logGroupName="/cwl/str-v2") names = [s["logStreamName"] for s in resp["logStreams"]] assert "stream-a" in names assert "stream-b" in names def test_logs_put_get_events_v2(logs): logs.create_log_group(logGroupName="/cwl/pge-v2") logs.create_log_stream(logGroupName="/cwl/pge-v2", logStreamName="s1") now = int(time.time() * 1000) logs.put_log_events( logGroupName="/cwl/pge-v2", logStreamName="s1", logEvents=[ {"timestamp": now, "message": "first line"}, {"timestamp": now + 1, "message": "second line"}, {"timestamp": now + 2, "message": "third line"}, ], ) resp = logs.get_log_events(logGroupName="/cwl/pge-v2", logStreamName="s1") assert len(resp["events"]) == 3 assert resp["events"][0]["message"] == "first line" assert resp["events"][2]["message"] == "third line" def test_logs_filter_events_v2(logs): logs.create_log_group(logGroupName="/cwl/flt-v2") logs.create_log_stream(logGroupName="/cwl/flt-v2", logStreamName="s1") now = int(time.time() * 1000) logs.put_log_events( logGroupName="/cwl/flt-v2", logStreamName="s1", logEvents=[ {"timestamp": now, "message": "ERROR disk full"}, {"timestamp": now + 1, "message": "INFO all clear"}, {"timestamp": now + 2, "message": "ERROR timeout"}, ], ) resp = logs.filter_log_events(logGroupName="/cwl/flt-v2", filterPattern="ERROR") assert len(resp["events"]) == 2 msgs = [e["message"] for e in resp["events"]] assert "ERROR disk full" in msgs assert "ERROR timeout" in msgs def test_logs_retention_policy_v2(logs): logs.create_log_group(logGroupName="/cwl/ret-v2") logs.put_retention_policy(logGroupName="/cwl/ret-v2", retentionInDays=30) resp = logs.describe_log_groups(logGroupNamePrefix="/cwl/ret-v2") grp = next(g for g in resp["logGroups"] if g["logGroupName"] == "/cwl/ret-v2") assert grp["retentionInDays"] == 30 logs.delete_retention_policy(logGroupName="/cwl/ret-v2") resp2 = logs.describe_log_groups(logGroupNamePrefix="/cwl/ret-v2") grp2 = next(g for g in resp2["logGroups"] if g["logGroupName"] == "/cwl/ret-v2") assert "retentionInDays" not in grp2 def test_logs_tags_v2(logs): logs.create_log_group(logGroupName="/cwl/tag-v2", tags={"env": "prod"}) resp = logs.list_tags_log_group(logGroupName="/cwl/tag-v2") assert resp["tags"]["env"] == "prod" logs.tag_log_group(logGroupName="/cwl/tag-v2", tags={"team": "infra"}) resp2 = logs.list_tags_log_group(logGroupName="/cwl/tag-v2") assert resp2["tags"]["env"] == "prod" assert resp2["tags"]["team"] == "infra" logs.untag_log_group(logGroupName="/cwl/tag-v2", tags=["env"]) resp3 = logs.list_tags_log_group(logGroupName="/cwl/tag-v2") assert "env" not in resp3["tags"] assert resp3["tags"]["team"] == "infra" def test_logs_put_requires_group_v2(logs): with pytest.raises(ClientError) as exc: logs.put_log_events( logGroupName="/cwl/nonexistent-xyz", logStreamName="s1", logEvents=[{"timestamp": int(time.time() * 1000), "message": "fail"}], ) assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException" def test_logs_retention_policy(logs): import uuid as _uuid group = f"/intg/retention/{_uuid.uuid4().hex[:8]}" logs.create_log_group(logGroupName=group) logs.put_retention_policy(logGroupName=group, retentionInDays=7) groups = logs.describe_log_groups(logGroupNamePrefix=group)["logGroups"] assert groups[0].get("retentionInDays") == 7 logs.delete_retention_policy(logGroupName=group) groups2 = logs.describe_log_groups(logGroupNamePrefix=group)["logGroups"] assert groups2[0].get("retentionInDays") is None def test_logs_subscription_filter(logs): import uuid as _uuid group = f"/intg/subfilter/{_uuid.uuid4().hex[:8]}" logs.create_log_group(logGroupName=group) logs.put_subscription_filter( logGroupName=group, filterName="my-filter", filterPattern="ERROR", destinationArn="arn:aws:lambda:us-east-1:000000000000:function:log-handler", ) resp = logs.describe_subscription_filters(logGroupName=group) assert any(f["filterName"] == "my-filter" for f in resp["subscriptionFilters"]) logs.delete_subscription_filter(logGroupName=group, filterName="my-filter") resp2 = logs.describe_subscription_filters(logGroupName=group) assert not any(f["filterName"] == "my-filter" for f in resp2["subscriptionFilters"]) def test_logs_metric_filter(logs): import uuid as _uuid group = f"/intg/metricfilter/{_uuid.uuid4().hex[:8]}" logs.create_log_group(logGroupName=group) logs.put_metric_filter( logGroupName=group, filterName="error-count", filterPattern="[ERROR]", metricTransformations=[ { "metricName": "ErrorCount", "metricNamespace": "MyApp", "metricValue": "1", } ], ) resp = logs.describe_metric_filters(logGroupName=group) assert any(f["filterName"] == "error-count" for f in resp["metricFilters"]) logs.delete_metric_filter(logGroupName=group, filterName="error-count") resp2 = logs.describe_metric_filters(logGroupName=group) assert not any(f["filterName"] == "error-count" for f in resp2.get("metricFilters", [])) def test_logs_tag_log_group(logs): import uuid as _uuid group = f"/intg/tagging/{_uuid.uuid4().hex[:8]}" logs.create_log_group(logGroupName=group) logs.tag_log_group(logGroupName=group, tags={"project": "ministack", "env": "test"}) resp = logs.list_tags_log_group(logGroupName=group) assert resp["tags"].get("project") == "ministack" logs.untag_log_group(logGroupName=group, tags=["project"]) resp2 = logs.list_tags_log_group(logGroupName=group) assert "project" not in resp2["tags"] def test_logs_insights_start_query(logs): import uuid as _uuid group = f"/intg/insights/{_uuid.uuid4().hex[:8]}" logs.create_log_group(logGroupName=group) resp = logs.start_query( logGroupName=group, startTime=int(time.time()) - 3600, endTime=int(time.time()), queryString="fields @timestamp, @message | limit 10", ) assert "queryId" in resp results = logs.get_query_results(queryId=resp["queryId"]) assert results["status"] in ("Complete", "Running", "Scheduled") def test_logs_filter_with_wildcard(logs): """FilterLogEvents with wildcard pattern matches correctly.""" logs.create_log_group(logGroupName="/qa/logs/wildcard") logs.create_log_stream(logGroupName="/qa/logs/wildcard", logStreamName="stream1") logs.put_log_events( logGroupName="/qa/logs/wildcard", logStreamName="stream1", logEvents=[ {"timestamp": int(time.time() * 1000), "message": "ERROR: disk full"}, {"timestamp": int(time.time() * 1000), "message": "INFO: all good"}, {"timestamp": int(time.time() * 1000), "message": "ERROR: timeout"}, ], ) resp = logs.filter_log_events(logGroupName="/qa/logs/wildcard", filterPattern="ERROR*") messages = [e["message"] for e in resp["events"]] assert all("ERROR" in m for m in messages) assert len(messages) == 2 def test_logs_describe_log_groups_prefix(logs): """DescribeLogGroups with logGroupNamePrefix filters correctly.""" logs.create_log_group(logGroupName="/qa/logs/prefix/alpha") logs.create_log_group(logGroupName="/qa/logs/prefix/beta") logs.create_log_group(logGroupName="/qa/logs/other/gamma") resp = logs.describe_log_groups(logGroupNamePrefix="/qa/logs/prefix") names = [g["logGroupName"] for g in resp["logGroups"]] assert "/qa/logs/prefix/alpha" in names assert "/qa/logs/prefix/beta" in names assert "/qa/logs/other/gamma" not in names def test_logs_retention_policy_invalid_value(logs): """PutRetentionPolicy with invalid days raises InvalidParameterException.""" logs.create_log_group(logGroupName="/qa/logs/retention-invalid") with pytest.raises(ClientError) as exc: logs.put_retention_policy(logGroupName="/qa/logs/retention-invalid", retentionInDays=999) assert exc.value.response["Error"]["Code"] == "InvalidParameterException" def test_logs_list_tags_for_resource_arn_without_star(logs): name = "/tf/regression/arn-no-star" logs.create_log_group(logGroupName=name, tags={"env": "test"}) # Get the ARN as stored (includes :*) groups = logs.describe_log_groups(logGroupNamePrefix=name)["logGroups"] stored_arn = groups[0]["arn"] assert stored_arn.endswith(":*"), f"Expected stored ARN to end with :*, got {stored_arn}" # Terraform sends the ARN without :* — this must not raise ResourceNotFoundException arn_no_star = stored_arn[:-2] # strip ':*' resp = logs.list_tags_for_resource(resourceArn=arn_no_star) assert resp["tags"]["env"] == "test" logs.delete_log_group(logGroupName=name) def test_logs_get_log_events_pagination_stops(logs): """GetLogEvents must return the caller's token when at end of stream to stop SDK pagination.""" group = "/test/pagination-stop" stream = "s1" logs.create_log_group(logGroupName=group) logs.create_log_stream(logGroupName=group, logStreamName=stream) logs.put_log_events( logGroupName=group, logStreamName=stream, logEvents=[ {"timestamp": 1000, "message": "msg1"}, {"timestamp": 2000, "message": "msg2"}, ], ) # First call — get all events resp = logs.get_log_events(logGroupName=group, logStreamName=stream, startFromHead=True) assert len(resp["events"]) == 2 fwd_token = resp["nextForwardToken"] # Second call with forward token — no more events, token must match what we sent resp2 = logs.get_log_events(logGroupName=group, logStreamName=stream, nextToken=fwd_token) assert len(resp2["events"]) == 0 assert resp2["nextForwardToken"] == fwd_token # same token = stop paginating # --------------------------------------------------------------------------- # Destination operations # --------------------------------------------------------------------------- def test_logs_put_destination(logs): """PutDestination creates a destination and returns its metadata.""" uid = _uuid_mod.uuid4().hex[:8] dest_name = f"test-dest-{uid}" target_arn = f"arn:aws:kinesis:us-east-1:000000000000:stream/dest-stream-{uid}" role_arn = f"arn:aws:iam::000000000000:role/dest-role-{uid}" resp = logs.put_destination( destinationName=dest_name, targetArn=target_arn, roleArn=role_arn, ) dest = resp["destination"] assert dest["destinationName"] == dest_name assert dest["targetArn"] == target_arn assert dest["roleArn"] == role_arn assert "arn" in dest assert "creationTime" in dest # cleanup logs.delete_destination(destinationName=dest_name) def test_logs_delete_destination(logs): """DeleteDestination removes a destination; deleting again raises ResourceNotFoundException.""" uid = _uuid_mod.uuid4().hex[:8] dest_name = f"test-dest-del-{uid}" logs.put_destination( destinationName=dest_name, targetArn="arn:aws:kinesis:us-east-1:000000000000:stream/s1", roleArn="arn:aws:iam::000000000000:role/r1", ) logs.delete_destination(destinationName=dest_name) with pytest.raises(ClientError) as exc: logs.delete_destination(destinationName=dest_name) assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException" def test_logs_describe_destinations(logs): """DescribeDestinations lists destinations filtered by prefix.""" uid = _uuid_mod.uuid4().hex[:8] name_a = f"desc-dest-{uid}-alpha" name_b = f"desc-dest-{uid}-beta" name_c = f"other-dest-{uid}" for n in (name_a, name_b, name_c): logs.put_destination( destinationName=n, targetArn="arn:aws:kinesis:us-east-1:000000000000:stream/s1", roleArn="arn:aws:iam::000000000000:role/r1", ) resp = logs.describe_destinations(DestinationNamePrefix=f"desc-dest-{uid}") names = [d["destinationName"] for d in resp["destinations"]] assert name_a in names assert name_b in names assert name_c not in names # cleanup for n in (name_a, name_b, name_c): logs.delete_destination(destinationName=n) def test_logs_put_destination_policy(logs): """PutDestinationPolicy updates the accessPolicy on an existing destination.""" uid = _uuid_mod.uuid4().hex[:8] dest_name = f"test-dest-pol-{uid}" logs.put_destination( destinationName=dest_name, targetArn="arn:aws:kinesis:us-east-1:000000000000:stream/s1", roleArn="arn:aws:iam::000000000000:role/r1", ) policy = json.dumps({"Statement": [{"Effect": "Allow", "Principal": "*", "Action": "logs:PutSubscriptionFilter"}]}) logs.put_destination_policy(destinationName=dest_name, accessPolicy=policy) resp = logs.describe_destinations(DestinationNamePrefix=dest_name) dest = next(d for d in resp["destinations"] if d["destinationName"] == dest_name) assert dest["accessPolicy"] == policy # cleanup logs.delete_destination(destinationName=dest_name) # --------------------------------------------------------------------------- # ARN-based tagging operations (TagResource / UntagResource) # --------------------------------------------------------------------------- def test_logs_tag_resource(logs): """TagResource adds tags to a log group resolved by ARN.""" uid = _uuid_mod.uuid4().hex[:8] group = f"/intg/tag-resource/{uid}" logs.create_log_group(logGroupName=group) groups = logs.describe_log_groups(logGroupNamePrefix=group)["logGroups"] arn = groups[0]["arn"] logs.tag_resource(resourceArn=arn, tags={"team": "platform", "env": "staging"}) resp = logs.list_tags_for_resource(resourceArn=arn) assert resp["tags"]["team"] == "platform" assert resp["tags"]["env"] == "staging" # cleanup logs.delete_log_group(logGroupName=group) def test_logs_untag_resource(logs): """UntagResource removes tags from a log group resolved by ARN.""" uid = _uuid_mod.uuid4().hex[:8] group = f"/intg/untag-resource/{uid}" logs.create_log_group(logGroupName=group, tags={"keep": "yes", "remove": "me"}) groups = logs.describe_log_groups(logGroupNamePrefix=group)["logGroups"] arn = groups[0]["arn"] logs.untag_resource(resourceArn=arn, tagKeys=["remove"]) resp = logs.list_tags_for_resource(resourceArn=arn) assert resp["tags"]["keep"] == "yes" assert "remove" not in resp["tags"] # cleanup logs.delete_log_group(logGroupName=group) # --------------------------------------------------------------------------- # StopQuery # --------------------------------------------------------------------------- def test_logs_stop_query(logs): """StopQuery cancels a running query and sets its status to Cancelled.""" uid = _uuid_mod.uuid4().hex[:8] group = f"/intg/stop-query/{uid}" logs.create_log_group(logGroupName=group) start_resp = logs.start_query( logGroupName=group, startTime=int(time.time()) - 3600, endTime=int(time.time()), queryString="fields @timestamp | limit 5", ) query_id = start_resp["queryId"] stop_resp = logs.stop_query(queryId=query_id) assert stop_resp["success"] is True results = logs.get_query_results(queryId=query_id) assert results["status"] == "Cancelled" # cleanup logs.delete_log_group(logGroupName=group)