Spaces:
Runtime error
Runtime error
| # app/app/cache.py | |
| from pathlib import Path | |
| import os, json, tempfile, os | |
| import asyncio | |
| class Cache: | |
| def __init__(self, filename: str = "cache.json"): | |
| # bevorzugt /data (persistenter Storage auf HF), sonst /tmp | |
| base = Path(os.getenv("APP_CACHE_DIR", "/data")) | |
| if not os.access(base, os.W_OK): | |
| base = Path("/tmp/app-cache") | |
| base.mkdir(parents=True, exist_ok=True) | |
| self.path = base / filename | |
| def _dump_to_disk(self, data: dict | list | None = None): | |
| # vorhandenen Zustand laden/ersetzen | |
| if data is None and self.path.exists(): | |
| try: | |
| data = json.loads(self.path.read_text(encoding="utf-8")) | |
| except Exception: | |
| data = None | |
| data = data if data is not None else {} | |
| # atomar schreiben | |
| with tempfile.NamedTemporaryFile( | |
| "w", delete=False, dir=self.path.parent, suffix=".tmp", encoding="utf-8" | |
| ) as tmp: | |
| json.dump(data, tmp, ensure_ascii=False, indent=2) | |
| tmp_name = tmp.name | |
| os.replace(tmp_name, self.path) # atomar | |
| async def add(self, measurement: dict): | |
| # ... füge measurement in deinen Speicher ein ... | |
| # Beispiel (anpassen, falls du eine Liste etc. führst): | |
| data = {"last": measurement} | |
| await asyncio.to_thread(self._dump_to_disk, data) | |