File size: 5,520 Bytes
3925cbd |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 |
import time
import hashlib
import json
from typing import Dict, Any, Optional
import logging
from app.utils.logging import log
from app.config.settings import (
api_call_stats
)
logger = logging.getLogger("my_logger")
class ResponseCacheManager:
"""管理API响应缓存的类"""
def __init__(self, expiry_time: int, max_entries: int, remove_after_use: bool = True,
cache_dict: Dict[str, Dict[str, Any]] = None):
self.cache = cache_dict if cache_dict is not None else {} # 使用传入的缓存字典或创建新字典
self.expiry_time = expiry_time
self.max_entries = max_entries
self.remove_after_use = remove_after_use
def get(self, cache_key: str):
"""获取缓存项,如果存在且未过期"""
now = time.time()
if cache_key in self.cache and now < self.cache[cache_key].get('expiry_time', 0):
cached_item = self.cache[cache_key]
# 获取响应但先不删除
response = cached_item['response']
# 返回响应
return response, True
return None, False
def store(self, cache_key: str, response, client_ip: str = None):
"""存储响应到缓存"""
now = time.time()
self.cache[cache_key] = {
'response': response,
'expiry_time': now + self.expiry_time,
'created_at': now,
'client_ip': client_ip
}
log('info', f"响应已缓存: {cache_key[:8]}...",
extra={'cache_operation': 'store', 'request_type': 'non-stream'})
# 如果缓存超过限制,清理最旧的
self.clean_if_needed()
def clean_expired(self):
"""清理所有过期的缓存项"""
now = time.time()
expired_keys = [k for k, v in self.cache.items() if now > v.get('expiry_time', 0)]
for key in expired_keys:
del self.cache[key]
log('info', f"清理过期缓存: {key[:8]}...", extra={'cache_operation': 'clean'})
def clean_if_needed(self):
"""如果缓存数量超过限制,清理最旧的项目"""
if len(self.cache) <= self.max_entries:
return
# 按创建时间排序
sorted_keys = sorted(self.cache.keys(),
key=lambda k: self.cache[k].get('created_at', 0))
# 计算需要删除的数量
to_remove = len(self.cache) - self.max_entries
# 删除最旧的项
for key in sorted_keys[:to_remove]:
del self.cache[key]
log('info', f"缓存容量限制,删除旧缓存: {key[:8]}...", extra={'cache_operation': 'limit'})
def generate_cache_key(chat_request) -> str:
"""生成请求的唯一缓存键"""
# 创建包含请求关键信息的字典
request_data = {
'model': chat_request.model,
'messages': []
}
# 添加消息内容
for msg in chat_request.messages:
if isinstance(msg.content, str):
message_data = {'role': msg.role, 'content': msg.content}
request_data['messages'].append(message_data)
elif isinstance(msg.content, list):
content_list = []
for item in msg.content:
if item.get('type') == 'text':
content_list.append({'type': 'text', 'text': item.get('text')})
# 对于图像数据,我们只使用标识符而不是全部数据
elif item.get('type') == 'image_url':
image_data = item.get('image_url', {}).get('url', '')
if image_data.startswith('data:image/'):
# 对于base64图像,使用前32字符作为标识符
content_list.append({'type': 'image_url', 'hash': hashlib.md5(image_data[:32].encode()).hexdigest()})
else:
content_list.append({'type': 'image_url', 'url': image_data})
request_data['messages'].append({'role': msg.role, 'content': content_list})
# 将字典转换为JSON字符串并计算哈希值
json_data = json.dumps(request_data, sort_keys=True)
return hashlib.md5(json_data.encode()).hexdigest()
def cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=None):
"""
将响应存入缓存
参数:
- response: 响应对象
- cache_key: 缓存键
- client_ip: 客户端IP
- response_cache_manager: 缓存管理器
- update_api_call_stats: 更新统计的函数
- api_key: API密钥,用于更新API密钥使用统计
"""
if not cache_key:
return
# 先检查缓存是否已存在
existing_cache = cache_key in response_cache_manager.cache
if existing_cache:
log('info', f"缓存已存在,跳过存储: {cache_key[:8]}...",
extra={'cache_operation': 'skip-existing', 'request_type': 'non-stream'})
else:
response_cache_manager.store(cache_key, response, client_ip)
log('info', f"API响应已缓存: {cache_key[:8]}...",
extra={'cache_operation': 'store-new', 'request_type': 'non-stream'})
# 更新API调用统计
update_api_call_stats(api_call_stats, api_key) |