Spaces:
Runtime error
Runtime error
File size: 10,338 Bytes
e37c7b1 c30abe9 3ea4118 c30abe9 3ea4118 e37c7b1 c30abe9 e37c7b1 c30abe9 ad5992a c30abe9 e37c7b1 3ea4118 c30abe9 ad5992a c30abe9 3ea4118 c30abe9 3ea4118 e37c7b1 3ea4118 c30abe9 3ea4118 c30abe9 3ea4118 ad5992a c30abe9 3ea4118 c30abe9 ad5992a c30abe9 e37c7b1 c30abe9 3ea4118 ad5992a c30abe9 3ea4118 ad5992a c30abe9 3ea4118 c30abe9 e37c7b1 c30abe9 3ea4118 c30abe9 3ea4118 c30abe9 eaa2876 ad5992a eaa2876 e37c7b1 eaa2876 e37c7b1 eaa2876 c30abe9 3ea4118 c30abe9 3ea4118 c30abe9 e37c7b1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 | """Tests for the operator debugging console.
Uses Starlette's TestClient against the OpenEnv app with console router.
No Docker dependency.
The console prefers live WebSocket session state, then published reset/step
state from real traffic, then a local fallback env for dev/test use.
"""
from __future__ import annotations
from unittest.mock import patch
import pytest
from starlette.testclient import TestClient
from open_range.protocols import SnapshotSpec
from open_range.server.app import create_app
from open_range.server.console import clear_episode, clear_history, record_action
from open_range.server.environment import RangeEnvironment
_TEST_SNAPSHOT = SnapshotSpec(
topology={"hosts": ["attacker", "siem"]},
flags=[],
golden_path=[],
task={
"red_briefing": "Console test mode.",
"blue_briefing": "Console test mode.",
},
)
@pytest.fixture()
def client():
"""Create a TestClient with a shared env on app.state for console API."""
app = create_app()
# Store a shared env so console API endpoints can access state
env = RangeEnvironment(docker_available=False)
app.state.env = env
clear_episode()
clear_history()
yield TestClient(app)
clear_episode()
clear_history()
@pytest.fixture()
def env(client: TestClient) -> RangeEnvironment:
"""Return the shared env stored on app.state."""
return client.app.state.env
# ===================================================================
# GET /console -- HTML page
# ===================================================================
class TestConsolePage:
def test_returns_html(self, client: TestClient):
resp = client.get("/console")
assert resp.status_code == 200
assert "text/html" in resp.headers["content-type"]
def test_html_contains_title(self, client: TestClient):
resp = client.get("/console")
assert "OpenRange Operator Console" in resp.text
def test_trailing_slash(self, client: TestClient):
resp = client.get("/console/")
assert resp.status_code == 200
assert "text/html" in resp.headers["content-type"]
# ===================================================================
# GET /console/api/snapshot
# ===================================================================
class TestSnapshotAPI:
def test_returns_json(self, client: TestClient):
resp = client.get("/console/api/snapshot")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, dict)
def test_snapshot_before_reset(self, client: TestClient):
"""Before reset, snapshot should have null id."""
data = client.get("/console/api/snapshot").json()
assert data["id"] is None
def test_snapshot_after_reset(self, client: TestClient, env: RangeEnvironment):
env.reset(snapshot=_TEST_SNAPSHOT, episode_id="snap_test_1")
data = client.get("/console/api/snapshot").json()
assert data["id"] == "snap_test_1"
assert "hosts" in data
assert "zones" in data
assert "vuln_count" in data
assert "tier" in data
def test_snapshot_no_truth_graph_or_flags(self, client: TestClient, env: RangeEnvironment):
"""Snapshot API must NOT leak truth_graph or flag values."""
env.reset(snapshot=_TEST_SNAPSHOT)
data = client.get("/console/api/snapshot").json()
assert "truth_graph" not in data
assert "flags" not in data
assert "golden_path" not in data
def test_http_reset_publishes_snapshot_to_console(self):
with patch(
"open_range.server.environment.RangeEnvironment._select_snapshot",
return_value=_TEST_SNAPSHOT,
), patch(
"open_range.server.environment.RangeEnvironment._ensure_clean_reset_path",
):
app = create_app()
clear_episode()
clear_history()
client = TestClient(app)
resp = client.post("/reset", json={"episode_id": "http_reset_1"})
assert resp.status_code == 200
data = client.get("/console/api/snapshot").json()
assert data["id"] == "http_reset_1"
assert data["state_scope"] == "published_episode"
assert data["hosts"] == ["attacker", "siem"]
clear_episode()
clear_history()
# ===================================================================
# GET /console/api/episode
# ===================================================================
class TestEpisodeAPI:
def test_returns_json(self, client: TestClient):
resp = client.get("/console/api/episode")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, dict)
def test_episode_fields(self, client: TestClient, env: RangeEnvironment):
env.reset(snapshot=_TEST_SNAPSHOT)
data = client.get("/console/api/episode").json()
assert "step_count" in data
assert "flags_found" in data
assert "mode" in data
assert "services_status" in data
def test_episode_step_count_updates(self, client: TestClient, env: RangeEnvironment):
from open_range.server.models import RangeAction
env.reset(snapshot=_TEST_SNAPSHOT)
data = client.get("/console/api/episode").json()
assert data["step_count"] == 0
env.step(RangeAction(command="nmap web", mode="red"))
data = client.get("/console/api/episode").json()
assert data["step_count"] == 1
def test_http_reset_publishes_episode_to_console(self):
with patch(
"open_range.server.environment.RangeEnvironment._select_snapshot",
return_value=_TEST_SNAPSHOT,
), patch(
"open_range.server.environment.RangeEnvironment._ensure_clean_reset_path",
):
app = create_app()
clear_episode()
clear_history()
client = TestClient(app)
client.post("/reset", json={"episode_id": "http_reset_2"})
data = client.get("/console/api/episode").json()
assert data["step_count"] == 0
assert data["mode"] == "red"
assert data["state_scope"] == "published_episode"
clear_episode()
clear_history()
# ===================================================================
# GET /console/api/history
# ===================================================================
class TestHistoryAPI:
def test_returns_list(self, client: TestClient):
resp = client.get("/console/api/history")
assert resp.status_code == 200
data = resp.json()
assert isinstance(data, list)
def test_history_empty_initially(self, client: TestClient):
data = client.get("/console/api/history").json()
assert data == []
def test_history_records_actions(self, client: TestClient):
import time
record_action({"step": 1, "command": "nmap -sV web", "mode": "red", "time": time.time()})
record_action({"step": 2, "command": "tail -f /var/log/syslog", "mode": "blue", "time": time.time()})
data = client.get("/console/api/history").json()
assert len(data) == 2
# Newest first
assert data[0]["mode"] == "blue"
assert data[1]["mode"] == "red"
def test_history_has_timestamps(self, client: TestClient):
import time
record_action({"step": 1, "command": "nmap web", "mode": "red", "time": time.time()})
data = client.get("/console/api/history").json()
assert len(data) == 1
assert "time" in data[0]
assert isinstance(data[0]["time"], float)
def test_history_updates_from_environment_steps(self, client: TestClient, env: RangeEnvironment):
from open_range.server.models import RangeAction
env.reset(snapshot=_TEST_SNAPSHOT)
env.step(RangeAction(command="nmap -sV web", mode="red"))
data = client.get("/console/api/history").json()
assert len(data) == 2
assert data[0]["command"] == "nmap -sV web"
assert data[0]["mode"] == "red"
assert data[1]["command"] == "reset"
assert data[1]["mode"] == "system"
def test_history_records_meta_step_commands(self, client: TestClient, env: RangeEnvironment):
from open_range.server.models import RangeAction
env.reset(snapshot=_TEST_SNAPSHOT)
env.step(RangeAction(command="submit_finding suspicious scan on web", mode="blue"))
data = client.get("/console/api/history").json()
assert data[0]["command"] == "submit_finding suspicious scan on web"
assert data[0]["mode"] == "blue"
def test_history_reset_clears_prior_entries_and_records_reset(self, client: TestClient, env: RangeEnvironment):
import time
record_action({"step": 99, "command": "old", "mode": "red", "time": time.time()})
env.reset(snapshot=_TEST_SNAPSHOT, episode_id="history_reset")
data = client.get("/console/api/history").json()
assert len(data) == 1
assert data[0]["command"] == "reset"
assert data[0]["mode"] == "system"
assert data[0]["episode_id"] == "history_reset"
def test_history_max_20(self, client: TestClient):
"""History API should return at most 20 entries."""
import time
for i in range(25):
record_action({"step": i, "command": f"cmd_{i}", "mode": "red", "time": time.time()})
data = client.get("/console/api/history").json()
assert len(data) == 20
def test_http_reset_records_history(self):
with patch(
"open_range.server.environment.RangeEnvironment._select_snapshot",
return_value=_TEST_SNAPSHOT,
), patch(
"open_range.server.environment.RangeEnvironment._ensure_clean_reset_path",
):
app = create_app()
clear_episode()
clear_history()
client = TestClient(app)
client.post("/reset", json={"episode_id": "http_reset_3"})
data = client.get("/console/api/history").json()
assert len(data) == 1
assert data[0]["command"] == "reset"
assert data[0]["mode"] == "system"
clear_episode()
clear_history()
|