1
File size: 5,520 Bytes
3925cbd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import time
import hashlib
import json
from typing import Dict, Any, Optional
import logging
from app.utils.logging import log
from app.config.settings import (
    api_call_stats
)
logger = logging.getLogger("my_logger")

class ResponseCacheManager:
    """管理API响应缓存的类"""
    
    def __init__(self, expiry_time: int, max_entries: int, remove_after_use: bool = True, 

                cache_dict: Dict[str, Dict[str, Any]] = None):
        self.cache = cache_dict if cache_dict is not None else {}  # 使用传入的缓存字典或创建新字典
        self.expiry_time = expiry_time
        self.max_entries = max_entries
        self.remove_after_use = remove_after_use
    
    def get(self, cache_key: str):
        """获取缓存项,如果存在且未过期"""
        now = time.time()
        if cache_key in self.cache and now < self.cache[cache_key].get('expiry_time', 0):
            cached_item = self.cache[cache_key]
            
            # 获取响应但先不删除
            response = cached_item['response']
            
            # 返回响应
            return response, True
        
        return None, False
    
    def store(self, cache_key: str, response, client_ip: str = None):
        """存储响应到缓存"""
        now = time.time()
        self.cache[cache_key] = {
            'response': response,
            'expiry_time': now + self.expiry_time,
            'created_at': now,
            'client_ip': client_ip
        }
        
        log('info', f"响应已缓存: {cache_key[:8]}...", 
            extra={'cache_operation': 'store', 'request_type': 'non-stream'})
        
        # 如果缓存超过限制,清理最旧的
        self.clean_if_needed()
    
    def clean_expired(self):
        """清理所有过期的缓存项"""
        now = time.time()
        expired_keys = [k for k, v in self.cache.items() if now > v.get('expiry_time', 0)]
        
        for key in expired_keys:
            del self.cache[key]
            log('info', f"清理过期缓存: {key[:8]}...", extra={'cache_operation': 'clean'})
    
    def clean_if_needed(self):
        """如果缓存数量超过限制,清理最旧的项目"""
        if len(self.cache) <= self.max_entries:
            return
        
        # 按创建时间排序
        sorted_keys = sorted(self.cache.keys(),
                           key=lambda k: self.cache[k].get('created_at', 0))
        
        # 计算需要删除的数量
        to_remove = len(self.cache) - self.max_entries
        
        # 删除最旧的项
        for key in sorted_keys[:to_remove]:
            del self.cache[key]
            log('info', f"缓存容量限制,删除旧缓存: {key[:8]}...", extra={'cache_operation': 'limit'})

def generate_cache_key(chat_request) -> str:
    """生成请求的唯一缓存键"""
    # 创建包含请求关键信息的字典
    request_data = {
        'model': chat_request.model, 
        'messages': []
    }
    
    # 添加消息内容
    for msg in chat_request.messages:
        if isinstance(msg.content, str):
            message_data = {'role': msg.role, 'content': msg.content}
            request_data['messages'].append(message_data)
        elif isinstance(msg.content, list):
            content_list = []
            for item in msg.content:
                if item.get('type') == 'text':
                    content_list.append({'type': 'text', 'text': item.get('text')})
                # 对于图像数据,我们只使用标识符而不是全部数据
                elif item.get('type') == 'image_url':
                    image_data = item.get('image_url', {}).get('url', '')
                    if image_data.startswith('data:image/'):
                        # 对于base64图像,使用前32字符作为标识符
                        content_list.append({'type': 'image_url', 'hash': hashlib.md5(image_data[:32].encode()).hexdigest()})
                    else:
                        content_list.append({'type': 'image_url', 'url': image_data})
            request_data['messages'].append({'role': msg.role, 'content': content_list})
    
    # 将字典转换为JSON字符串并计算哈希值
    json_data = json.dumps(request_data, sort_keys=True)
    return hashlib.md5(json_data.encode()).hexdigest()

def cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=None):
    """

    将响应存入缓存

    

    参数:

    - response: 响应对象

    - cache_key: 缓存键

    - client_ip: 客户端IP

    - response_cache_manager: 缓存管理器

    - update_api_call_stats: 更新统计的函数

    - api_key: API密钥,用于更新API密钥使用统计

    """
    if not cache_key:
        return
        
    # 先检查缓存是否已存在
    existing_cache = cache_key in response_cache_manager.cache
    
    if existing_cache:
        log('info', f"缓存已存在,跳过存储: {cache_key[:8]}...",
            extra={'cache_operation': 'skip-existing', 'request_type': 'non-stream'})
    else:
        response_cache_manager.store(cache_key, response, client_ip)
        log('info', f"API响应已缓存: {cache_key[:8]}...",
            extra={'cache_operation': 'store-new', 'request_type': 'non-stream'})
    
    # 更新API调用统计
    update_api_call_stats(api_call_stats, api_key)