File size: 3,338 Bytes
2fb233c
e994c16
 
 
2fb233c
 
e994c16
 
 
 
 
2fb233c
e994c16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2fb233c
 
 
 
 
 
 
 
 
 
 
e994c16
 
 
 
 
 
 
 
 
 
 
 
 
2fb233c
e994c16
 
2fb233c
 
e994c16
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
"""Caching. In-process LRU cache for diagnosis results keyed by image and metadata.

For privacy, we hash the image bytes; the image itself is never persisted
in the cache. Identical scans produce identical hashes, giving us a simple
content-addressed cache. Metadata is hashed into the key so the same scan can
be diagnosed again when the user corrects film stock, storage, or confidence.
"""

from __future__ import annotations

import hashlib
import json
import logging
import time
from collections import OrderedDict
from threading import Lock
from typing import Any

from config import get_app_config

logger = logging.getLogger(__name__)


class DiagnosisCache:
    """Thread-safe LRU cache for diagnosis results."""

    def __init__(self, max_size: int = 64, ttl_seconds: int = 3600) -> None:
        self._max_size = max_size
        self._ttl = ttl_seconds
        self._store: OrderedDict[str, tuple[float, dict]] = OrderedDict()
        self._lock = Lock()
        self._hits = 0
        self._misses = 0

    @staticmethod
    def hash_image(image_bytes: bytes) -> str:
        return hashlib.sha256(image_bytes).hexdigest()

    @classmethod
    def hash_key(cls, image_bytes: bytes, metadata: dict | None = None) -> str:
        image_hash = cls.hash_image(image_bytes)
        if not metadata:
            return image_hash
        metadata_json = json.dumps(metadata, sort_keys=True, separators=(",", ":"))
        metadata_hash = hashlib.sha256(metadata_json.encode("utf-8")).hexdigest()
        return f"{image_hash}:{metadata_hash}"

    def get(self, image_bytes: bytes, metadata: dict | None = None) -> dict | None:
        key = self.hash_key(image_bytes, metadata)
        now = time.time()
        with self._lock:
            entry = self._store.get(key)
            if entry is None:
                self._misses += 1
                return None
            ts, value = entry
            if now - ts > self._ttl:
                del self._store[key]
                self._misses += 1
                return None
            self._store.move_to_end(key)
            self._hits += 1
            logger.info("Cache hit for %s", key[:25])
            return value

    def put(self, image_bytes: bytes, value: dict, metadata: dict | None = None) -> None:
        key = self.hash_key(image_bytes, metadata)
        now = time.time()
        with self._lock:
            self._store[key] = (now, value)
            self._store.move_to_end(key)
            while len(self._store) > self._max_size:
                self._store.popitem(last=False)

    def stats(self) -> dict:
        with self._lock:
            return {
                "size": len(self._store),
                "max_size": self._max_size,
                "hits": self._hits,
                "misses": self._misses,
            }

    def clear(self) -> None:
        with self._lock:
            self._store.clear()
            self._hits = 0
            self._misses = 0


_default_cache: DiagnosisCache | None = None


def get_cache() -> DiagnosisCache:
    global _default_cache
    if _default_cache is None:
        cfg = get_app_config()
        _default_cache = DiagnosisCache(
            max_size=cfg.cache_size,
            ttl_seconds=cfg.cache_ttl_seconds,
        )
    return _default_cache


__all__ = ["DiagnosisCache", "get_cache"]