File size: 26,656 Bytes
e82bac2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 | # -*- coding: utf-8 -*-
"""
请求处理相关的工具函数,包括 Token 估算、上下文截断、速率限制检查和计数更新、上下文保存等。
"""
import json # 导入 JSON 处理模块
import logging # 导入日志模块
import time # 导入时间模块
from typing import List, Dict, Any, Optional, Tuple # 导入类型提示
from collections import Counter, defaultdict # 导入集合类型
# 导入核心模块
from app.core.database import utils as db_utils # 导入数据库工具模块
from app.core.context import store as context_store # 导入上下文存储模块
# 导入配置
from app import config as app_config # 导入应用配置
# 导入跟踪相关的数据结构和常量
from app.core.tracking import (
ip_daily_input_token_counts, ip_input_token_counts_lock, # IP 每日输入 Token 计数及锁
usage_data, usage_lock, RPM_WINDOW_SECONDS, TPM_WINDOW_SECONDS # 使用数据、锁及时间窗口常量
)
# 导入日志记录器
logger = logging.getLogger('my_logger') # 获取日志记录器实例
# --- Token 估算与上下文截断 (来自 token_utils.py) ---
def estimate_token_count(contents: List[Dict[str, Any]]) -> int:
"""
估算 Gemini contents 列表的 Token 数量。
使用简单的字符数估算方法 (1 个 token 大约等于 4 个字符)。
注意:这是一个非常粗略的估算,实际 Token 数可能因模型和内容而异。
Args:
contents (List[Dict[str, Any]]): Gemini 格式的内容列表。
Returns:
int: 估算的 Token 数量。
"""
if not contents: # 检查列表是否为空
return 0 # 如果为空,返回 0
try:
# 计算 JSON 序列化后的字符数
# ensure_ascii=False 确保中文字符等非 ASCII 字符按实际字符数计算,而不是转义序列
char_count = len(json.dumps(contents, ensure_ascii=False)) # 序列化为 JSON 字符串并获取长度
# 使用 1 token ≈ 4 chars 的简化规则进行估算
return char_count // 4 # 返回估算的 Token 数
except TypeError as e:
# 捕获并记录序列化过程中可能发生的类型错误
logger.error(f"序列化 contents 进行 Token 估算时出错: {e}", exc_info=True) # 记录错误日志
return 0 # 如果序列化失败,返回 0
async def truncate_context( # 改为 async 函数,因为内部可能调用 async 函数 (如 estimate_token_count 未来可能改为调用 API)
contents: List[Dict[str, Any]],
model_name: str,
dynamic_max_tokens_limit: Optional[int] = None # 新增可选参数,表示基于 Key 实时容量的动态限制
) -> Tuple[List[Dict[str, Any]], bool]:
"""
根据模型限制和可选的动态限制截断对话历史 (contents)。
采用从开头成对移除消息(通常是 user/model 对)的策略,
直到估算的 Token 数量满足限制要求。
Args:
contents (List[Dict[str, Any]]): 完整的对话历史列表 (Gemini 格式)。
model_name (str): 当前请求使用的模型名称,用于查找其 Token 限制。
dynamic_max_tokens_limit (Optional[int]): 可选的动态 Token 限制,
通常基于 API Key 的实时可用容量。如果提供,将使用此限制与模型静态限制中的较小值。
Returns:
Tuple[List[Dict[str, Any]], bool]:
- 第一个元素是截断后的对话历史列表。
- 第二个元素是一个布尔值,指示截断后是否仍然超限
(True 表示超限,False 表示未超限或无需截断)。
如果返回 True,调用者通常不应保存此上下文,因为它可能仍然过长。
"""
if not contents: # 检查输入列表是否为空
return [], False # 如果为空,直接返回空列表和 False (未超限)
# --- 确定最大 Token 限制 ---
# 1. 获取配置中的默认值和安全边际
# 使用 getattr 提供默认值,增加配置的灵活性
default_max_tokens = getattr(app_config, 'DEFAULT_MAX_CONTEXT_TOKENS', 30000) # 获取默认最大上下文 Token 数,默认为 30000
safety_margin = getattr(app_config, 'CONTEXT_TOKEN_SAFETY_MARGIN', 200) # 获取 Token 安全边际,默认为 200
# 2. 获取模型的静态输入 Token 限制
model_limits = getattr(app_config, 'MODEL_LIMITS', {}) # 从配置加载模型限制字典
limit_info = model_limits.get(model_name) # 查找当前模型的限制信息
static_max_tokens = default_max_tokens # 默认使用全局默认值
if limit_info and isinstance(limit_info, dict) and limit_info.get("input_token_limit"): # 检查是否存在有效的模型特定限制
try:
limit_value = limit_info["input_token_limit"] # 获取模型限制值
if limit_value is not None: # 确保值不是 JSON null
static_max_tokens = int(limit_value) # 转换为整数
else:
# 如果模型限制值为 null,记录警告并使用默认值
logger.warning(f"模型 '{model_name}' 的 input_token_limit 值为 null,使用默认值 {default_max_tokens}") # 记录警告:模型限制值为 null
except (ValueError, TypeError):
# 如果模型限制值无效(无法转换为整数),记录警告并使用默认值
logger.warning(f"模型 '{model_name}' 的 input_token_limit 值无效 ('{limit_info.get('input_token_limit')}'),使用默认值 {default_max_tokens}") # 记录警告:模型限制值无效
else:
# 如果模型或其限制未定义,记录警告并使用默认值
logger.warning(f"模型 '{model_name}' 或其 input_token_limit 未在 model_limits.json 中定义,使用默认值 {default_max_tokens}") # 记录警告:模型限制未定义
# 3. 结合动态限制确定最终使用的最大 Token 限制
actual_max_tokens = static_max_tokens # 默认使用模型的静态限制
if dynamic_max_tokens_limit is not None and dynamic_max_tokens_limit >= 0: # 如果提供了有效的动态限制(非 None 且非负)
# 取静态限制和动态限制中的较小者作为实际限制
actual_max_tokens = min(static_max_tokens, dynamic_max_tokens_limit)
# 记录日志,说明使用了哪个限制
logger.debug(f"使用动态限制 {dynamic_max_tokens_limit} 和静态限制 {static_max_tokens},最终最大 Token 限制为 {actual_max_tokens}") # 记录使用的限制值
# 4. 计算截断阈值(实际限制减去安全边际)
# 确保阈值不小于 0
truncation_threshold = max(0, actual_max_tokens - safety_margin) # 计算最终的截断目标 Token 数
# --- 执行截断 ---
# 估算当前内容的 Token 数量
estimated_tokens = estimate_token_count(contents) # 调用 Token 估算函数
# 判断是否需要截断
if estimated_tokens > truncation_threshold: # 如果估算 Token 数超过了阈值
logger.info(f"上下文估算 Token ({estimated_tokens}) 超出阈值 ({truncation_threshold} for model {model_name}, actual max tokens {actual_max_tokens}),开始截断...") # 记录开始截断的日志
# 创建内容的副本进行操作,避免修改原始列表
truncated_contents = list(contents) # 复制列表
# 循环移除消息对,直到满足 Token 限制或无法再移除
while estimate_token_count(truncated_contents) > truncation_threshold and len(truncated_contents) >= 2:
# 从列表开头移除两个元素(假设是 user/model 对)
removed_first = truncated_contents.pop(0) # 移除第一个元素 (通常是 user)
removed_second = truncated_contents.pop(0) # 移除第二个元素 (通常是 model)
# 记录被移除的消息的角色,用于调试
logger.debug(f"移除旧消息对: roles={removed_first.get('role')}, {removed_second.get('role')}") # 记录移除的消息角色
# 重新估算截断后的 Token 数量
final_estimated_tokens = estimate_token_count(truncated_contents) # 估算最终 Token 数
# 检查截断后是否仍然超限
if final_estimated_tokens > truncation_threshold: # 如果截断后仍然超过阈值
# 这种情况可能发生在即使只剩下一条消息,其 Token 数也超过阈值
logger.error(f"截断后上下文估算 Token ({final_estimated_tokens}) 仍然超过阈值 ({truncation_threshold})。本次交互的上下文不应被保存。") # 记录错误:截断后仍超限
# 返回截断后的内容,并标记为超限 (True)
return truncated_contents, True
else:
# 截断成功,且最终 Token 数在阈值内
logger.info(f"上下文截断完成,剩余消息数: {len(truncated_contents)}, 最终估算 Token: {final_estimated_tokens}") # 记录截断成功信息
# 返回截断后的内容,并标记为未超限 (False)
return truncated_contents, False
else:
# 如果原始 Token 数未超过阈值,无需截断
return contents, False # 返回原始内容,并标记为未超限 (False)
# --- 速率限制检查与计数更新 (来自 rate_limit_utils.py) ---
def check_rate_limits_and_update_counts(
api_key: str,
model_name: str,
limits: Optional[Dict[str, Any]]
) -> bool:
"""
检查给定 API Key 和模型的速率限制 (RPD, TPD_Input, RPM, TPM_Input)。
此函数在选择 Key *之前* 调用,用于预检查 Key 是否已达到已知限制。
如果未达到限制,则更新 RPM 和 RPD 计数(假设本次请求会发生),并返回 True。
如果达到任何限制,则记录警告并返回 False,表示不应选择此 Key。
Args:
api_key (str): 当前尝试使用的 API Key。
model_name (str): 请求的模型名称。
limits (Optional[Dict[str, Any]]): 从配置中获取的该模型的限制字典。
Returns:
bool: 如果根据已知计数判断可以继续进行 API 调用则返回 True,否则返回 False。
"""
if not limits: # 检查是否有该模型的限制配置
logger.warning(f"模型 '{model_name}' 不在 model_limits.json 中,跳过本地速率限制检查。") # 记录警告:模型不在限制配置中
return True # 没有限制信息,默认允许调用
now = time.time() # 获取当前时间戳,用于 RPM 和 TPM 检查
perform_api_call = True # 初始化标志:假设可以执行 API 调用
with usage_lock: # 获取使用数据锁,保证对共享数据 usage_data 的访问是线程安全的
# 使用 setdefault 确保 key 和 model 的条目存在于 usage_data 中,避免 KeyError
# 如果键不存在,会使用 defaultdict 的默认工厂(这里是另一个 defaultdict)创建新条目
key_usage = usage_data.setdefault(api_key, defaultdict(lambda: defaultdict(int)))[model_name] # 获取或创建 Key 和模型的用法数据字典
# --- 检查并更新 RPM (每分钟请求数) ---
rpm_limit = limits.get("rpm") # 从模型限制中获取 RPM 限制值
if rpm_limit is not None: # 如果配置了 RPM 限制
current_rpm_count = key_usage.get("rpm_count", 0) # 获取当前 RPM 计数,默认为 0
rpm_timestamp = key_usage.get("rpm_timestamp", 0) # 获取上次 RPM 窗口开始时间戳,默认为 0
if now - rpm_timestamp >= RPM_WINDOW_SECONDS: # 检查当前时间是否已经超过了 RPM 窗口时长
# RPM 窗口已过期,重置计数并将当前请求计为 1
key_usage["rpm_count"] = 1 # 新窗口的第一个请求
key_usage["rpm_timestamp"] = now # 更新窗口开始时间戳为当前时间
logger.debug(f"RPM 窗口过期,重置计数并增加 (Key: {api_key[:8]}, Model: {model_name}): 新 RPM=1") # 记录 RPM 窗口过期和重置
else:
# RPM 窗口未过期,检查加上当前这个预期的请求是否会超限
if current_rpm_count + 1 > rpm_limit: # 如果当前计数加 1 超过限制
logger.warning(f"速率限制预检查失败 (Key: {api_key[:8]}, Model: {model_name}): RPM 达到限制 ({current_rpm_count}/{rpm_limit})。跳过此 Key。") # 记录 RPM 超限警告
perform_api_call = False # 设置标志为 False,表示不能选择此 Key
else:
# 未达到限制,预先增加计数(假设此 Key 会被选中并使用)
key_usage["rpm_count"] = current_rpm_count + 1 # RPM 计数加 1
# 时间戳保持不变,因为仍在当前窗口内
logger.debug(f"RPM 计数增加 (Key: {api_key[:8]}, Model: {model_name}): 新 RPM={key_usage['rpm_count']}") # 记录 RPM 计数增加
# --- 检查并更新 RPD (每日请求数) ---
# 仅在之前的检查(RPM)通过时才进行 RPD 检查
if perform_api_call:
rpd_limit = limits.get("rpd") # 获取 RPD 限制值
if rpd_limit is not None: # 如果配置了 RPD 限制
current_rpd_count = key_usage.get("rpd_count", 0) # 获取当前 RPD 计数,默认为 0
# RPD 是每日计数,不需要时间窗口检查,直接判断是否超限
if current_rpd_count + 1 > rpd_limit: # 如果当前计数加 1 超过限制
logger.warning(f"速率限制预检查失败 (Key: {api_key[:8]}, Model: {model_name}): RPD 达到限制 ({current_rpd_count}/{rpd_limit})。跳过此 Key。") # 记录 RPD 超限警告
perform_api_call = False # 设置标志为 False
else:
# 未达到限制,预先增加计数
key_usage["rpd_count"] = current_rpd_count + 1 # RPD 计数加 1
logger.debug(f"RPD 计数增加 (Key: {api_key[:8]}, Model: {model_name}): 新 RPD={key_usage['rpd_count']}") # 记录 RPD 计数增加
# --- 检查 TPD_Input (每日输入 Token 数) ---
# 仅检查,不在此处增加计数,因为此时还不知道实际的输入 Token 数。
# 计数更新在 API 调用成功后的 update_token_counts 函数中进行。
if perform_api_call:
tpd_input_limit = limits.get("tpd_input") # 获取 TPD_Input 限制值
if tpd_input_limit is not None and key_usage.get("tpd_input_count", 0) >= tpd_input_limit: # 如果设置了限制且当前计数已达到或超过限制
logger.warning(f"速率限制预检查失败 (Key: {api_key[:8]}, Model: {model_name}): TPD_Input 达到限制 ({key_usage.get('tpd_input_count', 0)}/{tpd_input_limit})。跳过此 Key。") # 记录 TPD_Input 超限警告
perform_api_call = False # 设置标志为 False
# --- 检查 TPM_Input (每分钟输入 Token 数) ---
# 同样仅检查,不在此处增加计数。
if perform_api_call:
tpm_input_limit = limits.get("tpm_input") # 获取 TPM_Input 限制值
if tpm_input_limit is not None: # 如果设置了 TPM_Input 限制
# 检查是否仍在当前 TPM 窗口内
if now - key_usage.get("tpm_input_timestamp", 0) < TPM_WINDOW_SECONDS:
# 如果在窗口内,检查当前 Token 计数是否已达到或超过限制
if key_usage.get("tpm_input_count", 0) >= tpm_input_limit:
logger.warning(f"速率限制预检查失败 (Key: {api_key[:8]}, Model: {model_name}): TPM_Input 达到限制 ({key_usage.get('tpm_input_count', 0)}/{tpm_input_limit})。跳过此 Key。") # 记录 TPM_Input 超限警告
perform_api_call = False # 设置标志为 False
# 注意:这里没有 else 块来在窗口过期时重置 TPM_Input 计数/时间戳,
# 因为 update_token_counts 函数会处理 TPM_Input 的重置和增加。
# 如果所有检查都通过,更新此 Key 的最后请求时间戳(用于 Key 选择策略)
if perform_api_call:
key_usage["last_request_timestamp"] = now # 更新最后请求时间戳
return perform_api_call # 返回最终的检查结果
def update_token_counts(
api_key: str,
model_name: str,
limits: Optional[Dict[str, Any]],
prompt_tokens: Optional[int],
client_ip: str,
today_date_str_pt: str
) -> None:
"""
在 API 调用成功 *之后* 更新给定 API Key 和模型的 TPD_Input 和 TPM_Input 计数。
同时记录基于 IP 的每日输入 Token 消耗。
Args:
api_key (str): 当前成功使用的 API Key。
model_name (str): 请求的模型名称。
limits (Optional[Dict[str, Any]]): 从配置中获取的该模型的限制字典。
prompt_tokens (Optional[int]): 从 API 响应中获取的实际输入 Token 数量。
client_ip (str): 客户端 IP 地址。
today_date_str_pt (str): 当前的太平洋时区日期字符串 (YYYY-MM-DD),用于 IP 每日计数。
"""
# 检查输入有效性:需要有效的限制信息和大于 0 的 prompt_tokens
if not limits or not prompt_tokens or prompt_tokens <= 0:
if limits and (not prompt_tokens or prompt_tokens <= 0): # 如果有限制但 prompt_tokens 无效
logger.warning(f"Token 计数更新跳过 (Key: {api_key[:8]}, Model: {model_name}): 无效的 prompt_tokens ({prompt_tokens})。") # 记录警告:无效的 prompt_tokens
# 如果没有限制信息或 prompt_tokens 无效,则不执行更新
return # 直接返回
with usage_lock: # 获取使用数据锁,保证线程安全
# 确保 key 和 model 的条目存在
key_usage = usage_data.setdefault(api_key, defaultdict(lambda: defaultdict(int)))[model_name] # 获取或创建 Key 和模型的用法数据字典
# --- 更新 TPD_Input (每日输入 Token 数) ---
# 直接累加本次请求的 prompt_tokens
key_usage["tpd_input_count"] = key_usage.get("tpd_input_count", 0) + prompt_tokens # 累加 TPD_Input 计数
# --- 更新 TPM_Input (每分钟输入 Token 数) ---
tpm_input_limit = limits.get("tpm_input") # 获取 TPM_Input 限制值
if tpm_input_limit is not None: # 只有在配置了 TPM 限制时才更新
now_tpm = time.time() # 获取当前时间戳
# 检查 TPM 窗口是否已过期
if now_tpm - key_usage.get("tpm_input_timestamp", 0) >= TPM_WINDOW_SECONDS:
# 窗口已过期,重置计数为当前请求的 Token 数,并更新时间戳
key_usage["tpm_input_count"] = prompt_tokens # 新窗口的第一个请求的 Token 数
key_usage["tpm_input_timestamp"] = now_tpm # 更新窗口开始时间戳
else:
# 窗口未过期,累加 Token 数
key_usage["tpm_input_count"] = key_usage.get("tpm_input_count", 0) + prompt_tokens # 累加 TPM_Input 计数
# 记录详细的 Token 更新日志
logger.debug(f"输入 Token 计数更新 (Key: {api_key[:8]}, Model: {model_name}): Added TPD_Input={prompt_tokens}, TPM_Input={key_usage['tpm_input_count']}") # 记录 Token 计数更新详情
# --- 记录 IP 输入 Token 消耗 (独立于 Key 的限制) ---
# 使用单独的锁来保护 IP 计数数据
with ip_input_token_counts_lock: # 获取 IP 输入 token 计数锁
# 使用 setdefault 确保日期条目存在,并使用 Counter 方便地增加 IP 计数
# 结构: { 'YYYY-MM-DD': Counter({'ip1': count1, 'ip2': count2}) }
ip_daily_input_token_counts.setdefault(today_date_str_pt, Counter())[client_ip] += prompt_tokens # 增加指定 IP 在当天的输入 Token 计数
# --- 上下文保存逻辑 (来自 utils.py 原始版本) ---
async def save_context_after_success(
proxy_key: str,
contents_to_send: List[Dict[str, Any]],
model_reply_content: str,
model_name: str,
enable_context: bool,
final_tool_calls: Optional[List[Dict[str, Any]]] = None
):
"""
在 API 调用成功后保存上下文(如果启用)。
Args:
proxy_key (str): 用于存储上下文的键 (通常是 user_id)。
contents_to_send (List[Dict[str, Any]]): 发送给模型的最终内容列表 (包含历史)。
model_reply_content (str): 模型返回的文本回复。
model_name (str): 使用的模型名称。
enable_context (bool): 是否启用上下文保存功能。
final_tool_calls (Optional[List[Dict[str, Any]]]): 模型返回的工具调用信息(目前暂未处理)。
"""
if not enable_context: # 如果未启用上下文保存
logger.debug(f"Key {proxy_key[:8]}... 的上下文补全已禁用,跳过上下文保存。") # 记录跳过信息
return # 直接返回
# 记录准备保存上下文的日志,并指明当前的数据库模式
logger.debug(f"准备为 Key '{proxy_key[:8]}...' 保存上下文 (内存模式: {db_utils.IS_MEMORY_DB})") # 记录准备保存日志
# 构造模型的回复部分,格式应符合 Gemini contents 结构
model_reply_part = {"role": "model", "parts": [{"text": model_reply_content}]}
if final_tool_calls: # 如果存在工具调用信息
# TODO: 处理工具调用的上下文保存。
# Gemini API 的工具调用响应格式与 OpenAI 不同,通常包含 functionCall 和 functionResponse。
# 需要确定如何将这些信息整合到对话历史中以便后续使用。
# 目前仅记录警告,表示暂未处理。
logger.warning("上下文保存:暂未处理工具调用 (tool_calls) 的保存。") # 记录警告:未处理工具调用
pass # 暂时忽略工具调用
# 将模型的回复追加到发送给模型的内容之后,形成完整的对话历史用于保存
final_contents_to_save = contents_to_send + [model_reply_part] # 组合最终要保存的内容
# --- 对最终要保存的内容进行截断 ---
# 保存上下文时,通常只使用模型的静态限制进行截断,
# 因为保存的目的是维护历史记录,而不是适配某个 Key 的实时容量。
# 注意:这里调用了 truncate_context 函数,它会根据 model_name 查找静态限制。
# 第二个返回参数 still_over_limit_final 指示即使截断后是否仍然超限。
truncated_contents_to_save, still_over_limit_final = await truncate_context(final_contents_to_save, model_name) # 对最终内容进行截断
if not still_over_limit_final: # 如果截断后内容没有超限
try:
# 调用 context_store 模块的 save_context 函数保存截断后的上下文
await context_store.save_context(proxy_key, truncated_contents_to_save) # 保存上下文
logger.info(f"上下文保存成功 for Key {proxy_key[:8]}...") # 记录保存成功日志
except Exception as e:
# 捕获并记录保存过程中可能发生的任何异常
logger.error(f"保存上下文失败 (Key: {proxy_key[:8]}...): {str(e)}", exc_info=True) # 记录保存失败错误
else:
# 如果截断后仍然超限,记录错误,不进行保存
logger.error(f"上下文在添加回复并再次截断后仍然超限 (Key: {proxy_key[:8]}...). 上下文未保存。") # 记录错误:截断后仍超限
# --- 工具调用处理 (来自 tool_call_utils.py) ---
def process_tool_calls(gemini_tool_calls: Any) -> Optional[List[Dict[str, Any]]]:
"""
将 Gemini 返回的 functionCall 列表转换为 OpenAI 兼容的 tool_calls 格式。
Gemini: [{'functionCall': {'name': 'func_name', 'args': {...}}}]
OpenAI: [{'id': 'call_...', 'type': 'function', 'function': {'name': 'func_name', 'arguments': '{...}'}}]
"""
if not isinstance(gemini_tool_calls, list): # 检查输入是否为列表
logger.warning(f"期望 gemini_tool_calls 是列表,但得到 {type(gemini_tool_calls)}") # 记录警告
return None # 返回 None
openai_tool_calls = [] # 初始化 OpenAI 格式工具调用列表
# 遍历 Gemini 工具调用列表
for i, call in enumerate(gemini_tool_calls):
# 简化条件判断,逐步检查元素的格式和必要字段
if not isinstance(call, dict): # 检查元素是否为字典
logger.warning(f"工具调用列表中的元素不是字典: {call}") # 记录警告
continue # 跳过格式不正确的元素
# 检查 'functionCall' 键是否存在且其值是字典
function_call_data = call.get('functionCall')
if not isinstance(function_call_data, dict):
logger.warning(f"工具调用元素缺少有效的 'functionCall' 字典: {call}") # 记录警告
continue # 跳过格式不正确的元素
# 检查 'name' 字段是否存在且有效
func_name = function_call_data.get('name')
if not isinstance(func_name, str) or not func_name:
logger.warning(f"工具调用元素缺少有效的 'name' 字段: {call}") # 记录警告
continue # 跳过格式不正确的元素
# 检查 'args' 字段是否存在且是字典
func_args = function_call_data.get('args')
if not isinstance(func_args, dict):
logger.warning(f"工具调用元素缺少有效的 'args' 字典: {call}") # 记录警告
continue # 跳过格式不正确的元素
try:
# OpenAI 需要 arguments 是 JSON 字符串
arguments_str = json.dumps(func_args, ensure_ascii=False) # 将参数序列化为 JSON 字符串
except TypeError as e:
logger.error(f"序列化工具调用参数失败 (Name: {func_name}): {e}", exc_info=True) # 记录序列化失败错误
continue # 跳过这个调用
# 添加到 OpenAI 格式列表
openai_tool_calls.append({
"id": f"call_{int(time.time()*1000)}_{i}", # 生成唯一 ID (基于时间戳和索引)
"type": "function", # 类型固定为 function
"function": {
"name": func_name, # 函数名称
"arguments": arguments_str, # 参数 JSON 字符串
}
})
return openai_tool_calls if openai_tool_calls else None # 返回 OpenAI 格式列表或 None
|