| | |
| | |
| | import asyncio |
| | import time |
| | from functools import wraps |
| |
|
| | import time |
| | import hashlib |
| | import pickle |
| | import asyncio |
| | from functools import wraps |
| | from typing import Any, Callable, Dict, Optional, Tuple |
| | from dataclasses import dataclass |
| | from collections import OrderedDict |
| |
|
| |
|
| | @dataclass |
| | class CacheEntry: |
| | expire_time: float |
| | result: Any |
| | access_time: float |
| |
|
| |
|
| | def async_cache_decorator( |
| | max_age: int = 10, |
| | max_size: Optional[int] = 100, |
| | ignore_kwargs: Optional[list] = None |
| | ): |
| | """ |
| | 高级缓存装饰器 |
| | |
| | Args: |
| | max_age: 缓存最大年龄(秒) |
| | max_size: 缓存最大大小(None表示无限制) |
| | ignore_kwargs: 忽略的kwargs参数名列表 |
| | """ |
| | ignore_kwargs = ignore_kwargs or [] |
| |
|
| | def decorator(func: Callable): |
| | cache: Dict[str, CacheEntry] = {} |
| | access_order = OrderedDict() |
| |
|
| | @wraps(func) |
| | async def wrapper(*args, **kwargs): |
| | |
| | _clean_expired_cache() |
| |
|
| | |
| | filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ignore_kwargs} |
| | cache_key = _generate_cache_key(args, filtered_kwargs) |
| | current_time = time.time() |
| |
|
| | |
| | if cache_key in cache: |
| | entry = cache[cache_key] |
| | if current_time < entry.expire_time: |
| | |
| | entry.access_time = current_time |
| | access_order.move_to_end(cache_key) |
| | return entry.result |
| | else: |
| | |
| | _remove_from_cache(cache_key) |
| |
|
| | |
| | result = await func(*args, **kwargs) |
| |
|
| | |
| | if max_size and len(cache) >= max_size: |
| | |
| | oldest_key = next(iter(access_order)) |
| | _remove_from_cache(oldest_key) |
| |
|
| | |
| | cache[cache_key] = CacheEntry( |
| | expire_time=current_time + max_age, |
| | result=result, |
| | access_time=current_time |
| | ) |
| | access_order[cache_key] = True |
| |
|
| | return result |
| |
|
| | def _clean_expired_cache(): |
| | """清理过期缓存""" |
| | current_time = time.time() |
| | expired_keys = [ |
| | key for key, entry in cache.items() |
| | if current_time >= entry.expire_time |
| | ] |
| | for key in expired_keys: |
| | _remove_from_cache(key) |
| |
|
| | def _remove_from_cache(cache_key: str): |
| | """从缓存中移除指定键""" |
| | if cache_key in cache: |
| | del cache[cache_key] |
| | if cache_key in access_order: |
| | del access_order[cache_key] |
| |
|
| | |
| | def clear_cache(): |
| | """清空所有缓存""" |
| | cache.clear() |
| | access_order.clear() |
| |
|
| | def remove_from_cache(*args, **kwargs): |
| | """移除特定参数的缓存""" |
| | filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ignore_kwargs} |
| | cache_key = _generate_cache_key(args, filtered_kwargs) |
| | if cache_key in cache: |
| | _remove_from_cache(cache_key) |
| | return True |
| | return False |
| |
|
| | def get_cache_info() -> Dict[str, Any]: |
| | """获取缓存统计信息""" |
| | current_time = time.time() |
| | valid_entries = sum( |
| | 1 for entry in cache.values() |
| | if current_time < entry.expire_time |
| | ) |
| | return { |
| | 'total_entries': len(cache), |
| | 'valid_entries': valid_entries, |
| | 'max_size': max_size, |
| | 'max_age': max_age, |
| | 'cache_keys': list(cache.keys()) |
| | } |
| |
|
| | def get_cached_result(*args, **kwargs): |
| | """获取缓存结果(如果存在且未过期)""" |
| | filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ignore_kwargs} |
| | cache_key = _generate_cache_key(args, filtered_kwargs) |
| | current_time = time.time() |
| |
|
| | if cache_key in cache: |
| | entry = cache[cache_key] |
| | if current_time < entry.expire_time: |
| | return entry.result |
| | return None |
| |
|
| | |
| | wrapper.clear_cache = clear_cache |
| | wrapper.remove_from_cache = remove_from_cache |
| | wrapper.get_cache_info = get_cache_info |
| | wrapper.get_cached_result = get_cached_result |
| |
|
| | return wrapper |
| |
|
| | return decorator |
| |
|
| |
|
| | def _generate_cache_key(args: tuple, kwargs: dict) -> str: |
| | """生成缓存键""" |
| | try: |
| | |
| | key_data = pickle.dumps((args, sorted(kwargs.items()))) |
| | return hashlib.sha256(key_data).hexdigest() |
| | except (TypeError, pickle.PickleError): |
| | |
| | args_str = str(args) |
| | kwargs_str = str(sorted(kwargs.items())) |
| | return hashlib.sha256(f"{args_str}:{kwargs_str}".encode()).hexdigest() |
| |
|
| |
|
| | @async_cache_decorator(10) |
| | async def call_api(): |
| | await asyncio.sleep(1) |
| | return {"data": f"API响应时间: {time.time()}", "status": "success"} |
| |
|
| |
|
| | async def main(): |
| | |
| | result1 = await call_api() |
| | print("第一次结果:", result1) |
| |
|
| | |
| | result2 = await call_api() |
| | print("第二次结果:", result2) |
| |
|
| | |
| | await asyncio.sleep(5) |
| | result3 = await call_api() |
| | print("第三次结果:", result3) |
| |
|
| | |
| | await asyncio.sleep(6) |
| | result4 = await call_api() |
| | print("第四次结果:", result4) |
| |
|
| |
|
| | if __name__ == "__main__": |
| | asyncio.run(main()) |
| |
|