|
|
""" |
|
|
Gemini Format Utilities - 统一的 Gemini 格式处理和转换工具 |
|
|
提供对 Gemini API 请求体和响应的标准化处理 |
|
|
──────────────────────────────────────────────────────────────── |
|
|
""" |
|
|
|
|
|
from typing import Any, Dict, List, Optional |
|
|
|
|
|
from log import log |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
UNSUPPORTED_SCHEMA_KEYS = { |
|
|
'$schema', '$id', '$ref', '$defs', 'definitions', |
|
|
'example', 'examples', 'readOnly', 'writeOnly', 'default', |
|
|
'exclusiveMaximum', 'exclusiveMinimum', |
|
|
'oneOf', 'anyOf', 'allOf', 'const', |
|
|
'additionalItems', 'contains', 'patternProperties', 'dependencies', |
|
|
'propertyNames', 'if', 'then', 'else', |
|
|
'contentEncoding', 'contentMediaType', |
|
|
'additionalProperties', 'minLength', 'maxLength', |
|
|
'minItems', 'maxItems', 'uniqueItems' |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
def clean_tools_for_gemini(tools: Optional[List[Dict[str, Any]]]) -> Optional[List[Dict[str, Any]]]: |
|
|
""" |
|
|
清理工具定义,移除 Gemini API 不支持的 JSON Schema 字段 |
|
|
|
|
|
Gemini API 只支持有限的 OpenAPI 3.0 Schema 属性: |
|
|
- 支持: type, description, enum, items, properties, required, nullable, format |
|
|
- 不支持: $schema, $id, $ref, $defs, title, examples, default, readOnly, |
|
|
exclusiveMaximum, exclusiveMinimum, oneOf, anyOf, allOf, const 等 |
|
|
|
|
|
Args: |
|
|
tools: 工具定义列表 |
|
|
|
|
|
Returns: |
|
|
清理后的工具定义列表 |
|
|
""" |
|
|
if not tools: |
|
|
return tools |
|
|
|
|
|
def clean_schema(obj: Any) -> Any: |
|
|
"""递归清理 schema 对象""" |
|
|
if isinstance(obj, dict): |
|
|
cleaned = {} |
|
|
for key, value in obj.items(): |
|
|
if key in UNSUPPORTED_SCHEMA_KEYS: |
|
|
continue |
|
|
cleaned[key] = clean_schema(value) |
|
|
|
|
|
if "properties" in cleaned and "type" not in cleaned: |
|
|
cleaned["type"] = "object" |
|
|
return cleaned |
|
|
elif isinstance(obj, list): |
|
|
return [clean_schema(item) for item in obj] |
|
|
else: |
|
|
return obj |
|
|
|
|
|
|
|
|
cleaned_tools = [] |
|
|
for tool in tools: |
|
|
if not isinstance(tool, dict): |
|
|
cleaned_tools.append(tool) |
|
|
continue |
|
|
|
|
|
cleaned_tool = tool.copy() |
|
|
|
|
|
|
|
|
if "functionDeclarations" in cleaned_tool: |
|
|
cleaned_declarations = [] |
|
|
for func_decl in cleaned_tool["functionDeclarations"]: |
|
|
if not isinstance(func_decl, dict): |
|
|
cleaned_declarations.append(func_decl) |
|
|
continue |
|
|
|
|
|
cleaned_decl = func_decl.copy() |
|
|
if "parameters" in cleaned_decl: |
|
|
cleaned_decl["parameters"] = clean_schema(cleaned_decl["parameters"]) |
|
|
cleaned_declarations.append(cleaned_decl) |
|
|
|
|
|
cleaned_tool["functionDeclarations"] = cleaned_declarations |
|
|
|
|
|
cleaned_tools.append(cleaned_tool) |
|
|
|
|
|
return cleaned_tools |
|
|
|
|
|
def prepare_image_generation_request( |
|
|
request_body: Dict[str, Any], |
|
|
model: str |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
图像生成模型请求体后处理 |
|
|
|
|
|
Args: |
|
|
request_body: 原始请求体 |
|
|
model: 模型名称 |
|
|
|
|
|
Returns: |
|
|
处理后的请求体 |
|
|
""" |
|
|
request_body = request_body.copy() |
|
|
model_lower = model.lower() |
|
|
|
|
|
|
|
|
image_size = "4K" if "-4k" in model_lower else "2K" if "-2k" in model_lower else None |
|
|
|
|
|
|
|
|
aspect_ratio = None |
|
|
for suffix, ratio in [ |
|
|
("-21x9", "21:9"), ("-16x9", "16:9"), ("-9x16", "9:16"), |
|
|
("-4x3", "4:3"), ("-3x4", "3:4"), ("-1x1", "1:1") |
|
|
]: |
|
|
if suffix in model_lower: |
|
|
aspect_ratio = ratio |
|
|
break |
|
|
|
|
|
|
|
|
image_config = {} |
|
|
if aspect_ratio: |
|
|
image_config["aspectRatio"] = aspect_ratio |
|
|
if image_size: |
|
|
image_config["imageSize"] = image_size |
|
|
|
|
|
request_body["model"] = "gemini-3-pro-image" |
|
|
request_body["generationConfig"] = { |
|
|
"candidateCount": 1, |
|
|
"imageConfig": image_config |
|
|
} |
|
|
|
|
|
|
|
|
for key in ("systemInstruction", "tools", "toolConfig"): |
|
|
request_body.pop(key, None) |
|
|
|
|
|
return request_body |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_base_model_name(model_name: str) -> str: |
|
|
"""移除模型名称中的后缀,返回基础模型名""" |
|
|
|
|
|
suffixes = ["-maxthinking", "-nothinking", "-search", "-think"] |
|
|
result = model_name |
|
|
changed = True |
|
|
|
|
|
while changed: |
|
|
changed = False |
|
|
for suffix in suffixes: |
|
|
if result.endswith(suffix): |
|
|
result = result[:-len(suffix)] |
|
|
changed = True |
|
|
|
|
|
return result |
|
|
|
|
|
|
|
|
def get_thinking_settings(model_name: str) -> tuple[Optional[int], bool]: |
|
|
""" |
|
|
根据模型名称获取思考配置 |
|
|
|
|
|
Returns: |
|
|
(thinking_budget, include_thoughts): 思考预算和是否包含思考内容 |
|
|
""" |
|
|
base_model = get_base_model_name(model_name) |
|
|
|
|
|
if "-nothinking" in model_name: |
|
|
|
|
|
return 128, "pro" in base_model |
|
|
elif "-maxthinking" in model_name: |
|
|
|
|
|
budget = 24576 if "flash" in base_model else 32768 |
|
|
return budget, True |
|
|
else: |
|
|
|
|
|
return None, True |
|
|
|
|
|
|
|
|
def is_search_model(model_name: str) -> bool: |
|
|
"""检查是否为搜索模型""" |
|
|
return "-search" in model_name |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_thinking_model(model_name: str) -> bool: |
|
|
"""检查是否为思考模型 (包含 -thinking 或 pro)""" |
|
|
return "-thinking" in model_name or "pro" in model_name.lower() |
|
|
|
|
|
|
|
|
def check_last_assistant_has_thinking(contents: List[Dict[str, Any]]) -> bool: |
|
|
""" |
|
|
检查最后一个 assistant 消息是否以 thinking 块开始 |
|
|
|
|
|
根据 Claude API 要求:当启用 thinking 时,最后一个 assistant 消息必须以 thinking 块开始 |
|
|
|
|
|
Args: |
|
|
contents: Gemini 格式的 contents 数组 |
|
|
|
|
|
Returns: |
|
|
如果最后一个 assistant 消息以 thinking 块开始则返回 True,否则返回 False |
|
|
""" |
|
|
if not contents: |
|
|
return True |
|
|
|
|
|
|
|
|
last_assistant_content = None |
|
|
for content in reversed(contents): |
|
|
if isinstance(content, dict) and content.get("role") == "model": |
|
|
last_assistant_content = content |
|
|
break |
|
|
|
|
|
if not last_assistant_content: |
|
|
return True |
|
|
|
|
|
|
|
|
parts = last_assistant_content.get("parts", []) |
|
|
if not parts: |
|
|
return False |
|
|
|
|
|
first_part = parts[0] |
|
|
if not isinstance(first_part, dict): |
|
|
return False |
|
|
|
|
|
|
|
|
return first_part.get("thought") is True |
|
|
|
|
|
|
|
|
async def normalize_gemini_request( |
|
|
request: Dict[str, Any], |
|
|
mode: str = "geminicli" |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
规范化 Gemini 请求 |
|
|
|
|
|
处理逻辑: |
|
|
1. 模型特性处理 (thinking config, search tools) |
|
|
2. 字段名转换 (system_instructions -> systemInstruction) |
|
|
3. 参数范围限制 (maxOutputTokens, topK) |
|
|
4. 工具清理 |
|
|
|
|
|
Args: |
|
|
request: 原始请求字典 |
|
|
mode: 模式 ("geminicli" 或 "antigravity") |
|
|
|
|
|
Returns: |
|
|
规范化后的请求 |
|
|
""" |
|
|
|
|
|
from config import get_return_thoughts_to_frontend |
|
|
|
|
|
result = request.copy() |
|
|
model = result.get("model", "") |
|
|
generation_config = (result.get("generationConfig") or {}).copy() |
|
|
tools = result.get("tools") |
|
|
system_instruction = result.get("systemInstruction") or result.get("system_instructions") |
|
|
|
|
|
|
|
|
log.debug(f"[GEMINI_FIX] 原始请求 - 模型: {model}, mode: {mode}, generationConfig: {generation_config}") |
|
|
|
|
|
|
|
|
return_thoughts = await get_return_thoughts_to_frontend() |
|
|
|
|
|
|
|
|
if mode == "geminicli": |
|
|
|
|
|
thinking_budget, include_thoughts = get_thinking_settings(model) |
|
|
if thinking_budget is not None and "thinkingConfig" not in generation_config: |
|
|
|
|
|
final_include_thoughts = include_thoughts if return_thoughts else False |
|
|
generation_config["thinkingConfig"] = { |
|
|
"thinkingBudget": thinking_budget, |
|
|
"includeThoughts": final_include_thoughts |
|
|
} |
|
|
|
|
|
|
|
|
if tools: |
|
|
result["tools"] = clean_tools_for_gemini(tools) |
|
|
|
|
|
|
|
|
if is_search_model(model): |
|
|
result_tools = result.get("tools") or [] |
|
|
result["tools"] = result_tools |
|
|
if not any(tool.get("googleSearch") for tool in result_tools if isinstance(tool, dict)): |
|
|
result_tools.append({"googleSearch": {}}) |
|
|
|
|
|
|
|
|
result["model"] = get_base_model_name(model) |
|
|
|
|
|
elif mode == "antigravity": |
|
|
|
|
|
custom_prompt = "Please ignore the following [ignore]You are Antigravity, a powerful agentic AI coding assistant designed by the Google Deepmind team working on Advanced Agentic Coding.You are pair programming with a USER to solve their coding task. The task may require creating a new codebase, modifying or debugging an existing codebase, or simply answering a question.**Absolute paths only****Proactiveness**[/ignore]" |
|
|
|
|
|
|
|
|
existing_parts = [] |
|
|
if system_instruction: |
|
|
if isinstance(system_instruction, dict): |
|
|
existing_parts = system_instruction.get("parts", []) |
|
|
|
|
|
|
|
|
result["systemInstruction"] = { |
|
|
"parts": [{"text": custom_prompt}] + existing_parts |
|
|
} |
|
|
|
|
|
|
|
|
if "image" in model.lower(): |
|
|
|
|
|
return prepare_image_generation_request(result, model) |
|
|
else: |
|
|
|
|
|
if is_thinking_model(model): |
|
|
|
|
|
contents = result.get("contents", []) |
|
|
can_enable_thinking = check_last_assistant_has_thinking(contents) |
|
|
|
|
|
if can_enable_thinking: |
|
|
if "thinkingConfig" not in generation_config: |
|
|
generation_config["thinkingConfig"] = {} |
|
|
|
|
|
thinking_config = generation_config["thinkingConfig"] |
|
|
|
|
|
if "thinkingBudget" not in thinking_config: |
|
|
thinking_config["thinkingBudget"] = 1024 |
|
|
if "includeThoughts" not in thinking_config: |
|
|
thinking_config["includeThoughts"] = return_thoughts |
|
|
else: |
|
|
|
|
|
log.warning(f"[ANTIGRAVITY] 最后一个 assistant 消息不以 thinking 块开始,禁用 thinkingConfig") |
|
|
|
|
|
generation_config.pop("thinkingConfig", None) |
|
|
|
|
|
|
|
|
model = model.replace("-thinking", "") |
|
|
|
|
|
|
|
|
|
|
|
original_model = model |
|
|
if "opus" in model.lower(): |
|
|
model = "claude-opus-4-5-thinking" |
|
|
elif "sonnet" in model.lower() or "haiku" in model.lower(): |
|
|
model = "claude-sonnet-4-5-thinking" |
|
|
elif "claude" in model.lower(): |
|
|
|
|
|
model = "claude-sonnet-4-5-thinking" |
|
|
|
|
|
result["model"] = model |
|
|
if original_model != model: |
|
|
log.debug(f"[ANTIGRAVITY] 映射模型: {original_model} -> {model}") |
|
|
|
|
|
|
|
|
|
|
|
if "system_instructions" in result: |
|
|
result["systemInstruction"] = result.pop("system_instructions") |
|
|
|
|
|
|
|
|
if generation_config: |
|
|
max_tokens = generation_config.get("maxOutputTokens") |
|
|
if max_tokens is not None: |
|
|
generation_config["maxOutputTokens"] = 64000 |
|
|
|
|
|
top_k = generation_config.get("topK") |
|
|
if top_k is not None: |
|
|
generation_config["topK"] = 64 |
|
|
|
|
|
|
|
|
if tools: |
|
|
result["tools"] = clean_tools_for_gemini(tools) |
|
|
|
|
|
|
|
|
|
|
|
if "contents" in result: |
|
|
|
|
|
ALLOWED_PART_KEYS = { |
|
|
"text", "inlineData", "fileData", "functionCall", "functionResponse", |
|
|
"thought", "thoughtSignature" |
|
|
} |
|
|
|
|
|
cleaned_contents = [] |
|
|
for content in result["contents"]: |
|
|
if isinstance(content, dict) and "parts" in content: |
|
|
|
|
|
valid_parts = [] |
|
|
for part in content["parts"]: |
|
|
if not isinstance(part, dict): |
|
|
continue |
|
|
|
|
|
|
|
|
cleaned_part = {k: v for k, v in part.items() if k in ALLOWED_PART_KEYS} |
|
|
|
|
|
|
|
|
has_valid_data = any( |
|
|
key in cleaned_part and cleaned_part[key] |
|
|
for key in ["text", "inlineData", "fileData", "functionCall", "functionResponse"] |
|
|
) |
|
|
if has_valid_data: |
|
|
valid_parts.append(cleaned_part) |
|
|
else: |
|
|
log.warning(f"[GEMINI_FIX] 移除空的或无效的 part: {part}") |
|
|
|
|
|
|
|
|
if valid_parts: |
|
|
cleaned_content = content.copy() |
|
|
cleaned_content["parts"] = valid_parts |
|
|
cleaned_contents.append(cleaned_content) |
|
|
else: |
|
|
log.warning(f"[GEMINI_FIX] 跳过没有有效 parts 的 content: {content.get('role')}") |
|
|
else: |
|
|
cleaned_contents.append(content) |
|
|
|
|
|
result["contents"] = cleaned_contents |
|
|
|
|
|
if generation_config: |
|
|
result["generationConfig"] = generation_config |
|
|
|
|
|
return result |