open-range / tests /test_environment.py
Lars Talian
Fix lint packaging and fail closed runtime paths (#93)
6e4e622 unverified
"""Tests for RangeEnvironment lifecycle — all run without Docker."""
from unittest.mock import patch
import pytest
from open_range.protocols import (
FlagSpec,
GoldenPathStep,
SnapshotSpec,
TaskSpec,
TruthGraph,
Vulnerability,
)
from open_range.server.environment import RangeEnvironment, _extract_command_name
from open_range.server.models import RangeAction, RangeObservation, RangeState
# Minimal snapshot for tests that just need reset() to work
_MINIMAL_SNAPSHOT = SnapshotSpec(
topology={"hosts": ["attacker", "siem"]},
flags=[],
golden_path=[],
task=TaskSpec(red_briefing="Test mode.", blue_briefing="Test mode."),
)
class TestCommandExtraction:
"""Helper: extracting base command name from full command strings."""
def test_simple(self):
assert _extract_command_name("nmap -sV web") == "nmap"
def test_with_path(self):
assert _extract_command_name("/usr/bin/nmap -sV web") == "nmap"
def test_piped(self):
assert _extract_command_name("curl http://web | grep flag") == "curl"
def test_empty(self):
assert _extract_command_name("") == ""
class TestReset:
"""reset() returns a RangeObservation with briefing."""
def test_reset_returns_observation(self):
env = RangeEnvironment(docker_available=False)
obs = env.reset(snapshot=_MINIMAL_SNAPSHOT)
assert isinstance(obs, RangeObservation)
assert "Range ready" in obs.stdout
def test_reset_sets_episode_id(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT, episode_id="ep_42")
assert env.state.episode_id == "ep_42"
def test_reset_clears_step_count(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
env.step(RangeAction(command="nmap -sV web", mode="red"))
assert env.state.step_count == 1
env.reset(snapshot=_MINIMAL_SNAPSHOT)
assert env.state.step_count == 0
def test_reset_with_snapshot(self, sample_snapshot_spec):
env = RangeEnvironment(docker_available=False)
obs = env.reset(snapshot=sample_snapshot_spec)
assert isinstance(obs, RangeObservation)
assert env.snapshot is not None
def test_reset_initializes_services_status_from_topology_hosts(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
# In mock mode service health is unknown, but hosts should be tracked.
assert set(env.state.services_status.keys()) == {"attacker", "siem"}
def test_auto_without_docker_uses_mock_docker_mode(self):
def fake_get_docker(self):
self._docker_available = False
return None
with patch.object(RangeEnvironment, "_get_docker", fake_get_docker):
env = RangeEnvironment(docker_available=None, execution_mode="auto")
env.reset(snapshot=_MINIMAL_SNAPSHOT)
obs = env.step(RangeAction(command="printf test", mode="red"))
assert env._execution_mode == "docker"
assert obs.stderr == ""
assert "[mock] executed on attacker" in obs.stdout
class TestTargetResolution:
"""Target selection should honor manifest-compiled metadata."""
def test_resolve_target_uses_host_catalog_roles(self):
env = RangeEnvironment(docker_available=False)
env.reset(
snapshot=SnapshotSpec(
topology={
"hosts": ["web", "kali1", "socbox"],
"host_catalog": {
"web": {"role": "web", "zone": "dmz"},
"kali1": {"role": "attacker", "zone": "external"},
"socbox": {"role": "siem", "zone": "management"},
},
},
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
)
assert env._resolve_target(RangeAction(command="id", mode="red")) == "kali1"
assert env._resolve_target(RangeAction(command="id", mode="blue")) == "socbox"
def test_resolve_target_uses_zone_mapping_for_string_hosts(self):
env = RangeEnvironment(docker_available=False)
env.reset(
snapshot=SnapshotSpec(
topology={
"hosts": ["web", "kali1", "socbox"],
"zones": {
"dmz": ["web"],
"external": ["kali1"],
"management": ["socbox"],
},
},
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
)
assert env._resolve_target(RangeAction(command="id", mode="red")) == "kali1"
assert env._resolve_target(RangeAction(command="id", mode="blue")) == "socbox"
class TestRedStep:
"""Red agent actions."""
def test_red_step_returns_observation(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
action = RangeAction(command="nmap -sV web", mode="red")
obs = env.step(action)
assert isinstance(obs, RangeObservation)
def test_red_step_increments_counter(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
env.step(RangeAction(command="nmap -sV web", mode="red"))
assert env.state.step_count == 1
env.step(RangeAction(command="curl http://web", mode="red"))
assert env.state.step_count == 2
def test_red_any_command_forwarded(self):
"""No artificial allowlist — commands route to the attacker container."""
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
obs = env.step(RangeAction(command="iptables -L", mode="red"))
# In mock mode, this runs on attacker container (not rejected)
assert obs.stderr == ""
assert "attacker" in obs.stdout # mock output includes container name
def test_red_action_logged(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
env.step(RangeAction(command="nmap -sV web", mode="red"))
assert len(env.red_history) >= 1
class TestBlueStep:
"""Blue agent actions."""
def test_blue_step_returns_observation(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
obs = env.step(RangeAction(command="tail_log /var/log/syslog", mode="blue"))
assert isinstance(obs, RangeObservation)
def test_blue_submit_finding(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
obs = env.step(RangeAction(command="submit_finding SQL injection detected", mode="blue"))
assert "recorded" in obs.stdout.lower() or "submitted" in obs.stdout.lower()
def test_blue_submit_finding_marks_grounded_when_matching_red_activity(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
env.step(RangeAction(command="nmap -sV web", mode="red"))
env.step(RangeAction(command="submit_finding nmap scan detected on web host", mode="blue"))
assert env.blue_history
assert env.blue_history[-1].get("grounded") is True
def test_blue_submit_finding_marks_ungrounded_for_generic_claim(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
env.step(RangeAction(command="nmap -sV web", mode="red"))
env.step(RangeAction(command="submit_finding attack found", mode="blue"))
assert env.blue_history
assert env.blue_history[-1].get("grounded") is False
def test_blue_any_command_forwarded(self):
"""No artificial allowlist — commands route to the siem container."""
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
obs = env.step(RangeAction(command="nmap -sV web", mode="blue"))
# In mock mode, this runs on siem container (not rejected)
assert obs.stderr == ""
assert "siem" in obs.stdout # mock output includes container name
def test_blue_empty_command_rejected(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
obs = env.step(RangeAction(command="", mode="blue"))
assert obs.stderr != ""
def test_blue_alerts_fall_back_to_synthetic_red_history(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
env.step(RangeAction(command="nmap -sV web", mode="red"))
obs = env.step(RangeAction(command="tail -n 50 /var/log/siem/all.log", mode="blue"))
assert obs.alerts
assert any("synthetic" in alert.lower() for alert in obs.alerts)
def test_step_passes_timeout_override_to_executor(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
seen = {}
def fake_exec(container_name, command, timeout_s=None):
seen["container_name"] = container_name
seen["command"] = command
seen["timeout_s"] = timeout_s
return "ok", ""
env._exec_in_container = fake_exec # type: ignore[method-assign]
env.step(RangeAction(command="nmap -sV web", mode="red"), timeout_s=7.5)
assert seen["timeout_s"] == 7.5
class TestFlagSubmission:
"""Flag submission triggers correct rewards."""
def test_correct_flag_accepted(self):
env = RangeEnvironment(docker_available=False)
snapshot = SnapshotSpec(
topology={"hosts": ["attacker", "web"]},
flags=[FlagSpec(id="f1", value="FLAG{correct}", path="/f.txt", host="web")],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
env.reset(snapshot=snapshot)
obs = env.step(RangeAction(command="submit_flag FLAG{correct}", mode="red"))
assert "FLAG{correct}" in env.state.flags_found
assert "Correct" in obs.stdout or "correct" in obs.stdout.lower()
def test_hallucinated_flag_rejected(self):
env = RangeEnvironment(docker_available=False)
snapshot = SnapshotSpec(
topology={"hosts": ["attacker", "web"]},
flags=[FlagSpec(id="f1", value="FLAG{real}", path="/f.txt", host="web")],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
env.reset(snapshot=snapshot)
obs = env.step(RangeAction(command="submit_flag FLAG{fake}", mode="red"))
assert "FLAG{fake}" not in env.state.flags_found
assert "invalid" in obs.stdout.lower() or "Invalid" in obs.stdout
def test_all_flags_ends_episode(self):
env = RangeEnvironment(docker_available=False)
snapshot = SnapshotSpec(
topology={"hosts": ["attacker", "web"]},
flags=[FlagSpec(id="f1", value="FLAG{a}", path="/f.txt", host="web")],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
env.reset(snapshot=snapshot)
obs = env.step(RangeAction(command="submit_flag FLAG{a}", mode="red"))
assert obs.done is True
class TestTermination:
"""Environment terminates at max_steps."""
def test_max_steps_terminates(self):
env = RangeEnvironment(docker_available=False, max_steps=3)
env.reset(snapshot=_MINIMAL_SNAPSHOT)
env.step(RangeAction(command="nmap -sV web", mode="red"))
env.step(RangeAction(command="curl http://web", mode="red"))
obs = env.step(RangeAction(command="curl http://web/login", mode="red"))
assert obs.done is True
class TestStateProperty:
"""state property returns current RangeState."""
def test_state_reflects_episode(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_MINIMAL_SNAPSHOT, episode_id="test_ep")
assert env.state.episode_id == "test_ep"
assert env.state.step_count == 0
env.step(RangeAction(command="nmap -sV web", mode="red"))
assert env.state.step_count == 1
# -------------------------------------------------------------------
# Task engine (#17)
# -------------------------------------------------------------------
def _make_multistep_snapshot():
"""Helper: snapshot with a multi_step task and milestones."""
return SnapshotSpec(
topology={
"hosts": ["attacker", "web", "db"],
"users": [
{"username": "admin", "password": "admin123", "hosts": ["web", "db"]},
],
},
flags=[FlagSpec(id="f1", value="FLAG{ms}", path="/f.txt", host="db")],
golden_path=[],
task=TaskSpec(
red_briefing="Multi-step challenge.",
blue_briefing="Watch.",
task_type="multi_step",
milestones=["port scan complete", "credentials found", "database accessed"],
),
)
class TestTaskEngine:
"""Milestone checking for multi_step tasks (#17)."""
def test_milestone_detected_in_output(self):
env = RangeEnvironment(docker_available=False)
snapshot = _make_multistep_snapshot()
env.reset(snapshot=snapshot)
# Mock mode returns "[mock] executed on attacker: ..." which won't match.
# We need to check that _check_milestone works with the right output.
ms = env._check_milestone("Port scan complete -- found open ports")
assert ms == "port scan complete"
def test_milestone_not_duplicated(self):
env = RangeEnvironment(docker_available=False)
snapshot = _make_multistep_snapshot()
env.reset(snapshot=snapshot)
# Simulate first milestone completion
env._state.milestones_completed.append("port scan complete")
ms = env._check_milestone("Port scan complete again")
assert ms is None # Already completed
def test_milestone_returns_none_for_exploit_task(self):
env = RangeEnvironment(docker_available=False)
snapshot = SnapshotSpec(
topology={"hosts": ["attacker", "web"]},
flags=[],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch.", task_type="exploit"),
)
env.reset(snapshot=snapshot)
ms = env._check_milestone("anything here")
assert ms is None
def test_milestone_returns_none_for_no_match(self):
env = RangeEnvironment(docker_available=False)
snapshot = _make_multistep_snapshot()
env.reset(snapshot=snapshot)
ms = env._check_milestone("nothing relevant here")
assert ms is None
def test_milestones_tracked_in_state(self):
env = RangeEnvironment(docker_available=False)
snapshot = _make_multistep_snapshot()
env.reset(snapshot=snapshot)
assert env.state.milestones_completed == []
# Manually add a milestone (simulating what step() does)
env._state.milestones_completed.append("port scan complete")
assert env.state.milestones_completed == ["port scan complete"]
def test_task_type_field_on_task_spec(self):
ts = TaskSpec(task_type="multi_step", milestones=["a", "b"])
assert ts.task_type == "multi_step"
assert ts.milestones == ["a", "b"]
def test_success_conditions_on_task_spec(self):
ts = TaskSpec(
success_conditions=[
{"type": "flag", "value": "FLAG{x}"},
{"type": "endpoint", "url": "/api/data", "expect": "secret"},
],
)
assert len(ts.success_conditions) == 2
assert ts.success_conditions[0]["type"] == "flag"
# -------------------------------------------------------------------
# Auth scenario (#25)
# -------------------------------------------------------------------
def _make_auth_snapshot():
"""Helper: snapshot with users for auth testing."""
return SnapshotSpec(
topology={
"hosts": ["attacker", "web", "db"],
"users": [
{"username": "admin", "password": "admin123", "hosts": ["web", "db"]},
{"username": "guest", "password": "guest", "hosts": ["web"]},
],
},
flags=[FlagSpec(id="f1", value="FLAG{auth}", path="/f.txt", host="db")],
golden_path=[],
task=TaskSpec(red_briefing="Auth challenge.", blue_briefing="Watch."),
)
class TestAuthScenario:
"""Auth and logout commands update session tracking (#25)."""
def test_auth_success(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
obs = env.step(RangeAction(command="auth web admin admin123", mode="red"))
assert "Authenticated" in obs.stdout
assert env.state.active_sessions["web"] == "admin"
def test_auth_failure(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
obs = env.step(RangeAction(command="auth web admin wrongpass", mode="red"))
assert "failed" in obs.stderr.lower()
assert "web" not in env.state.active_sessions
def test_auth_wrong_host(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
obs = env.step(RangeAction(command="auth db guest guest", mode="red"))
# guest only has access to web, not db
assert "failed" in obs.stderr.lower()
assert "db" not in env.state.active_sessions
def test_auth_attempt_logged(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
env.step(RangeAction(command="auth web admin admin123", mode="red"))
assert len(env.state.auth_attempts) == 1
assert env.state.auth_attempts[0]["success"] is True
def test_auth_failure_logged(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
env.step(RangeAction(command="auth web admin wrong", mode="red"))
assert len(env.state.auth_attempts) == 1
assert env.state.auth_attempts[0]["success"] is False
def test_logout_success(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
env.step(RangeAction(command="auth web admin admin123", mode="red"))
assert "web" in env.state.active_sessions
obs = env.step(RangeAction(command="logout web", mode="red"))
assert "Logged out" in obs.stdout
assert "web" not in env.state.active_sessions
def test_logout_no_session(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
obs = env.step(RangeAction(command="logout web", mode="red"))
assert "No active session" in obs.stderr
def test_auth_missing_args(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
obs = env.step(RangeAction(command="auth web admin", mode="red"))
assert "Usage" in obs.stderr
def test_logout_missing_args(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
obs = env.step(RangeAction(command="logout", mode="red"))
assert "Usage" in obs.stderr
def test_auth_creates_access_grant(self):
env = RangeEnvironment(docker_available=False)
env.reset(snapshot=_make_auth_snapshot())
env.step(RangeAction(command="auth web admin admin123", mode="red"))
assert "web:shell" in env.state.access_grants
# -------------------------------------------------------------------
# Pivot mechanics (#26)
# -------------------------------------------------------------------
class TestPivotMechanics:
"""Access grants and pivot tracking (#26)."""
def test_pivot_detected_from_credential_leak(self):
"""When command output contains credentials matching the truth graph,
access_grants and pivot_history are updated."""
env = RangeEnvironment(docker_available=False)
snapshot = SnapshotSpec(
topology={
"hosts": ["attacker", "web", "db"],
"users": [
{"username": "dbadmin", "password": "s3cret!", "hosts": ["db"]},
],
},
flags=[],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
env.reset(snapshot=snapshot)
# Simulate checking pivot on command output that contains credentials
env._check_pivot(
RangeAction(command="cat /etc/app/config.ini", mode="red"),
"db_user = dbadmin\ndb_pass = s3cret!\nhost = db",
)
assert "db:credential" in env.state.access_grants
assert len(env.state.pivot_history) == 1
assert env.state.pivot_history[0]["to"] == "db"
assert env.state.pivot_history[0]["via"] == "credential_reuse"
def test_no_pivot_without_matching_creds(self):
env = RangeEnvironment(docker_available=False)
snapshot = SnapshotSpec(
topology={
"hosts": ["attacker", "web"],
"users": [
{"username": "admin", "password": "secret", "hosts": ["web"]},
],
},
flags=[],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
env.reset(snapshot=snapshot)
env._check_pivot(
RangeAction(command="ls", mode="red"),
"no credentials here",
)
assert env.state.access_grants == []
assert env.state.pivot_history == []
def test_pivot_not_duplicated(self):
env = RangeEnvironment(docker_available=False)
snapshot = SnapshotSpec(
topology={
"hosts": ["attacker", "web", "db"],
"users": [
{"username": "admin", "password": "pass", "hosts": ["db"]},
],
},
flags=[],
golden_path=[],
task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
)
env.reset(snapshot=snapshot)
action = RangeAction(command="cat config", mode="red")
env._check_pivot(action, "admin pass db")
env._check_pivot(action, "admin pass db")
# Should only appear once
assert env.state.access_grants.count("db:credential") == 1
def test_state_has_access_grants_field(self):
state = RangeState()
assert state.access_grants == []
assert state.pivot_history == []
def test_state_has_auth_fields(self):
state = RangeState()
assert state.active_sessions == {}
assert state.auth_attempts == []
assert state.milestones_completed == []