Spaces:
Running
Running
| #!/usr/bin/python3 | |
| # -*- coding: utf-8 -*- | |
| import asyncio | |
| import time | |
| from functools import wraps | |
| import time | |
| import hashlib | |
| import pickle | |
| import asyncio | |
| from functools import wraps | |
| from typing import Any, Callable, Dict, Optional, Tuple | |
| from dataclasses import dataclass | |
| from collections import OrderedDict | |
| class CacheEntry: | |
| expire_time: float | |
| result: Any | |
| access_time: float | |
| def async_cache_decorator( | |
| max_age: int = 10, | |
| max_size: Optional[int] = 100, | |
| ignore_kwargs: Optional[list] = None | |
| ): | |
| """ | |
| 高级缓存装饰器 | |
| Args: | |
| max_age: 缓存最大年龄(秒) | |
| max_size: 缓存最大大小(None表示无限制) | |
| ignore_kwargs: 忽略的kwargs参数名列表 | |
| """ | |
| ignore_kwargs = ignore_kwargs or [] | |
| def decorator(func: Callable): | |
| cache: Dict[str, CacheEntry] = {} | |
| access_order = OrderedDict() # 用于LRU淘汰 | |
| async def wrapper(*args, **kwargs): | |
| # 清理过期的缓存 | |
| _clean_expired_cache() | |
| # 生成缓存键(忽略指定的kwargs) | |
| filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ignore_kwargs} | |
| cache_key = _generate_cache_key(args, filtered_kwargs) | |
| current_time = time.time() | |
| # 检查缓存 | |
| if cache_key in cache: | |
| entry = cache[cache_key] | |
| if current_time < entry.expire_time: | |
| # 更新访问时间和LRU顺序 | |
| entry.access_time = current_time | |
| access_order.move_to_end(cache_key) | |
| return entry.result | |
| else: | |
| # 缓存过期 | |
| _remove_from_cache(cache_key) | |
| # 调用原函数 | |
| result = await func(*args, **kwargs) | |
| # 检查缓存大小并可能淘汰 | |
| if max_size and len(cache) >= max_size: | |
| # 移除最久未使用的缓存 | |
| oldest_key = next(iter(access_order)) | |
| _remove_from_cache(oldest_key) | |
| # 添加新缓存 | |
| cache[cache_key] = CacheEntry( | |
| expire_time=current_time + max_age, | |
| result=result, | |
| access_time=current_time | |
| ) | |
| access_order[cache_key] = True | |
| return result | |
| def _clean_expired_cache(): | |
| """清理过期缓存""" | |
| current_time = time.time() | |
| expired_keys = [ | |
| key for key, entry in cache.items() | |
| if current_time >= entry.expire_time | |
| ] | |
| for key in expired_keys: | |
| _remove_from_cache(key) | |
| def _remove_from_cache(cache_key: str): | |
| """从缓存中移除指定键""" | |
| if cache_key in cache: | |
| del cache[cache_key] | |
| if cache_key in access_order: | |
| del access_order[cache_key] | |
| # 缓存管理方法 | |
| def clear_cache(): | |
| """清空所有缓存""" | |
| cache.clear() | |
| access_order.clear() | |
| def remove_from_cache(*args, **kwargs): | |
| """移除特定参数的缓存""" | |
| filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ignore_kwargs} | |
| cache_key = _generate_cache_key(args, filtered_kwargs) | |
| if cache_key in cache: | |
| _remove_from_cache(cache_key) | |
| return True | |
| return False | |
| def get_cache_info() -> Dict[str, Any]: | |
| """获取缓存统计信息""" | |
| current_time = time.time() | |
| valid_entries = sum( | |
| 1 for entry in cache.values() | |
| if current_time < entry.expire_time | |
| ) | |
| return { | |
| 'total_entries': len(cache), | |
| 'valid_entries': valid_entries, | |
| 'max_size': max_size, | |
| 'max_age': max_age, | |
| 'cache_keys': list(cache.keys()) | |
| } | |
| def get_cached_result(*args, **kwargs): | |
| """获取缓存结果(如果存在且未过期)""" | |
| filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ignore_kwargs} | |
| cache_key = _generate_cache_key(args, filtered_kwargs) | |
| current_time = time.time() | |
| if cache_key in cache: | |
| entry = cache[cache_key] | |
| if current_time < entry.expire_time: | |
| return entry.result | |
| return None | |
| # 附加方法 | |
| wrapper.clear_cache = clear_cache | |
| wrapper.remove_from_cache = remove_from_cache | |
| wrapper.get_cache_info = get_cache_info | |
| wrapper.get_cached_result = get_cached_result | |
| return wrapper | |
| return decorator | |
| def _generate_cache_key(args: tuple, kwargs: dict) -> str: | |
| """生成缓存键""" | |
| try: | |
| # 尝试使用更高效的序列化 | |
| key_data = pickle.dumps((args, sorted(kwargs.items()))) | |
| return hashlib.sha256(key_data).hexdigest() | |
| except (TypeError, pickle.PickleError): | |
| # 如果无法序列化,使用字符串表示 | |
| args_str = str(args) | |
| kwargs_str = str(sorted(kwargs.items())) | |
| return hashlib.sha256(f"{args_str}:{kwargs_str}".encode()).hexdigest() | |
| async def call_api(): | |
| await asyncio.sleep(1) | |
| return {"data": f"API响应时间: {time.time()}", "status": "success"} | |
| async def main(): | |
| # 第一次调用 - 实际调用API | |
| result1 = await call_api() | |
| print("第一次结果:", result1) | |
| # 立即再次调用 - 返回缓存结果 | |
| result2 = await call_api() | |
| print("第二次结果:", result2) | |
| # 等待5秒后调用 - 返回缓存结果 | |
| await asyncio.sleep(5) | |
| result3 = await call_api() | |
| print("第三次结果:", result3) | |
| # 等待11秒后调用 - 重新调用API | |
| await asyncio.sleep(6) # 总共等待11秒 | |
| result4 = await call_api() | |
| print("第四次结果:", result4) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |