SuperAI_Forecast / backend /test_cache_utils.py
Thang6822
Stabilize workspace UX and deployment flow
2eec8c3
from __future__ import annotations
import os
import tempfile
import unittest
from backend.cache_utils import PersistentCache, TTLCache
class _FakeLogger:
def error(self, msg: str, *args: object, **kwargs: object) -> None:
return None
def info(self, msg: str, *args: object, **kwargs: object) -> None:
return None
class CacheUtilsTests(unittest.TestCase):
def test_ttl_cache_returns_defensive_copy(self) -> None:
cache = TTLCache()
payload = {"forecast": [{"price": 100.0}], "meta": {"source": "memory"}}
cache.set("sample", payload, ttl_seconds=30)
payload["forecast"][0]["price"] = 999.0
payload["meta"]["source"] = "mutated"
cached_once = cache.get("sample")
self.assertEqual(cached_once["forecast"][0]["price"], 100.0)
self.assertEqual(cached_once["meta"]["source"], "memory")
cached_once["forecast"][0]["price"] = 555.0
cached_twice = cache.get("sample")
self.assertEqual(cached_twice["forecast"][0]["price"], 100.0)
def test_persistent_cache_queue_copies_payload_before_enqueue(self) -> None:
class FakeQueue:
def __init__(self) -> None:
self.item = None
def put_nowait(self, item: tuple[str, object, int]) -> None:
self.item = item
fd, temp_path = tempfile.mkstemp(suffix=".db")
os.close(fd)
try:
cache = PersistentCache(
db_path=temp_path,
cache_version_getter=lambda: "test-version",
logger=_FakeLogger(),
)
queue = FakeQueue()
cache._queue = queue
payload = {"nested": {"value": 1}}
cache.set("queued", payload, ttl=60)
payload["nested"]["value"] = 7
self.assertIsNotNone(queue.item)
_, queued_payload, queued_ttl = queue.item
self.assertEqual(queued_ttl, 60)
self.assertEqual(queued_payload["nested"]["value"], 1)
finally:
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except PermissionError:
pass
def test_persistent_cache_persists_payload_across_instances(self) -> None:
fd, temp_path = tempfile.mkstemp(suffix=".db")
os.close(fd)
try:
cache = PersistentCache(
db_path=temp_path,
cache_version_getter=lambda: "test-version",
logger=_FakeLogger(),
)
payload = {"forecast": [{"price": 100.0}], "meta": {"source": "sqlite"}}
cache.set("persisted", payload, ttl=60)
payload["forecast"][0]["price"] = 999.0
cached_same_instance = cache.get("persisted")
self.assertIsNotNone(cached_same_instance)
self.assertEqual(cached_same_instance["forecast"][0]["price"], 100.0)
reopened_cache = PersistentCache(
db_path=temp_path,
cache_version_getter=lambda: "test-version",
logger=_FakeLogger(),
)
cached_reopened = reopened_cache.get("persisted")
self.assertIsNotNone(cached_reopened)
self.assertEqual(cached_reopened["forecast"][0]["price"], 100.0)
self.assertEqual(cached_reopened["meta"]["source"], "sqlite")
finally:
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except PermissionError:
pass
def test_persistent_cache_treats_version_mismatch_as_cache_miss(self) -> None:
fd, temp_path = tempfile.mkstemp(suffix=".db")
os.close(fd)
try:
cache_v1 = PersistentCache(
db_path=temp_path,
cache_version_getter=lambda: "v1",
logger=_FakeLogger(),
)
cache_v1.set("versioned", {"value": 123}, ttl=60)
cache_v2 = PersistentCache(
db_path=temp_path,
cache_version_getter=lambda: "v2",
logger=_FakeLogger(),
)
self.assertIsNone(cache_v2.get("versioned"))
cache_v1_reopened = PersistentCache(
db_path=temp_path,
cache_version_getter=lambda: "v1",
logger=_FakeLogger(),
)
self.assertIsNone(cache_v1_reopened.get("versioned"))
finally:
if os.path.exists(temp_path):
try:
os.remove(temp_path)
except PermissionError:
pass
if __name__ == "__main__":
unittest.main()