File size: 6,110 Bytes
80bf15d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import asyncio
import time
from functools import wraps

import time
import hashlib
import pickle
import asyncio
from functools import wraps
from typing import Any, Callable, Dict, Optional, Tuple
from dataclasses import dataclass
from collections import OrderedDict


@dataclass
class CacheEntry:
    expire_time: float
    result: Any
    access_time: float


def async_cache_decorator(
        max_age: int = 10,
        max_size: Optional[int] = 100,
        ignore_kwargs: Optional[list] = None
):
    """
    高级缓存装饰器

    Args:
        max_age: 缓存最大年龄(秒)
        max_size: 缓存最大大小(None表示无限制)
        ignore_kwargs: 忽略的kwargs参数名列表
    """
    ignore_kwargs = ignore_kwargs or []

    def decorator(func: Callable):
        cache: Dict[str, CacheEntry] = {}
        access_order = OrderedDict()  # 用于LRU淘汰

        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 清理过期的缓存
            _clean_expired_cache()

            # 生成缓存键(忽略指定的kwargs)
            filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ignore_kwargs}
            cache_key = _generate_cache_key(args, filtered_kwargs)
            current_time = time.time()

            # 检查缓存
            if cache_key in cache:
                entry = cache[cache_key]
                if current_time < entry.expire_time:
                    # 更新访问时间和LRU顺序
                    entry.access_time = current_time
                    access_order.move_to_end(cache_key)
                    return entry.result
                else:
                    # 缓存过期
                    _remove_from_cache(cache_key)

            # 调用原函数
            result = await func(*args, **kwargs)

            # 检查缓存大小并可能淘汰
            if max_size and len(cache) >= max_size:
                # 移除最久未使用的缓存
                oldest_key = next(iter(access_order))
                _remove_from_cache(oldest_key)

            # 添加新缓存
            cache[cache_key] = CacheEntry(
                expire_time=current_time + max_age,
                result=result,
                access_time=current_time
            )
            access_order[cache_key] = True

            return result

        def _clean_expired_cache():
            """清理过期缓存"""
            current_time = time.time()
            expired_keys = [
                key for key, entry in cache.items()
                if current_time >= entry.expire_time
            ]
            for key in expired_keys:
                _remove_from_cache(key)

        def _remove_from_cache(cache_key: str):
            """从缓存中移除指定键"""
            if cache_key in cache:
                del cache[cache_key]
            if cache_key in access_order:
                del access_order[cache_key]

        # 缓存管理方法
        def clear_cache():
            """清空所有缓存"""
            cache.clear()
            access_order.clear()

        def remove_from_cache(*args, **kwargs):
            """移除特定参数的缓存"""
            filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ignore_kwargs}
            cache_key = _generate_cache_key(args, filtered_kwargs)
            if cache_key in cache:
                _remove_from_cache(cache_key)
                return True
            return False

        def get_cache_info() -> Dict[str, Any]:
            """获取缓存统计信息"""
            current_time = time.time()
            valid_entries = sum(
                1 for entry in cache.values()
                if current_time < entry.expire_time
            )
            return {
                'total_entries': len(cache),
                'valid_entries': valid_entries,
                'max_size': max_size,
                'max_age': max_age,
                'cache_keys': list(cache.keys())
            }

        def get_cached_result(*args, **kwargs):
            """获取缓存结果(如果存在且未过期)"""
            filtered_kwargs = {k: v for k, v in kwargs.items() if k not in ignore_kwargs}
            cache_key = _generate_cache_key(args, filtered_kwargs)
            current_time = time.time()

            if cache_key in cache:
                entry = cache[cache_key]
                if current_time < entry.expire_time:
                    return entry.result
            return None

        # 附加方法
        wrapper.clear_cache = clear_cache
        wrapper.remove_from_cache = remove_from_cache
        wrapper.get_cache_info = get_cache_info
        wrapper.get_cached_result = get_cached_result

        return wrapper

    return decorator


def _generate_cache_key(args: tuple, kwargs: dict) -> str:
    """生成缓存键"""
    try:
        # 尝试使用更高效的序列化
        key_data = pickle.dumps((args, sorted(kwargs.items())))
        return hashlib.sha256(key_data).hexdigest()
    except (TypeError, pickle.PickleError):
        # 如果无法序列化,使用字符串表示
        args_str = str(args)
        kwargs_str = str(sorted(kwargs.items()))
        return hashlib.sha256(f"{args_str}:{kwargs_str}".encode()).hexdigest()


@async_cache_decorator(10)
async def call_api():
    await asyncio.sleep(1)
    return {"data": f"API响应时间: {time.time()}", "status": "success"}


async def main():
    # 第一次调用 - 实际调用API
    result1 = await call_api()
    print("第一次结果:", result1)

    # 立即再次调用 - 返回缓存结果
    result2 = await call_api()
    print("第二次结果:", result2)

    # 等待5秒后调用 - 返回缓存结果
    await asyncio.sleep(5)
    result3 = await call_api()
    print("第三次结果:", result3)

    # 等待11秒后调用 - 重新调用API
    await asyncio.sleep(6)  # 总共等待11秒
    result4 = await call_api()
    print("第四次结果:", result4)


if __name__ == "__main__":
    asyncio.run(main())