Spaces:
Configuration error
Configuration error
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import logging | |
| from collections import deque | |
| from pathlib import Path | |
| from typing import Deque | |
| from .models import Measurement | |
| logger = logging.getLogger(__name__) | |
| class MeasurementCache: | |
| def __init__(self, *, maxlen: int = 48, storage_path: Path | str = "data/cache.json") -> None: | |
| self._buffer: Deque[Measurement] = deque(maxlen=maxlen) | |
| self._lock = asyncio.Lock() | |
| self._path = Path(storage_path) | |
| self._path.parent.mkdir(parents=True, exist_ok=True) | |
| self._load_from_disk() | |
| def _load_from_disk(self) -> None: | |
| if not self._path.exists(): | |
| return | |
| try: | |
| raw = json.loads(self._path.read_text("utf-8")) | |
| except json.JSONDecodeError as exc: | |
| logger.warning("Failed to read cache file %s: %s", self._path, exc) | |
| return | |
| if not isinstance(raw, list): | |
| logger.warning("Invalid cache file format: expected list, got %s", type(raw)) | |
| return | |
| for item in raw[-self._buffer.maxlen :]: | |
| try: | |
| measurement = Measurement.model_validate(item) | |
| except Exception as exc: # noqa: BLE001 | |
| logger.debug("Skipping invalid cache entry %s: %s", item, exc) | |
| continue | |
| self._buffer.append(measurement) | |
| logger.info("Loaded %s cached measurements", len(self._buffer)) | |
| def _dump_to_disk(self) -> None: | |
| data = [m.model_dump(mode="json") for m in self._buffer] | |
| tmp_path = self._path.with_suffix(".tmp") | |
| tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | |
| tmp_path.replace(self._path) | |
| async def add(self, measurement: Measurement, *, persist: bool = True) -> None: | |
| async with self._lock: | |
| latest = self._buffer[-1] if self._buffer else None | |
| if latest and latest.timestamp == measurement.timestamp: | |
| self._buffer.pop() | |
| self._buffer.append(measurement) | |
| if persist: | |
| await asyncio.to_thread(self._dump_to_disk) | |
| async def get_latest(self) -> Measurement | None: | |
| async with self._lock: | |
| return self._buffer[-1] if self._buffer else None | |
| async def get_history(self) -> list[Measurement]: | |
| async with self._lock: | |
| return list(self._buffer) | |
| def __len__(self) -> int: | |
| return len(self._buffer) | |