File size: 26,656 Bytes
e82bac2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
# -*- coding: utf-8 -*-
"""
请求处理相关的工具函数,包括 Token 估算、上下文截断、速率限制检查和计数更新、上下文保存等。
"""
import json # 导入 JSON 处理模块
import logging # 导入日志模块
import time # 导入时间模块
from typing import List, Dict, Any, Optional, Tuple # 导入类型提示
from collections import Counter, defaultdict # 导入集合类型

# 导入核心模块
from app.core.database import utils as db_utils # 导入数据库工具模块
from app.core.context import store as context_store # 导入上下文存储模块
# 导入配置
from app import config as app_config # 导入应用配置
# 导入跟踪相关的数据结构和常量
from app.core.tracking import (
    ip_daily_input_token_counts, ip_input_token_counts_lock, # IP 每日输入 Token 计数及锁
    usage_data, usage_lock, RPM_WINDOW_SECONDS, TPM_WINDOW_SECONDS # 使用数据、锁及时间窗口常量
)

# 导入日志记录器
logger = logging.getLogger('my_logger') # 获取日志记录器实例

# --- Token 估算与上下文截断 (来自 token_utils.py) ---

def estimate_token_count(contents: List[Dict[str, Any]]) -> int:
    """
    估算 Gemini contents 列表的 Token 数量。
    使用简单的字符数估算方法 (1 个 token 大约等于 4 个字符)。
    注意:这是一个非常粗略的估算,实际 Token 数可能因模型和内容而异。

    Args:
        contents (List[Dict[str, Any]]): Gemini 格式的内容列表。

    Returns:
        int: 估算的 Token 数量。
    """
    if not contents: # 检查列表是否为空
        return 0 # 如果为空,返回 0
    try:
        # 计算 JSON 序列化后的字符数
        # ensure_ascii=False 确保中文字符等非 ASCII 字符按实际字符数计算,而不是转义序列
        char_count = len(json.dumps(contents, ensure_ascii=False)) # 序列化为 JSON 字符串并获取长度
        # 使用 1 token ≈ 4 chars 的简化规则进行估算
        return char_count // 4 # 返回估算的 Token 数
    except TypeError as e:
        # 捕获并记录序列化过程中可能发生的类型错误
        logger.error(f"序列化 contents 进行 Token 估算时出错: {e}", exc_info=True) # 记录错误日志
        return 0 # 如果序列化失败,返回 0

async def truncate_context( # 改为 async 函数,因为内部可能调用 async 函数 (如 estimate_token_count 未来可能改为调用 API)
    contents: List[Dict[str, Any]],
    model_name: str,
    dynamic_max_tokens_limit: Optional[int] = None # 新增可选参数,表示基于 Key 实时容量的动态限制
) -> Tuple[List[Dict[str, Any]], bool]:
    """
    根据模型限制和可选的动态限制截断对话历史 (contents)。
    采用从开头成对移除消息(通常是 user/model 对)的策略,
    直到估算的 Token 数量满足限制要求。

    Args:
        contents (List[Dict[str, Any]]): 完整的对话历史列表 (Gemini 格式)。
        model_name (str): 当前请求使用的模型名称,用于查找其 Token 限制。
        dynamic_max_tokens_limit (Optional[int]): 可选的动态 Token 限制,
            通常基于 API Key 的实时可用容量。如果提供,将使用此限制与模型静态限制中的较小值。

    Returns:
        Tuple[List[Dict[str, Any]], bool]:
            - 第一个元素是截断后的对话历史列表。
            - 第二个元素是一个布尔值,指示截断后是否仍然超限
              (True 表示超限,False 表示未超限或无需截断)。
              如果返回 True,调用者通常不应保存此上下文,因为它可能仍然过长。
    """
    if not contents: # 检查输入列表是否为空
        return [], False # 如果为空,直接返回空列表和 False (未超限)

    # --- 确定最大 Token 限制 ---
    # 1. 获取配置中的默认值和安全边际
    # 使用 getattr 提供默认值,增加配置的灵活性
    default_max_tokens = getattr(app_config, 'DEFAULT_MAX_CONTEXT_TOKENS', 30000) # 获取默认最大上下文 Token 数,默认为 30000
    safety_margin = getattr(app_config, 'CONTEXT_TOKEN_SAFETY_MARGIN', 200) # 获取 Token 安全边际,默认为 200

    # 2. 获取模型的静态输入 Token 限制
    model_limits = getattr(app_config, 'MODEL_LIMITS', {}) # 从配置加载模型限制字典
    limit_info = model_limits.get(model_name) # 查找当前模型的限制信息
    static_max_tokens = default_max_tokens # 默认使用全局默认值
    if limit_info and isinstance(limit_info, dict) and limit_info.get("input_token_limit"): # 检查是否存在有效的模型特定限制
        try:
            limit_value = limit_info["input_token_limit"] # 获取模型限制值
            if limit_value is not None: # 确保值不是 JSON null
                 static_max_tokens = int(limit_value) # 转换为整数
            else:
                 # 如果模型限制值为 null,记录警告并使用默认值
                 logger.warning(f"模型 '{model_name}' 的 input_token_limit 值为 null,使用默认值 {default_max_tokens}") # 记录警告:模型限制值为 null
        except (ValueError, TypeError):
             # 如果模型限制值无效(无法转换为整数),记录警告并使用默认值
             logger.warning(f"模型 '{model_name}' 的 input_token_limit 值无效 ('{limit_info.get('input_token_limit')}'),使用默认值 {default_max_tokens}") # 记录警告:模型限制值无效
    else:
        # 如果模型或其限制未定义,记录警告并使用默认值
        logger.warning(f"模型 '{model_name}' 或其 input_token_limit 未在 model_limits.json 中定义,使用默认值 {default_max_tokens}") # 记录警告:模型限制未定义

    # 3. 结合动态限制确定最终使用的最大 Token 限制
    actual_max_tokens = static_max_tokens # 默认使用模型的静态限制
    if dynamic_max_tokens_limit is not None and dynamic_max_tokens_limit >= 0: # 如果提供了有效的动态限制(非 None 且非负)
        # 取静态限制和动态限制中的较小者作为实际限制
        actual_max_tokens = min(static_max_tokens, dynamic_max_tokens_limit)
        # 记录日志,说明使用了哪个限制
        logger.debug(f"使用动态限制 {dynamic_max_tokens_limit} 和静态限制 {static_max_tokens},最终最大 Token 限制为 {actual_max_tokens}") # 记录使用的限制值

    # 4. 计算截断阈值(实际限制减去安全边际)
    # 确保阈值不小于 0
    truncation_threshold = max(0, actual_max_tokens - safety_margin) # 计算最终的截断目标 Token 数

    # --- 执行截断 ---
    # 估算当前内容的 Token 数量
    estimated_tokens = estimate_token_count(contents) # 调用 Token 估算函数

    # 判断是否需要截断
    if estimated_tokens > truncation_threshold: # 如果估算 Token 数超过了阈值
        logger.info(f"上下文估算 Token ({estimated_tokens}) 超出阈值 ({truncation_threshold} for model {model_name}, actual max tokens {actual_max_tokens}),开始截断...") # 记录开始截断的日志
        # 创建内容的副本进行操作,避免修改原始列表
        truncated_contents = list(contents) # 复制列表
        # 循环移除消息对,直到满足 Token 限制或无法再移除
        while estimate_token_count(truncated_contents) > truncation_threshold and len(truncated_contents) >= 2:
            # 从列表开头移除两个元素(假设是 user/model 对)
            removed_first = truncated_contents.pop(0) # 移除第一个元素 (通常是 user)
            removed_second = truncated_contents.pop(0) # 移除第二个元素 (通常是 model)
            # 记录被移除的消息的角色,用于调试
            logger.debug(f"移除旧消息对: roles={removed_first.get('role')}, {removed_second.get('role')}") # 记录移除的消息角色

        # 重新估算截断后的 Token 数量
        final_estimated_tokens = estimate_token_count(truncated_contents) # 估算最终 Token 数

        # 检查截断后是否仍然超限
        if final_estimated_tokens > truncation_threshold: # 如果截断后仍然超过阈值
             # 这种情况可能发生在即使只剩下一条消息,其 Token 数也超过阈值
             logger.error(f"截断后上下文估算 Token ({final_estimated_tokens}) 仍然超过阈值 ({truncation_threshold})。本次交互的上下文不应被保存。") # 记录错误:截断后仍超限
             # 返回截断后的内容,并标记为超限 (True)
             return truncated_contents, True
        else:
            # 截断成功,且最终 Token 数在阈值内
            logger.info(f"上下文截断完成,剩余消息数: {len(truncated_contents)}, 最终估算 Token: {final_estimated_tokens}") # 记录截断成功信息
            # 返回截断后的内容,并标记为未超限 (False)
            return truncated_contents, False
    else:
        # 如果原始 Token 数未超过阈值,无需截断
        return contents, False # 返回原始内容,并标记为未超限 (False)

# --- 速率限制检查与计数更新 (来自 rate_limit_utils.py) ---

def check_rate_limits_and_update_counts(
    api_key: str,
    model_name: str,
    limits: Optional[Dict[str, Any]]
) -> bool:
    """
    检查给定 API Key 和模型的速率限制 (RPD, TPD_Input, RPM, TPM_Input)。
    此函数在选择 Key *之前* 调用,用于预检查 Key 是否已达到已知限制。
    如果未达到限制,则更新 RPM 和 RPD 计数(假设本次请求会发生),并返回 True。
    如果达到任何限制,则记录警告并返回 False,表示不应选择此 Key。

    Args:
        api_key (str): 当前尝试使用的 API Key。
        model_name (str): 请求的模型名称。
        limits (Optional[Dict[str, Any]]): 从配置中获取的该模型的限制字典。

    Returns:
        bool: 如果根据已知计数判断可以继续进行 API 调用则返回 True,否则返回 False。
    """
    if not limits: # 检查是否有该模型的限制配置
        logger.warning(f"模型 '{model_name}' 不在 model_limits.json 中,跳过本地速率限制检查。") # 记录警告:模型不在限制配置中
        return True # 没有限制信息,默认允许调用

    now = time.time() # 获取当前时间戳,用于 RPM 和 TPM 检查
    perform_api_call = True # 初始化标志:假设可以执行 API 调用

    with usage_lock: # 获取使用数据锁,保证对共享数据 usage_data 的访问是线程安全的
        # 使用 setdefault 确保 key 和 model 的条目存在于 usage_data 中,避免 KeyError
        # 如果键不存在,会使用 defaultdict 的默认工厂(这里是另一个 defaultdict)创建新条目
        key_usage = usage_data.setdefault(api_key, defaultdict(lambda: defaultdict(int)))[model_name] # 获取或创建 Key 和模型的用法数据字典

        # --- 检查并更新 RPM (每分钟请求数) ---
        rpm_limit = limits.get("rpm") # 从模型限制中获取 RPM 限制值
        if rpm_limit is not None: # 如果配置了 RPM 限制
            current_rpm_count = key_usage.get("rpm_count", 0) # 获取当前 RPM 计数,默认为 0
            rpm_timestamp = key_usage.get("rpm_timestamp", 0) # 获取上次 RPM 窗口开始时间戳,默认为 0

            if now - rpm_timestamp >= RPM_WINDOW_SECONDS: # 检查当前时间是否已经超过了 RPM 窗口时长
                # RPM 窗口已过期,重置计数并将当前请求计为 1
                key_usage["rpm_count"] = 1 # 新窗口的第一个请求
                key_usage["rpm_timestamp"] = now # 更新窗口开始时间戳为当前时间
                logger.debug(f"RPM 窗口过期,重置计数并增加 (Key: {api_key[:8]}, Model: {model_name}): 新 RPM=1") # 记录 RPM 窗口过期和重置
            else:
                # RPM 窗口未过期,检查加上当前这个预期的请求是否会超限
                if current_rpm_count + 1 > rpm_limit: # 如果当前计数加 1 超过限制
                     logger.warning(f"速率限制预检查失败 (Key: {api_key[:8]}, Model: {model_name}): RPM 达到限制 ({current_rpm_count}/{rpm_limit})。跳过此 Key。") # 记录 RPM 超限警告
                     perform_api_call = False # 设置标志为 False,表示不能选择此 Key
                else:
                    # 未达到限制,预先增加计数(假设此 Key 会被选中并使用)
                    key_usage["rpm_count"] = current_rpm_count + 1 # RPM 计数加 1
                    # 时间戳保持不变,因为仍在当前窗口内
                    logger.debug(f"RPM 计数增加 (Key: {api_key[:8]}, Model: {model_name}): 新 RPM={key_usage['rpm_count']}") # 记录 RPM 计数增加

        # --- 检查并更新 RPD (每日请求数) ---
        # 仅在之前的检查(RPM)通过时才进行 RPD 检查
        if perform_api_call:
            rpd_limit = limits.get("rpd") # 获取 RPD 限制值
            if rpd_limit is not None: # 如果配置了 RPD 限制
                current_rpd_count = key_usage.get("rpd_count", 0) # 获取当前 RPD 计数,默认为 0
                # RPD 是每日计数,不需要时间窗口检查,直接判断是否超限
                if current_rpd_count + 1 > rpd_limit: # 如果当前计数加 1 超过限制
                    logger.warning(f"速率限制预检查失败 (Key: {api_key[:8]}, Model: {model_name}): RPD 达到限制 ({current_rpd_count}/{rpd_limit})。跳过此 Key。") # 记录 RPD 超限警告
                    perform_api_call = False # 设置标志为 False
                else:
                    # 未达到限制,预先增加计数
                    key_usage["rpd_count"] = current_rpd_count + 1 # RPD 计数加 1
                    logger.debug(f"RPD 计数增加 (Key: {api_key[:8]}, Model: {model_name}): 新 RPD={key_usage['rpd_count']}") # 记录 RPD 计数增加

        # --- 检查 TPD_Input (每日输入 Token 数) ---
        # 仅检查,不在此处增加计数,因为此时还不知道实际的输入 Token 数。
        # 计数更新在 API 调用成功后的 update_token_counts 函数中进行。
        if perform_api_call:
             tpd_input_limit = limits.get("tpd_input") # 获取 TPD_Input 限制值
             if tpd_input_limit is not None and key_usage.get("tpd_input_count", 0) >= tpd_input_limit: # 如果设置了限制且当前计数已达到或超过限制
                 logger.warning(f"速率限制预检查失败 (Key: {api_key[:8]}, Model: {model_name}): TPD_Input 达到限制 ({key_usage.get('tpd_input_count', 0)}/{tpd_input_limit})。跳过此 Key。") # 记录 TPD_Input 超限警告
                 perform_api_call = False # 设置标志为 False

        # --- 检查 TPM_Input (每分钟输入 Token 数) ---
        # 同样仅检查,不在此处增加计数。
        if perform_api_call:
             tpm_input_limit = limits.get("tpm_input") # 获取 TPM_Input 限制值
             if tpm_input_limit is not None: # 如果设置了 TPM_Input 限制
                 # 检查是否仍在当前 TPM 窗口内
                 if now - key_usage.get("tpm_input_timestamp", 0) < TPM_WINDOW_SECONDS:
                      # 如果在窗口内,检查当前 Token 计数是否已达到或超过限制
                      if key_usage.get("tpm_input_count", 0) >= tpm_input_limit:
                          logger.warning(f"速率限制预检查失败 (Key: {api_key[:8]}, Model: {model_name}): TPM_Input 达到限制 ({key_usage.get('tpm_input_count', 0)}/{tpm_input_limit})。跳过此 Key。") # 记录 TPM_Input 超限警告
                          perform_api_call = False # 设置标志为 False
                 # 注意:这里没有 else 块来在窗口过期时重置 TPM_Input 计数/时间戳,
                 # 因为 update_token_counts 函数会处理 TPM_Input 的重置和增加。

        # 如果所有检查都通过,更新此 Key 的最后请求时间戳(用于 Key 选择策略)
        if perform_api_call:
            key_usage["last_request_timestamp"] = now # 更新最后请求时间戳

    return perform_api_call # 返回最终的检查结果

def update_token_counts(
    api_key: str,
    model_name: str,
    limits: Optional[Dict[str, Any]],
    prompt_tokens: Optional[int],
    client_ip: str,
    today_date_str_pt: str
) -> None:
    """
    在 API 调用成功 *之后* 更新给定 API Key 和模型的 TPD_Input 和 TPM_Input 计数。
    同时记录基于 IP 的每日输入 Token 消耗。

    Args:
        api_key (str): 当前成功使用的 API Key。
        model_name (str): 请求的模型名称。
        limits (Optional[Dict[str, Any]]): 从配置中获取的该模型的限制字典。
        prompt_tokens (Optional[int]): 从 API 响应中获取的实际输入 Token 数量。
        client_ip (str): 客户端 IP 地址。
        today_date_str_pt (str): 当前的太平洋时区日期字符串 (YYYY-MM-DD),用于 IP 每日计数。
    """
    # 检查输入有效性:需要有效的限制信息和大于 0 的 prompt_tokens
    if not limits or not prompt_tokens or prompt_tokens <= 0:
        if limits and (not prompt_tokens or prompt_tokens <= 0): # 如果有限制但 prompt_tokens 无效
             logger.warning(f"Token 计数更新跳过 (Key: {api_key[:8]}, Model: {model_name}): 无效的 prompt_tokens ({prompt_tokens})。") # 记录警告:无效的 prompt_tokens
        # 如果没有限制信息或 prompt_tokens 无效,则不执行更新
        return # 直接返回

    with usage_lock: # 获取使用数据锁,保证线程安全
        # 确保 key 和 model 的条目存在
        key_usage = usage_data.setdefault(api_key, defaultdict(lambda: defaultdict(int)))[model_name] # 获取或创建 Key 和模型的用法数据字典

        # --- 更新 TPD_Input (每日输入 Token 数) ---
        # 直接累加本次请求的 prompt_tokens
        key_usage["tpd_input_count"] = key_usage.get("tpd_input_count", 0) + prompt_tokens # 累加 TPD_Input 计数

        # --- 更新 TPM_Input (每分钟输入 Token 数) ---
        tpm_input_limit = limits.get("tpm_input") # 获取 TPM_Input 限制值
        if tpm_input_limit is not None: # 只有在配置了 TPM 限制时才更新
            now_tpm = time.time() # 获取当前时间戳
            # 检查 TPM 窗口是否已过期
            if now_tpm - key_usage.get("tpm_input_timestamp", 0) >= TPM_WINDOW_SECONDS:
                # 窗口已过期,重置计数为当前请求的 Token 数,并更新时间戳
                key_usage["tpm_input_count"] = prompt_tokens # 新窗口的第一个请求的 Token 数
                key_usage["tpm_input_timestamp"] = now_tpm # 更新窗口开始时间戳
            else:
                # 窗口未过期,累加 Token 数
                key_usage["tpm_input_count"] = key_usage.get("tpm_input_count", 0) + prompt_tokens # 累加 TPM_Input 计数
            # 记录详细的 Token 更新日志
            logger.debug(f"输入 Token 计数更新 (Key: {api_key[:8]}, Model: {model_name}): Added TPD_Input={prompt_tokens}, TPM_Input={key_usage['tpm_input_count']}") # 记录 Token 计数更新详情

    # --- 记录 IP 输入 Token 消耗 (独立于 Key 的限制) ---
    # 使用单独的锁来保护 IP 计数数据
    with ip_input_token_counts_lock: # 获取 IP 输入 token 计数锁
        # 使用 setdefault 确保日期条目存在,并使用 Counter 方便地增加 IP 计数
        # 结构: { 'YYYY-MM-DD': Counter({'ip1': count1, 'ip2': count2}) }
        ip_daily_input_token_counts.setdefault(today_date_str_pt, Counter())[client_ip] += prompt_tokens # 增加指定 IP 在当天的输入 Token 计数

# --- 上下文保存逻辑 (来自 utils.py 原始版本) ---
async def save_context_after_success(
    proxy_key: str,
    contents_to_send: List[Dict[str, Any]],
    model_reply_content: str,
    model_name: str,
    enable_context: bool,
    final_tool_calls: Optional[List[Dict[str, Any]]] = None
):
    """
    在 API 调用成功后保存上下文(如果启用)。

    Args:
        proxy_key (str): 用于存储上下文的键 (通常是 user_id)。
        contents_to_send (List[Dict[str, Any]]): 发送给模型的最终内容列表 (包含历史)。
        model_reply_content (str): 模型返回的文本回复。
        model_name (str): 使用的模型名称。
        enable_context (bool): 是否启用上下文保存功能。
        final_tool_calls (Optional[List[Dict[str, Any]]]): 模型返回的工具调用信息(目前暂未处理)。
    """
    if not enable_context: # 如果未启用上下文保存
        logger.debug(f"Key {proxy_key[:8]}... 的上下文补全已禁用,跳过上下文保存。") # 记录跳过信息
        return # 直接返回

    # 记录准备保存上下文的日志,并指明当前的数据库模式
    logger.debug(f"准备为 Key '{proxy_key[:8]}...' 保存上下文 (内存模式: {db_utils.IS_MEMORY_DB})") # 记录准备保存日志

    # 构造模型的回复部分,格式应符合 Gemini contents 结构
    model_reply_part = {"role": "model", "parts": [{"text": model_reply_content}]}
    if final_tool_calls: # 如果存在工具调用信息
        # TODO: 处理工具调用的上下文保存。
        # Gemini API 的工具调用响应格式与 OpenAI 不同,通常包含 functionCall 和 functionResponse。
        # 需要确定如何将这些信息整合到对话历史中以便后续使用。
        # 目前仅记录警告,表示暂未处理。
        logger.warning("上下文保存:暂未处理工具调用 (tool_calls) 的保存。") # 记录警告:未处理工具调用
        pass # 暂时忽略工具调用


    # 将模型的回复追加到发送给模型的内容之后,形成完整的对话历史用于保存
    final_contents_to_save = contents_to_send + [model_reply_part] # 组合最终要保存的内容

    # --- 对最终要保存的内容进行截断 ---
    # 保存上下文时,通常只使用模型的静态限制进行截断,
    # 因为保存的目的是维护历史记录,而不是适配某个 Key 的实时容量。
    # 注意:这里调用了 truncate_context 函数,它会根据 model_name 查找静态限制。
    # 第二个返回参数 still_over_limit_final 指示即使截断后是否仍然超限。
    truncated_contents_to_save, still_over_limit_final = await truncate_context(final_contents_to_save, model_name) # 对最终内容进行截断

    if not still_over_limit_final: # 如果截断后内容没有超限
        try:
            # 调用 context_store 模块的 save_context 函数保存截断后的上下文
            await context_store.save_context(proxy_key, truncated_contents_to_save) # 保存上下文
            logger.info(f"上下文保存成功 for Key {proxy_key[:8]}...") # 记录保存成功日志
        except Exception as e:
            # 捕获并记录保存过程中可能发生的任何异常
            logger.error(f"保存上下文失败 (Key: {proxy_key[:8]}...): {str(e)}", exc_info=True) # 记录保存失败错误
    else:
        # 如果截断后仍然超限,记录错误,不进行保存
        logger.error(f"上下文在添加回复并再次截断后仍然超限 (Key: {proxy_key[:8]}...). 上下文未保存。") # 记录错误:截断后仍超限

# --- 工具调用处理 (来自 tool_call_utils.py) ---
def process_tool_calls(gemini_tool_calls: Any) -> Optional[List[Dict[str, Any]]]:
    """
    将 Gemini 返回的 functionCall 列表转换为 OpenAI 兼容的 tool_calls 格式。
    Gemini: [{'functionCall': {'name': 'func_name', 'args': {...}}}]
    OpenAI: [{'id': 'call_...', 'type': 'function', 'function': {'name': 'func_name', 'arguments': '{...}'}}]
    """
    if not isinstance(gemini_tool_calls, list): # 检查输入是否为列表
        logger.warning(f"期望 gemini_tool_calls 是列表,但得到 {type(gemini_tool_calls)}") # 记录警告
        return None # 返回 None

    openai_tool_calls = [] # 初始化 OpenAI 格式工具调用列表
    # 遍历 Gemini 工具调用列表
    for i, call in enumerate(gemini_tool_calls):
        # 简化条件判断,逐步检查元素的格式和必要字段
        if not isinstance(call, dict): # 检查元素是否为字典
            logger.warning(f"工具调用列表中的元素不是字典: {call}") # 记录警告
            continue # 跳过格式不正确的元素

        # 检查 'functionCall' 键是否存在且其值是字典
        function_call_data = call.get('functionCall')
        if not isinstance(function_call_data, dict):
            logger.warning(f"工具调用元素缺少有效的 'functionCall' 字典: {call}") # 记录警告
            continue # 跳过格式不正确的元素

        # 检查 'name' 字段是否存在且有效
        func_name = function_call_data.get('name')
        if not isinstance(func_name, str) or not func_name:
            logger.warning(f"工具调用元素缺少有效的 'name' 字段: {call}") # 记录警告
            continue # 跳过格式不正确的元素

        # 检查 'args' 字段是否存在且是字典
        func_args = function_call_data.get('args')
        if not isinstance(func_args, dict):
            logger.warning(f"工具调用元素缺少有效的 'args' 字典: {call}") # 记录警告
            continue # 跳过格式不正确的元素

        try:
            # OpenAI 需要 arguments 是 JSON 字符串
            arguments_str = json.dumps(func_args, ensure_ascii=False) # 将参数序列化为 JSON 字符串
        except TypeError as e:
            logger.error(f"序列化工具调用参数失败 (Name: {func_name}): {e}", exc_info=True) # 记录序列化失败错误
            continue # 跳过这个调用

        # 添加到 OpenAI 格式列表
        openai_tool_calls.append({
            "id": f"call_{int(time.time()*1000)}_{i}", # 生成唯一 ID (基于时间戳和索引)
            "type": "function", # 类型固定为 function
            "function": {
                "name": func_name, # 函数名称
                "arguments": arguments_str, # 参数 JSON 字符串
            }
        })

    return openai_tool_calls if openai_tool_calls else None # 返回 OpenAI 格式列表或 None