|
|
import time
|
|
|
import hashlib
|
|
|
import json
|
|
|
from typing import Dict, Any, Optional
|
|
|
import logging
|
|
|
from app.utils.logging import log
|
|
|
from app.config.settings import (
|
|
|
api_call_stats
|
|
|
)
|
|
|
logger = logging.getLogger("my_logger")
|
|
|
|
|
|
class ResponseCacheManager:
|
|
|
"""管理API响应缓存的类"""
|
|
|
|
|
|
def __init__(self, expiry_time: int, max_entries: int, remove_after_use: bool = True,
|
|
|
cache_dict: Dict[str, Dict[str, Any]] = None):
|
|
|
self.cache = cache_dict if cache_dict is not None else {}
|
|
|
self.expiry_time = expiry_time
|
|
|
self.max_entries = max_entries
|
|
|
self.remove_after_use = remove_after_use
|
|
|
|
|
|
def get(self, cache_key: str):
|
|
|
"""获取缓存项,如果存在且未过期"""
|
|
|
now = time.time()
|
|
|
if cache_key in self.cache and now < self.cache[cache_key].get('expiry_time', 0):
|
|
|
cached_item = self.cache[cache_key]
|
|
|
|
|
|
|
|
|
response = cached_item['response']
|
|
|
|
|
|
|
|
|
return response, True
|
|
|
|
|
|
return None, False
|
|
|
|
|
|
def store(self, cache_key: str, response, client_ip: str = None):
|
|
|
"""存储响应到缓存"""
|
|
|
now = time.time()
|
|
|
self.cache[cache_key] = {
|
|
|
'response': response,
|
|
|
'expiry_time': now + self.expiry_time,
|
|
|
'created_at': now,
|
|
|
'client_ip': client_ip
|
|
|
}
|
|
|
|
|
|
log('info', f"响应已缓存: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'store', 'request_type': 'non-stream'})
|
|
|
|
|
|
|
|
|
self.clean_if_needed()
|
|
|
|
|
|
def clean_expired(self):
|
|
|
"""清理所有过期的缓存项"""
|
|
|
now = time.time()
|
|
|
expired_keys = [k for k, v in self.cache.items() if now > v.get('expiry_time', 0)]
|
|
|
|
|
|
for key in expired_keys:
|
|
|
del self.cache[key]
|
|
|
log('info', f"清理过期缓存: {key[:8]}...", extra={'cache_operation': 'clean'})
|
|
|
|
|
|
def clean_if_needed(self):
|
|
|
"""如果缓存数量超过限制,清理最旧的项目"""
|
|
|
if len(self.cache) <= self.max_entries:
|
|
|
return
|
|
|
|
|
|
|
|
|
sorted_keys = sorted(self.cache.keys(),
|
|
|
key=lambda k: self.cache[k].get('created_at', 0))
|
|
|
|
|
|
|
|
|
to_remove = len(self.cache) - self.max_entries
|
|
|
|
|
|
|
|
|
for key in sorted_keys[:to_remove]:
|
|
|
del self.cache[key]
|
|
|
log('info', f"缓存容量限制,删除旧缓存: {key[:8]}...", extra={'cache_operation': 'limit'})
|
|
|
|
|
|
def generate_cache_key(chat_request) -> str:
|
|
|
"""生成请求的唯一缓存键"""
|
|
|
|
|
|
request_data = {
|
|
|
'model': chat_request.model,
|
|
|
'messages': []
|
|
|
}
|
|
|
|
|
|
|
|
|
for msg in chat_request.messages:
|
|
|
if isinstance(msg.content, str):
|
|
|
message_data = {'role': msg.role, 'content': msg.content}
|
|
|
request_data['messages'].append(message_data)
|
|
|
elif isinstance(msg.content, list):
|
|
|
content_list = []
|
|
|
for item in msg.content:
|
|
|
if item.get('type') == 'text':
|
|
|
content_list.append({'type': 'text', 'text': item.get('text')})
|
|
|
|
|
|
elif item.get('type') == 'image_url':
|
|
|
image_data = item.get('image_url', {}).get('url', '')
|
|
|
if image_data.startswith('data:image/'):
|
|
|
|
|
|
content_list.append({'type': 'image_url', 'hash': hashlib.md5(image_data[:32].encode()).hexdigest()})
|
|
|
else:
|
|
|
content_list.append({'type': 'image_url', 'url': image_data})
|
|
|
request_data['messages'].append({'role': msg.role, 'content': content_list})
|
|
|
|
|
|
|
|
|
json_data = json.dumps(request_data, sort_keys=True)
|
|
|
return hashlib.md5(json_data.encode()).hexdigest()
|
|
|
|
|
|
def cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=None):
|
|
|
"""
|
|
|
将响应存入缓存
|
|
|
|
|
|
参数:
|
|
|
- response: 响应对象
|
|
|
- cache_key: 缓存键
|
|
|
- client_ip: 客户端IP
|
|
|
- response_cache_manager: 缓存管理器
|
|
|
- update_api_call_stats: 更新统计的函数
|
|
|
- api_key: API密钥,用于更新API密钥使用统计
|
|
|
"""
|
|
|
if not cache_key:
|
|
|
return
|
|
|
|
|
|
|
|
|
existing_cache = cache_key in response_cache_manager.cache
|
|
|
|
|
|
if existing_cache:
|
|
|
log('info', f"缓存已存在,跳过存储: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'skip-existing', 'request_type': 'non-stream'})
|
|
|
else:
|
|
|
response_cache_manager.store(cache_key, response, client_ip)
|
|
|
log('info', f"API响应已缓存: {cache_key[:8]}...",
|
|
|
extra={'cache_operation': 'store-new', 'request_type': 'non-stream'})
|
|
|
|
|
|
|
|
|
update_api_call_stats(api_call_stats, api_key) |