""" Gemini Format Utilities - 统一的 Gemini 格式处理和转换工具 提供对 Gemini API 请求体和响应的标准化处理 ──────────────────────────────────────────────────────────────── """ from math import e from typing import Any, Dict, Optional from log import log # ==================== Gemini API 配置 ==================== # ====================== Model Configuration ====================== # Default Safety Settings for Google API DEFAULT_SAFETY_SETTINGS = [ {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_IMAGE_HATE", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_IMAGE_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_IMAGE_HARASSMENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_IMAGE_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_JAILBREAK", "threshold": "BLOCK_NONE"}, ] LITE_SAFETY_SETTINGS = [ {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"}, {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, ] def prepare_image_generation_request( request_body: Dict[str, Any], model: str ) -> Dict[str, Any]: """ 图像生成模型请求体后处理 Args: request_body: 原始请求体 model: 模型名称 Returns: 处理后的请求体 """ request_body = request_body.copy() model_lower = model.lower() # 解析分辨率 image_size = "4K" if "-4k" in model_lower else "2K" if "-2k" in model_lower else None # 解析比例 aspect_ratio = None for suffix, ratio in [ ("-21x9", "21:9"), ("-16x9", "16:9"), ("-9x16", "9:16"), ("-4x3", "4:3"), ("-3x4", "3:4"), ("-1x1", "1:1") ]: if suffix in model_lower: aspect_ratio = ratio break # 构建 imageConfig image_config = {} if aspect_ratio: image_config["aspectRatio"] = aspect_ratio if image_size: image_config["imageSize"] = image_size request_body["model"] = "gemini-3.1-flash-image" # 统一使用基础模型名 request_body["generationConfig"] = { "candidateCount": 1, "imageConfig": image_config } # 移除不需要的字段 for key in ("systemInstruction", "tools", "toolConfig"): request_body.pop(key, None) return request_body # ==================== 模型特性辅助函数 ==================== def get_base_model_name(model_name: str) -> str: """移除模型名称中的后缀,返回基础模型名""" # 按照从长到短的顺序排列,避免短后缀先于长后缀被匹配 suffixes = [ "-maxthinking", "-nothinking", # 兼容旧模式 "-minimal", "-medium", "-search", "-think", # 中等长度后缀 "-high", "-max", "-low" # 短后缀 ] result = model_name changed = True # 持续循环直到没有任何后缀可以移除 while changed: changed = False for suffix in suffixes: if result.endswith(suffix): result = result[:-len(suffix)] changed = True # 不使用 break,继续检查是否还有其他后缀 return result def get_thinking_settings(model_name: str) -> tuple[Optional[int], Optional[str]]: """ 根据模型名称获取思考配置 支持两种模式: 1. CLI 模式思考预算 (Gemini 2.5 系列): -max, -high, -medium, -low, -minimal 2. CLI 模式思考等级 (Gemini 3 Preview 系列): -high, -medium, -low, -minimal (仅 3-flash) 3. 兼容旧模式: -maxthinking, -nothinking (不返回给用户) Returns: (thinking_budget, thinking_level): 思考预算和思考等级 """ base_model = get_base_model_name(model_name) # ========== 兼容旧模式 (不返回给用户) ========== if "-nothinking" in model_name: # nothinking 模式: 限制思考 if "flash" in base_model: return 0, None return 128, None elif "-maxthinking" in model_name: # maxthinking 模式: 最大思考预算 budget = 24576 if "flash" in base_model else 32768 if "gemini-3" in base_model: # Gemini 3 系列不支持 thinkingBudget,返回 high 等级 return None, "high" else: return budget, None # ========== 新 CLI 模式: 基于思考预算/等级 ========== # Gemini 3 Preview 系列: 使用 thinkingLevel if "gemini-3" in base_model: if "-high" in model_name: return None, "high" elif "-medium" in model_name: # 仅 3-flash-preview 支持 medium if "flash" in base_model: return None, "medium" # pro 系列不支持 medium,返回 Default return None, None elif "-low" in model_name: return None, "low" elif "-minimal" in model_name: return None, None else: # Default: 不设置 thinking 配置 return None, None # Gemini 2.5 系列: 使用 thinkingBudget elif "gemini-2.5" in base_model: if "-max" in model_name: # 2.5-flash-max: 24576, 2.5-pro-max: 32768 budget = 24576 if "flash" in base_model else 32768 return budget, None elif "-high" in model_name: # 2.5-flash-high: 16000, 2.5-pro-high: 16000 return 16000, None elif "-medium" in model_name: # 2.5-flash-medium: 8192, 2.5-pro-medium: 8192 return 8192, None elif "-low" in model_name: # 2.5-flash-low: 1024, 2.5-pro-low: 1024 return 1024, None elif "-minimal" in model_name: # 2.5-flash-minimal: 0, 2.5-pro-minimal: 128 budget = 0 if "flash" in base_model else 128 return budget, None else: # Default: 不设置 thinking budget return None, None # 其他模型: 不设置 thinking 配置 return None, None def is_search_model(model_name: str) -> bool: """检查是否为搜索模型""" return "-search" in model_name # ==================== 统一的 Gemini 请求后处理 ==================== def is_thinking_model(model_name: str) -> bool: """检查是否为思考模型 (包含 -thinking 或 pro)""" return "think" in model_name or "pro" in model_name.lower() async def normalize_gemini_request( request: Dict[str, Any], mode: str = "geminicli" ) -> Dict[str, Any]: """ 规范化 Gemini 请求 处理逻辑: 1. 模型特性处理 (thinking config, search tools) 3. 参数范围限制 (maxOutputTokens, topK) 4. 工具清理 Args: request: 原始请求字典 mode: 模式 ("geminicli" 或 "antigravity") Returns: 规范化后的请求 """ # 导入配置函数 from config import get_return_thoughts_to_frontend result = request.copy() model = result.get("model", "") generation_config = (result.get("generationConfig") or {}).copy() # 创建副本避免修改原对象 tools = result.get("tools") system_instruction = result.get("systemInstruction") or result.get("system_instructions") # 记录原始请求 log.debug(f"[GEMINI_FIX] 原始请求 - 模型: {model}, mode: {mode}, generationConfig: {generation_config}") # 获取配置值 return_thoughts = await get_return_thoughts_to_frontend() # ========== 模式特定处理 ========== if mode == "geminicli": # 1. 思考设置 # 优先使用 get_thinking_settings 获取的思考预算和等级 thinking_budget, thinking_level = get_thinking_settings(model) # 其次使用传入的思考预算(如果未从模型名称获取) if thinking_budget is None and thinking_level is None: thinking_budget = generation_config.get("thinkingConfig", {}).get("thinkingBudget") thinking_level = generation_config.get("thinkingConfig", {}).get("thinkingLevel") # 假如 is_thinking_model 为真或者思考预算/等级不为空,设置 thinkingConfig if is_thinking_model(model) or thinking_budget is not None or thinking_level is not None: # 确保 thinkingConfig 存在 if "thinkingConfig" not in generation_config: generation_config["thinkingConfig"] = {} thinking_config = generation_config["thinkingConfig"] # 设置思考预算或等级(互斥) if thinking_budget is not None: thinking_config["thinkingBudget"] = thinking_budget thinking_config.pop("thinkingLevel", None) # 避免与 thinkingBudget 冲突 elif thinking_level is not None: thinking_config["thinkingLevel"] = thinking_level thinking_config.pop("thinkingBudget", None) # 避免与 thinkingLevel 冲突 # includeThoughts 逻辑: # 1. 如果是 pro 模型,为 return_thoughts # 2. 如果不是 pro 模型,检查是否有思考预算或思考等级 base_model = get_base_model_name(model) if "pro" in base_model: include_thoughts = return_thoughts elif "3-flash" in base_model: if thinking_level is None: include_thoughts = False else: include_thoughts = return_thoughts else: # 非 pro 模型: 有思考预算或等级才包含思考 # 注意: 思考预算为 0 时不包含思考 if thinking_budget is None or thinking_budget == 0: include_thoughts = False else: include_thoughts = return_thoughts thinking_config["includeThoughts"] = include_thoughts # 2. 搜索模型添加 Google Search if is_search_model(model): result_tools = result.get("tools") or [] result["tools"] = result_tools if not any(tool.get("googleSearch") for tool in result_tools if isinstance(tool, dict)): result_tools.append({"googleSearch": {}}) # 3. 模型名称处理 result["model"] = get_base_model_name(model) elif mode == "antigravity": ''' # 1. 处理 system_instruction custom_prompt = "Please ignore the following [ignore]You are Antigravity, a powerful agentic AI coding assistant designed by the Google Deepmind team working on Advanced Agentic Coding.You are pair programming with a USER to solve their coding task. The task may require creating a new codebase, modifying or debugging an existing codebase, or simply answering a question.**Absolute paths only****Proactiveness**[/ignore]" # 提取原有的 parts(如果存在) existing_parts = [] if system_instruction: if isinstance(system_instruction, dict): existing_parts = system_instruction.get("parts", []) # custom_prompt 始终放在第一位,原有内容整体后移 result["systemInstruction"] = { "parts": [{"text": custom_prompt}] + existing_parts } ''' # 2. 判断图片模型 if "image" in model.lower(): # 调用图片生成专用处理函数 return prepare_image_generation_request(result, model) else: # 3. 思考模型处理 if is_thinking_model(model) or ("thinkingBudget" in generation_config.get("thinkingConfig", {}) and generation_config["thinkingConfig"]["thinkingBudget"] != 0): # 直接设置 thinkingConfig if "thinkingConfig" not in generation_config: generation_config["thinkingConfig"] = {} thinking_config = generation_config["thinkingConfig"] # 优先使用传入的思考预算,否则使用默认值 if "thinkingBudget" not in thinking_config: thinking_config["thinkingBudget"] = 1024 thinking_config.pop("thinkingLevel", None) # 避免与 thinkingBudget 冲突 thinking_config["includeThoughts"] = return_thoughts # 检查最后一个 assistant 消息是否以 thinking 块开始 contents = result.get("contents", []) if "claude" in model.lower(): # 检测是否有工具调用(MCP场景) has_tool_calls = any( isinstance(content, dict) and any( isinstance(part, dict) and ("functionCall" in part or "function_call" in part) for part in content.get("parts", []) ) for content in contents ) if has_tool_calls: # MCP 场景:检测到工具调用,移除 thinkingConfig log.warning(f"[ANTIGRAVITY] 检测到工具调用(MCP场景),移除 thinkingConfig 避免失效") generation_config.pop("thinkingConfig", None) else: # 非 MCP 场景:填充思考块 # log.warning(f"[ANTIGRAVITY] 最后一个 assistant 消息不以 thinking 块开始,自动填充思考块") # 找到最后一个 model 角色的 content for i in range(len(contents) - 1, -1, -1): content = contents[i] if isinstance(content, dict) and content.get("role") == "model": # 在 parts 开头插入思考块(使用官方跳过验证的虚拟签名) parts = content.get("parts", []) thinking_part = { "text": "...", # "thought": True, # 标记为思考块 "thoughtSignature": "skip_thought_signature_validator" # 官方文档推荐的虚拟签名 } # 如果第一个 part 不是 thinking,则插入 if not parts or not (isinstance(parts[0], dict) and ("thought" in parts[0] or "thoughtSignature" in parts[0])): content["parts"] = [thinking_part] + parts log.debug(f"[ANTIGRAVITY] 已在最后一个 assistant 消息开头插入思考块(含跳过验证签名)") break # 移除 -thinking 后缀 model = model.replace("-thinking", "") # 4. Claude 模型关键词映射 # 使用关键词匹配而不是精确匹配,更灵活地处理各种变体 original_model = model if "opus" in model.lower(): model = "claude-opus-4-6-thinking" elif "sonnet" in model.lower(): model = "claude-sonnet-4-6" elif "haiku" in model.lower(): model = "gemini-2.5-flash" elif "claude" in model.lower(): # Claude 模型兜底:如果包含 claude 但不是 opus/sonnet/haiku model = "claude-sonnet-4-6" result["model"] = model if original_model != model: log.debug(f"[ANTIGRAVITY] 映射模型: {original_model} -> {model}") # 5. 模型特殊处理:循环移除末尾的 model 消息,保证以用户消息结尾 # 因为该模型不支持预填充 if "claude-opus-4-6-thinking" in model.lower() or "claude-sonnet-4-6" in model.lower(): contents = result.get("contents", []) removed_count = 0 while contents and isinstance(contents[-1], dict) and contents[-1].get("role") == "model": contents.pop() removed_count += 1 if removed_count > 0: log.warning(f"[ANTIGRAVITY] {model} 不支持预填充,移除了 {removed_count} 条末尾 model 消息") result["contents"] = contents # 6. 移除 antigravity 模式不支持的字段 generation_config.pop("presencePenalty", None) generation_config.pop("frequencyPenalty", None) generation_config.pop("stopSequences", None) # ========== 公共处理 ========== # 1. 安全设置覆盖 if "lite" in model.lower(): result["safetySettings"] = LITE_SAFETY_SETTINGS else: result["safetySettings"] = DEFAULT_SAFETY_SETTINGS # 2. 参数范围限制 if generation_config: # 强制设置 maxOutputTokens 为 64000 generation_config["maxOutputTokens"] = 64000 # 强制设置 topK 为 64 generation_config["topK"] = 64 if "contents" in result: cleaned_contents = [] for content in result["contents"]: if isinstance(content, dict) and "parts" in content: # 过滤掉空的或无效的 parts valid_parts = [] for part in content["parts"]: if not isinstance(part, dict): continue # 检查 part 是否有有效的非空值 # 过滤掉空字典或所有值都为空的 part has_valid_value = any( value not in (None, "", {}, []) for key, value in part.items() if key != "thought" # thought 字段可以为空 ) if has_valid_value: part = part.copy() # 修复 text 字段:确保是字符串而不是列表 if "text" in part: text_value = part["text"] if isinstance(text_value, list): # 如果是列表,合并为字符串 log.warning(f"[GEMINI_FIX] text 字段是列表,自动合并: {text_value}") part["text"] = " ".join(str(t) for t in text_value if t) elif isinstance(text_value, str): # 清理尾随空格 part["text"] = text_value.rstrip() else: # 其他类型转为字符串 log.warning(f"[GEMINI_FIX] text 字段类型异常 ({type(text_value)}), 转为字符串: {text_value}") part["text"] = str(text_value) valid_parts.append(part) else: log.warning(f"[GEMINI_FIX] 移除空的或无效的 part: {part}") # 只添加有有效 parts 的 content if valid_parts: cleaned_content = content.copy() cleaned_content["parts"] = valid_parts cleaned_contents.append(cleaned_content) else: log.warning(f"[GEMINI_FIX] 跳过没有有效 parts 的 content: {content.get('role')}") else: cleaned_contents.append(content) result["contents"] = cleaned_contents if generation_config: result["generationConfig"] = generation_config return result