|
|
import asyncio |
|
|
import pickle |
|
|
import time |
|
|
from typing import Generic |
|
|
|
|
|
from diskcache import Cache |
|
|
from loguru import logger |
|
|
|
|
|
from langflow.services.cache.base import AsyncBaseCacheService, AsyncLockType |
|
|
from langflow.services.cache.utils import CACHE_MISS |
|
|
|
|
|
|
|
|
class AsyncDiskCache(AsyncBaseCacheService, Generic[AsyncLockType]): |
|
|
def __init__(self, cache_dir, max_size=None, expiration_time=3600) -> None: |
|
|
self.cache = Cache(cache_dir) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if len(self.cache) > 0: |
|
|
self.cache.clear() |
|
|
self.lock = asyncio.Lock() |
|
|
self.max_size = max_size |
|
|
self.expiration_time = expiration_time |
|
|
|
|
|
async def get(self, key, lock: asyncio.Lock | None = None): |
|
|
if not lock: |
|
|
async with self.lock: |
|
|
return await asyncio.to_thread(self._get, key) |
|
|
else: |
|
|
return await asyncio.to_thread(self._get, key) |
|
|
|
|
|
def _get(self, key): |
|
|
item = self.cache.get(key, default=None) |
|
|
if item: |
|
|
if time.time() - item["time"] < self.expiration_time: |
|
|
self.cache.touch(key) |
|
|
return pickle.loads(item["value"]) if isinstance(item["value"], bytes) else item["value"] |
|
|
logger.info(f"Cache item for key '{key}' has expired and will be deleted.") |
|
|
self.cache.delete(key) |
|
|
return CACHE_MISS |
|
|
|
|
|
async def set(self, key, value, lock: asyncio.Lock | None = None) -> None: |
|
|
if not lock: |
|
|
async with self.lock: |
|
|
await self._set(key, value) |
|
|
else: |
|
|
await self._set(key, value) |
|
|
|
|
|
async def _set(self, key, value) -> None: |
|
|
if self.max_size and len(self.cache) >= self.max_size: |
|
|
await asyncio.to_thread(self.cache.cull) |
|
|
item = {"value": pickle.dumps(value) if not isinstance(value, str | bytes) else value, "time": time.time()} |
|
|
await asyncio.to_thread(self.cache.set, key, item) |
|
|
|
|
|
async def delete(self, key, lock: asyncio.Lock | None = None) -> None: |
|
|
if not lock: |
|
|
async with self.lock: |
|
|
await self._delete(key) |
|
|
else: |
|
|
await self._delete(key) |
|
|
|
|
|
async def _delete(self, key) -> None: |
|
|
await asyncio.to_thread(self.cache.delete, key) |
|
|
|
|
|
async def clear(self, lock: asyncio.Lock | None = None) -> None: |
|
|
if not lock: |
|
|
async with self.lock: |
|
|
await self._clear() |
|
|
else: |
|
|
await self._clear() |
|
|
|
|
|
async def _clear(self) -> None: |
|
|
await asyncio.to_thread(self.cache.clear) |
|
|
|
|
|
async def upsert(self, key, value, lock: asyncio.Lock | None = None) -> None: |
|
|
if not lock: |
|
|
async with self.lock: |
|
|
await self._upsert(key, value) |
|
|
else: |
|
|
await self._upsert(key, value) |
|
|
|
|
|
async def _upsert(self, key, value) -> None: |
|
|
existing_value = await asyncio.to_thread(self._get, key) |
|
|
if existing_value is not CACHE_MISS and isinstance(existing_value, dict) and isinstance(value, dict): |
|
|
existing_value.update(value) |
|
|
value = existing_value |
|
|
await self.set(key, value) |
|
|
|
|
|
async def contains(self, key) -> bool: |
|
|
return await asyncio.to_thread(self.cache.__contains__, key) |
|
|
|
|
|
async def teardown(self) -> None: |
|
|
|
|
|
self.cache.clear(retry=True) |
|
|
|