Spaces:
Runtime error
Runtime error
| """Tests for ServiceSpec, ReadinessCheck, and generate_service_specs(). | |
| Covers: | |
| - ServiceSpec / ReadinessCheck serialization round-trips | |
| - generate_service_specs() with compose input (tier-1 and tier-3 services) | |
| - generate_service_specs() with topology fallback (no compose) | |
| - Backward compatibility: SnapshotSpec without services field | |
| - Unknown images produce no specs (graceful skip) | |
| - Environment service lifecycle integration | |
| - Renderer generates services field in snapshot | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import tempfile | |
| from pathlib import Path | |
| from unittest.mock import MagicMock, patch | |
| import pytest | |
| from open_range.builder.service_manifest import ( | |
| _HOST_NAME_HINTS, | |
| _IMAGE_SERVICE_HINTS, | |
| _match_image_hint, | |
| generate_service_specs, | |
| ) | |
| from open_range.protocols import ( | |
| ReadinessCheck, | |
| ServiceSpec, | |
| SnapshotSpec, | |
| TaskSpec, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # ServiceSpec / ReadinessCheck serialization | |
| # --------------------------------------------------------------------------- | |
| class TestReadinessCheck: | |
| """ReadinessCheck model basics and serialization.""" | |
| def test_defaults(self): | |
| rc = ReadinessCheck() | |
| assert rc.type == "tcp" | |
| assert rc.port == 0 | |
| assert rc.url == "" | |
| assert rc.command == "" | |
| assert rc.timeout_s == 30 | |
| assert rc.interval_s == 1.0 | |
| def test_tcp_check(self): | |
| rc = ReadinessCheck(type="tcp", port=80, timeout_s=10) | |
| assert rc.type == "tcp" | |
| assert rc.port == 80 | |
| def test_http_check(self): | |
| rc = ReadinessCheck(type="http", url="http://localhost:8080/health") | |
| assert rc.type == "http" | |
| assert rc.url == "http://localhost:8080/health" | |
| def test_command_check(self): | |
| rc = ReadinessCheck(type="command", command="pgrep -x nginx") | |
| assert rc.type == "command" | |
| assert rc.command == "pgrep -x nginx" | |
| def test_roundtrip_json(self): | |
| rc = ReadinessCheck(type="http", url="http://localhost:9090", timeout_s=15) | |
| data = rc.model_dump() | |
| rc2 = ReadinessCheck(**data) | |
| assert rc2.type == rc.type | |
| assert rc2.url == rc.url | |
| assert rc2.timeout_s == rc.timeout_s | |
| class TestServiceSpec: | |
| """ServiceSpec model basics and serialization.""" | |
| def test_required_fields(self): | |
| svc = ServiceSpec(host="web", daemon="nginx", start_command="nginx &") | |
| assert svc.host == "web" | |
| assert svc.daemon == "nginx" | |
| assert svc.start_command == "nginx &" | |
| def test_defaults(self): | |
| svc = ServiceSpec(host="web", daemon="nginx", start_command="nginx &") | |
| assert svc.packages == [] | |
| assert svc.init_commands == [] | |
| assert svc.env_vars == {} | |
| assert svc.log_dir == "" | |
| assert isinstance(svc.readiness, ReadinessCheck) | |
| def test_full_spec(self): | |
| svc = ServiceSpec( | |
| host="db", | |
| daemon="mysqld", | |
| packages=["default-mysql-server"], | |
| init_commands=["mkdir -p /var/run/mysqld"], | |
| start_command="mysqld --user=mysql &", | |
| readiness=ReadinessCheck( | |
| type="command", | |
| command="mysqladmin ping", | |
| timeout_s=30, | |
| ), | |
| log_dir="/var/log/siem", | |
| env_vars={"MYSQL_ROOT_PASSWORD": "secret"}, | |
| ) | |
| assert svc.daemon == "mysqld" | |
| assert len(svc.init_commands) == 1 | |
| assert svc.readiness.type == "command" | |
| assert svc.env_vars["MYSQL_ROOT_PASSWORD"] == "secret" | |
| def test_roundtrip_json(self): | |
| svc = ServiceSpec( | |
| host="web", | |
| daemon="nginx", | |
| packages=["nginx"], | |
| init_commands=["mkdir -p /var/log/nginx"], | |
| start_command="nginx -g 'daemon off;' &", | |
| readiness=ReadinessCheck(type="tcp", port=80), | |
| log_dir="/var/log/siem", | |
| env_vars={"SERVER_NAME": "web.corp.local"}, | |
| ) | |
| data = json.loads(svc.model_dump_json()) | |
| svc2 = ServiceSpec(**data) | |
| assert svc2.host == svc.host | |
| assert svc2.daemon == svc.daemon | |
| assert svc2.packages == svc.packages | |
| assert svc2.readiness.port == 80 | |
| assert svc2.env_vars == svc.env_vars | |
| # --------------------------------------------------------------------------- | |
| # SnapshotSpec backward compatibility | |
| # --------------------------------------------------------------------------- | |
| class TestSnapshotSpecServices: | |
| """SnapshotSpec.services field: default and serialization.""" | |
| def test_default_empty(self): | |
| spec = SnapshotSpec() | |
| assert spec.services == [] | |
| def test_with_services(self): | |
| spec = SnapshotSpec( | |
| topology={"hosts": ["web"]}, | |
| services=[ | |
| ServiceSpec(host="web", daemon="nginx", start_command="nginx &"), | |
| ], | |
| ) | |
| assert len(spec.services) == 1 | |
| assert spec.services[0].daemon == "nginx" | |
| def test_roundtrip_preserves_services(self): | |
| svc = ServiceSpec( | |
| host="db", | |
| daemon="mysqld", | |
| start_command="mysqld &", | |
| readiness=ReadinessCheck(type="tcp", port=3306), | |
| ) | |
| spec = SnapshotSpec( | |
| topology={"hosts": ["db"]}, | |
| services=[svc], | |
| ) | |
| data = json.loads(spec.model_dump_json()) | |
| spec2 = SnapshotSpec(**data) | |
| assert len(spec2.services) == 1 | |
| assert spec2.services[0].daemon == "mysqld" | |
| assert spec2.services[0].readiness.port == 3306 | |
| def test_old_snapshot_without_services_parses(self): | |
| """Simulate loading a JSON snapshot that predates the services field.""" | |
| old_data = { | |
| "topology": {"hosts": ["web", "db"]}, | |
| "flags": [], | |
| "golden_path": [], | |
| } | |
| spec = SnapshotSpec(**old_data) | |
| assert spec.services == [] | |
| # --------------------------------------------------------------------------- | |
| # generate_service_specs() — compose input | |
| # --------------------------------------------------------------------------- | |
| class TestGenerateFromCompose: | |
| """generate_service_specs() with compose services dict.""" | |
| def test_tier1_basic_compose(self): | |
| """Tier 1 compose with common services maps correctly.""" | |
| compose = { | |
| "services": { | |
| "web": {"image": "nginx:1.25"}, | |
| "db": {"image": "mysql:8.0"}, | |
| "ldap": {"image": "osixia/openldap:1.5"}, | |
| "siem": {"image": "rsyslog:latest"}, | |
| "files": {"image": "samba:latest"}, | |
| "mail": {"image": "postfix:latest"}, | |
| "attacker": {"image": "kali:latest"}, | |
| } | |
| } | |
| topology = {"hosts": ["attacker", "web", "db", "ldap", "siem", "files", "mail"]} | |
| specs = generate_service_specs(compose, topology) | |
| daemon_names = {s.daemon for s in specs} | |
| assert "nginx" in daemon_names | |
| assert "mysqld" in daemon_names | |
| assert "slapd" in daemon_names | |
| assert "rsyslogd" in daemon_names | |
| assert "smbd" in daemon_names | |
| assert "master" in daemon_names # postfix | |
| def test_tier3_compose_with_extra_services(self): | |
| """Tier 3 compose with redis, postgres, jenkins.""" | |
| compose = { | |
| "services": { | |
| "web": {"image": "nginx:1.25"}, | |
| "cache": {"image": "redis:7"}, | |
| "db": {"image": "postgres:16"}, | |
| "ci_cd": {"image": "jenkins/jenkins:lts"}, | |
| "monitoring": {"image": "prometheus:latest"}, | |
| } | |
| } | |
| topology = {"hosts": ["web", "cache", "db", "ci_cd", "monitoring"]} | |
| specs = generate_service_specs(compose, topology) | |
| daemon_names = {s.daemon for s in specs} | |
| assert "nginx" in daemon_names | |
| assert "redis-server" in daemon_names | |
| assert "postgres" in daemon_names | |
| assert "java" in daemon_names # jenkins | |
| assert "prometheus" in daemon_names | |
| def test_unknown_image_skipped(self): | |
| """Custom images with no hint produce no specs.""" | |
| compose = { | |
| "services": { | |
| "custom_app": {"image": "mycompany/custom-app:1.0"}, | |
| "web": {"image": "nginx:1.25"}, | |
| } | |
| } | |
| specs = generate_service_specs(compose, {"hosts": []}) | |
| assert len(specs) == 1 | |
| assert specs[0].daemon == "nginx" | |
| def test_empty_compose(self): | |
| """Empty compose falls through to topology.""" | |
| specs = generate_service_specs({}, {"hosts": ["web", "db"]}) | |
| daemon_names = {s.daemon for s in specs} | |
| assert "nginx" in daemon_names | |
| assert "mysqld" in daemon_names | |
| def test_compose_env_vars_extracted(self): | |
| """Environment variables from compose are passed to ServiceSpec.""" | |
| compose = { | |
| "services": { | |
| "db": { | |
| "image": "mysql:8.0", | |
| "environment": {"MYSQL_ROOT_PASSWORD": "secret"}, | |
| }, | |
| } | |
| } | |
| specs = generate_service_specs(compose, {"hosts": []}) | |
| assert len(specs) == 1 | |
| assert specs[0].env_vars.get("MYSQL_ROOT_PASSWORD") == "secret" | |
| def test_compose_env_vars_list_form(self): | |
| """Environment in list form (KEY=VALUE) is handled.""" | |
| compose = { | |
| "services": { | |
| "db": { | |
| "image": "mysql:8.0", | |
| "environment": ["MYSQL_ROOT_PASSWORD=secret", "MYSQL_DATABASE=app"], | |
| }, | |
| } | |
| } | |
| specs = generate_service_specs(compose, {"hosts": []}) | |
| assert specs[0].env_vars["MYSQL_ROOT_PASSWORD"] == "secret" | |
| assert specs[0].env_vars["MYSQL_DATABASE"] == "app" | |
| def test_repeated_daemons_on_different_hosts_are_preserved(self): | |
| """Two hosts may intentionally run the same daemon family.""" | |
| compose = { | |
| "services": { | |
| "siem": {"image": "rsyslog:latest"}, | |
| "firewall": {"image": "rsyslog:latest"}, | |
| } | |
| } | |
| specs = generate_service_specs(compose, {"hosts": []}) | |
| assert len(specs) == 2 | |
| assert {spec.host for spec in specs} == {"siem", "firewall"} | |
| assert all(spec.daemon == "rsyslogd" for spec in specs) | |
| def test_same_daemon_across_multiple_web_hosts(self): | |
| compose = { | |
| "services": { | |
| "web1": {"image": "nginx:1.25"}, | |
| "web2": {"image": "nginx:1.25"}, | |
| } | |
| } | |
| specs = generate_service_specs(compose, {"hosts": ["web1", "web2"]}) | |
| assert len(specs) == 2 | |
| assert {spec.host for spec in specs} == {"web1", "web2"} | |
| assert all(spec.daemon == "nginx" for spec in specs) | |
| # --------------------------------------------------------------------------- | |
| # generate_service_specs() — topology fallback | |
| # --------------------------------------------------------------------------- | |
| class TestGenerateFromTopology: | |
| """generate_service_specs() falls back to topology when compose is empty.""" | |
| def test_basic_topology_hosts(self): | |
| topology = { | |
| "hosts": ["attacker", "web", "db", "ldap", "siem", "files", "mail"], | |
| } | |
| specs = generate_service_specs({}, topology) | |
| daemon_names = {s.daemon for s in specs} | |
| assert "nginx" in daemon_names | |
| assert "mysqld" in daemon_names | |
| assert "slapd" in daemon_names | |
| assert "rsyslogd" in daemon_names | |
| assert "smbd" in daemon_names | |
| assert "master" in daemon_names | |
| def test_unknown_host_skipped(self): | |
| topology = {"hosts": ["attacker", "custom_box"]} | |
| specs = generate_service_specs({}, topology) | |
| assert len(specs) == 0 | |
| def test_dict_hosts(self): | |
| """Hosts as dicts with 'name' key.""" | |
| topology = { | |
| "hosts": [ | |
| {"name": "web", "zone": "dmz"}, | |
| {"name": "db", "zone": "internal"}, | |
| ], | |
| } | |
| specs = generate_service_specs({}, topology) | |
| daemon_names = {s.daemon for s in specs} | |
| assert "nginx" in daemon_names | |
| assert "mysqld" in daemon_names | |
| def test_empty_topology(self): | |
| specs = generate_service_specs({}, {}) | |
| assert specs == [] | |
| # --------------------------------------------------------------------------- | |
| # _match_image_hint internals | |
| # --------------------------------------------------------------------------- | |
| class TestMatchImageHint: | |
| """_match_image_hint matches Docker image strings to hint entries.""" | |
| def test_exact_match(self): | |
| hint = _match_image_hint("nginx") | |
| assert hint is not None | |
| assert hint[0] == "nginx" | |
| def test_tagged_image(self): | |
| hint = _match_image_hint("mysql:8.0") | |
| assert hint is not None | |
| assert hint[0] == "mysqld" | |
| def test_namespaced_image(self): | |
| hint = _match_image_hint("osixia/openldap:1.5") | |
| assert hint is not None | |
| assert hint[0] == "slapd" | |
| def test_basename_fallback(self): | |
| """bitnami/redis:7 should match via basename 'redis'.""" | |
| hint = _match_image_hint("bitnami/redis:7") | |
| assert hint is not None | |
| assert hint[0] == "redis-server" | |
| def test_unknown_image(self): | |
| hint = _match_image_hint("mycompany/custom-service:v2") | |
| assert hint is None | |
| def test_empty_image(self): | |
| hint = _match_image_hint("") | |
| assert hint is None | |
| # --------------------------------------------------------------------------- | |
| # Environment integration: service lifecycle methods | |
| # --------------------------------------------------------------------------- | |
| class TestEnvironmentServiceLifecycle: | |
| """RangeEnvironment service lifecycle methods.""" | |
| def test_start_snapshot_services_noop_in_docker_mode(self): | |
| """_start_snapshot_services is a no-op when execution_mode != subprocess.""" | |
| from open_range.server.environment import RangeEnvironment | |
| env = RangeEnvironment(docker_available=False) | |
| # execution_mode defaults to "docker" when docker_available=False (mock) | |
| snapshot = SnapshotSpec( | |
| topology={"hosts": ["web"]}, | |
| services=[ServiceSpec(host="web", daemon="nginx", start_command="nginx &")], | |
| ) | |
| # Should not raise or attempt to start anything | |
| env._start_snapshot_services(snapshot) | |
| def test_start_snapshot_services_subprocess_mode(self, mock_run, mock_popen): | |
| """_start_snapshot_services starts declared services in subprocess mode.""" | |
| from open_range.server.environment import RangeEnvironment | |
| # Mock Popen to return an object with a wait() method | |
| mock_proc = MagicMock() | |
| mock_popen.return_value = mock_proc | |
| env = RangeEnvironment(docker_available=False, execution_mode="subprocess") | |
| snapshot = SnapshotSpec( | |
| topology={"hosts": ["web"]}, | |
| services=[ | |
| ServiceSpec( | |
| host="web", | |
| daemon="nginx", | |
| init_commands=["mkdir -p /var/log/nginx"], | |
| start_command="nginx &", | |
| readiness=ReadinessCheck(type="tcp", port=80, timeout_s=0), | |
| ), | |
| ], | |
| ) | |
| env._start_snapshot_services(snapshot) | |
| # Init commands use subprocess.run, daemon start uses Popen | |
| assert mock_run.call_count >= 1 # init command | |
| assert mock_popen.call_count >= 1 # daemon start | |
| def test_start_services_empty_skips(self): | |
| """When no services are declared, logs and skips provisioning.""" | |
| from open_range.server.environment import RangeEnvironment | |
| env = RangeEnvironment(docker_available=False, execution_mode="subprocess") | |
| snapshot = SnapshotSpec( | |
| topology={"hosts": ["web", "db"]}, | |
| services=[], # empty | |
| ) | |
| # Should not raise — just logs and returns | |
| env._start_snapshot_services(snapshot) | |
| def test_stop_services_uses_snapshot_daemons(self, mock_run): | |
| """_stop_services uses daemon names from snapshot.services.""" | |
| from open_range.server.environment import RangeEnvironment | |
| env = RangeEnvironment(docker_available=False, execution_mode="subprocess") | |
| env._snapshot = SnapshotSpec( | |
| topology={"hosts": ["web"]}, | |
| services=[ | |
| ServiceSpec(host="web", daemon="nginx", start_command="nginx &"), | |
| ServiceSpec(host="db", daemon="mysqld", start_command="mysqld &"), | |
| ], | |
| ) | |
| env._stop_services() | |
| # Should have called pkill for each daemon (either individually or via bash -c) | |
| all_call_strs = [] | |
| for call in mock_run.call_args_list: | |
| args = call[0][0] if call[0] else call.kwargs.get("args", []) | |
| all_call_strs.append(" ".join(str(a) for a in args)) | |
| combined = " ".join(all_call_strs) | |
| assert "nginx" in combined | |
| assert "mysqld" in combined | |
| def test_stop_services_no_services_skips_pkill(self): | |
| """_stop_services skips pkill when snapshot has no services.""" | |
| from open_range.server.environment import RangeEnvironment | |
| env = RangeEnvironment(docker_available=False, execution_mode="subprocess") | |
| env._snapshot = SnapshotSpec(topology={"hosts": ["web"]}) | |
| # Should not raise — just skips pkill since no service specs | |
| env._stop_services() | |
| def test_stop_services_no_snapshot(self): | |
| """_stop_services handles None snapshot gracefully.""" | |
| from open_range.server.environment import RangeEnvironment | |
| env = RangeEnvironment(docker_available=False, execution_mode="subprocess") | |
| env._snapshot = None | |
| # Should not raise | |
| env._stop_services() | |
| def test_probe_readiness_tcp_unreachable(self): | |
| """TCP probe returns False for unreachable port.""" | |
| from open_range.server.environment import RangeEnvironment | |
| check = ReadinessCheck(type="tcp", port=19999) | |
| assert RangeEnvironment._probe_readiness(check) is False | |
| def test_probe_readiness_command_success(self): | |
| """Command probe returns True for 'true' command.""" | |
| from open_range.server.environment import RangeEnvironment | |
| check = ReadinessCheck(type="command", command="true") | |
| assert RangeEnvironment._probe_readiness(check) is True | |
| def test_probe_readiness_command_failure(self): | |
| """Command probe returns False for 'false' command.""" | |
| from open_range.server.environment import RangeEnvironment | |
| check = ReadinessCheck(type="command", command="false") | |
| assert RangeEnvironment._probe_readiness(check) is False | |
| def test_reset_calls_service_lifecycle(self): | |
| """reset() calls _stop_services and _start_snapshot_services.""" | |
| from open_range.server.environment import RangeEnvironment | |
| env = RangeEnvironment(docker_available=False) | |
| stop_called = [] | |
| start_called = [] | |
| env._stop_services = lambda: stop_called.append(True) # type: ignore | |
| env._start_snapshot_services = lambda s: start_called.append(s) # type: ignore | |
| snapshot = SnapshotSpec( | |
| topology={"hosts": ["attacker", "web"]}, | |
| task=TaskSpec(red_briefing="Test.", blue_briefing="Test."), | |
| ) | |
| env.reset(snapshot=snapshot) | |
| assert len(stop_called) == 1 | |
| assert len(start_called) == 1 | |
| # --------------------------------------------------------------------------- | |
| # Renderer generates services in snapshot | |
| # --------------------------------------------------------------------------- | |
| class TestRendererServiceGeneration: | |
| """SnapshotRenderer._build_service_specs() populates spec.services.""" | |
| def test_renderer_populates_services_from_topology(self): | |
| from open_range.builder.renderer import SnapshotRenderer | |
| renderer = SnapshotRenderer() | |
| spec = SnapshotSpec( | |
| topology={ | |
| "hosts": ["web", "db", "ldap"], | |
| "zones": {"dmz": ["web"], "internal": ["db", "ldap"]}, | |
| "users": [], | |
| "firewall_rules": [], | |
| }, | |
| ) | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| renderer.render(spec, Path(tmpdir) / "out") | |
| # After rendering, services should be populated | |
| assert len(spec.services) >= 2 | |
| daemon_names = {s.daemon for s in spec.services} | |
| assert "nginx" in daemon_names | |
| assert "mysqld" in daemon_names | |
| def test_renderer_skips_if_services_already_present(self): | |
| from open_range.builder.renderer import SnapshotRenderer | |
| renderer = SnapshotRenderer() | |
| existing_svc = ServiceSpec(host="web", daemon="nginx", start_command="nginx &") | |
| spec = SnapshotSpec( | |
| topology={ | |
| "hosts": ["web", "db"], | |
| "zones": {"dmz": ["web"], "internal": ["db"]}, | |
| "users": [], | |
| "firewall_rules": [], | |
| }, | |
| services=[existing_svc], | |
| ) | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| renderer.render(spec, Path(tmpdir) / "out") | |
| # Should not have overwritten — still just the one we provided | |
| assert len(spec.services) == 1 | |
| assert spec.services[0].daemon == "nginx" | |
| # --------------------------------------------------------------------------- | |
| # Hint table coverage | |
| # --------------------------------------------------------------------------- | |
| class TestHintTableCoverage: | |
| """All image hints produce valid ServiceSpec entries.""" | |
| def test_hint_produces_valid_spec(self, image_key): | |
| """Each entry in the hint table produces a valid ServiceSpec.""" | |
| compose = {"services": {"svc": {"image": image_key}}} | |
| specs = generate_service_specs(compose, {"hosts": []}) | |
| assert len(specs) == 1 | |
| svc = specs[0] | |
| assert svc.daemon | |
| assert svc.start_command | |
| assert isinstance(svc.readiness, ReadinessCheck) | |
| def test_host_hint_produces_valid_spec(self, host_name): | |
| """Each entry in the host-name hint table produces a valid ServiceSpec.""" | |
| specs = generate_service_specs({}, {"hosts": [host_name]}) | |
| assert len(specs) >= 1 | |
| svc = specs[0] | |
| assert svc.daemon | |
| assert svc.start_command | |