File size: 2,549 Bytes
15d848a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
from __future__ import annotations

import asyncio
import json
import logging
from collections import deque
from pathlib import Path
from typing import Deque

from .models import Measurement

logger = logging.getLogger(__name__)


class MeasurementCache:
    def __init__(self, *, maxlen: int = 48, storage_path: Path | str = "data/cache.json") -> None:
        self._buffer: Deque[Measurement] = deque(maxlen=maxlen)
        self._lock = asyncio.Lock()
        self._path = Path(storage_path)
        self._path.parent.mkdir(parents=True, exist_ok=True)
        self._load_from_disk()

    def _load_from_disk(self) -> None:
        if not self._path.exists():
            return
        try:
            raw = json.loads(self._path.read_text("utf-8"))
        except json.JSONDecodeError as exc:
            logger.warning("Failed to read cache file %s: %s", self._path, exc)
            return

        if not isinstance(raw, list):
            logger.warning("Invalid cache file format: expected list, got %s", type(raw))
            return

        for item in raw[-self._buffer.maxlen :]:
            try:
                measurement = Measurement.model_validate(item)
            except Exception as exc:  # noqa: BLE001
                logger.debug("Skipping invalid cache entry %s: %s", item, exc)
                continue
            self._buffer.append(measurement)

        logger.info("Loaded %s cached measurements", len(self._buffer))

    def _dump_to_disk(self) -> None:
        data = [m.model_dump(mode="json") for m in self._buffer]
        tmp_path = self._path.with_suffix(".tmp")
        tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
        tmp_path.replace(self._path)

    async def add(self, measurement: Measurement, *, persist: bool = True) -> None:
        async with self._lock:
            latest = self._buffer[-1] if self._buffer else None
            if latest and latest.timestamp == measurement.timestamp:
                self._buffer.pop()
            self._buffer.append(measurement)

        if persist:
            await asyncio.to_thread(self._dump_to_disk)

    async def get_latest(self) -> Measurement | None:
        async with self._lock:
            return self._buffer[-1] if self._buffer else None

    async def get_history(self) -> list[Measurement]:
        async with self._lock:
            return list(self._buffer)

    def __len__(self) -> int:
        return len(self._buffer)