Spaces:
Runtime error
Runtime error
| """Tests for _parse_llm_response() — the critical LLM JSON -> SnapshotSpec mapper. | |
| Covers field name aliases, evidence spec formats, NPC persona parsing, | |
| files dict extraction, missing/minimal/malformed input, and a real LLM | |
| output fixture from snapshots/llm_tier1_test.json. | |
| """ | |
| import json | |
| from pathlib import Path | |
| import pytest | |
| from open_range.builder.builder import SnapshotParseError, _parse_llm_response | |
| from open_range.protocols import ( | |
| EvidenceItem, | |
| ExploitStep, | |
| FlagSpec, | |
| GoldenPathStep, | |
| NPCPersona, | |
| SnapshotSpec, | |
| Vulnerability, | |
| ) | |
| ROOT = Path(__file__).parent.parent | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| def _minimal_json(**overrides) -> str: | |
| """Return a minimal valid JSON string for _parse_llm_response. | |
| All top-level keys present but with empty/default values unless overridden. | |
| """ | |
| base: dict = { | |
| "topology": {}, | |
| "truth_graph": {"vulns": [], "exploit_chain": []}, | |
| "golden_path": [], | |
| "flags": [], | |
| "evidence_spec": {}, | |
| "npc_personas": [], | |
| "npc_traffic": {}, | |
| "task": {}, | |
| } | |
| base.update(overrides) | |
| return json.dumps(base) | |
| # --------------------------------------------------------------------------- | |
| # 1. Happy path with real LLM output | |
| # --------------------------------------------------------------------------- | |
| class TestRealLLMOutput: | |
| """Parse the actual LLM-generated JSON from snapshots/llm_tier1_test.json.""" | |
| def llm_json(self): | |
| path = ROOT / "snapshots" / "llm_tier1_test.json" | |
| if not path.exists(): | |
| pytest.skip("llm_tier1_test.json fixture not present") | |
| return path.read_text() | |
| def test_parses_to_snapshot_spec(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| assert isinstance(spec, SnapshotSpec) | |
| def test_topology_hosts(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| hosts = spec.topology["hosts"] | |
| assert "attacker" in hosts | |
| assert "web" in hosts | |
| assert "siem" in hosts | |
| assert len(hosts) == 8 | |
| def test_topology_zones(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| zones = spec.topology["zones"] | |
| assert "dmz" in zones | |
| assert "web" in zones["dmz"] | |
| def test_truth_graph_vulns(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| assert len(spec.truth_graph.vulns) == 2 | |
| vuln_ids = {v.id for v in spec.truth_graph.vulns} | |
| assert "VULN-SQLI-LOOKUP" in vuln_ids | |
| assert "VULN-MISSING-AUTHZ-REPORT" in vuln_ids | |
| def test_truth_graph_exploit_chain(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| assert len(spec.truth_graph.exploit_chain) == 2 | |
| assert spec.truth_graph.exploit_chain[0].vuln_id == "VULN-SQLI-LOOKUP" | |
| def test_flags(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| assert len(spec.flags) == 2 | |
| values = {f.value for f in spec.flags} | |
| assert "FLAG{9f3a2b4c5d6e7f80}" in values | |
| assert "FLAG{1a2b3c4d5e6f7788}" in values | |
| def test_golden_path(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| assert len(spec.golden_path) == 8 | |
| # The real LLM output uses "cmd" field name | |
| assert spec.golden_path[0].command == "nmap -p 80 10.0.1.10" | |
| assert spec.golden_path[0].expect_in_stdout == "80/tcp open" | |
| assert spec.golden_path[0].host == "attacker" | |
| def test_task_briefings(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| assert "Meridian" in spec.task.red_briefing | |
| assert spec.task.blue_briefing != "" | |
| def test_npc_personas(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| assert len(spec.npc_personas) == 8 | |
| names = {p.name for p in spec.npc_personas} | |
| assert "Derek Thompson" in names | |
| assert "Karen Williams" in names | |
| def test_npc_persona_security_awareness(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| by_name = {p.name: p for p in spec.npc_personas} | |
| assert by_name["Derek Thompson"].security_awareness == 0.85 | |
| assert by_name["Karen Williams"].security_awareness == 0.25 | |
| def test_files_dict(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| # Real LLM output has explicit files + vulnerable_code dicts | |
| assert len(spec.files) > 0 | |
| assert "web:/var/www/portal/lookup.php" in spec.files | |
| assert "web:/var/www/portal/admin/compliance_report.php" in spec.files | |
| def test_vulnerable_code_as_dict_extracted_to_files(self, llm_json): | |
| spec = _parse_llm_response(llm_json) | |
| # The VULN-SQLI-LOOKUP has vulnerable_code as dict with key | |
| # /var/www/portal/lookup.php. It should be extracted to files | |
| # as "web:/var/www/portal/lookup.php". | |
| # But the explicit files dict already has this key, so the | |
| # explicit one takes precedence (container_key not in files check). | |
| assert "web:/var/www/portal/lookup.php" in spec.files | |
| # --------------------------------------------------------------------------- | |
| # 2. Field name mappings (ExploitStep aliases) | |
| # --------------------------------------------------------------------------- | |
| class TestExploitStepFieldMappings: | |
| """LLM uses vuln/action/yields; Pydantic expects vuln_id/command/description.""" | |
| def test_vuln_maps_to_vuln_id(self): | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [], | |
| "exploit_chain": [ | |
| {"vuln": "V1", "action": "run exploit", "yields": "root shell"} | |
| ], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.truth_graph.exploit_chain[0].vuln_id == "V1" | |
| def test_action_maps_to_command(self): | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [], | |
| "exploit_chain": [ | |
| {"vuln": "V1", "action": "sqlmap -u http://...", "yields": "db dump"} | |
| ], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.truth_graph.exploit_chain[0].command == "sqlmap -u http://..." | |
| def test_yields_maps_to_description(self): | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [], | |
| "exploit_chain": [ | |
| {"vuln": "V1", "action": "cmd", "yields": "got credentials"} | |
| ], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.truth_graph.exploit_chain[0].description == "got credentials" | |
| def test_canonical_names_also_work(self): | |
| """vuln_id/command/description should pass through without aliasing.""" | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [], | |
| "exploit_chain": [ | |
| { | |
| "vuln_id": "V2", | |
| "command": "nmap -sV ...", | |
| "description": "port scan", | |
| } | |
| ], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| ec = spec.truth_graph.exploit_chain[0] | |
| assert ec.vuln_id == "V2" | |
| assert ec.command == "nmap -sV ..." | |
| assert ec.description == "port scan" | |
| def test_canonical_names_take_precedence(self): | |
| """When both canonical and alias are present, canonical wins (via get order).""" | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [], | |
| "exploit_chain": [ | |
| { | |
| "vuln_id": "canonical", | |
| "vuln": "alias", | |
| "command": "canonical_cmd", | |
| "action": "alias_cmd", | |
| "description": "canonical_desc", | |
| "yields": "alias_desc", | |
| } | |
| ], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| ec = spec.truth_graph.exploit_chain[0] | |
| assert ec.vuln_id == "canonical" | |
| assert ec.command == "canonical_cmd" | |
| assert ec.description == "canonical_desc" | |
| # --------------------------------------------------------------------------- | |
| # 3. GoldenPathStep field mappings | |
| # --------------------------------------------------------------------------- | |
| class TestGoldenPathFieldMappings: | |
| """LLM uses cmd/expect_stdout; Pydantic expects command/expect_in_stdout.""" | |
| def test_cmd_maps_to_command(self): | |
| raw = _minimal_json( | |
| golden_path=[ | |
| {"step": 1, "cmd": "nmap -sV 10.0.1.0/24", "expect_stdout": "open"} | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.golden_path[0].command == "nmap -sV 10.0.1.0/24" | |
| def test_expect_stdout_maps_to_expect_in_stdout(self): | |
| raw = _minimal_json( | |
| golden_path=[ | |
| {"step": 1, "cmd": "whoami", "expect_stdout": "root"} | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.golden_path[0].expect_in_stdout == "root" | |
| def test_canonical_command_field(self): | |
| raw = _minimal_json( | |
| golden_path=[ | |
| {"step": 1, "command": "ls -la", "expect_in_stdout": "total"} | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.golden_path[0].command == "ls -la" | |
| assert spec.golden_path[0].expect_in_stdout == "total" | |
| def test_mixed_field_names_across_steps(self): | |
| """Some steps use cmd, others use command — both should parse.""" | |
| raw = _minimal_json( | |
| golden_path=[ | |
| {"step": 1, "cmd": "nmap scan", "expect_stdout": "80/tcp"}, | |
| {"step": 2, "command": "curl http://web", "expect_in_stdout": "Welcome"}, | |
| {"step": 3, "cmd": "sqlmap", "expect_in_stdout": "FLAG"}, | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.golden_path) == 3 | |
| assert spec.golden_path[0].command == "nmap scan" | |
| assert spec.golden_path[0].expect_in_stdout == "80/tcp" | |
| assert spec.golden_path[1].command == "curl http://web" | |
| assert spec.golden_path[1].expect_in_stdout == "Welcome" | |
| assert spec.golden_path[2].command == "sqlmap" | |
| assert spec.golden_path[2].expect_in_stdout == "FLAG" | |
| def test_step_number_preserved(self): | |
| raw = _minimal_json( | |
| golden_path=[ | |
| {"step": 5, "cmd": "echo hi", "expect_stdout": "hi"} | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.golden_path[0].step == 5 | |
| def test_description_field_preserved(self): | |
| raw = _minimal_json( | |
| golden_path=[ | |
| { | |
| "step": 1, | |
| "cmd": "nmap", | |
| "expect_stdout": "open", | |
| "description": "Port scan the DMZ", | |
| } | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.golden_path[0].description == "Port scan the DMZ" | |
| def test_cmd_takes_precedence_over_command(self): | |
| """When both cmd and command are present, cmd wins (it's checked first).""" | |
| raw = _minimal_json( | |
| golden_path=[ | |
| { | |
| "step": 1, | |
| "cmd": "cmd_value", | |
| "command": "command_value", | |
| "expect_stdout": "x", | |
| } | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.golden_path[0].command == "cmd_value" | |
| # --------------------------------------------------------------------------- | |
| # 4. Evidence spec parsing | |
| # --------------------------------------------------------------------------- | |
| class TestEvidenceSpecParsing: | |
| """LLM returns dict, protocol expects list[EvidenceItem].""" | |
| def test_dict_with_string_values(self): | |
| raw = _minimal_json( | |
| evidence_spec={ | |
| "web_access_log": "SQL injection pattern", | |
| "siem_alerts": "Unauthorized access", | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.evidence_spec) == 2 | |
| locations = {e.location for e in spec.evidence_spec} | |
| assert "web_access_log" in locations | |
| assert "siem_alerts" in locations | |
| # String values become log_entry type | |
| for e in spec.evidence_spec: | |
| if e.location == "web_access_log": | |
| assert e.type == "log_entry" | |
| assert e.pattern == "SQL injection pattern" | |
| def test_dict_with_list_values(self): | |
| raw = _minimal_json( | |
| evidence_spec={ | |
| "siem_alerts": ["UNION SELECT detected", "admin endpoint accessed"], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.evidence_spec) == 2 | |
| # List values become alert type | |
| for e in spec.evidence_spec: | |
| assert e.type == "alert" | |
| assert e.location == "siem_alerts" | |
| patterns = {e.pattern for e in spec.evidence_spec} | |
| assert "UNION SELECT detected" in patterns | |
| assert "admin endpoint accessed" in patterns | |
| def test_dict_with_mixed_values(self): | |
| raw = _minimal_json( | |
| evidence_spec={ | |
| "web_log": "GET /search?q=", | |
| "alerts": ["sqli_detected", "auth_bypass"], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.evidence_spec) == 3 # 1 string + 2 list items | |
| def test_list_format_passthrough(self): | |
| """When evidence_spec is already a list of dicts, parse directly.""" | |
| raw = _minimal_json( | |
| evidence_spec=[ | |
| {"type": "alert", "location": "siem", "pattern": "SQLi"}, | |
| {"type": "log_entry", "location": "web_log", "pattern": "GET /admin"}, | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.evidence_spec) == 2 | |
| assert spec.evidence_spec[0].type == "alert" | |
| assert spec.evidence_spec[1].location == "web_log" | |
| def test_empty_dict(self): | |
| raw = _minimal_json(evidence_spec={}) | |
| spec = _parse_llm_response(raw) | |
| assert spec.evidence_spec == [] | |
| def test_empty_list(self): | |
| raw = _minimal_json(evidence_spec=[]) | |
| spec = _parse_llm_response(raw) | |
| assert spec.evidence_spec == [] | |
| # --------------------------------------------------------------------------- | |
| # 5. NPC persona parsing | |
| # --------------------------------------------------------------------------- | |
| class TestNPCPersonaParsing: | |
| def test_basic_persona(self): | |
| raw = _minimal_json( | |
| npc_personas=[ | |
| { | |
| "name": "Alice", | |
| "role": "Admin", | |
| "department": "IT", | |
| "security_awareness": 0.9, | |
| } | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.npc_personas) == 1 | |
| p = spec.npc_personas[0] | |
| assert p.name == "Alice" | |
| assert p.role == "Admin" | |
| assert p.department == "IT" | |
| assert p.security_awareness == 0.9 | |
| def test_accounts_with_string_values(self): | |
| raw = _minimal_json( | |
| npc_personas=[ | |
| { | |
| "name": "Bob", | |
| "accounts": { | |
| "email": "bob@corp.local", | |
| "ldap_dn": "cn=bob,dc=corp,dc=local", | |
| }, | |
| } | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.npc_personas[0].accounts["email"] == "bob@corp.local" | |
| def test_default_security_awareness(self): | |
| """Missing security_awareness defaults to 0.5.""" | |
| raw = _minimal_json(npc_personas=[{"name": "Charlie"}]) | |
| spec = _parse_llm_response(raw) | |
| assert spec.npc_personas[0].security_awareness == 0.5 | |
| def test_susceptibility_dict(self): | |
| raw = _minimal_json( | |
| npc_personas=[ | |
| { | |
| "name": "Diana", | |
| "susceptibility": {"phishing": 0.8, "pretexting": 0.6}, | |
| } | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.npc_personas[0].susceptibility["phishing"] == 0.8 | |
| def test_routine_dict(self): | |
| raw = _minimal_json( | |
| npc_personas=[ | |
| { | |
| "name": "Eve", | |
| "routine": { | |
| "morning": "check email", | |
| "afternoon": "process reports", | |
| }, | |
| } | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.npc_personas[0].routine["morning"] == "check email" | |
| def test_multiple_personas(self): | |
| raw = _minimal_json( | |
| npc_personas=[ | |
| {"name": "P1", "security_awareness": 0.1}, | |
| {"name": "P2", "security_awareness": 0.5}, | |
| {"name": "P3", "security_awareness": 0.9}, | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.npc_personas) == 3 | |
| names = [p.name for p in spec.npc_personas] | |
| assert names == ["P1", "P2", "P3"] | |
| def test_missing_optional_fields_default(self): | |
| """All optional fields should default gracefully.""" | |
| raw = _minimal_json(npc_personas=[{"name": "Minimal"}]) | |
| spec = _parse_llm_response(raw) | |
| p = spec.npc_personas[0] | |
| assert p.name == "Minimal" | |
| assert p.role == "" | |
| assert p.department == "" | |
| assert p.reports_to == "" | |
| assert p.communication_style == "" | |
| assert p.susceptibility == {} | |
| assert p.routine == {} | |
| assert p.accounts == {} | |
| # --------------------------------------------------------------------------- | |
| # 6. Files dict extraction | |
| # --------------------------------------------------------------------------- | |
| class TestFilesDictExtraction: | |
| def test_explicit_files_field(self): | |
| raw = _minimal_json( | |
| files={ | |
| "web:/var/www/index.php": "<?php echo 'hello'; ?>", | |
| "db:/opt/init.sql": "CREATE TABLE t(id INT);", | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.files) == 2 | |
| assert spec.files["web:/var/www/index.php"] == "<?php echo 'hello'; ?>" | |
| def test_vulnerable_code_dict_extracted(self): | |
| """vulnerable_code as {file_path: code} should be extracted to files.""" | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [ | |
| { | |
| "id": "v1", | |
| "type": "sqli", | |
| "host": "web", | |
| "service": "php", | |
| "injection_point": "/search", | |
| "vulnerable_code": { | |
| "/var/www/search.php": "<?php $q=$_GET['q']; ?>" | |
| }, | |
| } | |
| ], | |
| "exploit_chain": [], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert "web:/var/www/search.php" in spec.files | |
| assert spec.files["web:/var/www/search.php"] == "<?php $q=$_GET['q']; ?>" | |
| def test_vulnerable_code_string_on_web_host(self): | |
| """String vulnerable_code on web host with / injection_point goes to web:/var/www/portal{ip}.""" | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [ | |
| { | |
| "id": "v1", | |
| "type": "sqli", | |
| "host": "web", | |
| "service": "php", | |
| "injection_point": "/search.php", | |
| "vulnerable_code": "<?php echo 'vuln'; ?>", | |
| } | |
| ], | |
| "exploit_chain": [], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert "web:/var/www/portal/search.php" in spec.files | |
| def test_vulnerable_code_string_non_web_host_skipped(self): | |
| """String vulnerable_code on non-web host without / prefix is not extracted.""" | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [ | |
| { | |
| "id": "v1", | |
| "type": "weak_creds", | |
| "host": "db", | |
| "service": "mysql", | |
| "injection_point": "mysql -u root -proot", | |
| "vulnerable_code": "", | |
| } | |
| ], | |
| "exploit_chain": [], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.files) == 0 | |
| def test_explicit_files_not_overwritten_by_vulnerable_code(self): | |
| """If explicit files has a key, vulnerable_code should not overwrite it.""" | |
| raw = _minimal_json( | |
| files={"web:/var/www/search.php": "explicit content"}, | |
| truth_graph={ | |
| "vulns": [ | |
| { | |
| "id": "v1", | |
| "type": "sqli", | |
| "host": "web", | |
| "service": "php", | |
| "injection_point": "/search", | |
| "vulnerable_code": { | |
| "/var/www/search.php": "vulnerable content" | |
| }, | |
| } | |
| ], | |
| "exploit_chain": [], | |
| }, | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.files["web:/var/www/search.php"] == "explicit content" | |
| def test_no_files_field_produces_empty_dict(self): | |
| raw = _minimal_json() | |
| spec = _parse_llm_response(raw) | |
| assert spec.files == {} | |
| def test_files_field_non_string_values_skipped(self): | |
| """Non-string values in files dict are silently skipped.""" | |
| raw = _minimal_json( | |
| files={ | |
| "web:/good.php": "<?php ?>", | |
| "web:/bad.php": 12345, | |
| "web:/also_bad.php": ["not", "a", "string"], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.files) == 1 | |
| assert "web:/good.php" in spec.files | |
| # --------------------------------------------------------------------------- | |
| # 7. Missing optional fields | |
| # --------------------------------------------------------------------------- | |
| class TestMissingOptionalFields: | |
| def test_missing_evidence_spec(self): | |
| data = { | |
| "topology": {}, | |
| "truth_graph": {"vulns": [], "exploit_chain": []}, | |
| "golden_path": [], | |
| "flags": [], | |
| "npc_personas": [], | |
| "npc_traffic": {}, | |
| "task": {}, | |
| } | |
| spec = _parse_llm_response(json.dumps(data)) | |
| assert spec.evidence_spec == [] | |
| def test_missing_npc_personas(self): | |
| data = { | |
| "topology": {}, | |
| "truth_graph": {"vulns": [], "exploit_chain": []}, | |
| "golden_path": [], | |
| "flags": [], | |
| "evidence_spec": {}, | |
| "npc_traffic": {}, | |
| "task": {}, | |
| } | |
| spec = _parse_llm_response(json.dumps(data)) | |
| assert spec.npc_personas == [] | |
| def test_missing_npc_traffic(self): | |
| data = { | |
| "topology": {}, | |
| "truth_graph": {"vulns": [], "exploit_chain": []}, | |
| "golden_path": [], | |
| "flags": [], | |
| "evidence_spec": {}, | |
| "npc_personas": [], | |
| "task": {}, | |
| } | |
| spec = _parse_llm_response(json.dumps(data)) | |
| # npc_traffic gets default NPCTrafficSpec values | |
| assert spec.npc_traffic.level == 0 | |
| def test_missing_task(self): | |
| data = { | |
| "topology": {}, | |
| "truth_graph": {"vulns": [], "exploit_chain": []}, | |
| "golden_path": [], | |
| "flags": [], | |
| "evidence_spec": {}, | |
| "npc_personas": [], | |
| "npc_traffic": {}, | |
| } | |
| spec = _parse_llm_response(json.dumps(data)) | |
| assert spec.task.red_briefing == "" | |
| assert spec.task.blue_briefing == "" | |
| def test_missing_truth_graph(self): | |
| data = { | |
| "topology": {"hosts": ["web"]}, | |
| "golden_path": [], | |
| "flags": [], | |
| "evidence_spec": {}, | |
| "npc_personas": [], | |
| "npc_traffic": {}, | |
| "task": {}, | |
| } | |
| spec = _parse_llm_response(json.dumps(data)) | |
| assert spec.truth_graph.vulns == [] | |
| assert spec.truth_graph.exploit_chain == [] | |
| def test_missing_golden_path(self): | |
| data = { | |
| "topology": {}, | |
| "truth_graph": {"vulns": [], "exploit_chain": []}, | |
| "flags": [], | |
| "evidence_spec": {}, | |
| "npc_personas": [], | |
| "npc_traffic": {}, | |
| "task": {}, | |
| } | |
| spec = _parse_llm_response(json.dumps(data)) | |
| assert spec.golden_path == [] | |
| def test_missing_flags(self): | |
| data = { | |
| "topology": {}, | |
| "truth_graph": {"vulns": [], "exploit_chain": []}, | |
| "golden_path": [], | |
| "evidence_spec": {}, | |
| "npc_personas": [], | |
| "npc_traffic": {}, | |
| "task": {}, | |
| } | |
| spec = _parse_llm_response(json.dumps(data)) | |
| assert spec.flags == [] | |
| def test_vuln_with_minimal_fields(self): | |
| """A vulnerability with only id, type, host should parse fine.""" | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [{"id": "v1", "type": "sqli", "host": "web"}], | |
| "exploit_chain": [], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| v = spec.truth_graph.vulns[0] | |
| assert v.id == "v1" | |
| assert v.service == "" | |
| assert v.injection_point == "" | |
| assert v.vulnerable_code == "" | |
| assert v.root_cause == "" | |
| # --------------------------------------------------------------------------- | |
| # 8. Empty/minimal input | |
| # --------------------------------------------------------------------------- | |
| class TestMinimalInput: | |
| def test_completely_empty_json_object(self): | |
| """An empty JSON object should produce a valid SnapshotSpec with defaults.""" | |
| spec = _parse_llm_response("{}") | |
| assert isinstance(spec, SnapshotSpec) | |
| assert spec.topology == {} | |
| assert spec.truth_graph.vulns == [] | |
| assert spec.golden_path == [] | |
| assert spec.flags == [] | |
| assert spec.evidence_spec == [] | |
| assert spec.npc_personas == [] | |
| def test_minimal_valid_json(self): | |
| raw = _minimal_json() | |
| spec = _parse_llm_response(raw) | |
| assert isinstance(spec, SnapshotSpec) | |
| def test_topology_only(self): | |
| raw = json.dumps({"topology": {"hosts": ["web", "db"]}}) | |
| spec = _parse_llm_response(raw) | |
| assert spec.topology["hosts"] == ["web", "db"] | |
| assert spec.golden_path == [] | |
| # --------------------------------------------------------------------------- | |
| # 9. Malformed input | |
| # --------------------------------------------------------------------------- | |
| class TestMalformedInput: | |
| def test_invalid_json_raises(self): | |
| with pytest.raises(json.JSONDecodeError): | |
| _parse_llm_response("not valid json {{{") | |
| def test_json_array_not_object_raises(self): | |
| """Top-level must be an object, not an array.""" | |
| with pytest.raises((TypeError, AttributeError, SnapshotParseError)): | |
| _parse_llm_response("[1, 2, 3]") | |
| def test_json_string_not_object_raises(self): | |
| with pytest.raises((TypeError, AttributeError, SnapshotParseError)): | |
| _parse_llm_response('"just a string"') | |
| def test_truth_graph_not_dict_handled(self): | |
| """If truth_graph is a non-dict, parsing should fail gracefully.""" | |
| # truth_graph as string | |
| raw = json.dumps({"truth_graph": "not a dict"}) | |
| with pytest.raises((AttributeError, SnapshotParseError)): | |
| _parse_llm_response(raw) | |
| def test_golden_path_not_list_handled(self): | |
| """If golden_path is a non-list, parsing should fail gracefully.""" | |
| raw = json.dumps({"golden_path": "not a list"}) | |
| with pytest.raises((AttributeError, SnapshotParseError)): | |
| _parse_llm_response(raw) | |
| def test_empty_string_raises(self): | |
| with pytest.raises(json.JSONDecodeError): | |
| _parse_llm_response("") | |
| def test_json_with_trailing_comma_raises(self): | |
| with pytest.raises(json.JSONDecodeError): | |
| _parse_llm_response('{"key": "value",}') | |
| # --------------------------------------------------------------------------- | |
| # 10. Vulnerability parsing details | |
| # --------------------------------------------------------------------------- | |
| class TestVulnerabilityParsing: | |
| def test_all_vuln_fields_parsed(self): | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [ | |
| { | |
| "id": "VULN-001", | |
| "type": "sqli", | |
| "host": "web", | |
| "service": "nginx+php", | |
| "injection_point": "/search?q=", | |
| "vulnerable_code": "<?php $q=$_GET['q']; ?>", | |
| "root_cause": "No input sanitization", | |
| "blast_radius": "Full DB read", | |
| "remediation": "Use prepared statements", | |
| } | |
| ], | |
| "exploit_chain": [], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| v = spec.truth_graph.vulns[0] | |
| assert v.id == "VULN-001" | |
| assert v.type == "sqli" | |
| assert v.host == "web" | |
| assert v.service == "nginx+php" | |
| assert v.injection_point == "/search?q=" | |
| assert v.vulnerable_code == "<?php $q=$_GET['q']; ?>" | |
| assert v.root_cause == "No input sanitization" | |
| assert v.blast_radius == "Full DB read" | |
| assert v.remediation == "Use prepared statements" | |
| def test_vulnerable_code_as_dict(self): | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [ | |
| { | |
| "id": "V1", | |
| "type": "sqli", | |
| "host": "web", | |
| "vulnerable_code": { | |
| "/var/www/search.php": "<?php vuln code; ?>" | |
| }, | |
| } | |
| ], | |
| "exploit_chain": [], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| v = spec.truth_graph.vulns[0] | |
| assert isinstance(v.vulnerable_code, dict) | |
| assert v.vulnerable_code["/var/www/search.php"] == "<?php vuln code; ?>" | |
| def test_multiple_vulns(self): | |
| raw = _minimal_json( | |
| truth_graph={ | |
| "vulns": [ | |
| {"id": "V1", "type": "sqli", "host": "web"}, | |
| {"id": "V2", "type": "xss", "host": "web"}, | |
| {"id": "V3", "type": "idor", "host": "web"}, | |
| ], | |
| "exploit_chain": [], | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.truth_graph.vulns) == 3 | |
| types = {v.type for v in spec.truth_graph.vulns} | |
| assert types == {"sqli", "xss", "idor"} | |
| # --------------------------------------------------------------------------- | |
| # 11. Flag parsing | |
| # --------------------------------------------------------------------------- | |
| class TestFlagParsing: | |
| def test_single_flag(self): | |
| raw = _minimal_json( | |
| flags=[ | |
| { | |
| "id": "flag1", | |
| "value": "FLAG{abc123}", | |
| "path": "/var/flags/flag1.txt", | |
| "host": "db", | |
| } | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.flags) == 1 | |
| f = spec.flags[0] | |
| assert f.id == "flag1" | |
| assert f.value == "FLAG{abc123}" | |
| assert f.path == "/var/flags/flag1.txt" | |
| assert f.host == "db" | |
| def test_multiple_flags(self): | |
| raw = _minimal_json( | |
| flags=[ | |
| {"id": "f1", "value": "FLAG{a}", "path": "/f1.txt", "host": "web"}, | |
| {"id": "f2", "value": "FLAG{b}", "path": "/f2.txt", "host": "db"}, | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert len(spec.flags) == 2 | |
| def test_missing_flag_fields_default_to_empty(self): | |
| raw = _minimal_json(flags=[{}]) | |
| spec = _parse_llm_response(raw) | |
| f = spec.flags[0] | |
| assert f.id == "" | |
| assert f.value == "" | |
| assert f.path == "" | |
| assert f.host == "" | |
| # --------------------------------------------------------------------------- | |
| # 12. NPC traffic parsing | |
| # --------------------------------------------------------------------------- | |
| class TestNPCTrafficParsing: | |
| def test_http_rate_maps_to_rate_lambda(self): | |
| raw = _minimal_json(npc_traffic={"http_rate": 25}) | |
| spec = _parse_llm_response(raw) | |
| assert spec.npc_traffic.rate_lambda == 25 | |
| def test_default_scripts(self): | |
| raw = _minimal_json(npc_traffic={}) | |
| spec = _parse_llm_response(raw) | |
| assert "http_traffic.sh" in spec.npc_traffic.scripts | |
| def test_level_always_zero(self): | |
| """Current parser hardcodes level=0.""" | |
| raw = _minimal_json(npc_traffic={"http_rate": 50}) | |
| spec = _parse_llm_response(raw) | |
| assert spec.npc_traffic.level == 0 | |
| def test_missing_http_rate_defaults_to_10(self): | |
| raw = _minimal_json(npc_traffic={}) | |
| spec = _parse_llm_response(raw) | |
| assert spec.npc_traffic.rate_lambda == 10 | |
| # --------------------------------------------------------------------------- | |
| # 13. Task parsing | |
| # --------------------------------------------------------------------------- | |
| class TestTaskParsing: | |
| def test_both_briefings(self): | |
| raw = _minimal_json( | |
| task={ | |
| "red_briefing": "Attack the network.", | |
| "blue_briefing": "Defend the network.", | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.task.red_briefing == "Attack the network." | |
| assert spec.task.blue_briefing == "Defend the network." | |
| def test_missing_briefings_default_empty(self): | |
| raw = _minimal_json(task={}) | |
| spec = _parse_llm_response(raw) | |
| assert spec.task.red_briefing == "" | |
| assert spec.task.blue_briefing == "" | |
| def test_extra_task_fields_ignored(self): | |
| """Extra fields in task should be silently ignored.""" | |
| raw = _minimal_json( | |
| task={ | |
| "red_briefing": "Go", | |
| "blue_briefing": "Watch", | |
| "unknown_field": "whatever", | |
| } | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.task.red_briefing == "Go" | |
| # --------------------------------------------------------------------------- | |
| # 14. Roundtrip / integration | |
| # --------------------------------------------------------------------------- | |
| class TestRoundtrip: | |
| def test_complex_snapshot_parses_completely(self): | |
| """A complex snapshot with all sections populated should parse.""" | |
| data = { | |
| "topology": { | |
| "hosts": ["attacker", "web", "db", "siem"], | |
| "zones": {"dmz": ["web"], "internal": ["db"], "mgmt": ["siem"]}, | |
| "users": [{"username": "admin", "password": "pass", "groups": ["admins"], "hosts": ["web"]}], | |
| }, | |
| "truth_graph": { | |
| "vulns": [ | |
| { | |
| "id": "V1", | |
| "type": "sqli", | |
| "host": "web", | |
| "service": "php", | |
| "injection_point": "/search?q=", | |
| "vulnerable_code": {"search.php": "vuln code"}, | |
| "root_cause": "no sanitization", | |
| "blast_radius": "db read", | |
| "remediation": "prepared stmts", | |
| } | |
| ], | |
| "exploit_chain": [ | |
| {"vuln": "V1", "action": "sqlmap", "yields": "db dump"} | |
| ], | |
| }, | |
| "golden_path": [ | |
| {"step": 1, "cmd": "nmap -sV 10.0.1.0/24", "expect_stdout": "80/tcp"}, | |
| {"step": 2, "command": "curl http://web/search?q=test", "expect_in_stdout": "results"}, | |
| ], | |
| "flags": [ | |
| {"id": "f1", "value": "FLAG{complex}", "path": "/flag.txt", "host": "db"} | |
| ], | |
| "evidence_spec": { | |
| "web_log": "sqli pattern", | |
| "alerts": ["sql_injection_detected"], | |
| }, | |
| "npc_personas": [ | |
| { | |
| "name": "Alice", | |
| "role": "SysAdmin", | |
| "department": "IT", | |
| "reports_to": "CTO", | |
| "communication_style": "technical", | |
| "security_awareness": 0.9, | |
| "susceptibility": {"phishing": 0.1}, | |
| "routine": {"morning": "check logs"}, | |
| "accounts": {"email": "alice@corp.local"}, | |
| } | |
| ], | |
| "npc_traffic": {"http_rate": 20}, | |
| "task": { | |
| "red_briefing": "Hack the network.", | |
| "blue_briefing": "Monitor and defend.", | |
| }, | |
| "files": {"web:/var/www/index.php": "<?php echo 'hi'; ?>"}, | |
| } | |
| spec = _parse_llm_response(json.dumps(data)) | |
| # Verify all sections | |
| assert spec.topology["hosts"] == ["attacker", "web", "db", "siem"] | |
| assert len(spec.truth_graph.vulns) == 1 | |
| assert spec.truth_graph.exploit_chain[0].vuln_id == "V1" | |
| assert spec.truth_graph.exploit_chain[0].command == "sqlmap" | |
| assert len(spec.golden_path) == 2 | |
| assert spec.golden_path[0].command == "nmap -sV 10.0.1.0/24" | |
| assert spec.golden_path[1].expect_in_stdout == "results" | |
| assert spec.flags[0].value == "FLAG{complex}" | |
| assert len(spec.evidence_spec) == 2 # 1 string + 1 list item | |
| assert len(spec.npc_personas) == 1 | |
| assert spec.npc_traffic.rate_lambda == 20 | |
| assert spec.task.red_briefing == "Hack the network." | |
| # files: explicit + vulnerable_code dict | |
| assert "web:/var/www/index.php" in spec.files | |
| def test_golden_path_host_is_preserved(self): | |
| raw = _minimal_json( | |
| golden_path=[ | |
| { | |
| "step": 1, | |
| "cmd": "ssh db 'cat /var/flags/flag1.txt'", | |
| "expect_stdout": "FLAG{db}", | |
| "host": "jumpbox", | |
| } | |
| ] | |
| ) | |
| spec = _parse_llm_response(raw) | |
| assert spec.golden_path[0].host == "jumpbox" | |