File size: 5,431 Bytes
6172a47 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 | import asyncio
import contextlib
from unittest.mock import AsyncMock, MagicMock
import pytest
from messaging.models import IncomingMessage
from messaging.trees.data import MessageNode, MessageState, MessageTree
from messaging.trees.processor import TreeQueueProcessor
@pytest.fixture
def tree_processor():
return TreeQueueProcessor()
@pytest.fixture
def sample_incoming():
return IncomingMessage(
text="test message",
chat_id="chat123",
user_id="user456",
message_id="msg789",
platform="telegram",
)
@pytest.fixture
def sample_node(sample_incoming):
return MessageNode(
node_id="msg789", incoming=sample_incoming, status_message_id="status123"
)
@pytest.fixture
def sample_tree(sample_node):
return MessageTree(sample_node)
@pytest.mark.asyncio
async def test_process_node_success(tree_processor, sample_tree, sample_node):
processor = AsyncMock()
await tree_processor.process_node(sample_tree, sample_node, processor)
processor.assert_called_once_with(sample_node.node_id, sample_node)
assert sample_tree._current_node_id is None
@pytest.mark.asyncio
async def test_process_node_cancelled(tree_processor, sample_tree, sample_node):
processor = AsyncMock(side_effect=asyncio.CancelledError)
with pytest.raises(asyncio.CancelledError):
await tree_processor.process_node(sample_tree, sample_node, processor)
assert sample_tree._current_node_id is None
@pytest.mark.asyncio
async def test_process_node_exception(tree_processor, sample_tree, sample_node):
processor = AsyncMock(side_effect=Exception("Test error"))
# We need to mock update_state to verify it was called
sample_tree.update_state = AsyncMock()
await tree_processor.process_node(sample_tree, sample_node, processor)
sample_tree.update_state.assert_called_once_with(
sample_node.node_id, MessageState.ERROR, error_message="Test error"
)
assert sample_tree._current_node_id is None
@pytest.mark.asyncio
async def test_enqueue_and_start_when_free(tree_processor, sample_tree):
processor = AsyncMock()
node_id = "node1"
# Mock get_node to return a node
node = MagicMock(spec=MessageNode)
sample_tree.get_node = MagicMock(return_value=node)
was_queued = await tree_processor.enqueue_and_start(sample_tree, node_id, processor)
assert was_queued is False
assert sample_tree._is_processing is True
assert sample_tree._current_node_id == node_id
assert sample_tree._current_task is not None
# Clean up task
sample_tree._current_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await sample_tree._current_task
@pytest.mark.asyncio
async def test_enqueue_and_start_when_busy(tree_processor, sample_tree):
processor = AsyncMock()
sample_tree._is_processing = True
node_id = "node1"
was_queued = await tree_processor.enqueue_and_start(sample_tree, node_id, processor)
assert was_queued is True
assert sample_tree._queue.qsize() == 1
assert sample_tree._queue.get_nowait() == node_id
def test_cancel_current_task(tree_processor, sample_tree):
mock_task = MagicMock(spec=asyncio.Task)
mock_task.done.return_value = False
sample_tree._current_task = mock_task
cancelled = tree_processor.cancel_current(sample_tree)
assert cancelled is True
mock_task.cancel.assert_called_once()
def test_cancel_current_task_already_done(tree_processor, sample_tree):
mock_task = MagicMock(spec=asyncio.Task)
mock_task.done.return_value = True
sample_tree._current_task = mock_task
cancelled = tree_processor.cancel_current(sample_tree)
assert cancelled is False
mock_task.cancel.assert_not_called()
@pytest.mark.asyncio
async def test_process_next_queue_empty(tree_processor, sample_tree):
processor = AsyncMock()
sample_tree._is_processing = True
await tree_processor._process_next(sample_tree, processor)
assert sample_tree._is_processing is False
@pytest.mark.asyncio
async def test_process_next_with_item(tree_processor, sample_tree):
processor = AsyncMock()
await sample_tree._queue.put("next_node")
node = MagicMock(spec=MessageNode)
sample_tree.get_node = MagicMock(return_value=node)
await tree_processor._process_next(sample_tree, processor)
assert sample_tree._current_node_id == "next_node"
assert sample_tree._current_task is not None
# Clean up
sample_tree._current_task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await sample_tree._current_task
@pytest.mark.asyncio
async def test_process_next_triggers_queue_update(sample_tree):
callback = AsyncMock()
processor = TreeQueueProcessor(queue_update_callback=callback)
await sample_tree._queue.put("next_node")
sample_tree.get_node = MagicMock(return_value=None)
await processor._process_next(sample_tree, AsyncMock())
callback.assert_awaited_once_with(sample_tree)
@pytest.mark.asyncio
async def test_process_next_triggers_node_started(sample_tree):
node_started = AsyncMock()
processor = TreeQueueProcessor(node_started_callback=node_started)
await sample_tree._queue.put("next_node")
sample_tree.get_node = MagicMock(return_value=None)
await processor._process_next(sample_tree, AsyncMock())
node_started.assert_awaited_once_with(sample_tree, "next_node")
|