|
|
import asyncio |
|
|
import pickle |
|
|
import threading |
|
|
import time |
|
|
from collections import OrderedDict |
|
|
from typing import Generic, Union |
|
|
|
|
|
from loguru import logger |
|
|
from typing_extensions import override |
|
|
|
|
|
from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType, CacheService, LockType |
|
|
from langflow.services.cache.utils import CACHE_MISS |
|
|
|
|
|
|
|
|
class ThreadingInMemoryCache(CacheService, Generic[LockType]): |
|
|
"""A simple in-memory cache using an OrderedDict. |
|
|
|
|
|
This cache supports setting a maximum size and expiration time for cached items. |
|
|
When the cache is full, it uses a Least Recently Used (LRU) eviction policy. |
|
|
Thread-safe using a threading Lock. |
|
|
|
|
|
Attributes: |
|
|
max_size (int, optional): Maximum number of items to store in the cache. |
|
|
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. |
|
|
|
|
|
Example: |
|
|
cache = InMemoryCache(max_size=3, expiration_time=5) |
|
|
|
|
|
# setting cache values |
|
|
cache.set("a", 1) |
|
|
cache.set("b", 2) |
|
|
cache["c"] = 3 |
|
|
|
|
|
# getting cache values |
|
|
a = cache.get("a") |
|
|
b = cache["b"] |
|
|
""" |
|
|
|
|
|
def __init__(self, max_size=None, expiration_time=60 * 60) -> None: |
|
|
"""Initialize a new InMemoryCache instance. |
|
|
|
|
|
Args: |
|
|
max_size (int, optional): Maximum number of items to store in the cache. |
|
|
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. |
|
|
""" |
|
|
self._cache: OrderedDict = OrderedDict() |
|
|
self._lock = threading.RLock() |
|
|
self.max_size = max_size |
|
|
self.expiration_time = expiration_time |
|
|
|
|
|
def get(self, key, lock: Union[threading.Lock, None] = None): |
|
|
"""Retrieve an item from the cache. |
|
|
|
|
|
Args: |
|
|
key: The key of the item to retrieve. |
|
|
lock: A lock to use for the operation. |
|
|
|
|
|
Returns: |
|
|
The value associated with the key, or CACHE_MISS if the key is not found or the item has expired. |
|
|
""" |
|
|
with lock or self._lock: |
|
|
return self._get_without_lock(key) |
|
|
|
|
|
def _get_without_lock(self, key): |
|
|
"""Retrieve an item from the cache without acquiring the lock.""" |
|
|
if item := self._cache.get(key): |
|
|
if self.expiration_time is None or time.time() - item["time"] < self.expiration_time: |
|
|
|
|
|
self._cache.move_to_end(key) |
|
|
|
|
|
return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] |
|
|
self.delete(key) |
|
|
return CACHE_MISS |
|
|
|
|
|
def set(self, key, value, lock: Union[threading.Lock, None] = None) -> None: |
|
|
"""Add an item to the cache. |
|
|
|
|
|
If the cache is full, the least recently used item is evicted. |
|
|
|
|
|
Args: |
|
|
key: The key of the item. |
|
|
value: The value to cache. |
|
|
lock: A lock to use for the operation. |
|
|
""" |
|
|
with lock or self._lock: |
|
|
if key in self._cache: |
|
|
|
|
|
self.delete(key) |
|
|
elif self.max_size and len(self._cache) >= self.max_size: |
|
|
|
|
|
self._cache.popitem(last=False) |
|
|
|
|
|
|
|
|
self._cache[key] = {"value": value, "time": time.time()} |
|
|
|
|
|
def upsert(self, key, value, lock: Union[threading.Lock, None] = None) -> None: |
|
|
"""Inserts or updates a value in the cache. |
|
|
|
|
|
If the existing value and the new value are both dictionaries, they are merged. |
|
|
|
|
|
Args: |
|
|
key: The key of the item. |
|
|
value: The value to insert or update. |
|
|
lock: A lock to use for the operation. |
|
|
""" |
|
|
with lock or self._lock: |
|
|
existing_value = self._get_without_lock(key) |
|
|
if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict): |
|
|
existing_value.update(value) |
|
|
value = existing_value |
|
|
|
|
|
self.set(key, value) |
|
|
|
|
|
def get_or_set(self, key, value, lock: Union[threading.Lock, None] = None): |
|
|
"""Retrieve an item from the cache. |
|
|
|
|
|
If the item does not exist, set it with the provided value. |
|
|
|
|
|
Args: |
|
|
key: The key of the item. |
|
|
value: The value to cache if the item doesn't exist. |
|
|
lock: A lock to use for the operation. |
|
|
|
|
|
Returns: |
|
|
The cached value associated with the key. |
|
|
""" |
|
|
with lock or self._lock: |
|
|
if key in self._cache: |
|
|
return self.get(key) |
|
|
self.set(key, value) |
|
|
return value |
|
|
|
|
|
def delete(self, key, lock: Union[threading.Lock, None] = None) -> None: |
|
|
with lock or self._lock: |
|
|
self._cache.pop(key, None) |
|
|
|
|
|
def clear(self, lock: Union[threading.Lock, None] = None) -> None: |
|
|
"""Clear all items from the cache.""" |
|
|
with lock or self._lock: |
|
|
self._cache.clear() |
|
|
|
|
|
def contains(self, key) -> bool: |
|
|
"""Check if the key is in the cache.""" |
|
|
return key in self._cache |
|
|
|
|
|
def __contains__(self, key) -> bool: |
|
|
"""Check if the key is in the cache.""" |
|
|
return self.contains(key) |
|
|
|
|
|
def __getitem__(self, key): |
|
|
"""Retrieve an item from the cache using the square bracket notation.""" |
|
|
return self.get(key) |
|
|
|
|
|
def __setitem__(self, key, value) -> None: |
|
|
"""Add an item to the cache using the square bracket notation.""" |
|
|
self.set(key, value) |
|
|
|
|
|
def __delitem__(self, key) -> None: |
|
|
"""Remove an item from the cache using the square bracket notation.""" |
|
|
self.delete(key) |
|
|
|
|
|
def __len__(self) -> int: |
|
|
"""Return the number of items in the cache.""" |
|
|
return len(self._cache) |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
"""Return a string representation of the InMemoryCache instance.""" |
|
|
return f"InMemoryCache(max_size={self.max_size}, expiration_time={self.expiration_time})" |
|
|
|
|
|
|
|
|
class RedisCache(AsyncBaseCacheService, Generic[LockType]): |
|
|
"""A Redis-based cache implementation. |
|
|
|
|
|
This cache supports setting an expiration time for cached items. |
|
|
|
|
|
Attributes: |
|
|
expiration_time (int, optional): Time in seconds after which a cached item expires. Default is 1 hour. |
|
|
|
|
|
Example: |
|
|
cache = RedisCache(expiration_time=5) |
|
|
|
|
|
# setting cache values |
|
|
cache.set("a", 1) |
|
|
cache.set("b", 2) |
|
|
cache["c"] = 3 |
|
|
|
|
|
# getting cache values |
|
|
a = cache.get("a") |
|
|
b = cache["b"] |
|
|
""" |
|
|
|
|
|
def __init__(self, host="localhost", port=6379, db=0, url=None, expiration_time=60 * 60) -> None: |
|
|
"""Initialize a new RedisCache instance. |
|
|
|
|
|
Args: |
|
|
host (str, optional): Redis host. |
|
|
port (int, optional): Redis port. |
|
|
db (int, optional): Redis DB. |
|
|
url (str, optional): Redis URL. |
|
|
expiration_time (int, optional): Time in seconds after which a |
|
|
cached item expires. Default is 1 hour. |
|
|
""" |
|
|
try: |
|
|
from redis.asyncio import StrictRedis |
|
|
except ImportError as exc: |
|
|
msg = ( |
|
|
"RedisCache requires the redis-py package." |
|
|
" Please install Langflow with the deploy extra: pip install langflow[deploy]" |
|
|
) |
|
|
raise ImportError(msg) from exc |
|
|
logger.warning( |
|
|
"RedisCache is an experimental feature and may not work as expected." |
|
|
" Please report any issues to our GitHub repository." |
|
|
) |
|
|
if url: |
|
|
self._client = StrictRedis.from_url(url) |
|
|
else: |
|
|
self._client = StrictRedis(host=host, port=port, db=db) |
|
|
self.expiration_time = expiration_time |
|
|
|
|
|
|
|
|
def is_connected(self) -> bool: |
|
|
"""Check if the Redis client is connected.""" |
|
|
import redis |
|
|
|
|
|
try: |
|
|
asyncio.run(self._client.ping()) |
|
|
except redis.exceptions.ConnectionError: |
|
|
logger.exception("RedisCache could not connect to the Redis server") |
|
|
return False |
|
|
return True |
|
|
|
|
|
@override |
|
|
async def get(self, key, lock=None): |
|
|
if key is None: |
|
|
return CACHE_MISS |
|
|
value = await self._client.get(str(key)) |
|
|
return pickle.loads(value) if value else CACHE_MISS |
|
|
|
|
|
@override |
|
|
async def set(self, key, value, lock=None) -> None: |
|
|
try: |
|
|
if pickled := pickle.dumps(value): |
|
|
result = await self._client.setex(str(key), self.expiration_time, pickled) |
|
|
if not result: |
|
|
msg = "RedisCache could not set the value." |
|
|
raise ValueError(msg) |
|
|
except TypeError as exc: |
|
|
msg = "RedisCache only accepts values that can be pickled. " |
|
|
raise TypeError(msg) from exc |
|
|
|
|
|
@override |
|
|
async def upsert(self, key, value, lock=None) -> None: |
|
|
"""Inserts or updates a value in the cache. |
|
|
|
|
|
If the existing value and the new value are both dictionaries, they are merged. |
|
|
|
|
|
Args: |
|
|
key: The key of the item. |
|
|
value: The value to insert or update. |
|
|
lock: A lock to use for the operation. |
|
|
""" |
|
|
if key is None: |
|
|
return |
|
|
existing_value = await self.get(key) |
|
|
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict): |
|
|
existing_value.update(value) |
|
|
value = existing_value |
|
|
|
|
|
await self.set(key, value) |
|
|
|
|
|
@override |
|
|
async def delete(self, key, lock=None) -> None: |
|
|
await self._client.delete(key) |
|
|
|
|
|
@override |
|
|
async def clear(self, lock=None) -> None: |
|
|
"""Clear all items from the cache.""" |
|
|
await self._client.flushdb() |
|
|
|
|
|
async def contains(self, key) -> bool: |
|
|
"""Check if the key is in the cache.""" |
|
|
if key is None: |
|
|
return False |
|
|
return bool(await self._client.exists(str(key))) |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
"""Return a string representation of the RedisCache instance.""" |
|
|
return f"RedisCache(expiration_time={self.expiration_time})" |
|
|
|
|
|
|
|
|
class AsyncInMemoryCache(AsyncBaseCacheService, Generic[AsyncLockType]): |
|
|
def __init__(self, max_size=None, expiration_time=3600) -> None: |
|
|
self.cache: OrderedDict = OrderedDict() |
|
|
|
|
|
self.lock = asyncio.Lock() |
|
|
self.max_size = max_size |
|
|
self.expiration_time = expiration_time |
|
|
|
|
|
async def get(self, key, lock: asyncio.Lock | None = None): |
|
|
async with lock or self.lock: |
|
|
return await self._get(key) |
|
|
|
|
|
async def _get(self, key): |
|
|
item = self.cache.get(key, None) |
|
|
if item: |
|
|
if time.time() - item["time"] < self.expiration_time: |
|
|
self.cache.move_to_end(key) |
|
|
return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] |
|
|
logger.info(f"Cache item for key '{key}' has expired and will be deleted.") |
|
|
await self._delete(key) |
|
|
return CACHE_MISS |
|
|
|
|
|
async def set(self, key, value, lock: asyncio.Lock | None = None) -> None: |
|
|
async with lock or self.lock: |
|
|
await self._set( |
|
|
key, |
|
|
value, |
|
|
) |
|
|
|
|
|
async def _set(self, key, value) -> None: |
|
|
if self.max_size and len(self.cache) >= self.max_size: |
|
|
self.cache.popitem(last=False) |
|
|
self.cache[key] = {"value": value, "time": time.time()} |
|
|
self.cache.move_to_end(key) |
|
|
|
|
|
async def delete(self, key, lock: asyncio.Lock | None = None) -> None: |
|
|
async with lock or self.lock: |
|
|
await self._delete(key) |
|
|
|
|
|
async def _delete(self, key) -> None: |
|
|
if key in self.cache: |
|
|
del self.cache[key] |
|
|
|
|
|
async def clear(self, lock: asyncio.Lock | None = None) -> None: |
|
|
async with lock or self.lock: |
|
|
await self._clear() |
|
|
|
|
|
async def _clear(self) -> None: |
|
|
self.cache.clear() |
|
|
|
|
|
async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None: |
|
|
await self._upsert(key, value, lock) |
|
|
|
|
|
async def _upsert(self, key, value, lock: asyncio.Lock | None = None) -> None: |
|
|
existing_value = await self.get(key, lock) |
|
|
if existing_value is not None and isinstance(existing_value, dict) and isinstance(value, dict): |
|
|
existing_value.update(value) |
|
|
value = existing_value |
|
|
await self.set(key, value, lock) |
|
|
|
|
|
async def contains(self, key) -> bool: |
|
|
return key in self.cache |
|
|
|