aws_rl_env / aws_infra /tests /test_ecs.py
Sizzing's picture
Upload folder using huggingface_hub
c745a99 verified
import io
import json
import os
import time
import zipfile
from urllib.parse import urlparse
import pytest
from botocore.exceptions import ClientError
import uuid as _uuid_mod
def test_ecs_cluster(ecs):
ecs.create_cluster(clusterName="test-cluster")
clusters = ecs.list_clusters()
assert any("test-cluster" in arn for arn in clusters["clusterArns"])
def test_ecs_task_def(ecs):
resp = ecs.register_task_definition(
family="test-task",
containerDefinitions=[
{
"name": "web",
"image": "nginx:alpine",
"cpu": 128,
"memory": 256,
"portMappings": [{"containerPort": 80, "hostPort": 8080}],
}
],
requiresCompatibilities=["EC2"],
cpu="256",
memory="512",
)
assert resp["taskDefinition"]["family"] == "test-task"
assert resp["taskDefinition"]["revision"] == 1
def test_ecs_list_task_defs(ecs):
resp = ecs.list_task_definitions(familyPrefix="test-task")
assert len(resp["taskDefinitionArns"]) >= 1
def test_ecs_run_task_stops_after_exit(ecs):
"""DescribeTasks transitions to STOPPED after Docker container exits."""
ecs.create_cluster(clusterName="task-lifecycle")
ecs.register_task_definition(
family="short-lived",
containerDefinitions=[
{
"name": "worker",
"image": "alpine:latest",
"command": ["sh", "-c", "echo done"],
"essential": True,
}
],
)
resp = ecs.run_task(cluster="task-lifecycle", taskDefinition="short-lived")
task_arn = resp["tasks"][0]["taskArn"]
assert resp["tasks"][0]["lastStatus"] == "RUNNING"
# Poll until STOPPED (container exits almost immediately)
stopped = False
for _ in range(30):
time.sleep(2)
desc = ecs.describe_tasks(cluster="task-lifecycle", tasks=[task_arn])
task = desc["tasks"][0]
if task["lastStatus"] == "STOPPED":
stopped = True
assert task["desiredStatus"] == "STOPPED"
assert task["stopCode"] == "EssentialContainerExited"
assert task["containers"][0]["lastStatus"] == "STOPPED"
assert task["containers"][0]["exitCode"] == 0
break
assert stopped, "Task should transition to STOPPED after container exits"
def test_ecs_run_task_network_connectivity(ecs):
"""ECS container can reach Ministack (proves network detection works)."""
endpoint = os.environ.get("MINISTACK_ENDPOINT", "http://localhost:4566")
# Determine how a container can reach the host where Ministack runs.
# Docker Desktop (macOS/Windows): host.docker.internal works.
# Linux: use the Docker bridge gateway IP (typically 172.17.0.1).
host = os.environ.get("MINISTACK_HOST_FROM_CONTAINER", "")
if not host:
import platform
if platform.system() == "Linux":
# Docker bridge gateway — how containers reach the host on Linux
host = "172.17.0.1"
else:
host = "host.docker.internal"
parsed = urlparse(endpoint)
container_endpoint = f"{parsed.scheme}://{host}:{parsed.port}"
ecs.create_cluster(clusterName="net-test")
ecs.register_task_definition(
family="net-probe",
containerDefinitions=[
{
"name": "probe",
"image": "alpine:latest",
"command": ["sh", "-c", f"wget -q -O /dev/null {container_endpoint}/_ministack/health"],
"essential": True,
}
],
)
resp = ecs.run_task(cluster="net-test", taskDefinition="net-probe")
task_arn = resp["tasks"][0]["taskArn"]
assert resp["tasks"][0]["lastStatus"] == "RUNNING"
# Poll until STOPPED — wget should succeed (exit 0) if network is correct
success = False
for _ in range(30):
time.sleep(2)
desc = ecs.describe_tasks(cluster="net-test", tasks=[task_arn])
task = desc["tasks"][0]
if task["lastStatus"] == "STOPPED":
exit_code = task["containers"][0].get("exitCode")
assert exit_code == 0, (
f"Container could not reach Ministack at {container_endpoint} "
f"(exit code {exit_code}) — network detection may be broken"
)
success = True
break
assert success, "Task should transition to STOPPED"
def test_ecs_service(ecs):
ecs.create_service(
cluster="test-cluster",
serviceName="test-service",
taskDefinition="test-task",
desiredCount=1,
)
resp = ecs.describe_services(cluster="test-cluster", services=["test-service"])
assert len(resp["services"]) == 1
assert resp["services"][0]["serviceName"] == "test-service"
def test_ecs_create_cluster_v2(ecs):
resp = ecs.create_cluster(clusterName="ecs-cc-v2")
assert resp["cluster"]["clusterName"] == "ecs-cc-v2"
assert resp["cluster"]["status"] == "ACTIVE"
assert "clusterArn" in resp["cluster"]
def test_ecs_list_clusters_v2(ecs):
ecs.create_cluster(clusterName="ecs-lc-v2a")
ecs.create_cluster(clusterName="ecs-lc-v2b")
resp = ecs.list_clusters()
arns = resp["clusterArns"]
assert any("ecs-lc-v2a" in a for a in arns)
assert any("ecs-lc-v2b" in a for a in arns)
def test_ecs_register_task_def_v2(ecs):
resp = ecs.register_task_definition(
family="ecs-td-v2",
containerDefinitions=[
{
"name": "web",
"image": "nginx:alpine",
"cpu": 256,
"memory": 512,
"portMappings": [{"containerPort": 80, "hostPort": 8080}],
},
{"name": "sidecar", "image": "envoy:latest", "cpu": 128, "memory": 256},
],
requiresCompatibilities=["EC2"],
cpu="512",
memory="1024",
)
td = resp["taskDefinition"]
assert td["family"] == "ecs-td-v2"
assert td["revision"] == 1
assert td["status"] == "ACTIVE"
assert len(td["containerDefinitions"]) == 2
resp2 = ecs.register_task_definition(
family="ecs-td-v2",
containerDefinitions=[{"name": "web", "image": "nginx:latest", "cpu": 256, "memory": 512}],
)
assert resp2["taskDefinition"]["revision"] == 2
def test_ecs_list_task_defs_v2(ecs):
ecs.register_task_definition(
family="ecs-ltd-v2",
containerDefinitions=[{"name": "app", "image": "img", "cpu": 64, "memory": 128}],
)
resp = ecs.list_task_definitions(familyPrefix="ecs-ltd-v2")
assert len(resp["taskDefinitionArns"]) >= 1
assert all("ecs-ltd-v2" in a for a in resp["taskDefinitionArns"])
def test_ecs_create_service_v2(ecs):
ecs.create_cluster(clusterName="ecs-svc-v2c")
ecs.register_task_definition(
family="ecs-svc-v2td",
containerDefinitions=[{"name": "w", "image": "nginx", "cpu": 64, "memory": 128}],
)
resp = ecs.create_service(
cluster="ecs-svc-v2c",
serviceName="ecs-svc-v2",
taskDefinition="ecs-svc-v2td",
desiredCount=2,
)
svc = resp["service"]
assert svc["serviceName"] == "ecs-svc-v2"
assert svc["status"] == "ACTIVE"
assert svc["desiredCount"] == 2
def test_ecs_describe_services_v2(ecs):
ecs.create_cluster(clusterName="ecs-ds-v2c")
ecs.register_task_definition(
family="ecs-ds-v2td",
containerDefinitions=[{"name": "w", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster="ecs-ds-v2c",
serviceName="ecs-ds-v2a",
taskDefinition="ecs-ds-v2td",
desiredCount=1,
)
ecs.create_service(
cluster="ecs-ds-v2c",
serviceName="ecs-ds-v2b",
taskDefinition="ecs-ds-v2td",
desiredCount=3,
)
resp = ecs.describe_services(cluster="ecs-ds-v2c", services=["ecs-ds-v2a", "ecs-ds-v2b"])
assert len(resp["services"]) == 2
svc_map = {s["serviceName"]: s for s in resp["services"]}
assert svc_map["ecs-ds-v2a"]["desiredCount"] == 1
assert svc_map["ecs-ds-v2b"]["desiredCount"] == 3
def test_ecs_update_service_v2(ecs):
ecs.create_cluster(clusterName="ecs-us-v2c")
ecs.register_task_definition(
family="ecs-us-v2td",
containerDefinitions=[{"name": "w", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster="ecs-us-v2c",
serviceName="ecs-us-v2",
taskDefinition="ecs-us-v2td",
desiredCount=1,
)
ecs.update_service(cluster="ecs-us-v2c", service="ecs-us-v2", desiredCount=5)
resp = ecs.describe_services(cluster="ecs-us-v2c", services=["ecs-us-v2"])
assert resp["services"][0]["desiredCount"] == 5
def test_ecs_tags_v2(ecs):
resp = ecs.create_cluster(
clusterName="ecs-tag-v2c",
tags=[{"key": "env", "value": "staging"}],
)
arn = resp["cluster"]["clusterArn"]
tags = ecs.list_tags_for_resource(resourceArn=arn)["tags"]
assert any(t["key"] == "env" and t["value"] == "staging" for t in tags)
ecs.tag_resource(resourceArn=arn, tags=[{"key": "team", "value": "platform"}])
tags2 = ecs.list_tags_for_resource(resourceArn=arn)["tags"]
tag_map = {t["key"]: t["value"] for t in tags2}
assert tag_map["env"] == "staging"
assert tag_map["team"] == "platform"
ecs.untag_resource(resourceArn=arn, tagKeys=["env"])
tags3 = ecs.list_tags_for_resource(resourceArn=arn)["tags"]
assert not any(t["key"] == "env" for t in tags3)
assert any(t["key"] == "team" for t in tags3)
def test_ecs_capacity_provider(ecs):
resp = ecs.create_capacity_provider(
name="test-cp",
autoScalingGroupProvider={
"autoScalingGroupArn": "arn:aws:autoscaling:us-east-1:000000000000:autoScalingGroup:xxx:autoScalingGroupName/asg-1",
"managedScaling": {"status": "ENABLED"},
},
)
assert resp["capacityProvider"]["name"] == "test-cp"
desc = ecs.describe_capacity_providers(capacityProviders=["test-cp"])
assert any(cp["name"] == "test-cp" for cp in desc["capacityProviders"])
ecs.delete_capacity_provider(capacityProvider="test-cp")
def test_ecs_update_cluster(ecs):
ecs.create_cluster(clusterName="upd-cl")
resp = ecs.update_cluster(
cluster="upd-cl",
settings=[{"name": "containerInsights", "value": "enabled"}],
)
assert resp["cluster"]["clusterName"] == "upd-cl"
def test_ecs_timestamps_are_epoch(ecs):
"""ECS timestamps should be epoch numbers, not ISO strings."""
ecs.create_cluster(clusterName="ts-test-v44")
clusters = ecs.describe_clusters(clusters=["ts-test-v44"])
registered = clusters["clusters"][0].get("registeredContainerInstancesCount", 0)
# registeredAt might not be present on cluster, test on task def
ecs.register_task_definition(
family="ts-td-v44",
containerDefinitions=[{"name": "app", "image": "nginx", "memory": 256}],
)
td = ecs.describe_task_definition(taskDefinition="ts-td-v44")
registered_at = td["taskDefinition"].get("registeredAt")
if registered_at is not None:
from datetime import datetime
assert isinstance(registered_at, datetime), f"registeredAt should be datetime, got {type(registered_at)}"
# ---------------------------------------------------------------------------
# Service task spawning tests
# ---------------------------------------------------------------------------
def test_ecs_service_spawns_tasks(ecs):
"""Creating a service should spawn tasks matching desiredCount."""
cluster = "svc-spawn-c"
ecs.create_cluster(clusterName=cluster)
ecs.register_task_definition(
family="svc-spawn-td",
containerDefinitions=[{"name": "app", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster=cluster,
serviceName="svc-spawn",
taskDefinition="svc-spawn-td",
desiredCount=2,
)
tasks = ecs.list_tasks(cluster=cluster, serviceName="svc-spawn")
assert len(tasks["taskArns"]) == 2
# Verify describe_tasks returns correct metadata
desc = ecs.describe_tasks(cluster=cluster, tasks=tasks["taskArns"])
for t in desc["tasks"]:
assert t["lastStatus"] == "RUNNING"
assert t["group"] == "service:svc-spawn"
assert t["startedBy"] == "svc-spawn"
def test_ecs_list_services(ecs):
"""list_services should return ARNs of services in the cluster."""
cluster = "ls-svc-c"
ecs.create_cluster(clusterName=cluster)
ecs.register_task_definition(
family="ls-svc-td",
containerDefinitions=[{"name": "app", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster=cluster, serviceName="ls-svc-a", taskDefinition="ls-svc-td", desiredCount=1,
)
ecs.create_service(
cluster=cluster, serviceName="ls-svc-b", taskDefinition="ls-svc-td", desiredCount=1,
)
resp = ecs.list_services(cluster=cluster)
arns = resp["serviceArns"]
assert len(arns) == 2
assert any("ls-svc-a" in a for a in arns)
assert any("ls-svc-b" in a for a in arns)
def test_ecs_service_running_count(ecs):
"""Service runningCount should match the number of actual running tasks."""
cluster = "rc-c"
ecs.create_cluster(clusterName=cluster)
ecs.register_task_definition(
family="rc-td",
containerDefinitions=[{"name": "app", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster=cluster, serviceName="rc-svc", taskDefinition="rc-td", desiredCount=3,
)
resp = ecs.describe_services(cluster=cluster, services=["rc-svc"])
svc = resp["services"][0]
assert svc["runningCount"] == 3
assert svc["desiredCount"] == 3
def test_ecs_service_scale_up(ecs):
"""Updating desiredCount should spawn additional tasks."""
cluster = "su-c"
ecs.create_cluster(clusterName=cluster)
ecs.register_task_definition(
family="su-td",
containerDefinitions=[{"name": "app", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster=cluster, serviceName="su-svc", taskDefinition="su-td", desiredCount=1,
)
tasks_before = ecs.list_tasks(cluster=cluster, serviceName="su-svc")
assert len(tasks_before["taskArns"]) == 1
ecs.update_service(cluster=cluster, service="su-svc", desiredCount=3)
tasks_after = ecs.list_tasks(cluster=cluster, serviceName="su-svc")
assert len(tasks_after["taskArns"]) == 3
resp = ecs.describe_services(cluster=cluster, services=["su-svc"])
assert resp["services"][0]["runningCount"] == 3
def test_ecs_service_scale_down(ecs):
"""Scaling down desiredCount should stop excess tasks."""
cluster = "sd-c"
ecs.create_cluster(clusterName=cluster)
ecs.register_task_definition(
family="sd-td",
containerDefinitions=[{"name": "app", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster=cluster, serviceName="sd-svc", taskDefinition="sd-td", desiredCount=3,
)
tasks_before = ecs.list_tasks(cluster=cluster, serviceName="sd-svc")
assert len(tasks_before["taskArns"]) == 3
ecs.update_service(cluster=cluster, service="sd-svc", desiredCount=1)
tasks_after = ecs.list_tasks(cluster=cluster, serviceName="sd-svc")
assert len(tasks_after["taskArns"]) == 1
resp = ecs.describe_services(cluster=cluster, services=["sd-svc"])
assert resp["services"][0]["runningCount"] == 1
def test_ecs_service_td_update_replaces_tasks(ecs):
"""Updating task definition should replace old tasks with new ones."""
cluster = "tdu-c"
ecs.create_cluster(clusterName=cluster)
ecs.register_task_definition(
family="tdu-td",
containerDefinitions=[{"name": "app", "image": "nginx:1.0", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster=cluster, serviceName="tdu-svc", taskDefinition="tdu-td:1", desiredCount=2,
)
old_tasks = ecs.list_tasks(cluster=cluster, serviceName="tdu-svc")
assert len(old_tasks["taskArns"]) == 2
# Register new revision and update service
resp2 = ecs.register_task_definition(
family="tdu-td",
containerDefinitions=[{"name": "app", "image": "nginx:2.0", "cpu": 64, "memory": 128}],
)
new_td_arn = resp2["taskDefinition"]["taskDefinitionArn"]
ecs.update_service(cluster=cluster, service="tdu-svc", taskDefinition="tdu-td:2")
# New tasks should be on the new TD
new_tasks = ecs.list_tasks(cluster=cluster, serviceName="tdu-svc")
assert len(new_tasks["taskArns"]) == 2
# Verify all running tasks use the new task definition
desc = ecs.describe_tasks(cluster=cluster, tasks=new_tasks["taskArns"])
for t in desc["tasks"]:
assert t["taskDefinitionArn"] == new_td_arn, \
f"Task still on old TD: {t['taskDefinitionArn']}"
assert t["lastStatus"] == "RUNNING"
# Old tasks should be stopped
old_desc = ecs.describe_tasks(cluster=cluster, tasks=old_tasks["taskArns"])
for t in old_desc["tasks"]:
assert t["lastStatus"] == "STOPPED"
# Service should reflect correct counts
svc = ecs.describe_services(cluster=cluster, services=["tdu-svc"])
assert svc["services"][0]["runningCount"] == 2
def test_ecs_service_delete_stops_tasks(ecs):
"""Deleting a service should stop all its tasks."""
cluster = "del-c"
ecs.create_cluster(clusterName=cluster)
ecs.register_task_definition(
family="del-td",
containerDefinitions=[{"name": "app", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster=cluster, serviceName="del-svc", taskDefinition="del-td", desiredCount=2,
)
tasks = ecs.list_tasks(cluster=cluster, serviceName="del-svc")
assert len(tasks["taskArns"]) == 2
ecs.delete_service(cluster=cluster, service="del-svc", force=True)
tasks_after = ecs.list_tasks(cluster=cluster, serviceName="del-svc")
assert len(tasks_after["taskArns"]) == 0
# Verify tasks are STOPPED, not deleted
desc = ecs.describe_tasks(cluster=cluster, tasks=tasks["taskArns"])
for t in desc["tasks"]:
assert t["lastStatus"] == "STOPPED"
def test_ecs_service_scale_to_zero(ecs):
"""Scaling to zero should stop all tasks without deleting the service."""
cluster = "z-c"
ecs.create_cluster(clusterName=cluster)
ecs.register_task_definition(
family="z-td",
containerDefinitions=[{"name": "app", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster=cluster, serviceName="z-svc", taskDefinition="z-td", desiredCount=2,
)
ecs.update_service(cluster=cluster, service="z-svc", desiredCount=0)
tasks = ecs.list_tasks(cluster=cluster, serviceName="z-svc")
assert len(tasks["taskArns"]) == 0
resp = ecs.describe_services(cluster=cluster, services=["z-svc"])
svc = resp["services"][0]
assert svc["status"] == "ACTIVE"
assert svc["desiredCount"] == 0
assert svc["runningCount"] == 0
def test_ecs_cluster_task_counts(ecs):
"""Cluster runningTasksCount should reflect service-spawned tasks."""
cluster = "ct-c"
ecs.create_cluster(clusterName=cluster)
ecs.register_task_definition(
family="ct-td",
containerDefinitions=[{"name": "app", "image": "nginx", "cpu": 64, "memory": 128}],
)
ecs.create_service(
cluster=cluster, serviceName="ct-svc", taskDefinition="ct-td", desiredCount=3,
)
resp = ecs.describe_clusters(clusters=[cluster])
cl = resp["clusters"][0]
assert cl["runningTasksCount"] == 3
assert cl["activeServicesCount"] == 1
def test_ecs_cfn_service_visible(ecs, cfn):
"""Services created via CloudFormation should be visible in list-services and list-tasks."""
stack_name = "ecs-cfn-test"
template = json.dumps({
"AWSTemplateFormatVersion": "2010-09-09",
"Resources": {
"Cluster": {
"Type": "AWS::ECS::Cluster",
"Properties": {"ClusterName": "cfn-ecs-c"},
},
"TaskDef": {
"Type": "AWS::ECS::TaskDefinition",
"Properties": {
"Family": "cfn-ecs-td",
"ContainerDefinitions": [
{"Name": "app", "Image": "nginx", "Cpu": 64, "Memory": 128},
],
},
},
"Service": {
"Type": "AWS::ECS::Service",
"DependsOn": ["Cluster", "TaskDef"],
"Properties": {
"Cluster": {"Ref": "Cluster"},
"ServiceName": "cfn-ecs-svc",
"TaskDefinition": {"Ref": "TaskDef"},
"DesiredCount": 1,
"LaunchType": "EC2",
},
},
},
})
cfn.create_stack(StackName=stack_name, TemplateBody=template)
# Verify service is visible
svcs = ecs.list_services(cluster="cfn-ecs-c")
assert any("cfn-ecs-svc" in a for a in svcs["serviceArns"]), \
f"Service not found in list_services: {svcs['serviceArns']}"
# Verify tasks were spawned
tasks = ecs.list_tasks(cluster="cfn-ecs-c")
assert len(tasks["taskArns"]) >= 1, "No tasks spawned for CF-created service"
# Cleanup
cfn.delete_stack(StackName=stack_name)