File size: 5,506 Bytes
abd4352 612eafc abd4352 612eafc abd4352 612eafc abd4352 612eafc abd4352 612eafc abd4352 612eafc abd4352 612eafc abd4352 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 | """
tests/unit/test_executor_cache.py
Tests for the executor node — cache hit/miss behavior and result capping.
"""
import json
import uuid
import pytest
from unittest.mock import MagicMock, patch
@pytest.fixture
def executor_state(sample_state):
return {
**sample_state,
"generated_code": "SELECT product, SUM(amount) AS total FROM orders GROUP BY product",
"code_type": "sql",
"execution_error": None,
}
@pytest.mark.unit
class TestExecutorCaching:
def test_cache_hit_returns_cached_result(self, executor_state, mocker):
cached = [{"product": "Widget", "total": 100.0}]
mock_redis = MagicMock()
mock_redis.get.return_value = json.dumps(cached)
mocker.patch("agent.nodes.executor._get_redis", return_value=mock_redis)
from agent.nodes.executor import executor
result = executor(executor_state)
assert result["from_cache"] is True
assert result["execution_result"] == cached
def test_cache_miss_calls_connector(self, executor_state, mocker):
mock_redis = MagicMock()
mock_redis.get.side_effect = Exception("miss")
mocker.patch("agent.nodes.executor._get_redis", return_value=mock_redis)
rows = [{"product": "Widget", "total": 99.0}]
mock_connector = MagicMock()
mock_connector.execute_sql.return_value = rows
mocker.patch("agent.nodes.executor.get_connector", return_value=mock_connector)
from agent.nodes.executor import executor
result = executor(executor_state)
assert result["from_cache"] is False
assert result["execution_result"] == rows
mock_connector.execute_sql.assert_called_once()
def test_cache_write_called_on_miss(self, executor_state, mocker):
mock_redis = MagicMock()
mock_redis.get.side_effect = Exception("miss")
mocker.patch("agent.nodes.executor._get_redis", return_value=mock_redis)
mock_connector = MagicMock()
mock_connector.execute_sql.return_value = [{"x": 1}]
mocker.patch("agent.nodes.executor.get_connector", return_value=mock_connector)
from agent.nodes.executor import executor
executor(executor_state)
mock_redis.setex.assert_called_once()
# TTL should be 3600
args = mock_redis.setex.call_args[0]
assert args[1] == 3600
def test_safety_block_skips_execution(self, executor_state, mocker):
mock_redis = MagicMock()
mocker.patch("agent.nodes.executor._get_redis", return_value=mock_redis)
mock_connector = MagicMock()
mocker.patch("agent.nodes.executor.get_connector", return_value=mock_connector)
state = {**executor_state, "execution_error": "SAFETY_BLOCK: Drop operation"}
from agent.nodes.executor import executor
result = executor(state)
mock_connector.execute_sql.assert_not_called()
assert result["execution_error"].startswith("SAFETY_BLOCK")
def test_result_capped_at_500_rows(self, executor_state, mocker):
mock_redis = MagicMock()
mock_redis.get.side_effect = Exception("miss")
mocker.patch("agent.nodes.executor._get_redis", return_value=mock_redis)
big_result = [{"id": i} for i in range(600)]
mock_connector = MagicMock()
mock_connector.execute_sql.return_value = big_result
mocker.patch("agent.nodes.executor.get_connector", return_value=mock_connector)
from agent.nodes.executor import executor
result = executor(executor_state)
assert len(result["execution_result"]) == 500
def test_connector_exception_sets_error(self, executor_state, mocker):
mock_redis = MagicMock()
mock_redis.get.side_effect = Exception("miss")
mocker.patch("agent.nodes.executor._get_redis", return_value=mock_redis)
mock_connector = MagicMock()
mock_connector.execute_sql.side_effect = Exception("relation does not exist")
mocker.patch("agent.nodes.executor.get_connector", return_value=mock_connector)
from agent.nodes.executor import executor
result = executor(executor_state)
assert result["execution_result"] is None
assert "relation does not exist" in result["execution_error"]
assert result["from_cache"] is False
def test_cache_key_differs_by_query(self, mocker):
"""Two different queries must produce different cache keys."""
from agent.nodes.executor import _cache_key
k1 = _cache_key("neon:public", "SELECT 1", "sql")
k2 = _cache_key("neon:public", "SELECT 2", "sql")
assert k1 != k2
def test_cache_key_differs_by_connector(self, mocker):
from agent.nodes.executor import _cache_key
k1 = _cache_key("neon:public", "SELECT 1", "sql")
k2 = _cache_key("csv:http://foo", "SELECT 1", "sql")
assert k1 != k2
def test_latency_ms_recorded(self, executor_state, mocker):
mock_redis = MagicMock()
mock_redis.get.side_effect = Exception("miss")
mocker.patch("agent.nodes.executor._get_redis", return_value=mock_redis)
mock_connector = MagicMock()
mock_connector.execute_sql.return_value = [{"x": 1}]
mocker.patch("agent.nodes.executor.get_connector", return_value=mock_connector)
from agent.nodes.executor import executor
result = executor(executor_state)
assert result["latency_ms"] is not None
assert result["latency_ms"] >= 0
|