Spaces:
Runtime error
Runtime error
| """Tests for multimodal NPC communication channels (issue #30). | |
| Tests ChatChannel, VoiceChannel, DocumentChannel, chat traffic | |
| generation, and SIEM log aggregation. No Docker dependency. | |
| """ | |
| from __future__ import annotations | |
| import pytest | |
| from open_range.builder.npc.channels import ( | |
| ChatChannel, | |
| ChatMessage, | |
| DocumentChannel, | |
| DocumentRecord, | |
| NPCChannel, | |
| VoiceChannel, | |
| VoiceTranscript, | |
| ) | |
| from open_range.builder.npc.chat_traffic import generate_chat_traffic | |
| from open_range.builder.npc.npc_manager import ( | |
| NPCManager, | |
| _derive_scripts_from_topology, | |
| _hosts_from_topology, | |
| _resolve_env_vars, | |
| ) | |
| from open_range.protocols import NPCPersona, NPCTrafficSpec, SnapshotSpec, TaskSpec | |
| # =================================================================== | |
| # Fixtures | |
| # =================================================================== | |
| def low_awareness_persona() -> NPCPersona: | |
| return NPCPersona( | |
| name="Janet Smith", | |
| role="Marketing Coordinator", | |
| department="Marketing", | |
| security_awareness=0.2, | |
| susceptibility={ | |
| "phishing_email": 0.8, | |
| "attachment_opening": 0.9, | |
| "vishing": 0.7, | |
| "credential_sharing": 0.6, | |
| }, | |
| ) | |
| def high_awareness_persona() -> NPCPersona: | |
| return NPCPersona( | |
| name="David Chen", | |
| role="CISO", | |
| department="Security", | |
| security_awareness=0.95, | |
| susceptibility={ | |
| "phishing_email": 0.05, | |
| "attachment_opening": 0.1, | |
| "vishing": 0.05, | |
| "credential_sharing": 0.01, | |
| }, | |
| ) | |
| def two_personas( | |
| low_awareness_persona: NPCPersona, | |
| high_awareness_persona: NPCPersona, | |
| ) -> list[NPCPersona]: | |
| return [low_awareness_persona, high_awareness_persona] | |
| # =================================================================== | |
| # NPCChannel enum | |
| # =================================================================== | |
| class TestNPCChannel: | |
| def test_enum_values(self): | |
| assert NPCChannel.EMAIL == "email" | |
| assert NPCChannel.CHAT == "chat" | |
| assert NPCChannel.VOICE == "voice" | |
| assert NPCChannel.DOCUMENT == "document" | |
| # =================================================================== | |
| # ChatChannel | |
| # =================================================================== | |
| class TestChatChannel: | |
| def test_send_and_receive(self): | |
| ch = ChatChannel() | |
| ch.send_message("Alice", "Bob", "Hello Bob!") | |
| msgs = ch.get_messages("Bob") | |
| assert len(msgs) == 1 | |
| assert msgs[0].sender == "Alice" | |
| assert msgs[0].content == "Hello Bob!" | |
| def test_get_messages_filters_by_recipient(self): | |
| ch = ChatChannel() | |
| ch.send_message("Alice", "Bob", "For Bob") | |
| ch.send_message("Alice", "Carol", "For Carol") | |
| assert len(ch.get_messages("Bob")) == 1 | |
| assert len(ch.get_messages("Carol")) == 1 | |
| assert len(ch.get_messages("Dave")) == 0 | |
| def test_channel_log_returns_all(self): | |
| ch = ChatChannel() | |
| ch.send_message("Alice", "Bob", "msg1") | |
| ch.send_message("Carol", "Dave", "msg2") | |
| log = ch.get_channel_log() | |
| assert len(log) == 2 | |
| assert all(entry["type"] == "chat" for entry in log) | |
| assert log[0]["sender"] == "Alice" | |
| assert log[1]["sender"] == "Carol" | |
| def test_clear(self): | |
| ch = ChatChannel() | |
| ch.send_message("A", "B", "test") | |
| ch.clear() | |
| assert ch.get_channel_log() == [] | |
| assert ch.get_messages("B") == [] | |
| def test_message_has_timestamp(self): | |
| ch = ChatChannel() | |
| msg = ch.send_message("A", "B", "ts test") | |
| assert isinstance(msg.timestamp, float) | |
| assert msg.timestamp > 0 | |
| def test_send_returns_chat_message(self): | |
| ch = ChatChannel() | |
| msg = ch.send_message("A", "B", "hello") | |
| assert isinstance(msg, ChatMessage) | |
| # =================================================================== | |
| # VoiceChannel | |
| # =================================================================== | |
| class TestVoiceChannel: | |
| def test_initiate_call(self): | |
| ch = VoiceChannel() | |
| call = ch.initiate_call( | |
| caller="Attacker", | |
| callee="Janet Smith", | |
| pretext="Hi, this is IT support. I need your password for a security check.", | |
| ) | |
| assert isinstance(call, VoiceTranscript) | |
| assert call.caller == "Attacker" | |
| assert call.callee == "Janet Smith" | |
| assert len(call.transcript) == 1 | |
| assert call.transcript[0]["speaker"] == "Attacker" | |
| def test_respond_low_awareness(self, low_awareness_persona: NPCPersona): | |
| ch = VoiceChannel() | |
| call = ch.initiate_call("Attacker", low_awareness_persona.name, "Need your creds") | |
| result = ch.respond(low_awareness_persona, call) | |
| assert len(result.transcript) == 2 | |
| assert result.response_action in ("comply", "deflect", "refuse") | |
| # Low awareness + high vishing susceptibility -> comply | |
| assert result.response_action == "comply" | |
| def test_respond_high_awareness(self, high_awareness_persona: NPCPersona): | |
| ch = VoiceChannel() | |
| call = ch.initiate_call("Attacker", high_awareness_persona.name, "Need your password") | |
| result = ch.respond(high_awareness_persona, call) | |
| assert result.response_action == "report_to_IT" | |
| assert len(result.transcript) == 2 | |
| def test_get_call_log(self): | |
| ch = VoiceChannel() | |
| ch.initiate_call("A", "B", "pretext1") | |
| ch.initiate_call("C", "D", "pretext2") | |
| log = ch.get_call_log() | |
| assert len(log) == 2 | |
| assert all(entry["type"] == "voice" for entry in log) | |
| def test_clear(self): | |
| ch = VoiceChannel() | |
| ch.initiate_call("A", "B", "test") | |
| ch.clear() | |
| assert ch.get_call_log() == [] | |
| def test_call_has_timestamp(self): | |
| ch = VoiceChannel() | |
| call = ch.initiate_call("A", "B", "test") | |
| assert isinstance(call.timestamp, float) | |
| assert call.timestamp > 0 | |
| def test_respond_sets_duration(self, low_awareness_persona: NPCPersona): | |
| ch = VoiceChannel() | |
| call = ch.initiate_call("A", low_awareness_persona.name, "hello") | |
| result = ch.respond(low_awareness_persona, call) | |
| assert result.duration_s > 0 | |
| # =================================================================== | |
| # DocumentChannel | |
| # =================================================================== | |
| class TestDocumentChannel: | |
| def test_share_document(self): | |
| ch = DocumentChannel() | |
| doc = ch.share_document("Alice", "Bob", "report.pdf", "Q4 financial report") | |
| assert isinstance(doc, DocumentRecord) | |
| assert doc.sender == "Alice" | |
| assert doc.recipient == "Bob" | |
| assert doc.filename == "report.pdf" | |
| assert doc.accessed is False | |
| def test_inspect_low_awareness(self, low_awareness_persona: NPCPersona): | |
| ch = DocumentChannel() | |
| doc = ch.share_document("Attacker", low_awareness_persona.name, "malware.docx", "Invoice") | |
| decision = ch.inspect_document(low_awareness_persona, doc) | |
| # Low awareness + high attachment_opening susceptibility -> opened | |
| assert decision == "opened" | |
| assert doc.accessed is True | |
| assert doc.access_decision == "opened" | |
| def test_inspect_high_awareness(self, high_awareness_persona: NPCPersona): | |
| ch = DocumentChannel() | |
| doc = ch.share_document("Attacker", high_awareness_persona.name, "malware.docx", "Invoice") | |
| decision = ch.inspect_document(high_awareness_persona, doc) | |
| assert decision == "reported" | |
| assert doc.accessed is False | |
| assert doc.access_decision == "reported" | |
| def test_get_document_log(self): | |
| ch = DocumentChannel() | |
| ch.share_document("A", "B", "f1.txt", "desc1") | |
| ch.share_document("C", "D", "f2.txt", "desc2") | |
| log = ch.get_document_log() | |
| assert len(log) == 2 | |
| assert all(entry["type"] == "document" for entry in log) | |
| def test_clear(self): | |
| ch = DocumentChannel() | |
| ch.share_document("A", "B", "f.txt", "d") | |
| ch.clear() | |
| assert ch.get_document_log() == [] | |
| def test_document_has_timestamp(self): | |
| ch = DocumentChannel() | |
| doc = ch.share_document("A", "B", "f.txt", "d") | |
| assert isinstance(doc.timestamp, float) | |
| assert doc.timestamp > 0 | |
| # =================================================================== | |
| # Chat traffic generator | |
| # =================================================================== | |
| class TestChatTraffic: | |
| def test_generate_messages(self, two_personas: list[NPCPersona]): | |
| ch = ChatChannel() | |
| msgs = generate_chat_traffic(two_personas, ch, num_messages=5, seed=42) | |
| assert len(msgs) == 5 | |
| assert all("sender" in m for m in msgs) | |
| assert all("recipient" in m for m in msgs) | |
| assert all("content" in m for m in msgs) | |
| def test_deterministic_with_seed(self, two_personas: list[NPCPersona]): | |
| ch1 = ChatChannel() | |
| msgs1 = generate_chat_traffic(two_personas, ch1, num_messages=5, seed=99) | |
| ch2 = ChatChannel() | |
| msgs2 = generate_chat_traffic(two_personas, ch2, num_messages=5, seed=99) | |
| # Same seed -> same messages | |
| assert [m["content"] for m in msgs1] == [m["content"] for m in msgs2] | |
| def test_no_self_messages(self, two_personas: list[NPCPersona]): | |
| ch = ChatChannel() | |
| msgs = generate_chat_traffic(two_personas, ch, num_messages=20, seed=1) | |
| for m in msgs: | |
| assert m["sender"] != m["recipient"] | |
| def test_empty_with_fewer_than_two_personas(self): | |
| ch = ChatChannel() | |
| single = [NPCPersona(name="Solo")] | |
| msgs = generate_chat_traffic(single, ch, num_messages=5) | |
| assert msgs == [] | |
| def test_messages_appear_in_channel(self, two_personas: list[NPCPersona]): | |
| ch = ChatChannel() | |
| generate_chat_traffic(two_personas, ch, num_messages=5, seed=42) | |
| log = ch.get_channel_log() | |
| assert len(log) == 5 | |
| # =================================================================== | |
| # SIEM log aggregation (NPCManager) | |
| # =================================================================== | |
| class TestChannelSIEM: | |
| def test_siem_log_aggregates_channels(self): | |
| mgr = NPCManager() | |
| chat_ch = mgr.channels["chat"] | |
| voice_ch = mgr.channels["voice"] | |
| doc_ch = mgr.channels["document"] | |
| assert isinstance(chat_ch, ChatChannel) | |
| assert isinstance(voice_ch, VoiceChannel) | |
| assert isinstance(doc_ch, DocumentChannel) | |
| chat_ch.send_message("A", "B", "hello") | |
| voice_ch.initiate_call("C", "D", "pretext") | |
| doc_ch.share_document("E", "F", "file.txt", "desc") | |
| log = mgr.get_siem_log() | |
| assert len(log) == 3 | |
| types = {entry["type"] for entry in log} | |
| assert types == {"chat", "voice", "document"} | |
| def test_siem_log_sorted_by_timestamp(self): | |
| mgr = NPCManager() | |
| chat_ch = mgr.channels["chat"] | |
| assert isinstance(chat_ch, ChatChannel) | |
| # Send multiple messages | |
| chat_ch.send_message("A", "B", "first") | |
| chat_ch.send_message("C", "D", "second") | |
| log = mgr.get_siem_log() | |
| timestamps = [e["timestamp"] for e in log] | |
| assert timestamps == sorted(timestamps) | |
| def test_channels_reinitialised_on_start(self): | |
| """Verify that channels dict exists on a fresh NPCManager.""" | |
| mgr = NPCManager() | |
| assert "chat" in mgr.channels | |
| assert "voice" in mgr.channels | |
| assert "document" in mgr.channels | |
| class TestTopologyNormalization: | |
| def test_hosts_from_topology_normalizes_compiled_host_names(self): | |
| topology = { | |
| "hosts": ["web", "db"], | |
| "host_catalog": { | |
| "web": {"zone": "dmz", "services": ["nginx", "php-fpm"]}, | |
| "db": {"zone": "internal", "services": ["mysql"]}, | |
| }, | |
| } | |
| hosts = _hosts_from_topology(topology) | |
| assert [host["name"] for host in hosts] == ["web", "db"] | |
| assert hosts[0]["services"] == ["nginx", "php-fpm"] | |
| assert hosts[1]["services"] == ["mysql"] | |
| def test_helpers_accept_compiled_topology_host_names(self): | |
| topology = { | |
| "hosts": ["web", "db", "mail", "siem"], | |
| "host_catalog": { | |
| "web": {"services": ["nginx", "php-fpm"]}, | |
| "db": {"services": ["mysql"]}, | |
| "mail": {"services": ["postfix"]}, | |
| "siem": {"services": ["rsyslog"]}, | |
| }, | |
| "users": [ | |
| { | |
| "username": "svc_db", | |
| "password": "SvcDb!401", | |
| "hosts": ["db"], | |
| "role": "service", | |
| }, | |
| { | |
| "username": "admin", | |
| "password": "Adm1n!2024", | |
| "hosts": ["web", "siem"], | |
| "role": "admin", | |
| }, | |
| ], | |
| } | |
| scripts = _derive_scripts_from_topology(topology) | |
| env = _resolve_env_vars(topology, rate_lambda=10.0) | |
| assert "http_traffic.sh" in scripts | |
| assert "db_traffic.sh" in scripts | |
| assert "smtp_traffic.sh" in scripts | |
| assert env["WEB_HOST"] == "web" | |
| assert env["DB_HOST"] == "db" | |
| assert env["MAIL_HOST"] == "mail" | |
| assert env["SIEM_HOST"] == "siem" | |
| assert env["DB_USER"] == "svc_db" | |
| assert env["SSH_USER"] == "admin" | |