Spaces:
Running
Running
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import sqlite3 | |
| import time | |
| from typing import Any, Callable, Dict, Optional, Protocol, Tuple | |
| from backend.runtime_utils import clone_cache_payload | |
| class LoggerLike(Protocol): | |
| def error(self, msg: str, *args: Any, **kwargs: Any) -> None: ... | |
| def info(self, msg: str, *args: Any, **kwargs: Any) -> None: ... | |
| class PersistentCache: | |
| """SQLite-based persistent cache layer.""" | |
| def __init__( | |
| self, | |
| db_path: str, | |
| cache_version_getter: Callable[[], str], | |
| logger: LoggerLike, | |
| ) -> None: | |
| self.db_path = db_path | |
| self._cache_version_getter = cache_version_getter | |
| self._logger = logger | |
| db_dir = os.path.dirname(os.path.abspath(self.db_path)) | |
| os.makedirs(db_dir, exist_ok=True) | |
| self._init_db() | |
| def _init_db(self) -> None: | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute( | |
| """ | |
| CREATE TABLE IF NOT EXISTS cache ( | |
| key TEXT PRIMARY KEY, | |
| payload BLOB, | |
| expiry REAL, | |
| version TEXT | |
| ) | |
| """ | |
| ) | |
| conn.execute("CREATE INDEX IF NOT EXISTS idx_expiry ON cache(expiry)") | |
| def get(self, key: str) -> Optional[Any]: | |
| try: | |
| cache_version = self._cache_version_getter() | |
| with sqlite3.connect(self.db_path) as conn: | |
| cur = conn.execute( | |
| "SELECT payload, expiry, version FROM cache WHERE key = ?", | |
| (key,), | |
| ) | |
| row = cur.fetchone() | |
| if row: | |
| payload, expiry, version = row | |
| if time.time() < expiry and version == cache_version: | |
| return json.loads(payload) | |
| conn.execute("DELETE FROM cache WHERE key = ?", (key,)) | |
| except Exception as ex: | |
| self._logger.error("[Persistence] Read error: %s", ex) | |
| return None | |
| def set(self, key: str, payload: Any, ttl: int) -> None: | |
| cached_payload = clone_cache_payload(payload) | |
| if hasattr(self, "_queue"): | |
| self._queue.put_nowait((key, cached_payload, ttl)) | |
| return | |
| self._write_sync(key, cached_payload, ttl) | |
| def _write_sync(self, key: str, payload: Any, ttl: int) -> None: | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute( | |
| "INSERT OR REPLACE INTO cache (key, payload, expiry, version) VALUES (?, ?, ?, ?)", | |
| (key, json.dumps(payload), time.time() + ttl, self._cache_version_getter()), | |
| ) | |
| except Exception as ex: | |
| self._logger.error("[Persistence] Sync write error: %s", ex) | |
| async def start_writer(self) -> None: | |
| self._queue: asyncio.Queue[Tuple[str, Any, int]] = asyncio.Queue() | |
| self._logger.info("[Persistence] Async writer started.") | |
| while True: | |
| key, payload, ttl = await self._queue.get() | |
| try: | |
| await asyncio.to_thread(self._write_sync, key, payload, ttl) | |
| except Exception as ex: | |
| self._logger.error("[Persistence] Writer loop error: %s", ex) | |
| finally: | |
| self._queue.task_done() | |
| def evict(self) -> None: | |
| try: | |
| with sqlite3.connect(self.db_path) as conn: | |
| conn.execute("DELETE FROM cache WHERE expiry < ?", (time.time(),)) | |
| except Exception as ex: | |
| self._logger.error("[Persistence] Eviction error: %s", ex) | |
| class TTLCache: | |
| def __init__(self) -> None: | |
| self._store: Dict[str, Tuple[float, Any]] = {} | |
| def get(self, key: str) -> Optional[Any]: | |
| entry = self._store.get(key) | |
| if entry is None: | |
| return None | |
| exp, payload = entry | |
| if time.time() > exp: | |
| self._store.pop(key, None) | |
| return None | |
| return clone_cache_payload(payload) | |
| def set(self, key: str, payload: Any, ttl_seconds: int) -> None: | |
| self._store[key] = (time.time() + ttl_seconds, clone_cache_payload(payload)) | |
| def delete(self, key: str) -> bool: | |
| return self._store.pop(key, None) is not None | |
| def delete_by_prefix(self, prefix: str) -> int: | |
| victims = [k for k in list(self._store) if k.startswith(prefix)] | |
| for key in victims: | |
| self._store.pop(key, None) | |
| return len(victims) | |
| def clear(self) -> int: | |
| count = len(self._store) | |
| self._store.clear() | |
| return count | |
| def evict_expired(self) -> int: | |
| now = time.time() | |
| expired = [key for key, (exp, _) in self._store.items() if now > exp] | |
| for key in expired: | |
| self._store.pop(key, None) | |
| return len(expired) | |
| def stats(self) -> Dict[str, Any]: | |
| now = time.time() | |
| alive = sum(1 for exp, _ in self._store.values() if now <= exp) | |
| return { | |
| "total_keys": len(self._store), | |
| "alive_keys": alive, | |
| "expired_keys": len(self._store) - alive, | |
| } | |