| """Tests for messaging/ module.""" |
|
|
| import json |
| from unittest.mock import patch |
|
|
| import pytest |
|
|
| |
|
|
|
|
| class TestMessagingModels: |
| """Test messaging models.""" |
|
|
| def test_incoming_message_creation(self): |
| """Test IncomingMessage dataclass.""" |
| from messaging.models import IncomingMessage |
|
|
| msg = IncomingMessage( |
| text="Hello", |
| chat_id="123", |
| user_id="456", |
| message_id="789", |
| platform="telegram", |
| ) |
| assert msg.text == "Hello" |
| assert msg.chat_id == "123" |
| assert msg.platform == "telegram" |
| assert msg.is_reply() is False |
|
|
| def test_incoming_message_with_reply(self): |
| """Test IncomingMessage as a reply.""" |
| from messaging.models import IncomingMessage |
|
|
| msg = IncomingMessage( |
| text="Reply text", |
| chat_id="123", |
| user_id="456", |
| message_id="789", |
| platform="discord", |
| reply_to_message_id="100", |
| ) |
| assert msg.is_reply() is True |
| assert msg.reply_to_message_id == "100" |
|
|
|
|
| class TestMessagingBase: |
| """Test MessagingPlatform ABC.""" |
|
|
| def test_platform_is_abstract(self): |
| """Verify MessagingPlatform cannot be instantiated.""" |
| from messaging.platforms.base import MessagingPlatform |
|
|
| with pytest.raises(TypeError): |
| MessagingPlatform() |
|
|
|
|
| class TestSessionStore: |
| """Test SessionStore.""" |
|
|
| def test_session_store_init(self, tmp_path): |
| """Test SessionStore initialization.""" |
| from messaging.session import SessionStore |
|
|
| store = SessionStore(storage_path=str(tmp_path / "sessions.json")) |
| assert store._trees == {} |
|
|
| |
|
|
| def test_save_and_get_tree(self, tmp_path): |
| """Test saving and retrieving trees.""" |
| from messaging.session import SessionStore |
|
|
| store = SessionStore(storage_path=str(tmp_path / "sessions.json")) |
|
|
| tree_data = { |
| "root": "r1", |
| "nodes": {"r1": {"content": "root"}, "n1": {"content": "child"}}, |
| } |
| store.save_tree("r1", tree_data) |
|
|
| loaded = store.get_tree("r1") |
| assert loaded == tree_data |
|
|
| |
| node_map = store.get_node_mapping() |
| assert node_map["r1"] == "r1" |
| assert node_map["n1"] == "r1" |
|
|
| def test_register_node(self, tmp_path): |
| """Test manual node registration.""" |
| from messaging.session import SessionStore |
|
|
| store = SessionStore(storage_path=str(tmp_path / "sessions.json")) |
| store.register_node("n_manual", "r_manual") |
| assert store.get_node_mapping()["n_manual"] == "r_manual" |
|
|
| |
|
|
| def test_load_existing_file_with_trees(self, tmp_path): |
| """Test loading file with trees (legacy sessions ignored).""" |
| from messaging.session import SessionStore |
|
|
| data = { |
| "sessions": {}, |
| "trees": {"r1": {"root_id": "r1", "nodes": {"r1": {}}}}, |
| "node_to_tree": {"r1": "r1"}, |
| "message_log": {}, |
| } |
|
|
| p = tmp_path / "sessions.json" |
| with open(p, "w") as f: |
| json.dump(data, f) |
|
|
| store = SessionStore(storage_path=str(p)) |
| assert store.get_tree("r1") is not None |
|
|
| def test_load_corrupt_file(self, tmp_path): |
| """Test loading corrupt/invalid json file.""" |
| p = tmp_path / "sessions.json" |
| with open(p, "w") as f: |
| f.write("{invalid json") |
|
|
| from messaging.session import SessionStore |
|
|
| |
| store = SessionStore(storage_path=str(p)) |
| assert store._trees == {} |
|
|
| def test_save_error_handling(self, tmp_path): |
| """Test error during save.""" |
| from messaging.session import SessionStore |
|
|
| store = SessionStore(storage_path=str(tmp_path / "sessions.json")) |
| store.save_tree("r1", {"root_id": "r1", "nodes": {"r1": {}}}) |
|
|
| |
| with patch("builtins.open", side_effect=OSError("Disk full")): |
| store.save_tree("r2", {"root_id": "r2", "nodes": {"r2": {}}}) |
|
|
| |
| assert "r2" in store._trees |
|
|
|
|
| class TestTreeQueueManager: |
| """Test TreeQueueManager.""" |
|
|
| def test_tree_queue_manager_init(self): |
| """Test TreeQueueManager initialization.""" |
| from messaging.trees.queue_manager import TreeQueueManager |
|
|
| mgr = TreeQueueManager() |
| assert mgr.get_tree_count() == 0 |
|
|
| def test_tree_not_busy_initially(self): |
| """Test tree is not busy when no messages.""" |
| from messaging.trees.queue_manager import TreeQueueManager |
|
|
| mgr = TreeQueueManager() |
| assert mgr.is_tree_busy("nonexistent") is False |
|
|
| def test_get_queue_size_empty(self): |
| """Test queue size is 0 for non-existent node.""" |
| from messaging.trees.queue_manager import TreeQueueManager |
|
|
| mgr = TreeQueueManager() |
| assert mgr.get_queue_size("nonexistent") == 0 |
|
|
| @pytest.mark.asyncio |
| async def test_create_tree_and_enqueue(self): |
| """Test creating a tree and enqueueing.""" |
| from messaging.models import IncomingMessage |
| from messaging.trees.queue_manager import TreeQueueManager |
|
|
| mgr = TreeQueueManager() |
| processed = [] |
|
|
| async def processor(node_id, node): |
| processed.append(node_id) |
|
|
| incoming = IncomingMessage( |
| text="test", chat_id="1", user_id="1", message_id="1", platform="test" |
| ) |
|
|
| await mgr.create_tree("1", incoming, "status_1") |
| was_queued = await mgr.enqueue("1", processor) |
|
|
| |
| assert was_queued is False |
|
|
| @pytest.mark.asyncio |
| async def test_cancel_tree_empty(self): |
| """Test cancelling non-existent tree.""" |
| from messaging.trees.queue_manager import TreeQueueManager |
|
|
| mgr = TreeQueueManager() |
| cancelled = await mgr.cancel_tree("nonexistent") |
| assert cancelled == [] |
|
|