aws_rl_env / aws_infra /tests /test_logs.py
Sizzing's picture
Upload folder using huggingface_hub
c745a99 verified
import io
import json
import os
import time
import zipfile
from urllib.parse import urlparse
import pytest
from botocore.exceptions import ClientError
import uuid as _uuid_mod
def test_logs_put_get(logs):
logs.create_log_group(logGroupName="/test/ministack")
logs.create_log_stream(logGroupName="/test/ministack", logStreamName="stream1")
logs.put_log_events(
logGroupName="/test/ministack",
logStreamName="stream1",
logEvents=[
{"timestamp": int(time.time() * 1000), "message": "Hello from MiniStack"},
{"timestamp": int(time.time() * 1000), "message": "Second log line"},
],
)
resp = logs.get_log_events(logGroupName="/test/ministack", logStreamName="stream1")
assert len(resp["events"]) == 2
def test_logs_filter(logs):
resp = logs.filter_log_events(logGroupName="/test/ministack", filterPattern="MiniStack")
assert len(resp["events"]) >= 1
def test_logs_create_group_v2(logs):
logs.create_log_group(logGroupName="/cwl/cg-v2")
resp = logs.describe_log_groups(logGroupNamePrefix="/cwl/cg-v2")
assert any(g["logGroupName"] == "/cwl/cg-v2" for g in resp["logGroups"])
def test_logs_create_group_duplicate_v2(logs):
logs.create_log_group(logGroupName="/cwl/dup-v2")
with pytest.raises(ClientError) as exc:
logs.create_log_group(logGroupName="/cwl/dup-v2")
assert exc.value.response["Error"]["Code"] == "ResourceAlreadyExistsException"
def test_logs_delete_group_v2(logs):
logs.create_log_group(logGroupName="/cwl/del-v2")
logs.delete_log_group(logGroupName="/cwl/del-v2")
resp = logs.describe_log_groups(logGroupNamePrefix="/cwl/del-v2")
assert not any(g["logGroupName"] == "/cwl/del-v2" for g in resp["logGroups"])
def test_logs_describe_groups_v2(logs):
logs.create_log_group(logGroupName="/cwl/dg-a")
logs.create_log_group(logGroupName="/cwl/dg-b")
resp = logs.describe_log_groups(logGroupNamePrefix="/cwl/dg-")
names = [g["logGroupName"] for g in resp["logGroups"]]
assert "/cwl/dg-a" in names
assert "/cwl/dg-b" in names
def test_logs_create_stream_v2(logs):
logs.create_log_group(logGroupName="/cwl/str-v2")
logs.create_log_stream(logGroupName="/cwl/str-v2", logStreamName="stream-a")
logs.create_log_stream(logGroupName="/cwl/str-v2", logStreamName="stream-b")
resp = logs.describe_log_streams(logGroupName="/cwl/str-v2")
names = [s["logStreamName"] for s in resp["logStreams"]]
assert "stream-a" in names
assert "stream-b" in names
def test_logs_put_get_events_v2(logs):
logs.create_log_group(logGroupName="/cwl/pge-v2")
logs.create_log_stream(logGroupName="/cwl/pge-v2", logStreamName="s1")
now = int(time.time() * 1000)
logs.put_log_events(
logGroupName="/cwl/pge-v2",
logStreamName="s1",
logEvents=[
{"timestamp": now, "message": "first line"},
{"timestamp": now + 1, "message": "second line"},
{"timestamp": now + 2, "message": "third line"},
],
)
resp = logs.get_log_events(logGroupName="/cwl/pge-v2", logStreamName="s1")
assert len(resp["events"]) == 3
assert resp["events"][0]["message"] == "first line"
assert resp["events"][2]["message"] == "third line"
def test_logs_filter_events_v2(logs):
logs.create_log_group(logGroupName="/cwl/flt-v2")
logs.create_log_stream(logGroupName="/cwl/flt-v2", logStreamName="s1")
now = int(time.time() * 1000)
logs.put_log_events(
logGroupName="/cwl/flt-v2",
logStreamName="s1",
logEvents=[
{"timestamp": now, "message": "ERROR disk full"},
{"timestamp": now + 1, "message": "INFO all clear"},
{"timestamp": now + 2, "message": "ERROR timeout"},
],
)
resp = logs.filter_log_events(logGroupName="/cwl/flt-v2", filterPattern="ERROR")
assert len(resp["events"]) == 2
msgs = [e["message"] for e in resp["events"]]
assert "ERROR disk full" in msgs
assert "ERROR timeout" in msgs
def test_logs_retention_policy_v2(logs):
logs.create_log_group(logGroupName="/cwl/ret-v2")
logs.put_retention_policy(logGroupName="/cwl/ret-v2", retentionInDays=30)
resp = logs.describe_log_groups(logGroupNamePrefix="/cwl/ret-v2")
grp = next(g for g in resp["logGroups"] if g["logGroupName"] == "/cwl/ret-v2")
assert grp["retentionInDays"] == 30
logs.delete_retention_policy(logGroupName="/cwl/ret-v2")
resp2 = logs.describe_log_groups(logGroupNamePrefix="/cwl/ret-v2")
grp2 = next(g for g in resp2["logGroups"] if g["logGroupName"] == "/cwl/ret-v2")
assert "retentionInDays" not in grp2
def test_logs_tags_v2(logs):
logs.create_log_group(logGroupName="/cwl/tag-v2", tags={"env": "prod"})
resp = logs.list_tags_log_group(logGroupName="/cwl/tag-v2")
assert resp["tags"]["env"] == "prod"
logs.tag_log_group(logGroupName="/cwl/tag-v2", tags={"team": "infra"})
resp2 = logs.list_tags_log_group(logGroupName="/cwl/tag-v2")
assert resp2["tags"]["env"] == "prod"
assert resp2["tags"]["team"] == "infra"
logs.untag_log_group(logGroupName="/cwl/tag-v2", tags=["env"])
resp3 = logs.list_tags_log_group(logGroupName="/cwl/tag-v2")
assert "env" not in resp3["tags"]
assert resp3["tags"]["team"] == "infra"
def test_logs_put_requires_group_v2(logs):
with pytest.raises(ClientError) as exc:
logs.put_log_events(
logGroupName="/cwl/nonexistent-xyz",
logStreamName="s1",
logEvents=[{"timestamp": int(time.time() * 1000), "message": "fail"}],
)
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_logs_retention_policy(logs):
import uuid as _uuid
group = f"/intg/retention/{_uuid.uuid4().hex[:8]}"
logs.create_log_group(logGroupName=group)
logs.put_retention_policy(logGroupName=group, retentionInDays=7)
groups = logs.describe_log_groups(logGroupNamePrefix=group)["logGroups"]
assert groups[0].get("retentionInDays") == 7
logs.delete_retention_policy(logGroupName=group)
groups2 = logs.describe_log_groups(logGroupNamePrefix=group)["logGroups"]
assert groups2[0].get("retentionInDays") is None
def test_logs_subscription_filter(logs):
import uuid as _uuid
group = f"/intg/subfilter/{_uuid.uuid4().hex[:8]}"
logs.create_log_group(logGroupName=group)
logs.put_subscription_filter(
logGroupName=group,
filterName="my-filter",
filterPattern="ERROR",
destinationArn="arn:aws:lambda:us-east-1:000000000000:function:log-handler",
)
resp = logs.describe_subscription_filters(logGroupName=group)
assert any(f["filterName"] == "my-filter" for f in resp["subscriptionFilters"])
logs.delete_subscription_filter(logGroupName=group, filterName="my-filter")
resp2 = logs.describe_subscription_filters(logGroupName=group)
assert not any(f["filterName"] == "my-filter" for f in resp2["subscriptionFilters"])
def test_logs_metric_filter(logs):
import uuid as _uuid
group = f"/intg/metricfilter/{_uuid.uuid4().hex[:8]}"
logs.create_log_group(logGroupName=group)
logs.put_metric_filter(
logGroupName=group,
filterName="error-count",
filterPattern="[ERROR]",
metricTransformations=[
{
"metricName": "ErrorCount",
"metricNamespace": "MyApp",
"metricValue": "1",
}
],
)
resp = logs.describe_metric_filters(logGroupName=group)
assert any(f["filterName"] == "error-count" for f in resp["metricFilters"])
logs.delete_metric_filter(logGroupName=group, filterName="error-count")
resp2 = logs.describe_metric_filters(logGroupName=group)
assert not any(f["filterName"] == "error-count" for f in resp2.get("metricFilters", []))
def test_logs_tag_log_group(logs):
import uuid as _uuid
group = f"/intg/tagging/{_uuid.uuid4().hex[:8]}"
logs.create_log_group(logGroupName=group)
logs.tag_log_group(logGroupName=group, tags={"project": "ministack", "env": "test"})
resp = logs.list_tags_log_group(logGroupName=group)
assert resp["tags"].get("project") == "ministack"
logs.untag_log_group(logGroupName=group, tags=["project"])
resp2 = logs.list_tags_log_group(logGroupName=group)
assert "project" not in resp2["tags"]
def test_logs_insights_start_query(logs):
import uuid as _uuid
group = f"/intg/insights/{_uuid.uuid4().hex[:8]}"
logs.create_log_group(logGroupName=group)
resp = logs.start_query(
logGroupName=group,
startTime=int(time.time()) - 3600,
endTime=int(time.time()),
queryString="fields @timestamp, @message | limit 10",
)
assert "queryId" in resp
results = logs.get_query_results(queryId=resp["queryId"])
assert results["status"] in ("Complete", "Running", "Scheduled")
def test_logs_filter_with_wildcard(logs):
"""FilterLogEvents with wildcard pattern matches correctly."""
logs.create_log_group(logGroupName="/qa/logs/wildcard")
logs.create_log_stream(logGroupName="/qa/logs/wildcard", logStreamName="stream1")
logs.put_log_events(
logGroupName="/qa/logs/wildcard",
logStreamName="stream1",
logEvents=[
{"timestamp": int(time.time() * 1000), "message": "ERROR: disk full"},
{"timestamp": int(time.time() * 1000), "message": "INFO: all good"},
{"timestamp": int(time.time() * 1000), "message": "ERROR: timeout"},
],
)
resp = logs.filter_log_events(logGroupName="/qa/logs/wildcard", filterPattern="ERROR*")
messages = [e["message"] for e in resp["events"]]
assert all("ERROR" in m for m in messages)
assert len(messages) == 2
def test_logs_describe_log_groups_prefix(logs):
"""DescribeLogGroups with logGroupNamePrefix filters correctly."""
logs.create_log_group(logGroupName="/qa/logs/prefix/alpha")
logs.create_log_group(logGroupName="/qa/logs/prefix/beta")
logs.create_log_group(logGroupName="/qa/logs/other/gamma")
resp = logs.describe_log_groups(logGroupNamePrefix="/qa/logs/prefix")
names = [g["logGroupName"] for g in resp["logGroups"]]
assert "/qa/logs/prefix/alpha" in names
assert "/qa/logs/prefix/beta" in names
assert "/qa/logs/other/gamma" not in names
def test_logs_retention_policy_invalid_value(logs):
"""PutRetentionPolicy with invalid days raises InvalidParameterException."""
logs.create_log_group(logGroupName="/qa/logs/retention-invalid")
with pytest.raises(ClientError) as exc:
logs.put_retention_policy(logGroupName="/qa/logs/retention-invalid", retentionInDays=999)
assert exc.value.response["Error"]["Code"] == "InvalidParameterException"
def test_logs_list_tags_for_resource_arn_without_star(logs):
name = "/tf/regression/arn-no-star"
logs.create_log_group(logGroupName=name, tags={"env": "test"})
# Get the ARN as stored (includes :*)
groups = logs.describe_log_groups(logGroupNamePrefix=name)["logGroups"]
stored_arn = groups[0]["arn"]
assert stored_arn.endswith(":*"), f"Expected stored ARN to end with :*, got {stored_arn}"
# Terraform sends the ARN without :* — this must not raise ResourceNotFoundException
arn_no_star = stored_arn[:-2] # strip ':*'
resp = logs.list_tags_for_resource(resourceArn=arn_no_star)
assert resp["tags"]["env"] == "test"
logs.delete_log_group(logGroupName=name)
def test_logs_get_log_events_pagination_stops(logs):
"""GetLogEvents must return the caller's token when at end of stream to stop SDK pagination."""
group = "/test/pagination-stop"
stream = "s1"
logs.create_log_group(logGroupName=group)
logs.create_log_stream(logGroupName=group, logStreamName=stream)
logs.put_log_events(
logGroupName=group, logStreamName=stream,
logEvents=[
{"timestamp": 1000, "message": "msg1"},
{"timestamp": 2000, "message": "msg2"},
],
)
# First call — get all events
resp = logs.get_log_events(logGroupName=group, logStreamName=stream, startFromHead=True)
assert len(resp["events"]) == 2
fwd_token = resp["nextForwardToken"]
# Second call with forward token — no more events, token must match what we sent
resp2 = logs.get_log_events(logGroupName=group, logStreamName=stream, nextToken=fwd_token)
assert len(resp2["events"]) == 0
assert resp2["nextForwardToken"] == fwd_token # same token = stop paginating
# ---------------------------------------------------------------------------
# Destination operations
# ---------------------------------------------------------------------------
def test_logs_put_destination(logs):
"""PutDestination creates a destination and returns its metadata."""
uid = _uuid_mod.uuid4().hex[:8]
dest_name = f"test-dest-{uid}"
target_arn = f"arn:aws:kinesis:us-east-1:000000000000:stream/dest-stream-{uid}"
role_arn = f"arn:aws:iam::000000000000:role/dest-role-{uid}"
resp = logs.put_destination(
destinationName=dest_name,
targetArn=target_arn,
roleArn=role_arn,
)
dest = resp["destination"]
assert dest["destinationName"] == dest_name
assert dest["targetArn"] == target_arn
assert dest["roleArn"] == role_arn
assert "arn" in dest
assert "creationTime" in dest
# cleanup
logs.delete_destination(destinationName=dest_name)
def test_logs_delete_destination(logs):
"""DeleteDestination removes a destination; deleting again raises ResourceNotFoundException."""
uid = _uuid_mod.uuid4().hex[:8]
dest_name = f"test-dest-del-{uid}"
logs.put_destination(
destinationName=dest_name,
targetArn="arn:aws:kinesis:us-east-1:000000000000:stream/s1",
roleArn="arn:aws:iam::000000000000:role/r1",
)
logs.delete_destination(destinationName=dest_name)
with pytest.raises(ClientError) as exc:
logs.delete_destination(destinationName=dest_name)
assert exc.value.response["Error"]["Code"] == "ResourceNotFoundException"
def test_logs_describe_destinations(logs):
"""DescribeDestinations lists destinations filtered by prefix."""
uid = _uuid_mod.uuid4().hex[:8]
name_a = f"desc-dest-{uid}-alpha"
name_b = f"desc-dest-{uid}-beta"
name_c = f"other-dest-{uid}"
for n in (name_a, name_b, name_c):
logs.put_destination(
destinationName=n,
targetArn="arn:aws:kinesis:us-east-1:000000000000:stream/s1",
roleArn="arn:aws:iam::000000000000:role/r1",
)
resp = logs.describe_destinations(DestinationNamePrefix=f"desc-dest-{uid}")
names = [d["destinationName"] for d in resp["destinations"]]
assert name_a in names
assert name_b in names
assert name_c not in names
# cleanup
for n in (name_a, name_b, name_c):
logs.delete_destination(destinationName=n)
def test_logs_put_destination_policy(logs):
"""PutDestinationPolicy updates the accessPolicy on an existing destination."""
uid = _uuid_mod.uuid4().hex[:8]
dest_name = f"test-dest-pol-{uid}"
logs.put_destination(
destinationName=dest_name,
targetArn="arn:aws:kinesis:us-east-1:000000000000:stream/s1",
roleArn="arn:aws:iam::000000000000:role/r1",
)
policy = json.dumps({"Statement": [{"Effect": "Allow", "Principal": "*", "Action": "logs:PutSubscriptionFilter"}]})
logs.put_destination_policy(destinationName=dest_name, accessPolicy=policy)
resp = logs.describe_destinations(DestinationNamePrefix=dest_name)
dest = next(d for d in resp["destinations"] if d["destinationName"] == dest_name)
assert dest["accessPolicy"] == policy
# cleanup
logs.delete_destination(destinationName=dest_name)
# ---------------------------------------------------------------------------
# ARN-based tagging operations (TagResource / UntagResource)
# ---------------------------------------------------------------------------
def test_logs_tag_resource(logs):
"""TagResource adds tags to a log group resolved by ARN."""
uid = _uuid_mod.uuid4().hex[:8]
group = f"/intg/tag-resource/{uid}"
logs.create_log_group(logGroupName=group)
groups = logs.describe_log_groups(logGroupNamePrefix=group)["logGroups"]
arn = groups[0]["arn"]
logs.tag_resource(resourceArn=arn, tags={"team": "platform", "env": "staging"})
resp = logs.list_tags_for_resource(resourceArn=arn)
assert resp["tags"]["team"] == "platform"
assert resp["tags"]["env"] == "staging"
# cleanup
logs.delete_log_group(logGroupName=group)
def test_logs_untag_resource(logs):
"""UntagResource removes tags from a log group resolved by ARN."""
uid = _uuid_mod.uuid4().hex[:8]
group = f"/intg/untag-resource/{uid}"
logs.create_log_group(logGroupName=group, tags={"keep": "yes", "remove": "me"})
groups = logs.describe_log_groups(logGroupNamePrefix=group)["logGroups"]
arn = groups[0]["arn"]
logs.untag_resource(resourceArn=arn, tagKeys=["remove"])
resp = logs.list_tags_for_resource(resourceArn=arn)
assert resp["tags"]["keep"] == "yes"
assert "remove" not in resp["tags"]
# cleanup
logs.delete_log_group(logGroupName=group)
# ---------------------------------------------------------------------------
# StopQuery
# ---------------------------------------------------------------------------
def test_logs_stop_query(logs):
"""StopQuery cancels a running query and sets its status to Cancelled."""
uid = _uuid_mod.uuid4().hex[:8]
group = f"/intg/stop-query/{uid}"
logs.create_log_group(logGroupName=group)
start_resp = logs.start_query(
logGroupName=group,
startTime=int(time.time()) - 3600,
endTime=int(time.time()),
queryString="fields @timestamp | limit 5",
)
query_id = start_resp["queryId"]
stop_resp = logs.stop_query(queryId=query_id)
assert stop_resp["success"] is True
results = logs.get_query_results(queryId=query_id)
assert results["status"] == "Cancelled"
# cleanup
logs.delete_log_group(logGroupName=group)