| | import time |
| | import hashlib |
| | import json |
| | from typing import Dict, Any, Optional |
| | import logging |
| | from app.utils.logging import log |
| | from app.config.settings import ( |
| | api_call_stats |
| | ) |
| | logger = logging.getLogger("my_logger") |
| |
|
| | class ResponseCacheManager: |
| | """管理API响应缓存的类""" |
| | |
| | def __init__(self, expiry_time: int, max_entries: int, remove_after_use: bool = True, |
| | cache_dict: Dict[str, Dict[str, Any]] = None): |
| | self.cache = cache_dict if cache_dict is not None else {} |
| | self.expiry_time = expiry_time |
| | self.max_entries = max_entries |
| | self.remove_after_use = remove_after_use |
| | |
| | def get(self, cache_key: str): |
| | """获取缓存项,如果存在且未过期""" |
| | now = time.time() |
| | if cache_key in self.cache and now < self.cache[cache_key].get('expiry_time', 0): |
| | cached_item = self.cache[cache_key] |
| | |
| | |
| | response = cached_item['response'] |
| | |
| | |
| | return response, True |
| | |
| | return None, False |
| | |
| | def store(self, cache_key: str, response, client_ip: str = None): |
| | """存储响应到缓存""" |
| | now = time.time() |
| | self.cache[cache_key] = { |
| | 'response': response, |
| | 'expiry_time': now + self.expiry_time, |
| | 'created_at': now, |
| | 'client_ip': client_ip |
| | } |
| | |
| | log('info', f"响应已缓存: {cache_key[:8]}...", |
| | extra={'cache_operation': 'store', 'request_type': 'non-stream'}) |
| | |
| | |
| | self.clean_if_needed() |
| | |
| | def clean_expired(self): |
| | """清理所有过期的缓存项""" |
| | now = time.time() |
| | expired_keys = [k for k, v in self.cache.items() if now > v.get('expiry_time', 0)] |
| | |
| | for key in expired_keys: |
| | del self.cache[key] |
| | log('info', f"清理过期缓存: {key[:8]}...", extra={'cache_operation': 'clean'}) |
| | |
| | def clean_if_needed(self): |
| | """如果缓存数量超过限制,清理最旧的项目""" |
| | if len(self.cache) <= self.max_entries: |
| | return |
| | |
| | |
| | sorted_keys = sorted(self.cache.keys(), |
| | key=lambda k: self.cache[k].get('created_at', 0)) |
| | |
| | |
| | to_remove = len(self.cache) - self.max_entries |
| | |
| | |
| | for key in sorted_keys[:to_remove]: |
| | del self.cache[key] |
| | log('info', f"缓存容量限制,删除旧缓存: {key[:8]}...", extra={'cache_operation': 'limit'}) |
| |
|
| | def generate_cache_key(chat_request) -> str: |
| | """生成请求的唯一缓存键""" |
| | |
| | request_data = { |
| | 'model': chat_request.model, |
| | 'messages': [] |
| | } |
| | |
| | |
| | for msg in chat_request.messages: |
| | if isinstance(msg.content, str): |
| | message_data = {'role': msg.role, 'content': msg.content} |
| | request_data['messages'].append(message_data) |
| | elif isinstance(msg.content, list): |
| | content_list = [] |
| | for item in msg.content: |
| | if item.get('type') == 'text': |
| | content_list.append({'type': 'text', 'text': item.get('text')}) |
| | |
| | elif item.get('type') == 'image_url': |
| | image_data = item.get('image_url', {}).get('url', '') |
| | if image_data.startswith('data:image/'): |
| | |
| | content_list.append({'type': 'image_url', 'hash': hashlib.md5(image_data[:32].encode()).hexdigest()}) |
| | else: |
| | content_list.append({'type': 'image_url', 'url': image_data}) |
| | request_data['messages'].append({'role': msg.role, 'content': content_list}) |
| | |
| | |
| | json_data = json.dumps(request_data, sort_keys=True) |
| | return hashlib.md5(json_data.encode()).hexdigest() |
| |
|
| | def cache_response(response, cache_key, client_ip, response_cache_manager, update_api_call_stats, api_key=None): |
| | """ |
| | 将响应存入缓存 |
| | |
| | 参数: |
| | - response: 响应对象 |
| | - cache_key: 缓存键 |
| | - client_ip: 客户端IP |
| | - response_cache_manager: 缓存管理器 |
| | - update_api_call_stats: 更新统计的函数 |
| | - api_key: API密钥,用于更新API密钥使用统计 |
| | """ |
| | if not cache_key: |
| | return |
| | |
| | |
| | existing_cache = cache_key in response_cache_manager.cache |
| | |
| | if existing_cache: |
| | log('info', f"缓存已存在,跳过存储: {cache_key[:8]}...", |
| | extra={'cache_operation': 'skip-existing', 'request_type': 'non-stream'}) |
| | else: |
| | response_cache_manager.store(cache_key, response, client_ip) |
| | log('info', f"API响应已缓存: {cache_key[:8]}...", |
| | extra={'cache_operation': 'store-new', 'request_type': 'non-stream'}) |
| | |
| | |
| | update_api_call_stats(api_call_stats, api_key) |