Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import time | |
| from threading import RLock | |
| from typing import Any, Dict, Tuple, Optional | |
| class TTLCache: | |
| def __init__(self, ttl_seconds: int = 3600, max_items: int = 512) -> None: | |
| self.ttl_seconds = ttl_seconds | |
| self.max_items = max_items | |
| self._data: Dict[str, Tuple[float, Any]] = {} | |
| self._lock = RLock() | |
| def _evict_if_needed(self) -> None: | |
| if len(self._data) <= self.max_items: | |
| return | |
| # Evict oldest by expiry time | |
| items = sorted(self._data.items(), key=lambda kv: kv[1][0]) | |
| for k, _ in items[: max(1, len(items) - self.max_items)]: | |
| self._data.pop(k, None) | |
| def get(self, key: str) -> Optional[Any]: | |
| now = time.time() | |
| with self._lock: | |
| item = self._data.get(key) | |
| if not item: | |
| return None | |
| expires, value = item | |
| if expires < now: | |
| # expired | |
| self._data.pop(key, None) | |
| return None | |
| return value | |
| def set(self, key: str, value: Any) -> None: | |
| with self._lock: | |
| expires = time.time() + self.ttl_seconds | |
| self._data[key] = (expires, value) | |
| self._evict_if_needed() | |
| def clear(self) -> None: | |
| with self._lock: | |
| self._data.clear() |