open-range / tests /test_service_spec.py
Aaron Brown
fix(services): use Popen for daemon starts, remove fallbacks, dedup rsyslogd
ad5992a
"""Tests for ServiceSpec, ReadinessCheck, and generate_service_specs().
Covers:
- ServiceSpec / ReadinessCheck serialization round-trips
- generate_service_specs() with compose input (tier-1 and tier-3 services)
- generate_service_specs() with topology fallback (no compose)
- Backward compatibility: SnapshotSpec without services field
- Unknown images produce no specs (graceful skip)
- Environment service lifecycle integration
- Renderer generates services field in snapshot
"""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
from unittest.mock import MagicMock, patch
import pytest
from open_range.builder.service_manifest import (
_HOST_NAME_HINTS,
_IMAGE_SERVICE_HINTS,
_match_image_hint,
generate_service_specs,
)
from open_range.protocols import (
ReadinessCheck,
ServiceSpec,
SnapshotSpec,
TaskSpec,
)
# ---------------------------------------------------------------------------
# ServiceSpec / ReadinessCheck serialization
# ---------------------------------------------------------------------------
class TestReadinessCheck:
"""ReadinessCheck model basics and serialization."""
def test_defaults(self):
rc = ReadinessCheck()
assert rc.type == "tcp"
assert rc.port == 0
assert rc.url == ""
assert rc.command == ""
assert rc.timeout_s == 30
assert rc.interval_s == 1.0
def test_tcp_check(self):
rc = ReadinessCheck(type="tcp", port=80, timeout_s=10)
assert rc.type == "tcp"
assert rc.port == 80
def test_http_check(self):
rc = ReadinessCheck(type="http", url="http://localhost:8080/health")
assert rc.type == "http"
assert rc.url == "http://localhost:8080/health"
def test_command_check(self):
rc = ReadinessCheck(type="command", command="pgrep -x nginx")
assert rc.type == "command"
assert rc.command == "pgrep -x nginx"
def test_roundtrip_json(self):
rc = ReadinessCheck(type="http", url="http://localhost:9090", timeout_s=15)
data = rc.model_dump()
rc2 = ReadinessCheck(**data)
assert rc2.type == rc.type
assert rc2.url == rc.url
assert rc2.timeout_s == rc.timeout_s
class TestServiceSpec:
"""ServiceSpec model basics and serialization."""
def test_required_fields(self):
svc = ServiceSpec(host="web", daemon="nginx", start_command="nginx &")
assert svc.host == "web"
assert svc.daemon == "nginx"
assert svc.start_command == "nginx &"
def test_defaults(self):
svc = ServiceSpec(host="web", daemon="nginx", start_command="nginx &")
assert svc.packages == []
assert svc.init_commands == []
assert svc.env_vars == {}
assert svc.log_dir == ""
assert isinstance(svc.readiness, ReadinessCheck)
def test_full_spec(self):
svc = ServiceSpec(
host="db",
daemon="mysqld",
packages=["default-mysql-server"],
init_commands=["mkdir -p /var/run/mysqld"],
start_command="mysqld --user=mysql &",
readiness=ReadinessCheck(
type="command",
command="mysqladmin ping",
timeout_s=30,
),
log_dir="/var/log/siem",
env_vars={"MYSQL_ROOT_PASSWORD": "secret"},
)
assert svc.daemon == "mysqld"
assert len(svc.init_commands) == 1
assert svc.readiness.type == "command"
assert svc.env_vars["MYSQL_ROOT_PASSWORD"] == "secret"
def test_roundtrip_json(self):
svc = ServiceSpec(
host="web",
daemon="nginx",
packages=["nginx"],
init_commands=["mkdir -p /var/log/nginx"],
start_command="nginx -g 'daemon off;' &",
readiness=ReadinessCheck(type="tcp", port=80),
log_dir="/var/log/siem",
env_vars={"SERVER_NAME": "web.corp.local"},
)
data = json.loads(svc.model_dump_json())
svc2 = ServiceSpec(**data)
assert svc2.host == svc.host
assert svc2.daemon == svc.daemon
assert svc2.packages == svc.packages
assert svc2.readiness.port == 80
assert svc2.env_vars == svc.env_vars
# ---------------------------------------------------------------------------
# SnapshotSpec backward compatibility
# ---------------------------------------------------------------------------
class TestSnapshotSpecServices:
"""SnapshotSpec.services field: default and serialization."""
def test_default_empty(self):
spec = SnapshotSpec()
assert spec.services == []
def test_with_services(self):
spec = SnapshotSpec(
topology={"hosts": ["web"]},
services=[
ServiceSpec(host="web", daemon="nginx", start_command="nginx &"),
],
)
assert len(spec.services) == 1
assert spec.services[0].daemon == "nginx"
def test_roundtrip_preserves_services(self):
svc = ServiceSpec(
host="db",
daemon="mysqld",
start_command="mysqld &",
readiness=ReadinessCheck(type="tcp", port=3306),
)
spec = SnapshotSpec(
topology={"hosts": ["db"]},
services=[svc],
)
data = json.loads(spec.model_dump_json())
spec2 = SnapshotSpec(**data)
assert len(spec2.services) == 1
assert spec2.services[0].daemon == "mysqld"
assert spec2.services[0].readiness.port == 3306
def test_old_snapshot_without_services_parses(self):
"""Simulate loading a JSON snapshot that predates the services field."""
old_data = {
"topology": {"hosts": ["web", "db"]},
"flags": [],
"golden_path": [],
}
spec = SnapshotSpec(**old_data)
assert spec.services == []
# ---------------------------------------------------------------------------
# generate_service_specs() — compose input
# ---------------------------------------------------------------------------
class TestGenerateFromCompose:
"""generate_service_specs() with compose services dict."""
def test_tier1_basic_compose(self):
"""Tier 1 compose with common services maps correctly."""
compose = {
"services": {
"web": {"image": "nginx:1.25"},
"db": {"image": "mysql:8.0"},
"ldap": {"image": "osixia/openldap:1.5"},
"siem": {"image": "rsyslog:latest"},
"files": {"image": "samba:latest"},
"mail": {"image": "postfix:latest"},
"attacker": {"image": "kali:latest"},
}
}
topology = {"hosts": ["attacker", "web", "db", "ldap", "siem", "files", "mail"]}
specs = generate_service_specs(compose, topology)
daemon_names = {s.daemon for s in specs}
assert "nginx" in daemon_names
assert "mysqld" in daemon_names
assert "slapd" in daemon_names
assert "rsyslogd" in daemon_names
assert "smbd" in daemon_names
assert "master" in daemon_names # postfix
def test_tier3_compose_with_extra_services(self):
"""Tier 3 compose with redis, postgres, jenkins."""
compose = {
"services": {
"web": {"image": "nginx:1.25"},
"cache": {"image": "redis:7"},
"db": {"image": "postgres:16"},
"ci_cd": {"image": "jenkins/jenkins:lts"},
"monitoring": {"image": "prometheus:latest"},
}
}
topology = {"hosts": ["web", "cache", "db", "ci_cd", "monitoring"]}
specs = generate_service_specs(compose, topology)
daemon_names = {s.daemon for s in specs}
assert "nginx" in daemon_names
assert "redis-server" in daemon_names
assert "postgres" in daemon_names
assert "java" in daemon_names # jenkins
assert "prometheus" in daemon_names
def test_unknown_image_skipped(self):
"""Custom images with no hint produce no specs."""
compose = {
"services": {
"custom_app": {"image": "mycompany/custom-app:1.0"},
"web": {"image": "nginx:1.25"},
}
}
specs = generate_service_specs(compose, {"hosts": []})
assert len(specs) == 1
assert specs[0].daemon == "nginx"
def test_empty_compose(self):
"""Empty compose falls through to topology."""
specs = generate_service_specs({}, {"hosts": ["web", "db"]})
daemon_names = {s.daemon for s in specs}
assert "nginx" in daemon_names
assert "mysqld" in daemon_names
def test_compose_env_vars_extracted(self):
"""Environment variables from compose are passed to ServiceSpec."""
compose = {
"services": {
"db": {
"image": "mysql:8.0",
"environment": {"MYSQL_ROOT_PASSWORD": "secret"},
},
}
}
specs = generate_service_specs(compose, {"hosts": []})
assert len(specs) == 1
assert specs[0].env_vars.get("MYSQL_ROOT_PASSWORD") == "secret"
def test_compose_env_vars_list_form(self):
"""Environment in list form (KEY=VALUE) is handled."""
compose = {
"services": {
"db": {
"image": "mysql:8.0",
"environment": ["MYSQL_ROOT_PASSWORD=secret", "MYSQL_DATABASE=app"],
},
}
}
specs = generate_service_specs(compose, {"hosts": []})
assert specs[0].env_vars["MYSQL_ROOT_PASSWORD"] == "secret"
assert specs[0].env_vars["MYSQL_DATABASE"] == "app"
def test_repeated_daemons_on_different_hosts_are_preserved(self):
"""Two hosts may intentionally run the same daemon family."""
compose = {
"services": {
"siem": {"image": "rsyslog:latest"},
"firewall": {"image": "rsyslog:latest"},
}
}
specs = generate_service_specs(compose, {"hosts": []})
assert len(specs) == 2
assert {spec.host for spec in specs} == {"siem", "firewall"}
assert all(spec.daemon == "rsyslogd" for spec in specs)
def test_same_daemon_across_multiple_web_hosts(self):
compose = {
"services": {
"web1": {"image": "nginx:1.25"},
"web2": {"image": "nginx:1.25"},
}
}
specs = generate_service_specs(compose, {"hosts": ["web1", "web2"]})
assert len(specs) == 2
assert {spec.host for spec in specs} == {"web1", "web2"}
assert all(spec.daemon == "nginx" for spec in specs)
# ---------------------------------------------------------------------------
# generate_service_specs() — topology fallback
# ---------------------------------------------------------------------------
class TestGenerateFromTopology:
"""generate_service_specs() falls back to topology when compose is empty."""
def test_basic_topology_hosts(self):
topology = {
"hosts": ["attacker", "web", "db", "ldap", "siem", "files", "mail"],
}
specs = generate_service_specs({}, topology)
daemon_names = {s.daemon for s in specs}
assert "nginx" in daemon_names
assert "mysqld" in daemon_names
assert "slapd" in daemon_names
assert "rsyslogd" in daemon_names
assert "smbd" in daemon_names
assert "master" in daemon_names
def test_unknown_host_skipped(self):
topology = {"hosts": ["attacker", "custom_box"]}
specs = generate_service_specs({}, topology)
assert len(specs) == 0
def test_dict_hosts(self):
"""Hosts as dicts with 'name' key."""
topology = {
"hosts": [
{"name": "web", "zone": "dmz"},
{"name": "db", "zone": "internal"},
],
}
specs = generate_service_specs({}, topology)
daemon_names = {s.daemon for s in specs}
assert "nginx" in daemon_names
assert "mysqld" in daemon_names
def test_empty_topology(self):
specs = generate_service_specs({}, {})
assert specs == []
# ---------------------------------------------------------------------------
# _match_image_hint internals
# ---------------------------------------------------------------------------
class TestMatchImageHint:
"""_match_image_hint matches Docker image strings to hint entries."""
def test_exact_match(self):
hint = _match_image_hint("nginx")
assert hint is not None
assert hint[0] == "nginx"
def test_tagged_image(self):
hint = _match_image_hint("mysql:8.0")
assert hint is not None
assert hint[0] == "mysqld"
def test_namespaced_image(self):
hint = _match_image_hint("osixia/openldap:1.5")
assert hint is not None
assert hint[0] == "slapd"
def test_basename_fallback(self):
"""bitnami/redis:7 should match via basename 'redis'."""
hint = _match_image_hint("bitnami/redis:7")
assert hint is not None
assert hint[0] == "redis-server"
def test_unknown_image(self):
hint = _match_image_hint("mycompany/custom-service:v2")
assert hint is None
def test_empty_image(self):
hint = _match_image_hint("")
assert hint is None
# ---------------------------------------------------------------------------
# Environment integration: service lifecycle methods
# ---------------------------------------------------------------------------
class TestEnvironmentServiceLifecycle:
"""RangeEnvironment service lifecycle methods."""
def test_start_snapshot_services_noop_in_docker_mode(self):
"""_start_snapshot_services is a no-op when execution_mode != subprocess."""
from open_range.server.environment import RangeEnvironment
env = RangeEnvironment(docker_available=False)
# execution_mode defaults to "docker" when docker_available=False (mock)
snapshot = SnapshotSpec(
topology={"hosts": ["web"]},
services=[ServiceSpec(host="web", daemon="nginx", start_command="nginx &")],
)
# Should not raise or attempt to start anything
env._start_snapshot_services(snapshot)
@patch("subprocess.Popen")
@patch("subprocess.run")
def test_start_snapshot_services_subprocess_mode(self, mock_run, mock_popen):
"""_start_snapshot_services starts declared services in subprocess mode."""
from open_range.server.environment import RangeEnvironment
# Mock Popen to return an object with a wait() method
mock_proc = MagicMock()
mock_popen.return_value = mock_proc
env = RangeEnvironment(docker_available=False, execution_mode="subprocess")
snapshot = SnapshotSpec(
topology={"hosts": ["web"]},
services=[
ServiceSpec(
host="web",
daemon="nginx",
init_commands=["mkdir -p /var/log/nginx"],
start_command="nginx &",
readiness=ReadinessCheck(type="tcp", port=80, timeout_s=0),
),
],
)
env._start_snapshot_services(snapshot)
# Init commands use subprocess.run, daemon start uses Popen
assert mock_run.call_count >= 1 # init command
assert mock_popen.call_count >= 1 # daemon start
def test_start_services_empty_skips(self):
"""When no services are declared, logs and skips provisioning."""
from open_range.server.environment import RangeEnvironment
env = RangeEnvironment(docker_available=False, execution_mode="subprocess")
snapshot = SnapshotSpec(
topology={"hosts": ["web", "db"]},
services=[], # empty
)
# Should not raise — just logs and returns
env._start_snapshot_services(snapshot)
@patch("subprocess.run")
def test_stop_services_uses_snapshot_daemons(self, mock_run):
"""_stop_services uses daemon names from snapshot.services."""
from open_range.server.environment import RangeEnvironment
env = RangeEnvironment(docker_available=False, execution_mode="subprocess")
env._snapshot = SnapshotSpec(
topology={"hosts": ["web"]},
services=[
ServiceSpec(host="web", daemon="nginx", start_command="nginx &"),
ServiceSpec(host="db", daemon="mysqld", start_command="mysqld &"),
],
)
env._stop_services()
# Should have called pkill for each daemon (either individually or via bash -c)
all_call_strs = []
for call in mock_run.call_args_list:
args = call[0][0] if call[0] else call.kwargs.get("args", [])
all_call_strs.append(" ".join(str(a) for a in args))
combined = " ".join(all_call_strs)
assert "nginx" in combined
assert "mysqld" in combined
def test_stop_services_no_services_skips_pkill(self):
"""_stop_services skips pkill when snapshot has no services."""
from open_range.server.environment import RangeEnvironment
env = RangeEnvironment(docker_available=False, execution_mode="subprocess")
env._snapshot = SnapshotSpec(topology={"hosts": ["web"]})
# Should not raise — just skips pkill since no service specs
env._stop_services()
def test_stop_services_no_snapshot(self):
"""_stop_services handles None snapshot gracefully."""
from open_range.server.environment import RangeEnvironment
env = RangeEnvironment(docker_available=False, execution_mode="subprocess")
env._snapshot = None
# Should not raise
env._stop_services()
def test_probe_readiness_tcp_unreachable(self):
"""TCP probe returns False for unreachable port."""
from open_range.server.environment import RangeEnvironment
check = ReadinessCheck(type="tcp", port=19999)
assert RangeEnvironment._probe_readiness(check) is False
def test_probe_readiness_command_success(self):
"""Command probe returns True for 'true' command."""
from open_range.server.environment import RangeEnvironment
check = ReadinessCheck(type="command", command="true")
assert RangeEnvironment._probe_readiness(check) is True
def test_probe_readiness_command_failure(self):
"""Command probe returns False for 'false' command."""
from open_range.server.environment import RangeEnvironment
check = ReadinessCheck(type="command", command="false")
assert RangeEnvironment._probe_readiness(check) is False
def test_reset_calls_service_lifecycle(self):
"""reset() calls _stop_services and _start_snapshot_services."""
from open_range.server.environment import RangeEnvironment
env = RangeEnvironment(docker_available=False)
stop_called = []
start_called = []
env._stop_services = lambda: stop_called.append(True) # type: ignore
env._start_snapshot_services = lambda s: start_called.append(s) # type: ignore
snapshot = SnapshotSpec(
topology={"hosts": ["attacker", "web"]},
task=TaskSpec(red_briefing="Test.", blue_briefing="Test."),
)
env.reset(snapshot=snapshot)
assert len(stop_called) == 1
assert len(start_called) == 1
# ---------------------------------------------------------------------------
# Renderer generates services in snapshot
# ---------------------------------------------------------------------------
class TestRendererServiceGeneration:
"""SnapshotRenderer._build_service_specs() populates spec.services."""
def test_renderer_populates_services_from_topology(self):
from open_range.builder.renderer import SnapshotRenderer
renderer = SnapshotRenderer()
spec = SnapshotSpec(
topology={
"hosts": ["web", "db", "ldap"],
"zones": {"dmz": ["web"], "internal": ["db", "ldap"]},
"users": [],
"firewall_rules": [],
},
)
with tempfile.TemporaryDirectory() as tmpdir:
renderer.render(spec, Path(tmpdir) / "out")
# After rendering, services should be populated
assert len(spec.services) >= 2
daemon_names = {s.daemon for s in spec.services}
assert "nginx" in daemon_names
assert "mysqld" in daemon_names
def test_renderer_skips_if_services_already_present(self):
from open_range.builder.renderer import SnapshotRenderer
renderer = SnapshotRenderer()
existing_svc = ServiceSpec(host="web", daemon="nginx", start_command="nginx &")
spec = SnapshotSpec(
topology={
"hosts": ["web", "db"],
"zones": {"dmz": ["web"], "internal": ["db"]},
"users": [],
"firewall_rules": [],
},
services=[existing_svc],
)
with tempfile.TemporaryDirectory() as tmpdir:
renderer.render(spec, Path(tmpdir) / "out")
# Should not have overwritten — still just the one we provided
assert len(spec.services) == 1
assert spec.services[0].daemon == "nginx"
# ---------------------------------------------------------------------------
# Hint table coverage
# ---------------------------------------------------------------------------
class TestHintTableCoverage:
"""All image hints produce valid ServiceSpec entries."""
@pytest.mark.parametrize("image_key", list(_IMAGE_SERVICE_HINTS.keys()))
def test_hint_produces_valid_spec(self, image_key):
"""Each entry in the hint table produces a valid ServiceSpec."""
compose = {"services": {"svc": {"image": image_key}}}
specs = generate_service_specs(compose, {"hosts": []})
assert len(specs) == 1
svc = specs[0]
assert svc.daemon
assert svc.start_command
assert isinstance(svc.readiness, ReadinessCheck)
@pytest.mark.parametrize("host_name", list(_HOST_NAME_HINTS.keys()))
def test_host_hint_produces_valid_spec(self, host_name):
"""Each entry in the host-name hint table produces a valid ServiceSpec."""
specs = generate_service_specs({}, {"hosts": [host_name]})
assert len(specs) >= 1
svc = specs[0]
assert svc.daemon
assert svc.start_command