Spaces:
Runtime error
Runtime error
| """Tests for RangeEnvironment lifecycle — all run without Docker.""" | |
| from unittest.mock import patch | |
| import pytest | |
| from open_range.protocols import ( | |
| FlagSpec, | |
| GoldenPathStep, | |
| SnapshotSpec, | |
| TaskSpec, | |
| TruthGraph, | |
| Vulnerability, | |
| ) | |
| from open_range.server.environment import RangeEnvironment, _extract_command_name | |
| from open_range.server.models import RangeAction, RangeObservation, RangeState | |
| # Minimal snapshot for tests that just need reset() to work | |
| _MINIMAL_SNAPSHOT = SnapshotSpec( | |
| topology={"hosts": ["attacker", "siem"]}, | |
| flags=[], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Test mode.", blue_briefing="Test mode."), | |
| ) | |
| class TestCommandExtraction: | |
| """Helper: extracting base command name from full command strings.""" | |
| def test_simple(self): | |
| assert _extract_command_name("nmap -sV web") == "nmap" | |
| def test_with_path(self): | |
| assert _extract_command_name("/usr/bin/nmap -sV web") == "nmap" | |
| def test_piped(self): | |
| assert _extract_command_name("curl http://web | grep flag") == "curl" | |
| def test_empty(self): | |
| assert _extract_command_name("") == "" | |
| class TestReset: | |
| """reset() returns a RangeObservation with briefing.""" | |
| def test_reset_returns_observation(self): | |
| env = RangeEnvironment(docker_available=False) | |
| obs = env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| assert isinstance(obs, RangeObservation) | |
| assert "Range ready" in obs.stdout | |
| def test_reset_sets_episode_id(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT, episode_id="ep_42") | |
| assert env.state.episode_id == "ep_42" | |
| def test_reset_clears_step_count(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| env.step(RangeAction(command="nmap -sV web", mode="red")) | |
| assert env.state.step_count == 1 | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| assert env.state.step_count == 0 | |
| def test_reset_with_snapshot(self, sample_snapshot_spec): | |
| env = RangeEnvironment(docker_available=False) | |
| obs = env.reset(snapshot=sample_snapshot_spec) | |
| assert isinstance(obs, RangeObservation) | |
| assert env.snapshot is not None | |
| def test_reset_initializes_services_status_from_topology_hosts(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| # In mock mode service health is unknown, but hosts should be tracked. | |
| assert set(env.state.services_status.keys()) == {"attacker", "siem"} | |
| def test_auto_without_docker_uses_mock_docker_mode(self): | |
| def fake_get_docker(self): | |
| self._docker_available = False | |
| return None | |
| with patch.object(RangeEnvironment, "_get_docker", fake_get_docker): | |
| env = RangeEnvironment(docker_available=None, execution_mode="auto") | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| obs = env.step(RangeAction(command="printf test", mode="red")) | |
| assert env._execution_mode == "docker" | |
| assert obs.stderr == "" | |
| assert "[mock] executed on attacker" in obs.stdout | |
| class TestTargetResolution: | |
| """Target selection should honor manifest-compiled metadata.""" | |
| def test_resolve_target_uses_host_catalog_roles(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset( | |
| snapshot=SnapshotSpec( | |
| topology={ | |
| "hosts": ["web", "kali1", "socbox"], | |
| "host_catalog": { | |
| "web": {"role": "web", "zone": "dmz"}, | |
| "kali1": {"role": "attacker", "zone": "external"}, | |
| "socbox": {"role": "siem", "zone": "management"}, | |
| }, | |
| }, | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| ) | |
| assert env._resolve_target(RangeAction(command="id", mode="red")) == "kali1" | |
| assert env._resolve_target(RangeAction(command="id", mode="blue")) == "socbox" | |
| def test_resolve_target_uses_zone_mapping_for_string_hosts(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset( | |
| snapshot=SnapshotSpec( | |
| topology={ | |
| "hosts": ["web", "kali1", "socbox"], | |
| "zones": { | |
| "dmz": ["web"], | |
| "external": ["kali1"], | |
| "management": ["socbox"], | |
| }, | |
| }, | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| ) | |
| assert env._resolve_target(RangeAction(command="id", mode="red")) == "kali1" | |
| assert env._resolve_target(RangeAction(command="id", mode="blue")) == "socbox" | |
| class TestRedStep: | |
| """Red agent actions.""" | |
| def test_red_step_returns_observation(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| action = RangeAction(command="nmap -sV web", mode="red") | |
| obs = env.step(action) | |
| assert isinstance(obs, RangeObservation) | |
| def test_red_step_increments_counter(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| env.step(RangeAction(command="nmap -sV web", mode="red")) | |
| assert env.state.step_count == 1 | |
| env.step(RangeAction(command="curl http://web", mode="red")) | |
| assert env.state.step_count == 2 | |
| def test_red_any_command_forwarded(self): | |
| """No artificial allowlist — commands route to the attacker container.""" | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| obs = env.step(RangeAction(command="iptables -L", mode="red")) | |
| # In mock mode, this runs on attacker container (not rejected) | |
| assert obs.stderr == "" | |
| assert "attacker" in obs.stdout # mock output includes container name | |
| def test_red_action_logged(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| env.step(RangeAction(command="nmap -sV web", mode="red")) | |
| assert len(env.red_history) >= 1 | |
| class TestBlueStep: | |
| """Blue agent actions.""" | |
| def test_blue_step_returns_observation(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| obs = env.step(RangeAction(command="tail_log /var/log/syslog", mode="blue")) | |
| assert isinstance(obs, RangeObservation) | |
| def test_blue_submit_finding(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| obs = env.step(RangeAction(command="submit_finding SQL injection detected", mode="blue")) | |
| assert "recorded" in obs.stdout.lower() or "submitted" in obs.stdout.lower() | |
| def test_blue_submit_finding_marks_grounded_when_matching_red_activity(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| env.step(RangeAction(command="nmap -sV web", mode="red")) | |
| env.step(RangeAction(command="submit_finding nmap scan detected on web host", mode="blue")) | |
| assert env.blue_history | |
| assert env.blue_history[-1].get("grounded") is True | |
| def test_blue_submit_finding_marks_ungrounded_for_generic_claim(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| env.step(RangeAction(command="nmap -sV web", mode="red")) | |
| env.step(RangeAction(command="submit_finding attack found", mode="blue")) | |
| assert env.blue_history | |
| assert env.blue_history[-1].get("grounded") is False | |
| def test_blue_any_command_forwarded(self): | |
| """No artificial allowlist — commands route to the siem container.""" | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| obs = env.step(RangeAction(command="nmap -sV web", mode="blue")) | |
| # In mock mode, this runs on siem container (not rejected) | |
| assert obs.stderr == "" | |
| assert "siem" in obs.stdout # mock output includes container name | |
| def test_blue_empty_command_rejected(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| obs = env.step(RangeAction(command="", mode="blue")) | |
| assert obs.stderr != "" | |
| def test_blue_alerts_fall_back_to_synthetic_red_history(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| env.step(RangeAction(command="nmap -sV web", mode="red")) | |
| obs = env.step(RangeAction(command="tail -n 50 /var/log/siem/all.log", mode="blue")) | |
| assert obs.alerts | |
| assert any("synthetic" in alert.lower() for alert in obs.alerts) | |
| def test_step_passes_timeout_override_to_executor(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| seen = {} | |
| def fake_exec(container_name, command, timeout_s=None): | |
| seen["container_name"] = container_name | |
| seen["command"] = command | |
| seen["timeout_s"] = timeout_s | |
| return "ok", "" | |
| env._exec_in_container = fake_exec # type: ignore[method-assign] | |
| env.step(RangeAction(command="nmap -sV web", mode="red"), timeout_s=7.5) | |
| assert seen["timeout_s"] == 7.5 | |
| class TestFlagSubmission: | |
| """Flag submission triggers correct rewards.""" | |
| def test_correct_flag_accepted(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = SnapshotSpec( | |
| topology={"hosts": ["attacker", "web"]}, | |
| flags=[FlagSpec(id="f1", value="FLAG{correct}", path="/f.txt", host="web")], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| env.reset(snapshot=snapshot) | |
| obs = env.step(RangeAction(command="submit_flag FLAG{correct}", mode="red")) | |
| assert "FLAG{correct}" in env.state.flags_found | |
| assert "Correct" in obs.stdout or "correct" in obs.stdout.lower() | |
| def test_hallucinated_flag_rejected(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = SnapshotSpec( | |
| topology={"hosts": ["attacker", "web"]}, | |
| flags=[FlagSpec(id="f1", value="FLAG{real}", path="/f.txt", host="web")], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| env.reset(snapshot=snapshot) | |
| obs = env.step(RangeAction(command="submit_flag FLAG{fake}", mode="red")) | |
| assert "FLAG{fake}" not in env.state.flags_found | |
| assert "invalid" in obs.stdout.lower() or "Invalid" in obs.stdout | |
| def test_all_flags_ends_episode(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = SnapshotSpec( | |
| topology={"hosts": ["attacker", "web"]}, | |
| flags=[FlagSpec(id="f1", value="FLAG{a}", path="/f.txt", host="web")], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| env.reset(snapshot=snapshot) | |
| obs = env.step(RangeAction(command="submit_flag FLAG{a}", mode="red")) | |
| assert obs.done is True | |
| class TestTermination: | |
| """Environment terminates at max_steps.""" | |
| def test_max_steps_terminates(self): | |
| env = RangeEnvironment(docker_available=False, max_steps=3) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT) | |
| env.step(RangeAction(command="nmap -sV web", mode="red")) | |
| env.step(RangeAction(command="curl http://web", mode="red")) | |
| obs = env.step(RangeAction(command="curl http://web/login", mode="red")) | |
| assert obs.done is True | |
| class TestStateProperty: | |
| """state property returns current RangeState.""" | |
| def test_state_reflects_episode(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_MINIMAL_SNAPSHOT, episode_id="test_ep") | |
| assert env.state.episode_id == "test_ep" | |
| assert env.state.step_count == 0 | |
| env.step(RangeAction(command="nmap -sV web", mode="red")) | |
| assert env.state.step_count == 1 | |
| # ------------------------------------------------------------------- | |
| # Task engine (#17) | |
| # ------------------------------------------------------------------- | |
| def _make_multistep_snapshot(): | |
| """Helper: snapshot with a multi_step task and milestones.""" | |
| return SnapshotSpec( | |
| topology={ | |
| "hosts": ["attacker", "web", "db"], | |
| "users": [ | |
| {"username": "admin", "password": "admin123", "hosts": ["web", "db"]}, | |
| ], | |
| }, | |
| flags=[FlagSpec(id="f1", value="FLAG{ms}", path="/f.txt", host="db")], | |
| golden_path=[], | |
| task=TaskSpec( | |
| red_briefing="Multi-step challenge.", | |
| blue_briefing="Watch.", | |
| task_type="multi_step", | |
| milestones=["port scan complete", "credentials found", "database accessed"], | |
| ), | |
| ) | |
| class TestTaskEngine: | |
| """Milestone checking for multi_step tasks (#17).""" | |
| def test_milestone_detected_in_output(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = _make_multistep_snapshot() | |
| env.reset(snapshot=snapshot) | |
| # Mock mode returns "[mock] executed on attacker: ..." which won't match. | |
| # We need to check that _check_milestone works with the right output. | |
| ms = env._check_milestone("Port scan complete -- found open ports") | |
| assert ms == "port scan complete" | |
| def test_milestone_not_duplicated(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = _make_multistep_snapshot() | |
| env.reset(snapshot=snapshot) | |
| # Simulate first milestone completion | |
| env._state.milestones_completed.append("port scan complete") | |
| ms = env._check_milestone("Port scan complete again") | |
| assert ms is None # Already completed | |
| def test_milestone_returns_none_for_exploit_task(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = SnapshotSpec( | |
| topology={"hosts": ["attacker", "web"]}, | |
| flags=[], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch.", task_type="exploit"), | |
| ) | |
| env.reset(snapshot=snapshot) | |
| ms = env._check_milestone("anything here") | |
| assert ms is None | |
| def test_milestone_returns_none_for_no_match(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = _make_multistep_snapshot() | |
| env.reset(snapshot=snapshot) | |
| ms = env._check_milestone("nothing relevant here") | |
| assert ms is None | |
| def test_milestones_tracked_in_state(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = _make_multistep_snapshot() | |
| env.reset(snapshot=snapshot) | |
| assert env.state.milestones_completed == [] | |
| # Manually add a milestone (simulating what step() does) | |
| env._state.milestones_completed.append("port scan complete") | |
| assert env.state.milestones_completed == ["port scan complete"] | |
| def test_task_type_field_on_task_spec(self): | |
| ts = TaskSpec(task_type="multi_step", milestones=["a", "b"]) | |
| assert ts.task_type == "multi_step" | |
| assert ts.milestones == ["a", "b"] | |
| def test_success_conditions_on_task_spec(self): | |
| ts = TaskSpec( | |
| success_conditions=[ | |
| {"type": "flag", "value": "FLAG{x}"}, | |
| {"type": "endpoint", "url": "/api/data", "expect": "secret"}, | |
| ], | |
| ) | |
| assert len(ts.success_conditions) == 2 | |
| assert ts.success_conditions[0]["type"] == "flag" | |
| # ------------------------------------------------------------------- | |
| # Auth scenario (#25) | |
| # ------------------------------------------------------------------- | |
| def _make_auth_snapshot(): | |
| """Helper: snapshot with users for auth testing.""" | |
| return SnapshotSpec( | |
| topology={ | |
| "hosts": ["attacker", "web", "db"], | |
| "users": [ | |
| {"username": "admin", "password": "admin123", "hosts": ["web", "db"]}, | |
| {"username": "guest", "password": "guest", "hosts": ["web"]}, | |
| ], | |
| }, | |
| flags=[FlagSpec(id="f1", value="FLAG{auth}", path="/f.txt", host="db")], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Auth challenge.", blue_briefing="Watch."), | |
| ) | |
| class TestAuthScenario: | |
| """Auth and logout commands update session tracking (#25).""" | |
| def test_auth_success(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| obs = env.step(RangeAction(command="auth web admin admin123", mode="red")) | |
| assert "Authenticated" in obs.stdout | |
| assert env.state.active_sessions["web"] == "admin" | |
| def test_auth_failure(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| obs = env.step(RangeAction(command="auth web admin wrongpass", mode="red")) | |
| assert "failed" in obs.stderr.lower() | |
| assert "web" not in env.state.active_sessions | |
| def test_auth_wrong_host(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| obs = env.step(RangeAction(command="auth db guest guest", mode="red")) | |
| # guest only has access to web, not db | |
| assert "failed" in obs.stderr.lower() | |
| assert "db" not in env.state.active_sessions | |
| def test_auth_attempt_logged(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| env.step(RangeAction(command="auth web admin admin123", mode="red")) | |
| assert len(env.state.auth_attempts) == 1 | |
| assert env.state.auth_attempts[0]["success"] is True | |
| def test_auth_failure_logged(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| env.step(RangeAction(command="auth web admin wrong", mode="red")) | |
| assert len(env.state.auth_attempts) == 1 | |
| assert env.state.auth_attempts[0]["success"] is False | |
| def test_logout_success(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| env.step(RangeAction(command="auth web admin admin123", mode="red")) | |
| assert "web" in env.state.active_sessions | |
| obs = env.step(RangeAction(command="logout web", mode="red")) | |
| assert "Logged out" in obs.stdout | |
| assert "web" not in env.state.active_sessions | |
| def test_logout_no_session(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| obs = env.step(RangeAction(command="logout web", mode="red")) | |
| assert "No active session" in obs.stderr | |
| def test_auth_missing_args(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| obs = env.step(RangeAction(command="auth web admin", mode="red")) | |
| assert "Usage" in obs.stderr | |
| def test_logout_missing_args(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| obs = env.step(RangeAction(command="logout", mode="red")) | |
| assert "Usage" in obs.stderr | |
| def test_auth_creates_access_grant(self): | |
| env = RangeEnvironment(docker_available=False) | |
| env.reset(snapshot=_make_auth_snapshot()) | |
| env.step(RangeAction(command="auth web admin admin123", mode="red")) | |
| assert "web:shell" in env.state.access_grants | |
| # ------------------------------------------------------------------- | |
| # Pivot mechanics (#26) | |
| # ------------------------------------------------------------------- | |
| class TestPivotMechanics: | |
| """Access grants and pivot tracking (#26).""" | |
| def test_pivot_detected_from_credential_leak(self): | |
| """When command output contains credentials matching the truth graph, | |
| access_grants and pivot_history are updated.""" | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = SnapshotSpec( | |
| topology={ | |
| "hosts": ["attacker", "web", "db"], | |
| "users": [ | |
| {"username": "dbadmin", "password": "s3cret!", "hosts": ["db"]}, | |
| ], | |
| }, | |
| flags=[], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| env.reset(snapshot=snapshot) | |
| # Simulate checking pivot on command output that contains credentials | |
| env._check_pivot( | |
| RangeAction(command="cat /etc/app/config.ini", mode="red"), | |
| "db_user = dbadmin\ndb_pass = s3cret!\nhost = db", | |
| ) | |
| assert "db:credential" in env.state.access_grants | |
| assert len(env.state.pivot_history) == 1 | |
| assert env.state.pivot_history[0]["to"] == "db" | |
| assert env.state.pivot_history[0]["via"] == "credential_reuse" | |
| def test_no_pivot_without_matching_creds(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = SnapshotSpec( | |
| topology={ | |
| "hosts": ["attacker", "web"], | |
| "users": [ | |
| {"username": "admin", "password": "secret", "hosts": ["web"]}, | |
| ], | |
| }, | |
| flags=[], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| env.reset(snapshot=snapshot) | |
| env._check_pivot( | |
| RangeAction(command="ls", mode="red"), | |
| "no credentials here", | |
| ) | |
| assert env.state.access_grants == [] | |
| assert env.state.pivot_history == [] | |
| def test_pivot_not_duplicated(self): | |
| env = RangeEnvironment(docker_available=False) | |
| snapshot = SnapshotSpec( | |
| topology={ | |
| "hosts": ["attacker", "web", "db"], | |
| "users": [ | |
| {"username": "admin", "password": "pass", "hosts": ["db"]}, | |
| ], | |
| }, | |
| flags=[], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| env.reset(snapshot=snapshot) | |
| action = RangeAction(command="cat config", mode="red") | |
| env._check_pivot(action, "admin pass db") | |
| env._check_pivot(action, "admin pass db") | |
| # Should only appear once | |
| assert env.state.access_grants.count("db:credential") == 1 | |
| def test_state_has_access_grants_field(self): | |
| state = RangeState() | |
| assert state.access_grants == [] | |
| assert state.pivot_history == [] | |
| def test_state_has_auth_fields(self): | |
| state = RangeState() | |
| assert state.active_sessions == {} | |
| assert state.auth_attempts == [] | |
| assert state.milestones_completed == [] | |