Spaces:
Paused
Paused
| # /// zerodep | |
| # version = "0.2.4" | |
| # deps = [] | |
| # tier = "subsystem" | |
| # category = "storage" | |
| # note = "Install/update via `zerodep add cache`" | |
| # /// | |
| """Zero-dependency caching with TTL, eviction policies, and async support. | |
| stdlib only, Python 3.10+. | |
| Part of zerodep: https://github.com/Oaklight/zerodep | |
| Copyright (c) 2026 Peng Ding. MIT License. | |
| Provides: LRUCache, FIFOCache, LFUCache, TTLCache cache classes (MutableMapping), | |
| ``cached`` decorator with sync/async auto-detection, and convenience decorators | |
| ``lru_cache``, ``ttl_cache``, ``lfu_cache``, ``fifo_cache``. | |
| Does NOT implement: TLRUCache (per-item TTL function), MRUCache, RRCache, | |
| ``cachedmethod``, ``condition`` parameter, multiple backends, serialization. | |
| Example:: | |
| from cache import lru_cache, ttl_cache, LRUCache, cached | |
| # Convenience decorator (like functools.lru_cache but with more policies) | |
| @lru_cache(maxsize=256) | |
| def fibonacci(n): | |
| return n if n < 2 else fibonacci(n - 1) + fibonacci(n - 2) | |
| # TTL decorator | |
| @ttl_cache(maxsize=100, ttl=60) | |
| def fetch_config(key): | |
| return load_from_db(key) | |
| # Async support (auto-detected) | |
| @lru_cache(maxsize=128) | |
| async def fetch_data(url): | |
| return await async_get(url) | |
| # Direct cache class usage | |
| cache = LRUCache(maxsize=1024) | |
| cache["key"] = "value" | |
| # Advanced: cached() with explicit lock | |
| import threading | |
| @cached(LRUCache(128), lock=threading.Lock(), info=True) | |
| def thread_safe_compute(x): | |
| return expensive(x) | |
| """ | |
| from __future__ import annotations | |
| import collections | |
| import collections.abc | |
| import functools | |
| import inspect | |
| import time | |
| from typing import Any, Callable | |
| # ββ Key Functions ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _HashedTuple(tuple): | |
| """Tuple subclass that caches its hash value for fast repeated lookups.""" | |
| __hashvalue: int | None = None | |
| def __hash__(self, hash=tuple.__hash__): # type: ignore[override] | |
| hashvalue = self.__hashvalue | |
| if hashvalue is None: | |
| self.__hashvalue = hashvalue = hash(self) | |
| return hashvalue | |
| def __add__(self, other, add=tuple.__add__): | |
| return _HashedTuple(add(self, other)) | |
| def __radd__(self, other, add=tuple.__add__): | |
| return _HashedTuple(add(other, self)) | |
| _KWMARK = (_HashedTuple,) | |
| def hashkey(*args: Any, **kwargs: Any) -> _HashedTuple: | |
| """Return a cache key for the given positional and keyword arguments. | |
| This is the default key function for all cache decorators. | |
| """ | |
| if kwargs: | |
| return _HashedTuple(args + _KWMARK + tuple(sorted(kwargs.items()))) | |
| return _HashedTuple(args) | |
| def methodkey(_self: Any, *args: Any, **kwargs: Any) -> _HashedTuple: | |
| """Like ``hashkey`` but ignores the first positional argument (``self``). | |
| Use as the key function when caching instance methods. | |
| """ | |
| return hashkey(*args, **kwargs) | |
| def typedkey(*args: Any, **kwargs: Any) -> _HashedTuple: | |
| """Like ``hashkey`` but includes type information, so ``f(3)`` and | |
| ``f(3.0)`` are cached separately. | |
| """ | |
| if kwargs: | |
| sorted_items = tuple(sorted(kwargs.items())) | |
| return _HashedTuple( | |
| args | |
| + _KWMARK | |
| + sorted_items | |
| + tuple(type(v) for v in args) | |
| + tuple(type(v) for _, v in sorted_items) | |
| ) | |
| return _HashedTuple(args + tuple(type(v) for v in args)) | |
| # ββ Cache Info βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CacheInfo = collections.namedtuple( | |
| "CacheInfo", ["hits", "misses", "maxsize", "currsize"] | |
| ) | |
| # ββ Cache Base Class βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _DefaultSize: | |
| """Lightweight size tracker that always returns 1 for any key.""" | |
| __slots__ = () | |
| def __getitem__(self, _: Any) -> int: | |
| return 1 | |
| def __setitem__(self, _: Any, __: Any) -> None: | |
| pass | |
| def pop(self, _: Any, *args: Any) -> int: | |
| return 1 | |
| def clear(self) -> None: | |
| pass | |
| class Cache(collections.abc.MutableMapping): | |
| """Mutable mapping to serve as a simple cache or cache base class. | |
| Subclasses must override ``popitem`` to implement a specific eviction | |
| policy. The base class evicts arbitrary items (dict order). | |
| Args: | |
| maxsize: Maximum capacity (item count or weighted size). | |
| getsizeof: Optional callable returning the size of a value. | |
| Defaults to 1 per item. | |
| """ | |
| __marker = object() | |
| def __init__( | |
| self, | |
| maxsize: int | float, | |
| getsizeof: Callable[[Any], int] | None = None, | |
| ): | |
| if maxsize < 0: | |
| raise ValueError("maxsize must be non-negative") | |
| self.__data: dict[Any, Any] = {} | |
| self.__currsize: int | float = 0 | |
| self.__maxsize = maxsize | |
| if getsizeof is not None: | |
| self.__size: dict | _DefaultSize = {} | |
| self.getsizeof = getsizeof # type: ignore[assignment] | |
| else: | |
| self.__size = _DefaultSize() | |
| def __repr__(self) -> str: | |
| cls = self.__class__.__name__ | |
| return ( | |
| f"{cls}({self.__data!r}, " | |
| f"maxsize={self.__maxsize}, currsize={self.__currsize})" | |
| ) | |
| def __getitem__(self, key: Any) -> Any: | |
| try: | |
| return self.__data[key] | |
| except KeyError: | |
| return self.__missing__(key) | |
| def __setitem__(self, key: Any, value: Any) -> None: | |
| maxsize = self.__maxsize | |
| size = self.getsizeof(value) | |
| if size > maxsize: | |
| raise ValueError("value too large") | |
| if key in self.__data: | |
| diffsize = size - self.__size[key] | |
| else: | |
| diffsize = size | |
| while self.__currsize + diffsize > maxsize: | |
| self.popitem() | |
| if key in self.__data: | |
| # Key might have been evicted during popitem loop | |
| self.__currsize -= self.__size[key] | |
| self.__data[key] = value | |
| self.__size[key] = size | |
| self.__currsize += size | |
| def __delitem__(self, key: Any) -> None: | |
| size = self.__size.pop(key) | |
| del self.__data[key] | |
| self.__currsize -= size | |
| def __contains__(self, key: object) -> bool: | |
| return key in self.__data | |
| def __missing__(self, key: Any) -> Any: | |
| raise KeyError(key) | |
| def __iter__(self): | |
| return iter(self.__data) | |
| def __len__(self) -> int: | |
| return len(self.__data) | |
| def getsizeof(value: Any) -> int: | |
| """Return the size of *value*. Defaults to 1.""" | |
| return 1 | |
| def maxsize(self) -> int | float: | |
| """Maximum cache capacity.""" | |
| return self.__maxsize | |
| def currsize(self) -> int | float: | |
| """Current cache size (sum of item sizes).""" | |
| return self.__currsize | |
| def get(self, key: Any, default: Any = None) -> Any: | |
| if key in self: | |
| return self[key] | |
| return default | |
| def pop(self, key: Any, default: Any = __marker) -> Any: | |
| if key in self: | |
| value = self[key] | |
| del self[key] | |
| return value | |
| if default is self.__marker: | |
| raise KeyError(key) | |
| return default | |
| def setdefault(self, key: Any, default: Any = None) -> Any: | |
| if key in self: | |
| return self[key] | |
| try: | |
| self[key] = default | |
| except ValueError: | |
| pass | |
| return default | |
| def popitem(self) -> tuple[Any, Any]: | |
| """Remove and return an arbitrary ``(key, value)`` pair. | |
| Subclasses override this to implement eviction policies. | |
| """ | |
| try: | |
| key = next(iter(self.__data)) | |
| except StopIteration: | |
| raise KeyError(f"{type(self).__name__} is empty") from None | |
| return key, self.pop(key) | |
| def clear(self) -> None: | |
| self.__data.clear() | |
| self.__size.clear() | |
| self.__currsize = 0 | |
| # ββ FIFO Cache βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class FIFOCache(Cache): | |
| """First In First Out (FIFO) cache implementation. | |
| Evicts the oldest inserted item when the cache is full. Accessing an | |
| item does **not** change its eviction priority. | |
| """ | |
| def __init__( | |
| self, | |
| maxsize: int | float, | |
| getsizeof: Callable[[Any], int] | None = None, | |
| ): | |
| super().__init__(maxsize, getsizeof) | |
| self.__order: collections.OrderedDict[Any, None] = collections.OrderedDict() | |
| def __setitem__(self, key: Any, value: Any) -> None: | |
| if key in self: | |
| del self.__order[key] | |
| super().__setitem__(key, value) | |
| self.__order[key] = None | |
| def __delitem__(self, key: Any) -> None: | |
| super().__delitem__(key) | |
| del self.__order[key] | |
| def popitem(self) -> tuple[Any, Any]: | |
| """Remove and return the oldest inserted ``(key, value)`` pair.""" | |
| try: | |
| key = next(iter(self.__order)) | |
| except StopIteration: | |
| raise KeyError(f"{type(self).__name__} is empty") from None | |
| return key, self.pop(key) | |
| def clear(self) -> None: | |
| super().clear() | |
| self.__order.clear() | |
| # ββ LRU Cache ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class LRUCache(Cache): | |
| """Least Recently Used (LRU) cache implementation. | |
| Evicts the least recently accessed item when the cache is full. | |
| Both reads and writes count as accesses. | |
| """ | |
| def __init__( | |
| self, | |
| maxsize: int | float, | |
| getsizeof: Callable[[Any], int] | None = None, | |
| ): | |
| super().__init__(maxsize, getsizeof) | |
| self.__order: collections.OrderedDict[Any, None] = collections.OrderedDict() | |
| def __getitem__(self, key: Any) -> Any: | |
| # Bypass super().__getitem__ to avoid MRO dispatch overhead; | |
| # call Cache.__getitem__ directly + inline move_to_end. | |
| value = Cache.__getitem__(self, key) | |
| self.__order.move_to_end(key) | |
| return value | |
| def __setitem__(self, key: Any, value: Any) -> None: | |
| Cache.__setitem__(self, key, value) | |
| try: | |
| self.__order.move_to_end(key) | |
| except KeyError: | |
| self.__order[key] = None | |
| def __delitem__(self, key: Any) -> None: | |
| Cache.__delitem__(self, key) | |
| del self.__order[key] | |
| def popitem(self) -> tuple[Any, Any]: | |
| """Remove and return the least recently used ``(key, value)`` pair.""" | |
| try: | |
| key = next(iter(self.__order)) | |
| except StopIteration: | |
| raise KeyError(f"{type(self).__name__} is empty") from None | |
| return key, self.pop(key) | |
| def clear(self) -> None: | |
| Cache.clear(self) | |
| self.__order.clear() | |
| # ββ LFU Cache ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _LFUNode: | |
| """Doubly-linked node for the LFU frequency list.""" | |
| __slots__ = ("count", "keys", "prev", "next") | |
| def __init__(self, count: int = 0): | |
| self.count = count | |
| self.keys: set[Any] = set() | |
| self.prev: _LFUNode = self | |
| self.next: _LFUNode = self | |
| def insert_after(self, node: _LFUNode) -> None: | |
| """Insert *node* after this node.""" | |
| node.prev = self | |
| node.next = self.next | |
| self.next.prev = node | |
| self.next = node | |
| def unlink(self) -> None: | |
| """Remove this node from the linked list.""" | |
| self.prev.next = self.next | |
| self.next.prev = self.prev | |
| class LFUCache(Cache): | |
| """Least Frequently Used (LFU) cache implementation. | |
| Evicts the least frequently accessed item when the cache is full. | |
| All operations are O(1). | |
| """ | |
| def __init__( | |
| self, | |
| maxsize: int | float, | |
| getsizeof: Callable[[Any], int] | None = None, | |
| ): | |
| super().__init__(maxsize, getsizeof) | |
| self.__root = _LFUNode() # sentinel | |
| self.__links: dict[Any, _LFUNode] = {} | |
| def __getitem__(self, key: Any) -> Any: | |
| value = super().__getitem__(key) | |
| self.__touch(key) | |
| return value | |
| def __setitem__(self, key: Any, value: Any) -> None: | |
| existed = key in self | |
| super().__setitem__(key, value) | |
| if existed: | |
| self.__touch(key) | |
| else: | |
| # New key starts at count=1 | |
| root = self.__root | |
| first = root.next | |
| if first is root or first.count != 1: | |
| node = _LFUNode(count=1) | |
| root.insert_after(node) | |
| else: | |
| node = first | |
| node.keys.add(key) | |
| self.__links[key] = node | |
| def __delitem__(self, key: Any) -> None: | |
| super().__delitem__(key) | |
| node = self.__links.pop(key) | |
| node.keys.discard(key) | |
| if not node.keys: | |
| node.unlink() | |
| def __touch(self, key: Any) -> None: | |
| """Increment frequency count for *key*.""" | |
| node = self.__links[key] | |
| new_count = node.count + 1 | |
| nxt = node.next | |
| if nxt is self.__root or nxt.count != new_count: | |
| new_node = _LFUNode(count=new_count) | |
| node.insert_after(new_node) | |
| else: | |
| new_node = nxt | |
| new_node.keys.add(key) | |
| node.keys.discard(key) | |
| self.__links[key] = new_node | |
| if not node.keys: | |
| node.unlink() | |
| def popitem(self) -> tuple[Any, Any]: | |
| """Remove and return the least frequently used ``(key, value)`` pair.""" | |
| root = self.__root | |
| node = root.next | |
| if node is root: | |
| raise KeyError(f"{type(self).__name__} is empty") | |
| key = next(iter(node.keys)) | |
| # Bypass self.pop β self[key] β __touch to avoid wasteful | |
| # frequency increment on a key that is about to be deleted. | |
| value = Cache.__getitem__(self, key) | |
| del self[key] | |
| return key, value | |
| def clear(self) -> None: | |
| super().clear() | |
| self.__links.clear() | |
| self.__root.prev = self.__root | |
| self.__root.next = self.__root | |
| # ββ TTL Cache ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class _Timer: | |
| """Wraps a timer function with nestable time-freezing. | |
| Used by TTLCache to ensure time consistency within a single operation. | |
| """ | |
| __slots__ = ("_timer", "_nesting", "_time") | |
| def __init__(self, timer: Callable[[], float] = time.monotonic): | |
| self._timer = timer | |
| self._nesting = 0 | |
| self._time: float = 0.0 | |
| def __call__(self) -> float: | |
| if self._nesting == 0: | |
| return self._timer() | |
| return self._time | |
| def __enter__(self) -> float: | |
| if self._nesting == 0: | |
| self._time = self._timer() | |
| self._nesting += 1 | |
| return self._time | |
| def __exit__(self, *exc: Any) -> None: | |
| self._nesting -= 1 | |
| class _TTLLink: | |
| """Doubly-linked node for TTLCache expiry ordering.""" | |
| __slots__ = ("key", "expires", "prev", "next") | |
| def __init__( | |
| self, | |
| key: Any = None, | |
| expires: float = 0.0, | |
| ): | |
| self.key = key | |
| self.expires = expires | |
| self.prev: _TTLLink = self | |
| self.next: _TTLLink = self | |
| def unlink(self) -> None: | |
| self.prev.next = self.next | |
| self.next.prev = self.prev | |
| class TTLCache(LRUCache): | |
| """LRU cache with per-item time-to-live (TTL). | |
| Items are evicted when they expire (checked lazily on access) or when | |
| the cache is full (LRU order). | |
| Args: | |
| maxsize: Maximum cache capacity. | |
| ttl: Time-to-live in seconds for each item. | |
| timer: Callable returning the current time (default: ``time.monotonic``). | |
| getsizeof: Optional callable returning the size of a value. | |
| """ | |
| def __init__( | |
| self, | |
| maxsize: int | float, | |
| ttl: float, | |
| *, | |
| timer: Callable[[], float] = time.monotonic, | |
| getsizeof: Callable[[Any], int] | None = None, | |
| ): | |
| LRUCache.__init__(self, maxsize, getsizeof) | |
| self.__ttl = ttl | |
| self.__timer = _Timer(timer) | |
| self.__root = _TTLLink() # sentinel for expiry list | |
| self.__links: dict[Any, _TTLLink] = {} | |
| def ttl(self) -> float: | |
| """Time-to-live in seconds.""" | |
| return self.__ttl | |
| def timer(self) -> _Timer: | |
| """The timer function.""" | |
| return self.__timer | |
| def __contains__(self, key: object) -> bool: | |
| link = self.__links.get(key) | |
| if link is None: | |
| return False | |
| return self.__timer() < link.expires | |
| def __getitem__(self, key: Any) -> Any: | |
| link = self.__links.get(key) | |
| if link is not None and self.__timer() >= link.expires: | |
| del self[key] | |
| return self.__missing__(key) | |
| # Bypass LRUCache.__getitem__ β Cache.__getitem__ call chain; | |
| # inline both to save one method-call and one dict lookup. | |
| # Same direct-call pattern used by LFUCache.popitem. | |
| value = Cache.__getitem__(self, key) | |
| self._LRUCache__order.move_to_end(key) # type: ignore[attr-defined] | |
| return value | |
| def __setitem__(self, key: Any, value: Any) -> None: | |
| with self.__timer as now: | |
| self.expire(now) | |
| LRUCache.__setitem__(self, key, value) | |
| # Update or create expiry link | |
| link = self.__links.get(key) | |
| if link is not None: | |
| link.unlink() | |
| else: | |
| link = _TTLLink() | |
| self.__links[key] = link | |
| link.key = key | |
| link.expires = now + self.__ttl | |
| # Append to end of expiry list (newest) | |
| tail = self.__root.prev | |
| link.prev = tail | |
| link.next = self.__root | |
| tail.next = link | |
| self.__root.prev = link | |
| def __delitem__(self, key: Any) -> None: | |
| LRUCache.__delitem__(self, key) | |
| link = self.__links.pop(key) | |
| link.unlink() | |
| def __iter__(self): | |
| now = self.__timer() | |
| for key in super().__iter__(): | |
| link = self.__links.get(key) | |
| if link is not None and now < link.expires: | |
| yield key | |
| def __len__(self) -> int: | |
| now = self.__timer() | |
| # Single dict lookup via .get() instead of ``key in`` + ``[key]``. | |
| links_get = self.__links.get | |
| return sum( | |
| 1 | |
| for key in super().__iter__() | |
| if (link := links_get(key)) is not None and now < link.expires | |
| ) | |
| def currsize(self) -> int | float: | |
| """Current size, excluding expired items.""" | |
| self.expire() | |
| return super().currsize | |
| def expire(self, time: float | None = None) -> list[tuple[Any, Any]]: | |
| """Remove expired items and return them as a list of ``(key, value)`` pairs.""" | |
| if time is None: | |
| time = self.__timer() | |
| root = self.__root | |
| expired: list[tuple[Any, Any]] = [] | |
| link = root.next | |
| while link is not root and link.expires <= time: | |
| nxt = link.next | |
| key = link.key | |
| try: | |
| value = Cache.__getitem__(self, key) | |
| expired.append((key, value)) | |
| del self[key] | |
| except KeyError: | |
| pass | |
| link = nxt | |
| return expired | |
| def clear(self) -> None: | |
| LRUCache.clear(self) | |
| self.__links.clear() | |
| self.__root.prev = self.__root | |
| self.__root.next = self.__root | |
| # ββ Decorators βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _MISS = object() # sentinel for cache lookup miss | |
| class _Stats: | |
| """Mutable hit/miss counters shared by cache wrapper closures.""" | |
| __slots__ = ("hits", "misses") | |
| def __init__(self) -> None: | |
| self.hits = 0 | |
| self.misses = 0 | |
| def _get_cache_maxsize( | |
| cache: collections.abc.MutableMapping, | |
| ) -> int | float | None: | |
| """Return maxsize if *cache* is a ``Cache`` instance, else ``None``.""" | |
| return cache.maxsize if isinstance(cache, Cache) else None | |
| def _get_cache_currsize(cache: collections.abc.MutableMapping) -> int | float: | |
| """Return current size of *cache*.""" | |
| return cache.currsize if isinstance(cache, Cache) else len(cache) | |
| def _attach_cache_info( | |
| wrapper: Callable, | |
| cache: collections.abc.MutableMapping, | |
| lock: Any | None, | |
| stats: _Stats, | |
| maxsize: int | float | None, | |
| ) -> None: | |
| """Attach ``cache_info()`` and ``cache_clear()`` methods to *wrapper*.""" | |
| def cache_info() -> CacheInfo: | |
| return CacheInfo(stats.hits, stats.misses, maxsize, _get_cache_currsize(cache)) | |
| def cache_clear() -> None: | |
| with lock if lock else _no_lock(): | |
| cache.clear() | |
| stats.hits = stats.misses = 0 | |
| wrapper.cache_info = cache_info # type: ignore[attr-defined] | |
| wrapper.cache_clear = cache_clear # type: ignore[attr-defined] | |
| def _attach_noop_info( | |
| wrapper: Callable, | |
| stats: _Stats, | |
| ) -> None: | |
| """Attach ``cache_info()`` and ``cache_clear()`` for a no-op wrapper.""" | |
| def cache_info() -> CacheInfo: | |
| return CacheInfo(stats.hits, stats.misses, None, 0) | |
| def cache_clear() -> None: | |
| stats.hits = stats.misses = 0 | |
| wrapper.cache_info = cache_info # type: ignore[attr-defined] | |
| wrapper.cache_clear = cache_clear # type: ignore[attr-defined] | |
| def _make_sync_wrapper( | |
| fn: Callable, | |
| cache: collections.abc.MutableMapping | None, | |
| key: Callable, | |
| lock: Any | None, | |
| info: bool, | |
| ) -> Callable: | |
| """Build a synchronous caching wrapper.""" | |
| stats = _Stats() | |
| if cache is None: | |
| def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| stats.misses += 1 | |
| return fn(*args, **kwargs) | |
| if info: | |
| _attach_noop_info(wrapper, stats) | |
| return wrapper | |
| # Pre-cache method references to avoid attribute lookup per call. | |
| cache_getitem = cache.__getitem__ | |
| cache_setitem = cache.__setitem__ | |
| cache_setdefault = cache.setdefault | |
| if lock is not None: | |
| def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| k = key(*args, **kwargs) | |
| with lock: | |
| try: | |
| result = cache_getitem(k) | |
| except KeyError: | |
| result = _MISS | |
| if result is not _MISS: | |
| stats.hits += 1 | |
| return result | |
| stats.misses += 1 | |
| v = fn(*args, **kwargs) | |
| # Inline thread-safe store (was _cache_store_default). | |
| with lock: | |
| try: | |
| return cache_setdefault(k, v) | |
| except ValueError: | |
| return v | |
| else: | |
| def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| k = key(*args, **kwargs) | |
| try: | |
| result = cache_getitem(k) | |
| except KeyError: | |
| stats.misses += 1 | |
| v = fn(*args, **kwargs) | |
| try: | |
| cache_setitem(k, v) | |
| except ValueError: | |
| pass | |
| return v | |
| stats.hits += 1 | |
| return result | |
| if info: | |
| _attach_cache_info(wrapper, cache, lock, stats, _get_cache_maxsize(cache)) | |
| return wrapper | |
| def _make_async_wrapper( | |
| fn: Callable, | |
| cache: collections.abc.MutableMapping | None, | |
| key: Callable, | |
| lock: Any | None, | |
| info: bool, | |
| ) -> Callable: | |
| """Build an asynchronous caching wrapper.""" | |
| stats = _Stats() | |
| if cache is None: | |
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| stats.misses += 1 | |
| return await fn(*args, **kwargs) | |
| if info: | |
| _attach_noop_info(wrapper, stats) | |
| return wrapper | |
| # Pre-cache method references to avoid attribute lookup per call. | |
| cache_getitem = cache.__getitem__ | |
| cache_setitem = cache.__setitem__ | |
| cache_setdefault = cache.setdefault | |
| if lock is not None: | |
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| k = key(*args, **kwargs) | |
| async with lock: | |
| try: | |
| result = cache_getitem(k) | |
| except KeyError: | |
| result = _MISS | |
| if result is not _MISS: | |
| stats.hits += 1 | |
| return result | |
| stats.misses += 1 | |
| v = await fn(*args, **kwargs) | |
| # Inline thread-safe store (was _cache_store_default). | |
| async with lock: | |
| try: | |
| return cache_setdefault(k, v) | |
| except ValueError: | |
| return v | |
| else: | |
| async def wrapper(*args: Any, **kwargs: Any) -> Any: | |
| k = key(*args, **kwargs) | |
| try: | |
| result = cache_getitem(k) | |
| except KeyError: | |
| stats.misses += 1 | |
| v = await fn(*args, **kwargs) | |
| try: | |
| cache_setitem(k, v) | |
| except ValueError: | |
| pass | |
| return v | |
| stats.hits += 1 | |
| return result | |
| if info: | |
| _attach_cache_info(wrapper, cache, lock, stats, _get_cache_maxsize(cache)) | |
| return wrapper | |
| class _no_lock: | |
| """Dummy context manager when no lock is provided.""" | |
| def __enter__(self) -> None: | |
| pass | |
| def __exit__(self, *exc: Any) -> None: | |
| pass | |
| def cached( | |
| cache: Cache | collections.abc.MutableMapping | None, | |
| *, | |
| key: Callable[..., Any] = hashkey, | |
| lock: Any | None = None, | |
| info: bool = False, | |
| ) -> Callable: | |
| """Decorator to wrap a function with a memoizing callable. | |
| Args: | |
| cache: A cache instance (any MutableMapping) or ``None`` to disable. | |
| key: Key function (default ``hashkey``). | |
| lock: A lock object (``threading.Lock`` for sync, ``asyncio.Lock`` | |
| for async). The lock is released while the wrapped function | |
| executes. | |
| info: If ``True``, attach ``cache_info()`` and ``cache_clear()`` | |
| methods to the wrapper. | |
| """ | |
| def decorator(fn: Callable) -> Callable: | |
| if inspect.iscoroutinefunction(fn): | |
| wrapper = _make_async_wrapper(fn, cache, key, lock, info) | |
| else: | |
| wrapper = _make_sync_wrapper(fn, cache, key, lock, info) | |
| wrapper.cache = cache # type: ignore[attr-defined] | |
| wrapper.cache_key = key # type: ignore[attr-defined] | |
| wrapper.cache_lock = lock # type: ignore[attr-defined] | |
| return functools.update_wrapper(wrapper, fn) | |
| return decorator | |
| # ββ Convenience Decorators βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def lru_cache( | |
| fn: Callable[..., Any] | None = None, | |
| *, | |
| maxsize: int = 128, | |
| key: Callable[..., Any] = hashkey, | |
| lock: Any | None = None, | |
| info: bool = True, | |
| ) -> Callable[..., Any]: | |
| """LRU cache decorator. Auto-detects sync/async. | |
| Unlike ``functools.lru_cache``, supports TTL (via ``ttl_cache``), | |
| async functions, and explicit lock objects. | |
| Args: | |
| maxsize: Maximum number of cached results. | |
| key: Key function (default ``hashkey``). | |
| lock: Optional lock for thread/async safety. | |
| info: Attach ``cache_info()``/``cache_clear()`` (default ``True``). | |
| """ | |
| def decorator(fn: Callable) -> Callable: | |
| c = LRUCache(maxsize) | |
| return cached(c, key=key, lock=lock, info=info)(fn) | |
| if fn is not None: | |
| return decorator(fn) | |
| return decorator | |
| def fifo_cache( | |
| fn: Callable[..., Any] | None = None, | |
| *, | |
| maxsize: int = 128, | |
| key: Callable[..., Any] = hashkey, | |
| lock: Any | None = None, | |
| info: bool = True, | |
| ) -> Callable[..., Any]: | |
| """FIFO cache decorator. Auto-detects sync/async. | |
| Evicts the oldest inserted item regardless of access pattern. | |
| """ | |
| def decorator(fn: Callable) -> Callable: | |
| c = FIFOCache(maxsize) | |
| return cached(c, key=key, lock=lock, info=info)(fn) | |
| if fn is not None: | |
| return decorator(fn) | |
| return decorator | |
| def lfu_cache( | |
| fn: Callable[..., Any] | None = None, | |
| *, | |
| maxsize: int = 128, | |
| key: Callable[..., Any] = hashkey, | |
| lock: Any | None = None, | |
| info: bool = True, | |
| ) -> Callable[..., Any]: | |
| """LFU cache decorator. Auto-detects sync/async. | |
| Evicts the least frequently accessed item. | |
| """ | |
| def decorator(fn: Callable) -> Callable: | |
| c = LFUCache(maxsize) | |
| return cached(c, key=key, lock=lock, info=info)(fn) | |
| if fn is not None: | |
| return decorator(fn) | |
| return decorator | |
| def ttl_cache( | |
| fn: Callable[..., Any] | None = None, | |
| *, | |
| maxsize: int = 128, | |
| ttl: float = 600, | |
| timer: Callable[[], float] = time.monotonic, | |
| key: Callable[..., Any] = hashkey, | |
| lock: Any | None = None, | |
| info: bool = True, | |
| ) -> Callable[..., Any]: | |
| """TTL cache decorator. Auto-detects sync/async. | |
| Items expire after *ttl* seconds (default 600 = 10 minutes). | |
| Eviction follows LRU order when the cache is full. | |
| """ | |
| def decorator(fn: Callable) -> Callable: | |
| c = TTLCache(maxsize, ttl, timer=timer) | |
| return cached(c, key=key, lock=lock, info=info)(fn) | |
| if fn is not None: | |
| return decorator(fn) | |
| return decorator | |
| # ββ Public API βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| __all__ = [ | |
| # Cache classes | |
| "Cache", | |
| "FIFOCache", | |
| "LFUCache", | |
| "LRUCache", | |
| "TTLCache", | |
| # Decorators | |
| "cached", | |
| "fifo_cache", | |
| "lfu_cache", | |
| "lru_cache", | |
| "ttl_cache", | |
| # Key functions | |
| "hashkey", | |
| "methodkey", | |
| "typedkey", | |
| # Stats | |
| "CacheInfo", | |
| ] | |