Spaces:
Sleeping
Sleeping
| """ | |
| 统一内存缓存管理器 | |
| 为所有存储后端提供一致的内存缓存机制,确保读写一致性和高性能。 | |
| """ | |
| import asyncio | |
| import time | |
| from typing import Dict, Any, Optional | |
| from collections import deque | |
| from abc import ABC, abstractmethod | |
| from log import log | |
| class CacheBackend(ABC): | |
| """缓存后端接口,定义底层存储的读写操作""" | |
| async def load_data(self) -> Dict[str, Any]: | |
| """从底层存储加载数据""" | |
| pass | |
| async def write_data(self, data: Dict[str, Any]) -> bool: | |
| """将数据写入底层存储""" | |
| pass | |
| class UnifiedCacheManager: | |
| """统一缓存管理器""" | |
| def __init__( | |
| self, | |
| cache_backend: CacheBackend, | |
| cache_ttl: float = 300.0, | |
| write_delay: float = 1.0, | |
| name: str = "cache" | |
| ): | |
| """ | |
| 初始化缓存管理器 | |
| Args: | |
| cache_backend: 缓存后端实现 | |
| cache_ttl: 缓存TTL(秒) | |
| write_delay: 写入延迟(秒) | |
| name: 缓存名称(用于日志) | |
| """ | |
| self._backend = cache_backend | |
| self._cache_ttl = cache_ttl | |
| self._write_delay = write_delay | |
| self._name = name | |
| # 缓存数据 | |
| self._cache: Dict[str, Any] = {} | |
| self._cache_dirty = False | |
| self._last_cache_time = 0 | |
| # 并发控制 | |
| self._cache_lock = asyncio.Lock() | |
| # 异步写回任务 | |
| self._write_task: Optional[asyncio.Task] = None | |
| self._shutdown_event = asyncio.Event() | |
| # 性能监控 | |
| self._operation_count = 0 | |
| self._operation_times = deque(maxlen=1000) | |
| async def start(self): | |
| """启动缓存管理器""" | |
| if self._write_task and not self._write_task.done(): | |
| return | |
| self._shutdown_event.clear() | |
| self._write_task = asyncio.create_task(self._write_loop()) | |
| log.debug(f"{self._name} cache manager started") | |
| async def stop(self): | |
| """停止缓存管理器并刷新数据""" | |
| self._shutdown_event.set() | |
| if self._write_task and not self._write_task.done(): | |
| try: | |
| await asyncio.wait_for(self._write_task, timeout=5.0) | |
| except asyncio.TimeoutError: | |
| self._write_task.cancel() | |
| log.warning(f"{self._name} cache writer forcibly cancelled") | |
| # 刷新缓存 | |
| await self._flush_cache() | |
| log.debug(f"{self._name} cache manager stopped") | |
| async def get(self, key: str, default: Any = None) -> Any: | |
| """获取缓存项""" | |
| async with self._cache_lock: | |
| start_time = time.time() | |
| try: | |
| # 确保缓存已加载 | |
| await self._ensure_cache_loaded() | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| result = self._cache.get(key, default) | |
| log.debug(f"{self._name} cache get: {key} in {operation_time:.3f}s") | |
| return result | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting {self._name} cache key {key} in {operation_time:.3f}s: {e}") | |
| return default | |
| async def set(self, key: str, value: Any) -> bool: | |
| """设置缓存项""" | |
| async with self._cache_lock: | |
| start_time = time.time() | |
| try: | |
| # 确保缓存已加载 | |
| await self._ensure_cache_loaded() | |
| # 更新缓存 | |
| self._cache[key] = value | |
| self._cache_dirty = True | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"{self._name} cache set: {key} in {operation_time:.3f}s") | |
| return True | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error setting {self._name} cache key {key} in {operation_time:.3f}s: {e}") | |
| return False | |
| async def delete(self, key: str) -> bool: | |
| """删除缓存项""" | |
| async with self._cache_lock: | |
| start_time = time.time() | |
| try: | |
| # 确保缓存已加载 | |
| await self._ensure_cache_loaded() | |
| if key in self._cache: | |
| del self._cache[key] | |
| self._cache_dirty = True | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"{self._name} cache delete: {key} in {operation_time:.3f}s") | |
| return True | |
| else: | |
| log.warning(f"{self._name} cache key not found for deletion: {key}") | |
| return False | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error deleting {self._name} cache key {key} in {operation_time:.3f}s: {e}") | |
| return False | |
| async def get_all(self) -> Dict[str, Any]: | |
| """获取所有缓存数据""" | |
| async with self._cache_lock: | |
| start_time = time.time() | |
| try: | |
| # 确保缓存已加载 | |
| await self._ensure_cache_loaded() | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"{self._name} cache get_all ({len(self._cache)}) in {operation_time:.3f}s") | |
| return self._cache.copy() | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error getting all {self._name} cache in {operation_time:.3f}s: {e}") | |
| return {} | |
| async def update_multi(self, updates: Dict[str, Any]) -> bool: | |
| """批量更新缓存项""" | |
| async with self._cache_lock: | |
| start_time = time.time() | |
| try: | |
| # 确保缓存已加载 | |
| await self._ensure_cache_loaded() | |
| # 批量更新 | |
| self._cache.update(updates) | |
| self._cache_dirty = True | |
| # 性能监控 | |
| self._operation_count += 1 | |
| operation_time = time.time() - start_time | |
| self._operation_times.append(operation_time) | |
| log.debug(f"{self._name} cache update_multi ({len(updates)}) in {operation_time:.3f}s") | |
| return True | |
| except Exception as e: | |
| operation_time = time.time() - start_time | |
| log.error(f"Error updating {self._name} cache multi in {operation_time:.3f}s: {e}") | |
| return False | |
| async def _ensure_cache_loaded(self): | |
| """确保缓存已从底层存储加载""" | |
| current_time = time.time() | |
| # 检查缓存是否需要加载(首次加载或过期) | |
| # 如果缓存脏了(有未写入的数据),不要重新加载以避免数据丢失 | |
| if (self._last_cache_time == 0 or | |
| (current_time - self._last_cache_time > self._cache_ttl and not self._cache_dirty)): | |
| await self._load_cache() | |
| self._last_cache_time = current_time | |
| async def _load_cache(self): | |
| """从底层存储加载缓存""" | |
| try: | |
| start_time = time.time() | |
| # 从后端加载数据 | |
| data = await self._backend.load_data() | |
| if data: | |
| self._cache = data | |
| log.debug(f"{self._name} cache loaded ({len(self._cache)}) from backend") | |
| else: | |
| # 如果后端没有数据,初始化空缓存 | |
| self._cache = {} | |
| log.debug(f"{self._name} cache initialized empty") | |
| operation_time = time.time() - start_time | |
| log.debug(f"{self._name} cache loaded in {operation_time:.3f}s") | |
| except Exception as e: | |
| log.error(f"Error loading {self._name} cache from backend: {e}") | |
| self._cache = {} | |
| async def _write_loop(self): | |
| """异步写回循环""" | |
| while not self._shutdown_event.is_set(): | |
| try: | |
| # 等待写入延迟或关闭信号 | |
| try: | |
| await asyncio.wait_for(self._shutdown_event.wait(), timeout=self._write_delay) | |
| break # 收到关闭信号 | |
| except asyncio.TimeoutError: | |
| pass # 超时,检查是否需要写回 | |
| # 如果缓存脏了,写回底层存储 | |
| async with self._cache_lock: | |
| if self._cache_dirty: | |
| await self._write_cache() | |
| except Exception as e: | |
| log.error(f"Error in {self._name} cache writer loop: {e}") | |
| await asyncio.sleep(1) | |
| async def _write_cache(self): | |
| """将缓存写回底层存储""" | |
| if not self._cache_dirty: | |
| return | |
| try: | |
| start_time = time.time() | |
| # 写入后端 | |
| success = await self._backend.write_data(self._cache.copy()) | |
| if success: | |
| self._cache_dirty = False | |
| operation_time = time.time() - start_time | |
| log.debug(f"{self._name} cache written to backend in {operation_time:.3f}s ({len(self._cache)} items)") | |
| else: | |
| log.error(f"Failed to write {self._name} cache to backend") | |
| except Exception as e: | |
| log.error(f"Error writing {self._name} cache to backend: {e}") | |
| async def _flush_cache(self): | |
| """立即刷新缓存到底层存储""" | |
| async with self._cache_lock: | |
| if self._cache_dirty: | |
| await self._write_cache() | |
| log.debug(f"{self._name} cache flushed to backend") | |
| def get_stats(self) -> Dict[str, Any]: | |
| """获取缓存统计信息""" | |
| avg_time = sum(self._operation_times) / len(self._operation_times) if self._operation_times else 0 | |
| return { | |
| "cache_name": self._name, | |
| "cache_size": len(self._cache), | |
| "cache_dirty": self._cache_dirty, | |
| "operation_count": self._operation_count, | |
| "avg_operation_time": avg_time, | |
| "last_cache_time": self._last_cache_time, | |
| } |