File size: 4,734 Bytes
a721dfa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2eec8c3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a721dfa
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
from __future__ import annotations

import os
import tempfile
import unittest

from backend.cache_utils import PersistentCache, TTLCache


class _FakeLogger:
    def error(self, msg: str, *args: object, **kwargs: object) -> None:
        return None

    def info(self, msg: str, *args: object, **kwargs: object) -> None:
        return None


class CacheUtilsTests(unittest.TestCase):
    def test_ttl_cache_returns_defensive_copy(self) -> None:
        cache = TTLCache()
        payload = {"forecast": [{"price": 100.0}], "meta": {"source": "memory"}}
        cache.set("sample", payload, ttl_seconds=30)

        payload["forecast"][0]["price"] = 999.0
        payload["meta"]["source"] = "mutated"

        cached_once = cache.get("sample")
        self.assertEqual(cached_once["forecast"][0]["price"], 100.0)
        self.assertEqual(cached_once["meta"]["source"], "memory")

        cached_once["forecast"][0]["price"] = 555.0
        cached_twice = cache.get("sample")
        self.assertEqual(cached_twice["forecast"][0]["price"], 100.0)

    def test_persistent_cache_queue_copies_payload_before_enqueue(self) -> None:
        class FakeQueue:
            def __init__(self) -> None:
                self.item = None

            def put_nowait(self, item: tuple[str, object, int]) -> None:
                self.item = item

        fd, temp_path = tempfile.mkstemp(suffix=".db")
        os.close(fd)
        try:
            cache = PersistentCache(
                db_path=temp_path,
                cache_version_getter=lambda: "test-version",
                logger=_FakeLogger(),
            )
            queue = FakeQueue()
            cache._queue = queue

            payload = {"nested": {"value": 1}}
            cache.set("queued", payload, ttl=60)
            payload["nested"]["value"] = 7

            self.assertIsNotNone(queue.item)
            _, queued_payload, queued_ttl = queue.item
            self.assertEqual(queued_ttl, 60)
            self.assertEqual(queued_payload["nested"]["value"], 1)
        finally:
            if os.path.exists(temp_path):
                try:
                    os.remove(temp_path)
                except PermissionError:
                    pass

    def test_persistent_cache_persists_payload_across_instances(self) -> None:
        fd, temp_path = tempfile.mkstemp(suffix=".db")
        os.close(fd)
        try:
            cache = PersistentCache(
                db_path=temp_path,
                cache_version_getter=lambda: "test-version",
                logger=_FakeLogger(),
            )
            payload = {"forecast": [{"price": 100.0}], "meta": {"source": "sqlite"}}
            cache.set("persisted", payload, ttl=60)
            payload["forecast"][0]["price"] = 999.0

            cached_same_instance = cache.get("persisted")
            self.assertIsNotNone(cached_same_instance)
            self.assertEqual(cached_same_instance["forecast"][0]["price"], 100.0)

            reopened_cache = PersistentCache(
                db_path=temp_path,
                cache_version_getter=lambda: "test-version",
                logger=_FakeLogger(),
            )
            cached_reopened = reopened_cache.get("persisted")
            self.assertIsNotNone(cached_reopened)
            self.assertEqual(cached_reopened["forecast"][0]["price"], 100.0)
            self.assertEqual(cached_reopened["meta"]["source"], "sqlite")
        finally:
            if os.path.exists(temp_path):
                try:
                    os.remove(temp_path)
                except PermissionError:
                    pass

    def test_persistent_cache_treats_version_mismatch_as_cache_miss(self) -> None:
        fd, temp_path = tempfile.mkstemp(suffix=".db")
        os.close(fd)
        try:
            cache_v1 = PersistentCache(
                db_path=temp_path,
                cache_version_getter=lambda: "v1",
                logger=_FakeLogger(),
            )
            cache_v1.set("versioned", {"value": 123}, ttl=60)

            cache_v2 = PersistentCache(
                db_path=temp_path,
                cache_version_getter=lambda: "v2",
                logger=_FakeLogger(),
            )
            self.assertIsNone(cache_v2.get("versioned"))

            cache_v1_reopened = PersistentCache(
                db_path=temp_path,
                cache_version_getter=lambda: "v1",
                logger=_FakeLogger(),
            )
            self.assertIsNone(cache_v1_reopened.get("versioned"))
        finally:
            if os.path.exists(temp_path):
                try:
                    os.remove(temp_path)
                except PermissionError:
                    pass


if __name__ == "__main__":
    unittest.main()