llm_topic_modelling / cdk /test /test_s3_batch_lambda_synth.py
seanpedrickcase's picture
Sync: Updated CDK code to include possibility of exporting DynamoDB logs to S3 csv output on automated basis
373207f
Raw
History Blame Contribute Delete
3.74 kB
"""Synth assertions for optional S3 batch ECS trigger Lambda."""
import sys
from pathlib import Path
import pytest
CDK_DIR = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(CDK_DIR))
def test_batch_trigger_express_mutual_exclusion():
enable_batch = "True"
use_express = "True"
with pytest.raises(ValueError, match="ENABLE_S3_BATCH_ECS_TRIGGER"):
if enable_batch == "True" and use_express == "True":
raise ValueError(
"ENABLE_S3_BATCH_ECS_TRIGGER=True requires the legacy Fargate task definition "
"for ecs.run_task. Set USE_ECS_EXPRESS_MODE=False or disable the batch trigger."
)
def test_s3_batch_lambda_synth_resources():
from aws_cdk import App, Environment, Stack
from aws_cdk import aws_ec2 as ec2
from aws_cdk import aws_ecs as ecs
from aws_cdk import aws_iam as iam
from aws_cdk import aws_s3 as s3
from cdk_functions import create_s3_batch_ecs_trigger_lambda
app = App()
stack = Stack(
app,
"BatchLambdaTest",
env=Environment(account="123456789012", region="eu-west-2"),
)
vpc = ec2.Vpc(stack, "Vpc", max_azs=2)
output_bucket = s3.Bucket(stack, "OutputBucket")
config_bucket = s3.Bucket(stack, "ConfigBucket")
execution_role = iam.Role(
stack,
"ExecutionRole",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
)
task_role = iam.Role(
stack,
"TaskRole",
assumed_by=iam.ServicePrincipal("ecs-tasks.amazonaws.com"),
)
task_def = ecs.FargateTaskDefinition(
stack, "TaskDef", memory_limit_mib=2048, cpu=1024
)
task_def.add_container(
"docsummarisation",
image=ecs.ContainerImage.from_registry("nginx:latest"),
)
lambda_asset = str(CDK_DIR / "config" / "lambda")
create_s3_batch_ecs_trigger_lambda(
stack,
"S3BatchEcsTrigger",
function_name=None,
lambda_asset_path=lambda_asset,
output_bucket=output_bucket,
config_bucket=config_bucket,
cluster_name="test-cluster",
task_definition_arn=task_def.task_definition_arn,
container_name="docsummarisation",
subnet_ids=[vpc.private_subnets[0].subnet_id],
security_group_id=ec2.SecurityGroup(
stack, "EcsSg", vpc=vpc, allow_all_outbound=True
).security_group_id,
execution_role=execution_role,
task_role=task_role,
env_prefix="input/config/",
env_suffix=".env",
input_prefix="input/",
config_prefix="",
default_params_key="general-config/app_defaults.env",
general_env_prefix="general-config/",
default_task_type="extract",
)
template = app.synth().get_stack_by_name("BatchLambdaTest").template
resources = template["Resources"]
types = {r["Type"] for r in resources.values()}
assert "AWS::Lambda::Function" in types
assert "AWS::Lambda::Permission" in types
assert "Custom::S3BucketNotifications" in types or any(
r.get("Type") == "AWS::S3::Bucket"
and "NotificationConfiguration" in r.get("Properties", {})
for r in resources.values()
)
batch_lambda = next(
r
for r in resources.values()
if r["Type"] == "AWS::Lambda::Function"
and r["Properties"].get("Handler") == "lambda_function.lambda_handler"
)
env_vars = batch_lambda["Properties"]["Environment"]["Variables"]
assert env_vars.get("DEFAULT_TASK_TYPE") == "extract"
assert env_vars.get("BUCKET")
assert "RUN_DIRECT_MODE" not in env_vars
assert env_vars.get("ENV_PREFIX") == "input/config/"
assert env_vars.get("ECS_ASSIGN_PUBLIC_IP") == "DISABLED"