File size: 35,204 Bytes
96d34f2 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 | """
Gemini Format Utilities - 统一的 Gemini 格式处理和转换工具
提供对 Gemini API 请求体和响应的标准化处理
────────────────────────────────────────────────────────────────
"""
import json
from math import e
from typing import Any, Dict, Optional
from log import log
from src.converter.thoughtSignature_fix import SKIP_THOUGHT_SIGNATURE_VALIDATOR
# ==================== Gemini API 配置 ====================
# ====================== Model Configuration ======================
# Default Safety Settings for Google API
DEFAULT_SAFETY_SETTINGS = [
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"},
]
LITE_SAFETY_SETTINGS = [
{"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_HARASSMENT", "threshold": "BLOCK_NONE"},
{"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"},
]
def _append_schema_hint(schema: Dict[str, Any], hint: str) -> None:
"""Move fragile validation details into description instead of sending them raw."""
if not hint:
return
desc = schema.get("description")
schema["description"] = f"{desc} ({hint})" if desc else hint
def _resolve_schema_ref(ref: str, root_schema: Dict[str, Any]) -> Optional[Dict[str, Any]]:
if not isinstance(ref, str) or not ref.startswith("#/"):
return None
node: Any = root_schema
for part in ref[2:].split("/"):
part = part.replace("~1", "/").replace("~0", "~")
if not isinstance(node, dict) or part not in node:
return None
node = node[part]
return node if isinstance(node, dict) else None
def _clean_parameters_json_schema(
schema: Any,
root_schema: Optional[Dict[str, Any]] = None,
visited: Optional[set] = None,
) -> Any:
"""Clean a tool schema for Code Assist's parametersJsonSchema field."""
if isinstance(schema, list):
return [_clean_parameters_json_schema(item, root_schema, visited) for item in schema]
if not isinstance(schema, dict):
return schema
if root_schema is None:
root_schema = schema
if visited is None:
visited = set()
schema_id = id(schema)
if schema_id in visited:
return {"type": "object", "description": "circular reference"}
visited.add(schema_id)
ref_key = "$ref" if "$ref" in schema else ("ref" if "ref" in schema else None)
if ref_key:
resolved = _resolve_schema_ref(schema[ref_key], root_schema)
if resolved:
merged = dict(resolved)
for key in ("description", "default"):
if key in schema:
merged[key] = schema[key]
schema = merged
if "allOf" in schema:
result: Dict[str, Any] = {}
for item in schema.get("allOf") or []:
cleaned_item = _clean_parameters_json_schema(item, root_schema, visited)
if not isinstance(cleaned_item, dict):
continue
if "properties" in cleaned_item:
result.setdefault("properties", {}).update(cleaned_item["properties"])
if "required" in cleaned_item:
result.setdefault("required", []).extend(cleaned_item["required"])
for key, value in cleaned_item.items():
if key not in ("properties", "required"):
result[key] = value
for key, value in schema.items():
if key not in ("allOf", "properties", "required"):
result[key] = value
elif key in ("properties", "required") and key not in result:
result[key] = value
else:
result = dict(schema)
if result.get("nullable") is True:
_append_schema_hint(result, "nullable")
if "type" in result:
type_value = result["type"]
if isinstance(type_value, list):
non_null_types = [
str(t).lower()
for t in type_value
if isinstance(t, str) and t.lower() != "null"
]
if non_null_types:
result["type"] = non_null_types[0]
if any(str(t).lower() == "null" for t in type_value):
_append_schema_hint(result, "nullable")
else:
result["type"] = "string"
elif isinstance(type_value, str):
lower_type = type_value.lower()
if lower_type in {"string", "number", "integer", "boolean", "array", "object"}:
result["type"] = lower_type
elif lower_type == "null":
result["type"] = "string"
_append_schema_hint(result, "nullable")
else:
result.pop("type", None)
if "anyOf" in result or "oneOf" in result:
union_key = "anyOf" if "anyOf" in result else "oneOf"
union_items = result.get(union_key) or []
cleaned_items = [
item for item in (
_clean_parameters_json_schema(item, root_schema, visited)
for item in union_items
)
if isinstance(item, dict)
]
enum_values = [
item.get("const")
for item in union_items
if isinstance(item, dict) and item.get("const") not in ("", None)
]
if enum_values and len(enum_values) == len(union_items):
result["type"] = "string"
result["enum"] = [str(v) for v in enum_values]
else:
preferred = next(
(
item for item in cleaned_items
if item.get("type") in ("object", "array") or item.get("properties")
),
None,
)
if preferred is None:
preferred = next((item for item in cleaned_items if item.get("type") or item.get("enum")), None)
if preferred:
original_description = result.get("description")
result.update(preferred)
if original_description:
_append_schema_hint(result, original_description)
result.pop("anyOf", None)
result.pop("oneOf", None)
if result.get("type") == "array":
items = result.get("items")
if isinstance(items, list):
if items:
result["items"] = _clean_parameters_json_schema(items[0], root_schema, visited)
_append_schema_hint(result, "tuple schema simplified")
else:
result.pop("items", None)
elif isinstance(items, dict):
result["items"] = _clean_parameters_json_schema(items, root_schema, visited)
validation_keys = {
"default", "minLength", "maxLength", "minimum", "maximum",
"minItems", "maxItems", "pattern", "format", "uniqueItems",
}
for key in list(result.keys()):
if key in validation_keys:
value = result.pop(key)
if value not in (None, "", {}, []):
_append_schema_hint(result, f"{key}: {json.dumps(value, ensure_ascii=False)}")
unsupported_keys = {
"title", "$schema", "$id", "$ref", "ref", "strict", "nullable",
"exclusiveMaximum", "exclusiveMinimum", "additionalProperties",
"allOf", "anyOf", "oneOf", "$defs", "definitions", "example",
"examples", "readOnly", "writeOnly", "const", "additionalItems",
"contains", "patternProperties", "dependencies", "propertyNames",
"if", "then", "else", "contentEncoding", "contentMediaType",
}
for key in list(result.keys()):
if key in unsupported_keys or key.startswith("x-"):
del result[key]
nullable_props = set()
if isinstance(result.get("properties"), dict):
cleaned_props = {}
for prop_name, prop_schema in result["properties"].items():
if isinstance(prop_schema, dict):
prop_type = prop_schema.get("type")
if (
prop_schema.get("nullable") is True
or (
isinstance(prop_type, list)
and any(str(t).lower() == "null" for t in prop_type)
)
):
nullable_props.add(prop_name)
cleaned_props[prop_name] = _clean_parameters_json_schema(prop_schema, root_schema, visited)
result["properties"] = cleaned_props
if "properties" in result and "type" not in result:
result["type"] = "object"
if isinstance(result.get("required"), list):
prop_names = set(result.get("properties", {}).keys()) if isinstance(result.get("properties"), dict) else None
required = []
for item in result["required"]:
if not isinstance(item, str):
continue
if prop_names is not None and item not in prop_names:
continue
if item in nullable_props:
continue
if item not in required:
required.append(item)
if required:
result["required"] = required
else:
result.pop("required", None)
return result
def _normalize_tools_for_internal_api(tools: Any) -> Any:
if not isinstance(tools, list):
return tools
normalized_tools = []
for tool in tools:
if not isinstance(tool, dict):
normalized_tools.append(tool)
continue
normalized_tool = tool.copy()
declarations = normalized_tool.get("functionDeclarations")
if declarations is None:
declarations = normalized_tool.get("function_declarations")
if isinstance(declarations, list):
normalized_declarations = []
for declaration in declarations:
if not isinstance(declaration, dict):
normalized_declarations.append(declaration)
continue
normalized_declaration = declaration.copy()
if "parametersJsonSchema" in normalized_declaration:
schema = normalized_declaration["parametersJsonSchema"]
elif "parameters_json_schema" in normalized_declaration:
schema = normalized_declaration.pop("parameters_json_schema", None)
else:
schema = normalized_declaration.pop("parameters", None)
normalized_declaration.pop("parameters", None)
normalized_declaration.pop("parameters_json_schema", None)
if schema not in (None, {}, []):
normalized_declaration["parametersJsonSchema"] = _clean_parameters_json_schema(schema)
else:
normalized_declaration.pop("parametersJsonSchema", None)
normalized_declarations.append(normalized_declaration)
normalized_tool.pop("function_declarations", None)
normalized_tool["functionDeclarations"] = normalized_declarations
normalized_tools.append(normalized_tool)
return normalized_tools
def _ensure_empty_tool_schema_for_claude(tools: Any, model_name: str) -> Any:
if "claude" not in (model_name or "").lower() or not isinstance(tools, list):
return tools
normalized_tools = []
for tool in tools:
if not isinstance(tool, dict):
normalized_tools.append(tool)
continue
normalized_tool = tool.copy()
custom_tool = normalized_tool.get("custom")
if isinstance(custom_tool, dict) and "input_schema" not in custom_tool:
normalized_custom = custom_tool.copy()
normalized_custom["input_schema"] = {"type": "object", "properties": {}}
normalized_tool["custom"] = normalized_custom
declarations = normalized_tool.get("functionDeclarations")
if declarations is None:
declarations = normalized_tool.get("function_declarations")
if isinstance(declarations, list):
normalized_declarations = []
for declaration in declarations:
if not isinstance(declaration, dict):
normalized_declarations.append(declaration)
continue
normalized_declaration = declaration.copy()
if (
"parametersJsonSchema" not in normalized_declaration
and "parameters_json_schema" in normalized_declaration
):
normalized_declaration["parametersJsonSchema"] = normalized_declaration.pop("parameters_json_schema")
if "parametersJsonSchema" not in normalized_declaration:
normalized_declaration["parametersJsonSchema"] = {
"type": "object",
"properties": {},
}
normalized_declarations.append(normalized_declaration)
normalized_tool.pop("function_declarations", None)
normalized_tool["functionDeclarations"] = normalized_declarations
normalized_tools.append(normalized_tool)
return normalized_tools
def _should_skip_thought_signature(part: Dict[str, Any], model_name: str) -> bool:
if "claude" in (model_name or "").lower():
return False
return (
"functionCall" in part
or "function_call" in part
or part.get("thought") is True
or "thoughtSignature" in part
or "thought_signature" in part
)
def _normalize_part_thought_signature(part: Dict[str, Any], model_name: str) -> Dict[str, Any]:
normalized = part.copy()
if _should_skip_thought_signature(normalized, model_name):
normalized.pop("thought_signature", None)
normalized["thoughtSignature"] = SKIP_THOUGHT_SIGNATURE_VALIDATOR
return normalized
SUPPORTED_ASPECT_RATIOS = [
(1, 1), (2, 3), (3, 2), (3, 4), (4, 3),
(4, 5), (5, 4), (9, 16), (16, 9), (21, 9),
]
def _parse_size_to_image_config(size_str: str) -> Dict[str, str]:
"""
解析用户传入的 size 参数为 Gemini imageConfig 参数
支持格式: "1024x1536", "1024*1536", "1024X1536"
Returns:
包含 aspectRatio 和/或 imageSize 的字典
"""
import re
config = {}
size_str = size_str.strip()
match = re.match(r"^(\d+)\s*[xX*×]\s*(\d+)$", size_str)
if not match:
return config
width, height = int(match.group(1)), int(match.group(2))
if width <= 0 or height <= 0:
return config
# 计算最接近的支持宽高比
target_ratio = width / height
best_ratio = None
best_diff = float("inf")
for w, h in SUPPORTED_ASPECT_RATIOS:
diff = abs(target_ratio - w / h)
if diff < best_diff:
best_diff = diff
best_ratio = f"{w}:{h}"
if best_ratio:
config["aspectRatio"] = best_ratio
# 根据最大边长确定 imageSize(使用最接近的档位)
max_dim = max(width, height)
if max_dim <= 1280:
config["imageSize"] = "1K"
elif max_dim <= 2560:
config["imageSize"] = "2K"
else:
config["imageSize"] = "4K"
return config
def prepare_image_generation_request(
request_body: Dict[str, Any],
model: str
) -> Dict[str, Any]:
"""
图像生成模型请求体后处理
支持三种方式指定图片参数(优先级从高到低):
1. size 参数: 如 "1024x1536",自动计算 aspectRatio 和 imageSize
2. 模型名后缀: 如 -4k, -2k, -16x9, -1x1
3. 默认值: 不设置额外参数
Args:
request_body: 原始请求体
model: 模型名称
Returns:
处理后的请求体
"""
request_body = request_body.copy()
model_lower = model.lower()
# 优先使用 size 参数
size_str = request_body.pop("size", None)
if size_str:
image_config = _parse_size_to_image_config(size_str)
log.debug(f"[IMAGE] 从 size 参数 '{size_str}' 解析: {image_config}")
else:
# 从模型名后缀解析
image_size = "4K" if "-4k" in model_lower else "2K" if "-2k" in model_lower else None
aspect_ratio = None
for suffix, ratio in [
("-21x9", "21:9"), ("-16x9", "16:9"), ("-9x16", "9:16"),
("-4x3", "4:3"), ("-3x4", "3:4"), ("-1x1", "1:1")
]:
if suffix in model_lower:
aspect_ratio = ratio
break
image_config = {}
if aspect_ratio:
image_config["aspectRatio"] = aspect_ratio
if image_size:
image_config["imageSize"] = image_size
request_body["model"] = "gemini-3.1-flash-image" # 统一使用基础模型名
request_body["generationConfig"] = {
"candidateCount": 1,
"imageConfig": image_config
}
# 移除不需要的字段
for key in ("systemInstruction", "tools", "toolConfig"):
request_body.pop(key, None)
return request_body
# ==================== 模型特性辅助函数 ====================
def get_base_model_name(model_name: str) -> str:
"""移除模型名称中的后缀,返回基础模型名"""
# 按照从长到短的顺序排列,避免短后缀先于长后缀被匹配
suffixes = [
"-maxthinking", "-nothinking", # 兼容旧模式
"-minimal", "-medium", "-search", "-think", # 中等长度后缀
"-high", "-max", "-low" # 短后缀
]
result = model_name
changed = True
# 持续循环直到没有任何后缀可以移除
while changed:
changed = False
for suffix in suffixes:
if result.endswith(suffix):
result = result[:-len(suffix)]
changed = True
# 不使用 break,继续检查是否还有其他后缀
return result
def get_thinking_settings(model_name: str) -> tuple[Optional[int], Optional[str]]:
"""
根据模型名称获取思考配置
支持两种模式:
1. CLI 模式思考预算 (Gemini 2.5 系列): -max, -high, -medium, -low, -minimal
2. CLI 模式思考等级 (Gemini 3 Preview 系列): -high, -medium, -low, -minimal (仅 3-flash)
3. 兼容旧模式: -maxthinking, -nothinking (不返回给用户)
Returns:
(thinking_budget, thinking_level): 思考预算和思考等级
"""
base_model = get_base_model_name(model_name)
# ========== 兼容旧模式 (不返回给用户) ==========
if "-nothinking" in model_name:
# nothinking 模式: 限制思考
if "flash" in base_model:
return 0, None
return 128, None
elif "-maxthinking" in model_name:
# maxthinking 模式: 最大思考预算
budget = 24576 if "flash" in base_model else 32768
if "gemini-3" in base_model:
# Gemini 3 系列不支持 thinkingBudget,返回 high 等级
return None, "high"
else:
return budget, None
# ========== 新 CLI 模式: 基于思考预算/等级 ==========
# Gemini 3 Preview 系列: 使用 thinkingLevel
if "gemini-3" in base_model:
if "-high" in model_name:
return None, "high"
elif "-medium" in model_name:
# 仅 3-flash-preview 支持 medium
if "flash" in base_model:
return None, "medium"
# pro 系列不支持 medium,返回 Default
return None, None
elif "-low" in model_name:
return None, "low"
elif "-minimal" in model_name:
return None, None
else:
# Default: 不设置 thinking 配置
return None, None
# Gemini 2.5 系列: 使用 thinkingBudget
elif "gemini-2.5" in base_model:
if "-max" in model_name:
# 2.5-flash-max: 24576, 2.5-pro-max: 32768
budget = 24576 if "flash" in base_model else 32768
return budget, None
elif "-high" in model_name:
# 2.5-flash-high: 16000, 2.5-pro-high: 16000
return 16000, None
elif "-medium" in model_name:
# 2.5-flash-medium: 8192, 2.5-pro-medium: 8192
return 8192, None
elif "-low" in model_name:
# 2.5-flash-low: 1024, 2.5-pro-low: 1024
return 1024, None
elif "-minimal" in model_name:
# 2.5-flash-minimal: 0, 2.5-pro-minimal: 128
budget = 0 if "flash" in base_model else 128
return budget, None
else:
# Default: 不设置 thinking budget
return None, None
# 其他模型: 不设置 thinking 配置
return None, None
def is_search_model(model_name: str) -> bool:
"""检查是否为搜索模型"""
return "-search" in model_name
# ==================== 统一的 Gemini 请求后处理 ====================
def is_thinking_model(model_name: str) -> bool:
"""检查是否为思考模型 (包含 -thinking 或 pro)"""
return "think" in model_name or "pro" in model_name.lower()
async def normalize_gemini_request(
request: Dict[str, Any],
mode: str = "geminicli"
) -> Dict[str, Any]:
"""
规范化 Gemini 请求
处理逻辑:
1. 模型特性处理 (thinking config, search tools)
3. 参数范围限制 (maxOutputTokens, topK)
4. 工具清理
Args:
request: 原始请求字典
mode: 模式 ("geminicli" 或 "antigravity")
Returns:
规范化后的请求
"""
# 导入配置函数
from config import get_return_thoughts_to_frontend
result = request.copy()
model = result.get("model", "")
generation_config = (result.get("generationConfig") or {}).copy() # 创建副本避免修改原对象
tools = result.get("tools")
system_instruction = result.get("systemInstruction") or result.get("system_instructions")
# 记录原始请求
log.debug(f"[GEMINI_FIX] 原始请求 - 模型: {model}, mode: {mode}, generationConfig: {generation_config}")
# 获取配置值
return_thoughts = await get_return_thoughts_to_frontend()
# ========== 模式特定处理 ==========
if mode == "geminicli":
# 1. 思考设置
# 优先使用 get_thinking_settings 获取的思考预算和等级
thinking_budget, thinking_level = get_thinking_settings(model)
# 其次使用传入的思考预算(如果未从模型名称获取)
if thinking_budget is None and thinking_level is None:
thinking_budget = generation_config.get("thinkingConfig", {}).get("thinkingBudget")
thinking_level = generation_config.get("thinkingConfig", {}).get("thinkingLevel")
# 假如 is_thinking_model 为真或者思考预算/等级不为空,设置 thinkingConfig
if is_thinking_model(model) or thinking_budget is not None or thinking_level is not None:
# 确保 thinkingConfig 存在
if "thinkingConfig" not in generation_config:
generation_config["thinkingConfig"] = {}
thinking_config = generation_config["thinkingConfig"]
# 设置思考预算或等级(互斥)
if thinking_budget is not None:
thinking_config["thinkingBudget"] = thinking_budget
thinking_config.pop("thinkingLevel", None) # 避免与 thinkingBudget 冲突
elif thinking_level is not None:
thinking_config["thinkingLevel"] = thinking_level
thinking_config.pop("thinkingBudget", None) # 避免与 thinkingLevel 冲突
# includeThoughts 逻辑:
# 1. 如果是 pro 模型,为 return_thoughts
# 2. 如果不是 pro 模型,检查是否有思考预算或思考等级
base_model = get_base_model_name(model)
if "pro" in base_model:
include_thoughts = return_thoughts
elif "3-flash" in base_model:
if thinking_level is None:
include_thoughts = False
else:
include_thoughts = return_thoughts
else:
# 非 pro 模型: 有思考预算或等级才包含思考
# 注意: 思考预算为 0 时不包含思考
if thinking_budget is None or thinking_budget == 0:
include_thoughts = False
else:
include_thoughts = return_thoughts
thinking_config["includeThoughts"] = include_thoughts
# 2. 搜索模型添加 Google Search
if is_search_model(model):
result_tools = result.get("tools") or []
result["tools"] = result_tools
if not any(tool.get("googleSearch") for tool in result_tools if isinstance(tool, dict)):
result_tools.append({"googleSearch": {}})
# 3. 模型名称处理
result["model"] = get_base_model_name(model)
elif mode == "antigravity":
'''
# 1. 处理 system_instruction
custom_prompt = "Please ignore the following [ignore]You are Antigravity, a powerful agentic AI coding assistant designed by the Google Deepmind team working on Advanced Agentic Coding.You are pair programming with a USER to solve their coding task. The task may require creating a new codebase, modifying or debugging an existing codebase, or simply answering a question.**Absolute paths only****Proactiveness**[/ignore]"
# 提取原有的 parts(如果存在)
existing_parts = []
if system_instruction:
if isinstance(system_instruction, dict):
existing_parts = system_instruction.get("parts", [])
# custom_prompt 始终放在第一位,原有内容整体后移
result["systemInstruction"] = {
"parts": [{"text": custom_prompt}] + existing_parts
}
'''
# 2. 判断图片模型
if "image" in model.lower():
# 调用图片生成专用处理函数
return prepare_image_generation_request(result, model)
else:
# 3. 思考模型处理
if is_thinking_model(model) or ("thinkingBudget" in generation_config.get("thinkingConfig", {}) and generation_config["thinkingConfig"]["thinkingBudget"] != 0):
# 直接设置 thinkingConfig
if "thinkingConfig" not in generation_config:
generation_config["thinkingConfig"] = {}
thinking_config = generation_config["thinkingConfig"]
# 优先使用传入的思考预算,否则使用默认值
if "thinkingBudget" not in thinking_config:
thinking_config["thinkingBudget"] = 1024
thinking_config.pop("thinkingLevel", None) # 避免与 thinkingBudget 冲突
thinking_config["includeThoughts"] = return_thoughts
# 检查最后一个 assistant 消息是否以 thinking 块开始
contents = result.get("contents", [])
if "claude" in model.lower():
# 检测是否有工具调用(MCP场景)
has_tool_calls = any(
isinstance(content, dict) and
any(
isinstance(part, dict) and ("functionCall" in part or "function_call" in part)
for part in content.get("parts", [])
)
for content in contents
)
if has_tool_calls:
# MCP 场景:检测到工具调用,移除 thinkingConfig
log.warning(f"[ANTIGRAVITY] 检测到工具调用(MCP场景),移除 thinkingConfig 避免失效")
generation_config.pop("thinkingConfig", None)
else:
# 非 MCP 场景:填充思考块
# log.warning(f"[ANTIGRAVITY] 最后一个 assistant 消息不以 thinking 块开始,自动填充思考块")
# 找到最后一个 model 角色的 content
for i in range(len(contents) - 1, -1, -1):
content = contents[i]
if isinstance(content, dict) and content.get("role") == "model":
# 在 parts 开头插入思考块(使用官方跳过验证的虚拟签名)
parts = content.get("parts", [])
thinking_part = {
"text": "...",
# "thought": True, # 标记为思考块
"thoughtSignature": "skip_thought_signature_validator" # 官方文档推荐的虚拟签名
}
# 如果第一个 part 不是 thinking,则插入
if not parts or not (isinstance(parts[0], dict) and ("thought" in parts[0] or "thoughtSignature" in parts[0])):
content["parts"] = [thinking_part] + parts
log.debug(f"[ANTIGRAVITY] 已在最后一个 assistant 消息开头插入思考块(含跳过验证签名)")
break
# 移除 -thinking 后缀
model = model.replace("-thinking", "")
# 4. Claude 模型关键词映射
# 使用关键词匹配而不是精确匹配,更灵活地处理各种变体
original_model = model
if "opus" in model.lower():
model = "claude-opus-4-6-thinking"
elif "sonnet" in model.lower():
model = "claude-sonnet-4-6"
elif "haiku" in model.lower():
model = "gemini-2.5-flash"
elif "claude" in model.lower():
# Claude 模型兜底:如果包含 claude 但不是 opus/sonnet/haiku
model = "claude-sonnet-4-6"
result["model"] = model
if original_model != model:
log.debug(f"[ANTIGRAVITY] 映射模型: {original_model} -> {model}")
# 5. 模型特殊处理:循环移除末尾的 model 消息,保证以用户消息结尾
# 因为该模型不支持预填充
if "claude-opus-4-6-thinking" in model.lower() or "claude-sonnet-4-6" in model.lower():
contents = result.get("contents", [])
removed_count = 0
while contents and isinstance(contents[-1], dict) and contents[-1].get("role") == "model":
contents.pop()
removed_count += 1
if removed_count > 0:
log.warning(f"[ANTIGRAVITY] {model} 不支持预填充,移除了 {removed_count} 条末尾 model 消息")
result["contents"] = contents
# 6. 移除 antigravity 模式不支持的字段
generation_config.pop("presencePenalty", None)
generation_config.pop("frequencyPenalty", None)
generation_config.pop("stopSequences", None)
# ========== 公共处理 ==========
# 1. 安全设置覆盖
if "tools" in result:
result["tools"] = _normalize_tools_for_internal_api(result.get("tools"))
result["tools"] = _ensure_empty_tool_schema_for_claude(result.get("tools"), model)
if "lite" in model.lower():
result["safetySettings"] = LITE_SAFETY_SETTINGS
else:
result["safetySettings"] = DEFAULT_SAFETY_SETTINGS
# 2. 参数范围限制
if generation_config:
# 强制设置 maxOutputTokens 为 64000
generation_config["maxOutputTokens"] = 64000
# 强制设置 topK 为 64
generation_config["topK"] = 64
if "contents" in result:
cleaned_contents = []
for content in result["contents"]:
if isinstance(content, dict) and "parts" in content:
# 过滤掉空的或无效的 parts
valid_parts = []
for part in content["parts"]:
if not isinstance(part, dict):
continue
# 检查 part 是否有有效的非空值
# 过滤掉空字典或所有值都为空的 part
has_valid_value = any(
value not in (None, "", {}, [])
for key, value in part.items()
if key != "thought" # thought 字段可以为空
)
if has_valid_value:
part = _normalize_part_thought_signature(part, model)
# 修复 text 字段:确保是字符串而不是列表
if "text" in part:
text_value = part["text"]
if isinstance(text_value, list):
# 如果是列表,合并为字符串
log.warning(f"[GEMINI_FIX] text 字段是列表,自动合并: {text_value}")
part["text"] = " ".join(str(t) for t in text_value if t)
elif isinstance(text_value, str):
# 清理尾随空格
part["text"] = text_value.rstrip()
else:
# 其他类型转为字符串
log.warning(f"[GEMINI_FIX] text 字段类型异常 ({type(text_value)}), 转为字符串: {text_value}")
part["text"] = str(text_value)
valid_parts.append(part)
else:
log.warning(f"[GEMINI_FIX] 移除空的或无效的 part: {part}")
# 只添加有有效 parts 的 content
if valid_parts:
cleaned_content = content.copy()
cleaned_content["parts"] = valid_parts
cleaned_contents.append(cleaned_content)
else:
log.warning(f"[GEMINI_FIX] 跳过没有有效 parts 的 content: {content.get('role')}")
else:
cleaned_contents.append(content)
result["contents"] = cleaned_contents
if generation_config:
result["generationConfig"] = generation_config
return result
|