File size: 1,613 Bytes
6aecb2e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 | import pytest
@pytest.mark.asyncio
async def test_get_cache_service_disables_valkey_after_connection_failure(monkeypatch):
import landppt.services.cache_service as mod
from landppt.core.config import app_config
await mod.close_cache_service()
monkeypatch.setattr(app_config, "cache_backend", "valkey")
monkeypatch.setattr(app_config, "valkey_url", "valkey://valkey:6379")
attempts = {"count": 0}
async def fake_connect(self):
attempts["count"] += 1
self.enabled = False
self._connected = False
self._client = None
return False
monkeypatch.setattr(mod.CacheService, "connect", fake_connect)
cache1 = await mod.get_cache_service()
cache2 = await mod.get_cache_service()
assert attempts["count"] == 1
assert cache1 is cache2
assert cache1.enabled is False
assert cache1.is_connected is False
await mod.close_cache_service()
@pytest.mark.asyncio
async def test_get_cache_service_keeps_memory_backend_without_connect_attempt(monkeypatch):
import landppt.services.cache_service as mod
from landppt.core.config import app_config
await mod.close_cache_service()
monkeypatch.setattr(app_config, "cache_backend", "memory")
attempts = {"count": 0}
async def fake_connect(self):
attempts["count"] += 1
return False
monkeypatch.setattr(mod.CacheService, "connect", fake_connect)
cache = await mod.get_cache_service()
assert attempts["count"] == 0
assert cache.enabled is False
assert cache.is_connected is False
await mod.close_cache_service()
|