Spaces:
Runtime error
Runtime error
| """Shared fixtures for OpenRange test suite.""" | |
| from pathlib import Path | |
| import pytest | |
| ROOT = Path(__file__).parent.parent | |
| def manifests_dir(): | |
| return ROOT / "manifests" | |
| def tier1_manifest(manifests_dir): | |
| """Load tier1_basic.yaml as dict.""" | |
| import yaml | |
| with open(manifests_dir / "tier1_basic.yaml") as f: | |
| return yaml.safe_load(f) | |
| def tier2_manifest(manifests_dir): | |
| """Load tier2_corporate.yaml as dict.""" | |
| import yaml | |
| with open(manifests_dir / "tier2_corporate.yaml") as f: | |
| return yaml.safe_load(f) | |
| def tier3_manifest(manifests_dir): | |
| """Load tier3_enterprise.yaml as dict.""" | |
| import yaml | |
| with open(manifests_dir / "tier3_enterprise.yaml") as f: | |
| return yaml.safe_load(f) | |
| def sample_snapshot_spec(): | |
| """A valid SnapshotSpec for testing.""" | |
| from open_range.protocols import ( | |
| EvidenceItem, | |
| FlagSpec, | |
| GoldenPathStep, | |
| NPCTrafficSpec, | |
| SnapshotSpec, | |
| TaskSpec, | |
| TruthGraph, | |
| Vulnerability, | |
| ) | |
| return SnapshotSpec( | |
| topology={ | |
| "hosts": [ | |
| "attacker", "firewall", "web", "mail", | |
| "db", "files", "ldap", "siem", | |
| ], | |
| "zones": { | |
| "external": ["attacker"], | |
| "dmz": ["web", "mail"], | |
| "internal": ["db", "files"], | |
| "management": ["ldap", "siem"], | |
| }, | |
| "users": [ | |
| { | |
| "username": "admin", | |
| "password": "admin123", | |
| "groups": ["admins"], | |
| "hosts": ["web", "db"], | |
| } | |
| ], | |
| "tier": 1, | |
| }, | |
| truth_graph=TruthGraph( | |
| vulns=[ | |
| Vulnerability( | |
| id="vuln1", | |
| type="sqli", | |
| host="web", | |
| service="php", | |
| injection_point="/search?q=", | |
| vulnerable_code=( | |
| "$conn->query(\"SELECT * FROM products " | |
| "WHERE name LIKE '%$_GET[q]%'\");" | |
| ), | |
| root_cause="Unsanitized input in SQL query", | |
| blast_radius="data exposure", | |
| remediation="Use parameterized queries", | |
| ) | |
| ], | |
| exploit_chain=[], | |
| ), | |
| flags=[ | |
| FlagSpec( | |
| id="flag1", | |
| value="FLAG{test_sqli_123}", | |
| path="/var/flags/flag1.txt", | |
| host="db", | |
| ) | |
| ], | |
| golden_path=[ | |
| GoldenPathStep( | |
| step=1, | |
| command="nmap -sV 10.0.1.0/24", | |
| expect_in_stdout="80/tcp open http", | |
| ), | |
| GoldenPathStep( | |
| step=2, | |
| command="curl 'http://web/search?q=test'", | |
| expect_in_stdout="products", | |
| ), | |
| GoldenPathStep( | |
| step=3, | |
| command=( | |
| "curl 'http://web/search?q=test%27+UNION+" | |
| "SELECT+flag+FROM+flags--'" | |
| ), | |
| expect_in_stdout="FLAG{test_sqli_123}", | |
| ), | |
| GoldenPathStep( | |
| step=4, | |
| command="nmap -sV 10.0.2.0/24", | |
| expect_in_stdout="3306/tcp open mysql", | |
| ), | |
| GoldenPathStep( | |
| step=5, | |
| command="curl http://web/", | |
| expect_in_stdout="Welcome", | |
| ), | |
| GoldenPathStep( | |
| step=6, | |
| command="curl http://web/login", | |
| expect_in_stdout="Login", | |
| ), | |
| GoldenPathStep( | |
| step=7, | |
| command="mysql -h db -u admin -padmin123 -e 'SHOW DATABASES'", | |
| expect_in_stdout="flags", | |
| ), | |
| GoldenPathStep( | |
| step=8, | |
| command="submit_flag FLAG{test_sqli_123}", | |
| expect_in_stdout="correct", | |
| ), | |
| ], | |
| evidence_spec=[ | |
| EvidenceItem( | |
| type="log_entry", | |
| location="web_access_log", | |
| pattern="SQL injection pattern in access log", | |
| ) | |
| ], | |
| npc_traffic=NPCTrafficSpec(level=0, rate_lambda=30.0), | |
| npc_personas=[], | |
| task=TaskSpec( | |
| red_briefing=( | |
| "Target network has web services. Find vulnerabilities " | |
| "and capture the flag." | |
| ), | |
| blue_briefing=( | |
| "Monitor SIEM for suspicious activity. Investigate " | |
| "and respond to threats." | |
| ), | |
| ), | |
| ) | |
| def mock_containers(): | |
| """Mock ContainerSet for testing without Docker.""" | |
| class MockContainerSet: | |
| def __init__(self): | |
| self.exec_results = {} # {(container, cmd_fragment): output} | |
| self.exec_status = {} # {(container, cmd_fragment): exit_code} | |
| self.healthy = set() | |
| self.restarted = [] # track restart calls: list of container names | |
| def _lookup(mapping, container: str, cmd: str): | |
| for (c, pattern), result in mapping.items(): | |
| if c == container and pattern in cmd: | |
| return result | |
| return None | |
| async def exec_run(self, container: str, cmd: str, **kwargs): | |
| from open_range.protocols import ExecResult | |
| output = self._lookup(self.exec_results, container, cmd) | |
| status = self._lookup(self.exec_status, container, cmd) | |
| text = output if isinstance(output, str) else "" | |
| code = int(status) if status is not None else 0 | |
| if code == 0: | |
| return ExecResult(stdout=text, exit_code=0) | |
| return ExecResult(stderr=text, exit_code=code) | |
| async def exec(self, container: str, cmd: str, **kwargs) -> str: | |
| result = await self.exec_run(container, cmd, **kwargs) | |
| return result.combined_output | |
| async def is_healthy(self, container: str) -> bool: | |
| return container in self.healthy | |
| async def cp(self, container, src, dest): | |
| pass | |
| async def restart(self, container: str, **kwargs) -> None: | |
| self.restarted.append(container) | |
| return MockContainerSet() | |