"""Tests for multimodal NPC communication channels (issue #30). Tests ChatChannel, VoiceChannel, DocumentChannel, chat traffic generation, and SIEM log aggregation. No Docker dependency. """ from __future__ import annotations import pytest from open_range.builder.npc.channels import ( ChatChannel, ChatMessage, DocumentChannel, DocumentRecord, NPCChannel, VoiceChannel, VoiceTranscript, ) from open_range.builder.npc.chat_traffic import generate_chat_traffic from open_range.builder.npc.npc_manager import ( NPCManager, _derive_scripts_from_topology, _hosts_from_topology, _resolve_env_vars, ) from open_range.protocols import NPCPersona, NPCTrafficSpec, SnapshotSpec, TaskSpec # =================================================================== # Fixtures # =================================================================== @pytest.fixture def low_awareness_persona() -> NPCPersona: return NPCPersona( name="Janet Smith", role="Marketing Coordinator", department="Marketing", security_awareness=0.2, susceptibility={ "phishing_email": 0.8, "attachment_opening": 0.9, "vishing": 0.7, "credential_sharing": 0.6, }, ) @pytest.fixture def high_awareness_persona() -> NPCPersona: return NPCPersona( name="David Chen", role="CISO", department="Security", security_awareness=0.95, susceptibility={ "phishing_email": 0.05, "attachment_opening": 0.1, "vishing": 0.05, "credential_sharing": 0.01, }, ) @pytest.fixture def two_personas( low_awareness_persona: NPCPersona, high_awareness_persona: NPCPersona, ) -> list[NPCPersona]: return [low_awareness_persona, high_awareness_persona] # =================================================================== # NPCChannel enum # =================================================================== class TestNPCChannel: def test_enum_values(self): assert NPCChannel.EMAIL == "email" assert NPCChannel.CHAT == "chat" assert NPCChannel.VOICE == "voice" assert NPCChannel.DOCUMENT == "document" # =================================================================== # ChatChannel # =================================================================== class TestChatChannel: def test_send_and_receive(self): ch = ChatChannel() ch.send_message("Alice", "Bob", "Hello Bob!") msgs = ch.get_messages("Bob") assert len(msgs) == 1 assert msgs[0].sender == "Alice" assert msgs[0].content == "Hello Bob!" def test_get_messages_filters_by_recipient(self): ch = ChatChannel() ch.send_message("Alice", "Bob", "For Bob") ch.send_message("Alice", "Carol", "For Carol") assert len(ch.get_messages("Bob")) == 1 assert len(ch.get_messages("Carol")) == 1 assert len(ch.get_messages("Dave")) == 0 def test_channel_log_returns_all(self): ch = ChatChannel() ch.send_message("Alice", "Bob", "msg1") ch.send_message("Carol", "Dave", "msg2") log = ch.get_channel_log() assert len(log) == 2 assert all(entry["type"] == "chat" for entry in log) assert log[0]["sender"] == "Alice" assert log[1]["sender"] == "Carol" def test_clear(self): ch = ChatChannel() ch.send_message("A", "B", "test") ch.clear() assert ch.get_channel_log() == [] assert ch.get_messages("B") == [] def test_message_has_timestamp(self): ch = ChatChannel() msg = ch.send_message("A", "B", "ts test") assert isinstance(msg.timestamp, float) assert msg.timestamp > 0 def test_send_returns_chat_message(self): ch = ChatChannel() msg = ch.send_message("A", "B", "hello") assert isinstance(msg, ChatMessage) # =================================================================== # VoiceChannel # =================================================================== class TestVoiceChannel: def test_initiate_call(self): ch = VoiceChannel() call = ch.initiate_call( caller="Attacker", callee="Janet Smith", pretext="Hi, this is IT support. I need your password for a security check.", ) assert isinstance(call, VoiceTranscript) assert call.caller == "Attacker" assert call.callee == "Janet Smith" assert len(call.transcript) == 1 assert call.transcript[0]["speaker"] == "Attacker" def test_respond_low_awareness(self, low_awareness_persona: NPCPersona): ch = VoiceChannel() call = ch.initiate_call("Attacker", low_awareness_persona.name, "Need your creds") result = ch.respond(low_awareness_persona, call) assert len(result.transcript) == 2 assert result.response_action in ("comply", "deflect", "refuse") # Low awareness + high vishing susceptibility -> comply assert result.response_action == "comply" def test_respond_high_awareness(self, high_awareness_persona: NPCPersona): ch = VoiceChannel() call = ch.initiate_call("Attacker", high_awareness_persona.name, "Need your password") result = ch.respond(high_awareness_persona, call) assert result.response_action == "report_to_IT" assert len(result.transcript) == 2 def test_get_call_log(self): ch = VoiceChannel() ch.initiate_call("A", "B", "pretext1") ch.initiate_call("C", "D", "pretext2") log = ch.get_call_log() assert len(log) == 2 assert all(entry["type"] == "voice" for entry in log) def test_clear(self): ch = VoiceChannel() ch.initiate_call("A", "B", "test") ch.clear() assert ch.get_call_log() == [] def test_call_has_timestamp(self): ch = VoiceChannel() call = ch.initiate_call("A", "B", "test") assert isinstance(call.timestamp, float) assert call.timestamp > 0 def test_respond_sets_duration(self, low_awareness_persona: NPCPersona): ch = VoiceChannel() call = ch.initiate_call("A", low_awareness_persona.name, "hello") result = ch.respond(low_awareness_persona, call) assert result.duration_s > 0 # =================================================================== # DocumentChannel # =================================================================== class TestDocumentChannel: def test_share_document(self): ch = DocumentChannel() doc = ch.share_document("Alice", "Bob", "report.pdf", "Q4 financial report") assert isinstance(doc, DocumentRecord) assert doc.sender == "Alice" assert doc.recipient == "Bob" assert doc.filename == "report.pdf" assert doc.accessed is False def test_inspect_low_awareness(self, low_awareness_persona: NPCPersona): ch = DocumentChannel() doc = ch.share_document("Attacker", low_awareness_persona.name, "malware.docx", "Invoice") decision = ch.inspect_document(low_awareness_persona, doc) # Low awareness + high attachment_opening susceptibility -> opened assert decision == "opened" assert doc.accessed is True assert doc.access_decision == "opened" def test_inspect_high_awareness(self, high_awareness_persona: NPCPersona): ch = DocumentChannel() doc = ch.share_document("Attacker", high_awareness_persona.name, "malware.docx", "Invoice") decision = ch.inspect_document(high_awareness_persona, doc) assert decision == "reported" assert doc.accessed is False assert doc.access_decision == "reported" def test_get_document_log(self): ch = DocumentChannel() ch.share_document("A", "B", "f1.txt", "desc1") ch.share_document("C", "D", "f2.txt", "desc2") log = ch.get_document_log() assert len(log) == 2 assert all(entry["type"] == "document" for entry in log) def test_clear(self): ch = DocumentChannel() ch.share_document("A", "B", "f.txt", "d") ch.clear() assert ch.get_document_log() == [] def test_document_has_timestamp(self): ch = DocumentChannel() doc = ch.share_document("A", "B", "f.txt", "d") assert isinstance(doc.timestamp, float) assert doc.timestamp > 0 # =================================================================== # Chat traffic generator # =================================================================== class TestChatTraffic: def test_generate_messages(self, two_personas: list[NPCPersona]): ch = ChatChannel() msgs = generate_chat_traffic(two_personas, ch, num_messages=5, seed=42) assert len(msgs) == 5 assert all("sender" in m for m in msgs) assert all("recipient" in m for m in msgs) assert all("content" in m for m in msgs) def test_deterministic_with_seed(self, two_personas: list[NPCPersona]): ch1 = ChatChannel() msgs1 = generate_chat_traffic(two_personas, ch1, num_messages=5, seed=99) ch2 = ChatChannel() msgs2 = generate_chat_traffic(two_personas, ch2, num_messages=5, seed=99) # Same seed -> same messages assert [m["content"] for m in msgs1] == [m["content"] for m in msgs2] def test_no_self_messages(self, two_personas: list[NPCPersona]): ch = ChatChannel() msgs = generate_chat_traffic(two_personas, ch, num_messages=20, seed=1) for m in msgs: assert m["sender"] != m["recipient"] def test_empty_with_fewer_than_two_personas(self): ch = ChatChannel() single = [NPCPersona(name="Solo")] msgs = generate_chat_traffic(single, ch, num_messages=5) assert msgs == [] def test_messages_appear_in_channel(self, two_personas: list[NPCPersona]): ch = ChatChannel() generate_chat_traffic(two_personas, ch, num_messages=5, seed=42) log = ch.get_channel_log() assert len(log) == 5 # =================================================================== # SIEM log aggregation (NPCManager) # =================================================================== class TestChannelSIEM: def test_siem_log_aggregates_channels(self): mgr = NPCManager() chat_ch = mgr.channels["chat"] voice_ch = mgr.channels["voice"] doc_ch = mgr.channels["document"] assert isinstance(chat_ch, ChatChannel) assert isinstance(voice_ch, VoiceChannel) assert isinstance(doc_ch, DocumentChannel) chat_ch.send_message("A", "B", "hello") voice_ch.initiate_call("C", "D", "pretext") doc_ch.share_document("E", "F", "file.txt", "desc") log = mgr.get_siem_log() assert len(log) == 3 types = {entry["type"] for entry in log} assert types == {"chat", "voice", "document"} def test_siem_log_sorted_by_timestamp(self): mgr = NPCManager() chat_ch = mgr.channels["chat"] assert isinstance(chat_ch, ChatChannel) # Send multiple messages chat_ch.send_message("A", "B", "first") chat_ch.send_message("C", "D", "second") log = mgr.get_siem_log() timestamps = [e["timestamp"] for e in log] assert timestamps == sorted(timestamps) def test_channels_reinitialised_on_start(self): """Verify that channels dict exists on a fresh NPCManager.""" mgr = NPCManager() assert "chat" in mgr.channels assert "voice" in mgr.channels assert "document" in mgr.channels class TestTopologyNormalization: def test_hosts_from_topology_normalizes_compiled_host_names(self): topology = { "hosts": ["web", "db"], "host_catalog": { "web": {"zone": "dmz", "services": ["nginx", "php-fpm"]}, "db": {"zone": "internal", "services": ["mysql"]}, }, } hosts = _hosts_from_topology(topology) assert [host["name"] for host in hosts] == ["web", "db"] assert hosts[0]["services"] == ["nginx", "php-fpm"] assert hosts[1]["services"] == ["mysql"] def test_helpers_accept_compiled_topology_host_names(self): topology = { "hosts": ["web", "db", "mail", "siem"], "host_catalog": { "web": {"services": ["nginx", "php-fpm"]}, "db": {"services": ["mysql"]}, "mail": {"services": ["postfix"]}, "siem": {"services": ["rsyslog"]}, }, "users": [ { "username": "svc_db", "password": "SvcDb!401", "hosts": ["db"], "role": "service", }, { "username": "admin", "password": "Adm1n!2024", "hosts": ["web", "siem"], "role": "admin", }, ], } scripts = _derive_scripts_from_topology(topology) env = _resolve_env_vars(topology, rate_lambda=10.0) assert "http_traffic.sh" in scripts assert "db_traffic.sh" in scripts assert "smtp_traffic.sh" in scripts assert env["WEB_HOST"] == "web" assert env["DB_HOST"] == "db" assert env["MAIL_HOST"] == "mail" assert env["SIEM_HOST"] == "siem" assert env["DB_USER"] == "svc_db" assert env["SSH_USER"] == "admin"