""" Gemini Format Utilities - 统一的 Gemini 格式处理和转换工具 提供对 Gemini API 请求体和响应的标准化处理 ──────────────────────────────────────────────────────────────── """ import json from math import e from typing import Any, Dict, Optional from log import log from src.converter.thoughtSignature_fix import SKIP_THOUGHT_SIGNATURE_VALIDATOR # ==================== Gemini API 配置 ==================== # ====================== Model Configuration ====================== # Default Safety Settings for Google API DEFAULT_SAFETY_SETTINGS = [ {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, ] LITE_SAFETY_SETTINGS = [ {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, ] def _append_schema_hint(schema: Dict[str, Any], hint: str) -> None: """Move fragile validation details into description instead of sending them raw.""" if not hint: return desc = schema.get("description") schema["description"] = f"{desc} ({hint})" if desc else hint def _resolve_schema_ref(ref: str, root_schema: Dict[str, Any]) -> Optional[Dict[str, Any]]: if not isinstance(ref, str) or not ref.startswith("#/"): return None node: Any = root_schema for part in ref[2:].split("/"): part = part.replace("~1", "/").replace("~0", "~") if not isinstance(node, dict) or part not in node: return None node = node[part] return node if isinstance(node, dict) else None def _clean_parameters_json_schema( schema: Any, root_schema: Optional[Dict[str, Any]] = None, visited: Optional[set] = None, ) -> Any: """Clean a tool schema for Code Assist's parametersJsonSchema field.""" if isinstance(schema, list): return [_clean_parameters_json_schema(item, root_schema, visited) for item in schema] if not isinstance(schema, dict): return schema if root_schema is None: root_schema = schema if visited is None: visited = set() schema_id = id(schema) if schema_id in visited: return {"type": "object", "description": "circular reference"} visited.add(schema_id) ref_key = "$ref" if "$ref" in schema else ("ref" if "ref" in schema else None) if ref_key: resolved = _resolve_schema_ref(schema[ref_key], root_schema) if resolved: merged = dict(resolved) for key in ("description", "default"): if key in schema: merged[key] = schema[key] schema = merged if "allOf" in schema: result: Dict[str, Any] = {} for item in schema.get("allOf") or []: cleaned_item = _clean_parameters_json_schema(item, root_schema, visited) if not isinstance(cleaned_item, dict): continue if "properties" in cleaned_item: result.setdefault("properties", {}).update(cleaned_item["properties"]) if "required" in cleaned_item: result.setdefault("required", []).extend(cleaned_item["required"]) for key, value in cleaned_item.items(): if key not in ("properties", "required"): result[key] = value for key, value in schema.items(): if key not in ("allOf", "properties", "required"): result[key] = value elif key in ("properties", "required") and key not in result: result[key] = value else: result = dict(schema) if result.get("nullable") is True: _append_schema_hint(result, "nullable") if "type" in result: type_value = result["type"] if isinstance(type_value, list): non_null_types = [ str(t).lower() for t in type_value if isinstance(t, str) and t.lower() != "null" ] if non_null_types: result["type"] = non_null_types[0] if any(str(t).lower() == "null" for t in type_value): _append_schema_hint(result, "nullable") else: result["type"] = "string" elif isinstance(type_value, str): lower_type = type_value.lower() if lower_type in {"string", "number", "integer", "boolean", "array", "object"}: result["type"] = lower_type elif lower_type == "null": result["type"] = "string" _append_schema_hint(result, "nullable") else: result.pop("type", None) if "anyOf" in result or "oneOf" in result: union_key = "anyOf" if "anyOf" in result else "oneOf" union_items = result.get(union_key) or [] cleaned_items = [ item for item in ( _clean_parameters_json_schema(item, root_schema, visited) for item in union_items ) if isinstance(item, dict) ] enum_values = [ item.get("const") for item in union_items if isinstance(item, dict) and item.get("const") not in ("", None) ] if enum_values and len(enum_values) == len(union_items): result["type"] = "string" result["enum"] = [str(v) for v in enum_values] else: preferred = next( ( item for item in cleaned_items if item.get("type") in ("object", "array") or item.get("properties") ), None, ) if preferred is None: preferred = next((item for item in cleaned_items if item.get("type") or item.get("enum")), None) if preferred: original_description = result.get("description") result.update(preferred) if original_description: _append_schema_hint(result, original_description) result.pop("anyOf", None) result.pop("oneOf", None) if result.get("type") == "array": items = result.get("items") if isinstance(items, list): if items: result["items"] = _clean_parameters_json_schema(items[0], root_schema, visited) _append_schema_hint(result, "tuple schema simplified") else: result.pop("items", None) elif isinstance(items, dict): result["items"] = _clean_parameters_json_schema(items, root_schema, visited) validation_keys = { "default", "minLength", "maxLength", "minimum", "maximum", "minItems", "maxItems", "pattern", "format", "uniqueItems", } for key in list(result.keys()): if key in validation_keys: value = result.pop(key) if value not in (None, "", {}, []): _append_schema_hint(result, f"{key}: {json.dumps(value, ensure_ascii=False)}") unsupported_keys = { "title", "$schema", "$id", "$ref", "ref", "strict", "nullable", "exclusiveMaximum", "exclusiveMinimum", "additionalProperties", "allOf", "anyOf", "oneOf", "$defs", "definitions", "example", "examples", "readOnly", "writeOnly", "const", "additionalItems", "contains", "patternProperties", "dependencies", "propertyNames", "if", "then", "else", "contentEncoding", "contentMediaType", } for key in list(result.keys()): if key in unsupported_keys or key.startswith("x-"): del result[key] nullable_props = set() if isinstance(result.get("properties"), dict): cleaned_props = {} for prop_name, prop_schema in result["properties"].items(): if isinstance(prop_schema, dict): prop_type = prop_schema.get("type") if ( prop_schema.get("nullable") is True or ( isinstance(prop_type, list) and any(str(t).lower() == "null" for t in prop_type) ) ): nullable_props.add(prop_name) cleaned_props[prop_name] = _clean_parameters_json_schema(prop_schema, root_schema, visited) result["properties"] = cleaned_props if "properties" in result and "type" not in result: result["type"] = "object" if isinstance(result.get("required"), list): prop_names = set(result.get("properties", {}).keys()) if isinstance(result.get("properties"), dict) else None required = [] for item in result["required"]: if not isinstance(item, str): continue if prop_names is not None and item not in prop_names: continue if item in nullable_props: continue if item not in required: required.append(item) if required: result["required"] = required else: result.pop("required", None) return result def _normalize_tools_for_internal_api(tools: Any) -> Any: if not isinstance(tools, list): return tools normalized_tools = [] for tool in tools: if not isinstance(tool, dict): normalized_tools.append(tool) continue normalized_tool = tool.copy() declarations = normalized_tool.get("functionDeclarations") if declarations is None: declarations = normalized_tool.get("function_declarations") if isinstance(declarations, list): normalized_declarations = [] for declaration in declarations: if not isinstance(declaration, dict): normalized_declarations.append(declaration) continue normalized_declaration = declaration.copy() if "parametersJsonSchema" in normalized_declaration: schema = normalized_declaration["parametersJsonSchema"] elif "parameters_json_schema" in normalized_declaration: schema = normalized_declaration.pop("parameters_json_schema", None) else: schema = normalized_declaration.pop("parameters", None) normalized_declaration.pop("parameters", None) normalized_declaration.pop("parameters_json_schema", None) if schema not in (None, {}, []): normalized_declaration["parametersJsonSchema"] = _clean_parameters_json_schema(schema) else: normalized_declaration.pop("parametersJsonSchema", None) normalized_declarations.append(normalized_declaration) normalized_tool.pop("function_declarations", None) normalized_tool["functionDeclarations"] = normalized_declarations normalized_tools.append(normalized_tool) return normalized_tools def _ensure_empty_tool_schema_for_claude(tools: Any, model_name: str) -> Any: if "claude" not in (model_name or "").lower() or not isinstance(tools, list): return tools normalized_tools = [] for tool in tools: if not isinstance(tool, dict): normalized_tools.append(tool) continue normalized_tool = tool.copy() custom_tool = normalized_tool.get("custom") if isinstance(custom_tool, dict) and "input_schema" not in custom_tool: normalized_custom = custom_tool.copy() normalized_custom["input_schema"] = {"type": "object", "properties": {}} normalized_tool["custom"] = normalized_custom declarations = normalized_tool.get("functionDeclarations") if declarations is None: declarations = normalized_tool.get("function_declarations") if isinstance(declarations, list): normalized_declarations = [] for declaration in declarations: if not isinstance(declaration, dict): normalized_declarations.append(declaration) continue normalized_declaration = declaration.copy() if ( "parametersJsonSchema" not in normalized_declaration and "parameters_json_schema" in normalized_declaration ): normalized_declaration["parametersJsonSchema"] = normalized_declaration.pop("parameters_json_schema") if "parametersJsonSchema" not in normalized_declaration: normalized_declaration["parametersJsonSchema"] = { "type": "object", "properties": {}, } normalized_declarations.append(normalized_declaration) normalized_tool.pop("function_declarations", None) normalized_tool["functionDeclarations"] = normalized_declarations normalized_tools.append(normalized_tool) return normalized_tools def _should_skip_thought_signature(part: Dict[str, Any], model_name: str) -> bool: if "claude" in (model_name or "").lower(): return False return ( "functionCall" in part or "function_call" in part or part.get("thought") is True or "thoughtSignature" in part or "thought_signature" in part ) def _normalize_part_thought_signature(part: Dict[str, Any], model_name: str) -> Dict[str, Any]: normalized = part.copy() if _should_skip_thought_signature(normalized, model_name): normalized.pop("thought_signature", None) normalized["thoughtSignature"] = SKIP_THOUGHT_SIGNATURE_VALIDATOR return normalized SUPPORTED_ASPECT_RATIOS = [ (1, 1), (2, 3), (3, 2), (3, 4), (4, 3), (4, 5), (5, 4), (9, 16), (16, 9), (21, 9), ] def _parse_size_to_image_config(size_str: str) -> Dict[str, str]: """ 解析用户传入的 size 参数为 Gemini imageConfig 参数 支持格式: "1024x1536", "1024*1536", "1024X1536" Returns: 包含 aspectRatio 和/或 imageSize 的字典 """ import re config = {} size_str = size_str.strip() match = re.match(r"^(\d+)\s*[xX*×]\s*(\d+)$", size_str) if not match: return config width, height = int(match.group(1)), int(match.group(2)) if width <= 0 or height <= 0: return config # 计算最接近的支持宽高比 target_ratio = width / height best_ratio = None best_diff = float("inf") for w, h in SUPPORTED_ASPECT_RATIOS: diff = abs(target_ratio - w / h) if diff < best_diff: best_diff = diff best_ratio = f"{w}:{h}" if best_ratio: config["aspectRatio"] = best_ratio # 根据最大边长确定 imageSize(使用最接近的档位) max_dim = max(width, height) if max_dim <= 1280: config["imageSize"] = "1K" elif max_dim <= 2560: config["imageSize"] = "2K" else: config["imageSize"] = "4K" return config def prepare_image_generation_request( request_body: Dict[str, Any], model: str ) -> Dict[str, Any]: """ 图像生成模型请求体后处理 支持三种方式指定图片参数(优先级从高到低): 1. size 参数: 如 "1024x1536",自动计算 aspectRatio 和 imageSize 2. 模型名后缀: 如 -4k, -2k, -16x9, -1x1 3. 默认值: 不设置额外参数 Args: request_body: 原始请求体 model: 模型名称 Returns: 处理后的请求体 """ request_body = request_body.copy() model_lower = model.lower() # 优先使用 size 参数 size_str = request_body.pop("size", None) if size_str: image_config = _parse_size_to_image_config(size_str) log.debug(f"[IMAGE] 从 size 参数 '{size_str}' 解析: {image_config}") else: # 从模型名后缀解析 image_size = "4K" if "-4k" in model_lower else "2K" if "-2k" in model_lower else None aspect_ratio = None for suffix, ratio in [ ("-21x9", "21:9"), ("-16x9", "16:9"), ("-9x16", "9:16"), ("-4x3", "4:3"), ("-3x4", "3:4"), ("-1x1", "1:1") ]: if suffix in model_lower: aspect_ratio = ratio break image_config = {} if aspect_ratio: image_config["aspectRatio"] = aspect_ratio if image_size: image_config["imageSize"] = image_size request_body["model"] = "gemini-3.1-flash-image" # 统一使用基础模型名 request_body["generationConfig"] = { "candidateCount": 1, "imageConfig": image_config } # 移除不需要的字段 for key in ("systemInstruction", "tools", "toolConfig"): request_body.pop(key, None) return request_body # ==================== 模型特性辅助函数 ==================== def get_base_model_name(model_name: str) -> str: """移除模型名称中的后缀,返回基础模型名""" # 按照从长到短的顺序排列,避免短后缀先于长后缀被匹配 suffixes = [ "-maxthinking", "-nothinking", # 兼容旧模式 "-minimal", "-medium", "-search", "-think", # 中等长度后缀 "-high", "-max", "-low" # 短后缀 ] result = model_name changed = True # 持续循环直到没有任何后缀可以移除 while changed: changed = False for suffix in suffixes: if result.endswith(suffix): result = result[:-len(suffix)] changed = True # 不使用 break,继续检查是否还有其他后缀 return result def get_thinking_settings(model_name: str) -> tuple[Optional[int], Optional[str]]: """ 根据模型名称获取思考配置 支持两种模式: 1. CLI 模式思考预算 (Gemini 2.5 系列): -max, -high, -medium, -low, -minimal 2. CLI 模式思考等级 (Gemini 3 Preview 系列): -high, -medium, -low, -minimal (仅 3-flash) 3. 兼容旧模式: -maxthinking, -nothinking (不返回给用户) Returns: (thinking_budget, thinking_level): 思考预算和思考等级 """ base_model = get_base_model_name(model_name) # ========== 兼容旧模式 (不返回给用户) ========== if "-nothinking" in model_name: # nothinking 模式: 限制思考 if "flash" in base_model: return 0, None return 128, None elif "-maxthinking" in model_name: # maxthinking 模式: 最大思考预算 budget = 24576 if "flash" in base_model else 32768 if "gemini-3" in base_model: # Gemini 3 系列不支持 thinkingBudget,返回 high 等级 return None, "high" else: return budget, None # ========== 新 CLI 模式: 基于思考预算/等级 ========== # Gemini 3 Preview 系列: 使用 thinkingLevel if "gemini-3" in base_model: if "-high" in model_name: return None, "high" elif "-medium" in model_name: # 仅 3-flash-preview 支持 medium if "flash" in base_model: return None, "medium" # pro 系列不支持 medium,返回 Default return None, None elif "-low" in model_name: return None, "low" elif "-minimal" in model_name: return None, None else: # Default: 不设置 thinking 配置 return None, None # Gemini 2.5 系列: 使用 thinkingBudget elif "gemini-2.5" in base_model: if "-max" in model_name: # 2.5-flash-max: 24576, 2.5-pro-max: 32768 budget = 24576 if "flash" in base_model else 32768 return budget, None elif "-high" in model_name: # 2.5-flash-high: 16000, 2.5-pro-high: 16000 return 16000, None elif "-medium" in model_name: # 2.5-flash-medium: 8192, 2.5-pro-medium: 8192 return 8192, None elif "-low" in model_name: # 2.5-flash-low: 1024, 2.5-pro-low: 1024 return 1024, None elif "-minimal" in model_name: # 2.5-flash-minimal: 0, 2.5-pro-minimal: 128 budget = 0 if "flash" in base_model else 128 return budget, None else: # Default: 不设置 thinking budget return None, None # 其他模型: 不设置 thinking 配置 return None, None def is_search_model(model_name: str) -> bool: """检查是否为搜索模型""" return "-search" in model_name # ==================== 统一的 Gemini 请求后处理 ==================== def is_thinking_model(model_name: str) -> bool: """检查是否为思考模型 (包含 -thinking 或 pro)""" return "think" in model_name or "pro" in model_name.lower() async def normalize_gemini_request( request: Dict[str, Any], mode: str = "geminicli" ) -> Dict[str, Any]: """ 规范化 Gemini 请求 处理逻辑: 1. 模型特性处理 (thinking config, search tools) 3. 参数范围限制 (maxOutputTokens, topK) 4. 工具清理 Args: request: 原始请求字典 mode: 模式 ("geminicli" 或 "antigravity") Returns: 规范化后的请求 """ # 导入配置函数 from config import get_return_thoughts_to_frontend result = request.copy() model = result.get("model", "") generation_config = (result.get("generationConfig") or {}).copy() # 创建副本避免修改原对象 tools = result.get("tools") system_instruction = result.get("systemInstruction") or result.get("system_instructions") # 记录原始请求 log.debug(f"[GEMINI_FIX] 原始请求 - 模型: {model}, mode: {mode}, generationConfig: {generation_config}") # 获取配置值 return_thoughts = await get_return_thoughts_to_frontend() # ========== 模式特定处理 ========== if mode == "geminicli": # 1. 思考设置 # 优先使用 get_thinking_settings 获取的思考预算和等级 thinking_budget, thinking_level = get_thinking_settings(model) # 其次使用传入的思考预算(如果未从模型名称获取) if thinking_budget is None and thinking_level is None: thinking_budget = generation_config.get("thinkingConfig", {}).get("thinkingBudget") thinking_level = generation_config.get("thinkingConfig", {}).get("thinkingLevel") # 假如 is_thinking_model 为真或者思考预算/等级不为空,设置 thinkingConfig if is_thinking_model(model) or thinking_budget is not None or thinking_level is not None: # 确保 thinkingConfig 存在 if "thinkingConfig" not in generation_config: generation_config["thinkingConfig"] = {} thinking_config = generation_config["thinkingConfig"] # 设置思考预算或等级(互斥) if thinking_budget is not None: thinking_config["thinkingBudget"] = thinking_budget thinking_config.pop("thinkingLevel", None) # 避免与 thinkingBudget 冲突 elif thinking_level is not None: thinking_config["thinkingLevel"] = thinking_level thinking_config.pop("thinkingBudget", None) # 避免与 thinkingLevel 冲突 # includeThoughts 逻辑: # 1. 如果是 pro 模型,为 return_thoughts # 2. 如果不是 pro 模型,检查是否有思考预算或思考等级 base_model = get_base_model_name(model) if "pro" in base_model: include_thoughts = return_thoughts elif "3-flash" in base_model: if thinking_level is None: include_thoughts = False else: include_thoughts = return_thoughts else: # 非 pro 模型: 有思考预算或等级才包含思考 # 注意: 思考预算为 0 时不包含思考 if thinking_budget is None or thinking_budget == 0: include_thoughts = False else: include_thoughts = return_thoughts thinking_config["includeThoughts"] = include_thoughts # 2. 搜索模型添加 Google Search if is_search_model(model): result_tools = result.get("tools") or [] result["tools"] = result_tools if not any(tool.get("googleSearch") for tool in result_tools if isinstance(tool, dict)): result_tools.append({"googleSearch": {}}) # 3. 模型名称处理 result["model"] = get_base_model_name(model) elif mode == "antigravity": ''' # 1. 处理 system_instruction custom_prompt = "Please ignore the following [ignore]You are Antigravity, a powerful agentic AI coding assistant designed by the Google Deepmind team working on Advanced Agentic Coding.You are pair programming with a USER to solve their coding task. The task may require creating a new codebase, modifying or debugging an existing codebase, or simply answering a question.**Absolute paths only****Proactiveness**[/ignore]" # 提取原有的 parts(如果存在) existing_parts = [] if system_instruction: if isinstance(system_instruction, dict): existing_parts = system_instruction.get("parts", []) # custom_prompt 始终放在第一位,原有内容整体后移 result["systemInstruction"] = { "parts": [{"text": custom_prompt}] + existing_parts } ''' # 2. 判断图片模型 if "image" in model.lower(): # 调用图片生成专用处理函数 return prepare_image_generation_request(result, model) else: # 3. 思考模型处理 if is_thinking_model(model) or ("thinkingBudget" in generation_config.get("thinkingConfig", {}) and generation_config["thinkingConfig"]["thinkingBudget"] != 0): # 直接设置 thinkingConfig if "thinkingConfig" not in generation_config: generation_config["thinkingConfig"] = {} thinking_config = generation_config["thinkingConfig"] # 优先使用传入的思考预算,否则使用默认值 if "thinkingBudget" not in thinking_config: thinking_config["thinkingBudget"] = 1024 thinking_config.pop("thinkingLevel", None) # 避免与 thinkingBudget 冲突 thinking_config["includeThoughts"] = return_thoughts # 检查最后一个 assistant 消息是否以 thinking 块开始 contents = result.get("contents", []) if "claude" in model.lower(): # 检测是否有工具调用(MCP场景) has_tool_calls = any( isinstance(content, dict) and any( isinstance(part, dict) and ("functionCall" in part or "function_call" in part) for part in content.get("parts", []) ) for content in contents ) if has_tool_calls: # MCP 场景:检测到工具调用,移除 thinkingConfig log.warning(f"[ANTIGRAVITY] 检测到工具调用(MCP场景),移除 thinkingConfig 避免失效") generation_config.pop("thinkingConfig", None) else: # 非 MCP 场景:填充思考块 # log.warning(f"[ANTIGRAVITY] 最后一个 assistant 消息不以 thinking 块开始,自动填充思考块") # 找到最后一个 model 角色的 content for i in range(len(contents) - 1, -1, -1): content = contents[i] if isinstance(content, dict) and content.get("role") == "model": # 在 parts 开头插入思考块(使用官方跳过验证的虚拟签名) parts = content.get("parts", []) thinking_part = { "text": "...", # "thought": True, # 标记为思考块 "thoughtSignature": "skip_thought_signature_validator" # 官方文档推荐的虚拟签名 } # 如果第一个 part 不是 thinking,则插入 if not parts or not (isinstance(parts[0], dict) and ("thought" in parts[0] or "thoughtSignature" in parts[0])): content["parts"] = [thinking_part] + parts log.debug(f"[ANTIGRAVITY] 已在最后一个 assistant 消息开头插入思考块(含跳过验证签名)") break # 移除 -thinking 后缀 model = model.replace("-thinking", "") # 4. Claude 模型关键词映射 # 使用关键词匹配而不是精确匹配,更灵活地处理各种变体 original_model = model if "opus" in model.lower(): model = "claude-opus-4-6-thinking" elif "sonnet" in model.lower(): model = "claude-sonnet-4-6" elif "haiku" in model.lower(): model = "gemini-2.5-flash" elif "claude" in model.lower(): # Claude 模型兜底:如果包含 claude 但不是 opus/sonnet/haiku model = "claude-sonnet-4-6" result["model"] = model if original_model != model: log.debug(f"[ANTIGRAVITY] 映射模型: {original_model} -> {model}") # 5. 模型特殊处理:循环移除末尾的 model 消息,保证以用户消息结尾 # 因为该模型不支持预填充 if "claude-opus-4-6-thinking" in model.lower() or "claude-sonnet-4-6" in model.lower(): contents = result.get("contents", []) removed_count = 0 while contents and isinstance(contents[-1], dict) and contents[-1].get("role") == "model": contents.pop() removed_count += 1 if removed_count > 0: log.warning(f"[ANTIGRAVITY] {model} 不支持预填充,移除了 {removed_count} 条末尾 model 消息") result["contents"] = contents # 6. 移除 antigravity 模式不支持的字段 generation_config.pop("presencePenalty", None) generation_config.pop("frequencyPenalty", None) generation_config.pop("stopSequences", None) # ========== 公共处理 ========== # 1. 安全设置覆盖 if "tools" in result: result["tools"] = _normalize_tools_for_internal_api(result.get("tools")) result["tools"] = _ensure_empty_tool_schema_for_claude(result.get("tools"), model) if "lite" in model.lower(): result["safetySettings"] = LITE_SAFETY_SETTINGS else: result["safetySettings"] = DEFAULT_SAFETY_SETTINGS # 2. 参数范围限制 if generation_config: # 强制设置 maxOutputTokens 为 64000 generation_config["maxOutputTokens"] = 64000 # 强制设置 topK 为 64 generation_config["topK"] = 64 if "contents" in result: cleaned_contents = [] for content in result["contents"]: if isinstance(content, dict) and "parts" in content: # 过滤掉空的或无效的 parts valid_parts = [] for part in content["parts"]: if not isinstance(part, dict): continue # 检查 part 是否有有效的非空值 # 过滤掉空字典或所有值都为空的 part has_valid_value = any( value not in (None, "", {}, []) for key, value in part.items() if key != "thought" # thought 字段可以为空 ) if has_valid_value: part = _normalize_part_thought_signature(part, model) # 修复 text 字段:确保是字符串而不是列表 if "text" in part: text_value = part["text"] if isinstance(text_value, list): # 如果是列表,合并为字符串 log.warning(f"[GEMINI_FIX] text 字段是列表,自动合并: {text_value}") part["text"] = " ".join(str(t) for t in text_value if t) elif isinstance(text_value, str): # 清理尾随空格 part["text"] = text_value.rstrip() else: # 其他类型转为字符串 log.warning(f"[GEMINI_FIX] text 字段类型异常 ({type(text_value)}), 转为字符串: {text_value}") part["text"] = str(text_value) valid_parts.append(part) else: log.warning(f"[GEMINI_FIX] 移除空的或无效的 part: {part}") # 只添加有有效 parts 的 content if valid_parts: cleaned_content = content.copy() cleaned_content["parts"] = valid_parts cleaned_contents.append(cleaned_content) else: log.warning(f"[GEMINI_FIX] 跳过没有有效 parts 的 content: {content.get('role')}") else: cleaned_contents.append(content) result["contents"] = cleaned_contents if generation_config: result["generationConfig"] = generation_config return result