Spaces:
Paused
Paused
mirrobot-agent[bot]
fix(antigravity): add propertyNames to incompatible schema keywords (#52)
92db52f unverified | # src/rotator_library/providers/antigravity_provider_v2.py | |
| """ | |
| Antigravity Provider - Refactored Implementation | |
| A clean, well-structured provider for Google's Antigravity API, supporting: | |
| - Gemini 2.5 (Pro/Flash) with thinkingBudget | |
| - Gemini 3 (Pro/Flash/Image) with thinkingLevel | |
| - Claude (Sonnet 4.5) via Antigravity proxy | |
| - Claude (Opus 4.5) via Antigravity proxy | |
| Key Features: | |
| - Unified streaming/non-streaming handling | |
| - Server-side thought signature caching | |
| - Automatic base URL fallback | |
| - Gemini 3 tool hallucination prevention | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import copy | |
| import hashlib | |
| import json | |
| import logging | |
| import os | |
| import random | |
| import time | |
| import uuid | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import ( | |
| Any, | |
| AsyncGenerator, | |
| Dict, | |
| List, | |
| Optional, | |
| Tuple, | |
| Union, | |
| TYPE_CHECKING, | |
| ) | |
| import httpx | |
| import litellm | |
| from .provider_interface import ProviderInterface, UsageResetConfigDef, QuotaGroupMap | |
| from .antigravity_auth_base import AntigravityAuthBase | |
| from .provider_cache import ProviderCache | |
| from .utilities.antigravity_quota_tracker import AntigravityQuotaTracker | |
| from ..model_definitions import ModelDefinitions | |
| from ..timeout_config import TimeoutConfig | |
| from ..error_handler import EmptyResponseError, TransientQuotaError | |
| from ..utils.paths import get_logs_dir, get_cache_dir | |
| if TYPE_CHECKING: | |
| from ..usage_manager import UsageManager | |
| # ============================================================================= | |
| # INTERNAL EXCEPTIONS | |
| # ============================================================================= | |
| class _MalformedFunctionCallDetected(Exception): | |
| """ | |
| Internal exception raised when MALFORMED_FUNCTION_CALL is detected. | |
| Signals the retry logic to inject corrective messages and retry. | |
| Not intended to be raised to callers. | |
| """ | |
| def __init__(self, finish_message: str, raw_response: Dict[str, Any]): | |
| self.finish_message = finish_message | |
| self.raw_response = raw_response | |
| super().__init__(finish_message) | |
| # ============================================================================= | |
| # CONFIGURATION CONSTANTS | |
| # ============================================================================= | |
| def _env_bool(key: str, default: bool = False) -> bool: | |
| """Get boolean from environment variable.""" | |
| return os.getenv(key, str(default).lower()).lower() in ("true", "1", "yes") | |
| def _env_int(key: str, default: int) -> int: | |
| """Get integer from environment variable.""" | |
| return int(os.getenv(key, str(default))) | |
| lib_logger = logging.getLogger("rotator_library") | |
| # Antigravity base URLs with fallback order | |
| # Priority: daily (sandbox) → autopush (sandbox) → production | |
| BASE_URLS = [ | |
| "https://daily-cloudcode-pa.sandbox.googleapis.com/v1internal", | |
| # "https://autopush-cloudcode-pa.sandbox.googleapis.com/v1internal", | |
| "https://cloudcode-pa.googleapis.com/v1internal", # Production fallback | |
| ] | |
| # Required headers for Antigravity API calls | |
| # These headers are CRITICAL for gemini-3-pro-high/low to work | |
| # Without X-Goog-Api-Client and Client-Metadata, only gemini-3-pro-preview works | |
| ANTIGRAVITY_HEADERS = { | |
| "User-Agent": "antigravity/1.12.4 windows/amd64", | |
| "X-Goog-Api-Client": "google-cloud-sdk vscode_cloudshelleditor/0.1", | |
| "Client-Metadata": '{"ideType":"IDE_UNSPECIFIED","platform":"PLATFORM_UNSPECIFIED","pluginType":"GEMINI"}', | |
| } | |
| # Available models via Antigravity | |
| AVAILABLE_MODELS = [ | |
| # Gemini models | |
| # "gemini-2.5-pro", | |
| "gemini-2.5-flash", # Uses -thinking variant when reasoning_effort provided | |
| "gemini-2.5-flash-lite", # Thinking budget configurable, no name change | |
| "gemini-3-pro-preview", # Internally mapped to -low/-high variant based on thinkingLevel | |
| "gemini-3-flash", # New Gemini 3 Flash model (supports thinking with minBudget=32) | |
| # "gemini-3-pro-image", # Image generation model | |
| # "gemini-2.5-computer-use-preview-10-2025", | |
| # Claude models | |
| "claude-sonnet-4-5", # Uses -thinking variant when reasoning_effort provided | |
| "claude-opus-4-5", # ALWAYS uses -thinking variant (non-thinking doesn't exist) | |
| # Other models | |
| "gpt-oss-120b-medium", # GPT-OSS model, shares quota with Claude | |
| ] | |
| # Default max output tokens (including thinking) - can be overridden per request | |
| DEFAULT_MAX_OUTPUT_TOKENS = 64000 | |
| # Empty response retry configuration | |
| # When Antigravity returns an empty response (no content, no tool calls), | |
| # automatically retry up to this many attempts before giving up (minimum 1) | |
| EMPTY_RESPONSE_MAX_ATTEMPTS = max(1, _env_int("ANTIGRAVITY_EMPTY_RESPONSE_ATTEMPTS", 6)) | |
| EMPTY_RESPONSE_RETRY_DELAY = _env_int("ANTIGRAVITY_EMPTY_RESPONSE_RETRY_DELAY", 3) | |
| # Malformed function call retry configuration | |
| # When Gemini 3 returns MALFORMED_FUNCTION_CALL (invalid JSON syntax in tool args), | |
| # inject corrective messages and retry up to this many times | |
| MALFORMED_CALL_MAX_RETRIES = max(1, _env_int("ANTIGRAVITY_MALFORMED_CALL_RETRIES", 2)) | |
| MALFORMED_CALL_RETRY_DELAY = _env_int("ANTIGRAVITY_MALFORMED_CALL_DELAY", 1) | |
| # Model alias mappings (internal ↔ public) | |
| MODEL_ALIAS_MAP = { | |
| "rev19-uic3-1p": "gemini-2.5-computer-use-preview-10-2025", | |
| "gemini-3-pro-image": "gemini-3-pro-image-preview", | |
| "gemini-3-pro-low": "gemini-3-pro-preview", | |
| "gemini-3-pro-high": "gemini-3-pro-preview", | |
| } | |
| MODEL_ALIAS_REVERSE = {v: k for k, v in MODEL_ALIAS_MAP.items()} | |
| # Models to exclude from dynamic discovery | |
| EXCLUDED_MODELS = { | |
| "chat_20706", | |
| "chat_23310", | |
| "gemini-2.5-flash-thinking", | |
| "gemini-2.5-pro", | |
| } | |
| # Gemini finish reason mapping | |
| FINISH_REASON_MAP = { | |
| "STOP": "stop", | |
| "MAX_TOKENS": "length", | |
| "SAFETY": "content_filter", | |
| "RECITATION": "content_filter", | |
| "OTHER": "stop", | |
| } | |
| # Gemini 3 tool name remapping | |
| # Turned out not useful - saved for later to unfuck if needed | |
| GEMINI3_TOOL_RENAMES = { | |
| # "batch": "multi_tool", # "batch" triggers internal format: call:default_api:... | |
| } | |
| GEMINI3_TOOL_RENAMES_REVERSE = {v: k for k, v in GEMINI3_TOOL_RENAMES.items()} | |
| # Default safety settings - disable content filtering for all categories | |
| # Per CLIProxyAPI: these are attached to prevent safety blocks during API calls | |
| DEFAULT_SAFETY_SETTINGS = [ | |
| {"category": "HARM_CATEGORY_HARASSMENT", "threshold": "OFF"}, | |
| {"category": "HARM_CATEGORY_HATE_SPEECH", "threshold": "OFF"}, | |
| {"category": "HARM_CATEGORY_SEXUALLY_EXPLICIT", "threshold": "OFF"}, | |
| {"category": "HARM_CATEGORY_DANGEROUS_CONTENT", "threshold": "OFF"}, | |
| {"category": "HARM_CATEGORY_CIVIC_INTEGRITY", "threshold": "BLOCK_NONE"}, | |
| ] | |
| # Directory paths - use centralized path management | |
| def _get_antigravity_logs_dir(): | |
| return get_logs_dir() / "antigravity_logs" | |
| def _get_antigravity_cache_dir(): | |
| return get_cache_dir(subdir="antigravity") | |
| def _get_gemini3_signature_cache_file(): | |
| return _get_antigravity_cache_dir() / "gemini3_signatures.json" | |
| def _get_claude_thinking_cache_file(): | |
| return _get_antigravity_cache_dir() / "claude_thinking.json" | |
| # Gemini 3 tool fix system instruction (prevents hallucination) | |
| DEFAULT_GEMINI3_SYSTEM_INSTRUCTION = """<CRITICAL_TOOL_USAGE_INSTRUCTIONS> | |
| You are operating in a CUSTOM ENVIRONMENT where tool definitions COMPLETELY DIFFER from your training data. | |
| VIOLATION OF THESE RULES WILL CAUSE IMMEDIATE SYSTEM FAILURE. | |
| ## ABSOLUTE RULES - NO EXCEPTIONS | |
| 1. **SCHEMA IS LAW**: The JSON schema in each tool definition is the ONLY source of truth. | |
| - Your pre-trained knowledge about tools like 'read_file', 'apply_diff', 'write_to_file', 'bash', etc. is INVALID here. | |
| - Every tool has been REDEFINED with different parameters than what you learned during training. | |
| 2. **PARAMETER NAMES ARE EXACT**: Use ONLY the parameter names from the schema. | |
| - WRONG: 'suggested_answers', 'file_path', 'files_to_read', 'command_to_run' | |
| - RIGHT: Check the 'properties' field in the schema for the exact names | |
| - The schema's 'required' array tells you which parameters are mandatory | |
| 3. **ARRAY PARAMETERS**: When a parameter has "type": "array", check the 'items' field: | |
| - If items.type is "object", you MUST provide an array of objects with the EXACT properties listed | |
| - If items.type is "string", you MUST provide an array of strings | |
| - NEVER provide a single object when an array is expected | |
| - NEVER provide an array when a single value is expected | |
| 4. **NESTED OBJECTS**: When items.type is "object": | |
| - Check items.properties for the EXACT field names required | |
| - Check items.required for which nested fields are mandatory | |
| - Include ALL required nested fields in EVERY array element | |
| 5. **STRICT PARAMETERS HINT**: Tool descriptions contain "STRICT PARAMETERS: ..." which lists: | |
| - Parameter name, type, and whether REQUIRED | |
| - For arrays of objects: the nested structure in brackets like [field: type REQUIRED, ...] | |
| - USE THIS as your quick reference, but the JSON schema is authoritative | |
| 6. **BEFORE EVERY TOOL CALL**: | |
| a. Read the tool's 'parametersJsonSchema' or 'parameters' field completely | |
| b. Identify ALL required parameters | |
| c. Verify your parameter names match EXACTLY (case-sensitive) | |
| d. For arrays, verify you're providing the correct item structure | |
| e. Do NOT add parameters that don't exist in the schema | |
| 7. **JSON SYNTAX**: Function call arguments must be valid JSON. | |
| - All keys MUST be double-quoted: {"key":"value"} not {key:"value"} | |
| - Use double quotes for strings, not single quotes | |
| ## COMMON FAILURE PATTERNS TO AVOID | |
| - Using 'path' when schema says 'filePath' (or vice versa) | |
| - Using 'content' when schema says 'text' (or vice versa) | |
| - Providing {"file": "..."} when schema wants [{"path": "...", "line_ranges": [...]}] | |
| - Omitting required nested fields in array items | |
| - Adding 'additionalProperties' that the schema doesn't define | |
| - Guessing parameter names from similar tools you know from training | |
| - Using unquoted keys: {key:"value"} instead of {"key":"value"} | |
| - Writing JSON as text in your response instead of making an actual function call | |
| - Using single quotes instead of double quotes for strings | |
| ## REMEMBER | |
| Your training data about function calling is OUTDATED for this environment. | |
| The tool names may look familiar, but the schemas are DIFFERENT. | |
| When in doubt, RE-READ THE SCHEMA before making the call. | |
| </CRITICAL_TOOL_USAGE_INSTRUCTIONS> | |
| """ | |
| # Claude tool fix system instruction (prevents hallucination) | |
| DEFAULT_CLAUDE_SYSTEM_INSTRUCTION = """CRITICAL TOOL USAGE INSTRUCTIONS: | |
| You are operating in a custom environment where tool definitions differ from your training data. | |
| You MUST follow these rules strictly: | |
| 1. DO NOT use your internal training data to guess tool parameters | |
| 2. ONLY use the exact parameter structure defined in the tool schema | |
| 3. Parameter names in schemas are EXACT - do not substitute with similar names from your training (e.g., use 'follow_up' not 'suggested_answers') | |
| 4. Array parameters have specific item types - check the schema's 'items' field for the exact structure | |
| 5. When you see "STRICT PARAMETERS" in a tool description, those type definitions override any assumptions | |
| 6. Tool use in agentic workflows is REQUIRED - you must call tools with the exact parameters specified in the schema | |
| If you are unsure about a tool's parameters, YOU MUST read the schema definition carefully. | |
| """ | |
| # Parallel tool usage encouragement instruction | |
| DEFAULT_PARALLEL_TOOL_INSTRUCTION = """When multiple independent operations are needed, prefer making parallel tool calls in a single response rather than sequential calls across multiple responses. This reduces round-trips and improves efficiency. Only use sequential calls when one tool's output is required as input for another.""" | |
| # ============================================================================= | |
| # HELPER FUNCTIONS | |
| # ============================================================================= | |
| def _generate_request_id() -> str: | |
| """Generate Antigravity request ID: agent-{uuid}""" | |
| return f"agent-{uuid.uuid4()}" | |
| def _generate_session_id() -> str: | |
| """Generate Antigravity session ID: -{random_number}""" | |
| n = random.randint(1_000_000_000_000_000_000, 9_999_999_999_999_999_999) | |
| return f"-{n}" | |
| def _generate_project_id() -> str: | |
| """Generate fake project ID: {adj}-{noun}-{random}""" | |
| adjectives = ["useful", "bright", "swift", "calm", "bold"] | |
| nouns = ["fuze", "wave", "spark", "flow", "core"] | |
| return f"{random.choice(adjectives)}-{random.choice(nouns)}-{uuid.uuid4().hex[:5]}" | |
| def _normalize_type_arrays(schema: Any) -> Any: | |
| """ | |
| Normalize type arrays in JSON Schema for Proto-based Antigravity API. | |
| Converts `"type": ["string", "null"]` → `"type": "string", "nullable": true`. | |
| """ | |
| if isinstance(schema, dict): | |
| normalized = {} | |
| for key, value in schema.items(): | |
| if key == "type" and isinstance(value, list): | |
| types = value | |
| if "null" in types: | |
| normalized["nullable"] = True | |
| remaining_types = [t for t in types if t != "null"] | |
| if len(remaining_types) == 1: | |
| normalized[key] = remaining_types[0] | |
| elif len(remaining_types) > 1: | |
| normalized[key] = remaining_types | |
| # If no types remain, don't add "type" key | |
| else: | |
| normalized[key] = value[0] if len(value) == 1 else value | |
| else: | |
| normalized[key] = _normalize_type_arrays(value) | |
| return normalized | |
| elif isinstance(schema, list): | |
| return [_normalize_type_arrays(item) for item in schema] | |
| return schema | |
| def _recursively_parse_json_strings( | |
| obj: Any, | |
| schema: Optional[Dict[str, Any]] = None, | |
| parse_json_objects: bool = False, | |
| ) -> Any: | |
| """ | |
| Recursively parse JSON strings in nested data structures. | |
| Antigravity sometimes returns tool arguments with JSON-stringified values: | |
| {"files": "[{...}]"} instead of {"files": [{...}]}. | |
| Args: | |
| obj: The object to process | |
| schema: Optional JSON schema for the current level (used for schema-aware parsing) | |
| parse_json_objects: If False (default), don't parse JSON-looking strings into objects. | |
| This prevents corrupting string content like write tool's "content" field. | |
| If True, parse strings that look like JSON objects/arrays. | |
| Additionally handles: | |
| - Malformed double-encoded JSON (extra trailing '}' or ']') - only when parse_json_objects=True | |
| - Escaped string content (\n, \t, etc.) - always processed | |
| """ | |
| if isinstance(obj, dict): | |
| # Get properties schema for looking up field types | |
| properties_schema = schema.get("properties", {}) if schema else {} | |
| return { | |
| k: _recursively_parse_json_strings( | |
| v, | |
| properties_schema.get(k), | |
| parse_json_objects, | |
| ) | |
| for k, v in obj.items() | |
| } | |
| elif isinstance(obj, list): | |
| # Get items schema for array elements | |
| items_schema = schema.get("items") if schema else None | |
| return [ | |
| _recursively_parse_json_strings(item, items_schema, parse_json_objects) | |
| for item in obj | |
| ] | |
| elif isinstance(obj, str): | |
| stripped = obj.strip() | |
| # Check if string contains control character escape sequences that need unescaping | |
| # This handles cases where diff content has literal \n or \t instead of actual newlines/tabs | |
| # | |
| # IMPORTANT: We intentionally do NOT unescape strings containing \" or \\ | |
| # because these are typically intentional escapes in code/config content | |
| # (e.g., JSON embedded in YAML: BOT_NAMES_JSON: '["mirrobot", ...]') | |
| # Unescaping these would corrupt the content and cause issues like | |
| # oldString and newString becoming identical when they should differ. | |
| has_control_char_escapes = "\\n" in obj or "\\t" in obj | |
| has_intentional_escapes = '\\"' in obj or "\\\\" in obj | |
| if has_control_char_escapes and not has_intentional_escapes: | |
| try: | |
| # Use json.loads with quotes to properly unescape the string | |
| # This converts \n -> newline, \t -> tab | |
| unescaped = json.loads(f'"{obj}"') | |
| # Log the fix with a snippet for debugging | |
| snippet = obj[:80] + "..." if len(obj) > 80 else obj | |
| lib_logger.debug( | |
| f"[Antigravity] Unescaped control chars in string: " | |
| f"{len(obj) - len(unescaped)} chars changed. Snippet: {snippet!r}" | |
| ) | |
| return unescaped | |
| except (json.JSONDecodeError, ValueError): | |
| # If unescaping fails, continue with original processing | |
| pass | |
| # Only parse JSON strings if explicitly enabled | |
| if not parse_json_objects: | |
| return obj | |
| # Schema-aware parsing: only parse if schema expects object/array, not string | |
| if schema: | |
| schema_type = schema.get("type") | |
| if schema_type == "string": | |
| # Schema says this should be a string - don't parse it | |
| return obj | |
| # Only parse if schema expects object or array | |
| if schema_type not in ("object", "array", None): | |
| return obj | |
| # Check if it looks like JSON (starts with { or [) | |
| if stripped and stripped[0] in ("{", "["): | |
| # Try standard parsing first | |
| if (stripped.startswith("{") and stripped.endswith("}")) or ( | |
| stripped.startswith("[") and stripped.endswith("]") | |
| ): | |
| try: | |
| parsed = json.loads(obj) | |
| return _recursively_parse_json_strings( | |
| parsed, schema, parse_json_objects | |
| ) | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| # Handle malformed JSON: array that doesn't end with ] | |
| # e.g., '[{"path": "..."}]}' instead of '[{"path": "..."}]' | |
| if stripped.startswith("[") and not stripped.endswith("]"): | |
| try: | |
| # Find the last ] and truncate there | |
| last_bracket = stripped.rfind("]") | |
| if last_bracket > 0: | |
| cleaned = stripped[: last_bracket + 1] | |
| parsed = json.loads(cleaned) | |
| lib_logger.warning( | |
| f"[Antigravity] Auto-corrected malformed JSON string: " | |
| f"truncated {len(stripped) - len(cleaned)} extra chars" | |
| ) | |
| return _recursively_parse_json_strings( | |
| parsed, schema, parse_json_objects | |
| ) | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| # Handle malformed JSON: object that doesn't end with } | |
| if stripped.startswith("{") and not stripped.endswith("}"): | |
| try: | |
| # Find the last } and truncate there | |
| last_brace = stripped.rfind("}") | |
| if last_brace > 0: | |
| cleaned = stripped[: last_brace + 1] | |
| parsed = json.loads(cleaned) | |
| lib_logger.warning( | |
| f"[Antigravity] Auto-corrected malformed JSON string: " | |
| f"truncated {len(stripped) - len(cleaned)} extra chars" | |
| ) | |
| return _recursively_parse_json_strings( | |
| parsed, schema, parse_json_objects | |
| ) | |
| except (json.JSONDecodeError, ValueError): | |
| pass | |
| return obj | |
| def _inline_schema_refs(schema: Dict[str, Any]) -> Dict[str, Any]: | |
| """Inline local $ref definitions before sanitization.""" | |
| if not isinstance(schema, dict): | |
| return schema | |
| defs = schema.get("$defs", schema.get("definitions", {})) | |
| if not defs: | |
| return schema | |
| def resolve(node, seen=()): | |
| if not isinstance(node, dict): | |
| return [resolve(x, seen) for x in node] if isinstance(node, list) else node | |
| if "$ref" in node: | |
| ref = node["$ref"] | |
| if ref in seen: # Circular - drop it | |
| return {k: resolve(v, seen) for k, v in node.items() if k != "$ref"} | |
| for prefix in ("#/$defs/", "#/definitions/"): | |
| if isinstance(ref, str) and ref.startswith(prefix): | |
| name = ref[len(prefix) :] | |
| if name in defs: | |
| return resolve(copy.deepcopy(defs[name]), seen + (ref,)) | |
| return {k: resolve(v, seen) for k, v in node.items() if k != "$ref"} | |
| return {k: resolve(v, seen) for k, v in node.items()} | |
| return resolve(schema) | |
| def _clean_claude_schema(schema: Any, for_gemini: bool = False) -> Any: | |
| """ | |
| Recursively clean JSON Schema for Antigravity/Google's Proto-based API. | |
| Context-aware cleaning: | |
| - Removes unsupported validation keywords at schema-definition level | |
| - Preserves property NAMES even if they match validation keyword names | |
| (e.g., a tool parameter named "pattern" is preserved) | |
| - For Gemini: passes through most keywords including $schema, anyOf, oneOf, const | |
| - For Claude: strips validation keywords, converts anyOf/oneOf to first option, const to enum | |
| - For Gemini: passes through additionalProperties as-is | |
| - For Claude: normalizes permissive additionalProperties to true | |
| """ | |
| if not isinstance(schema, dict): | |
| return schema | |
| # Meta/structural keywords - always remove regardless of context | |
| # These are JSON Schema infrastructure, never valid property names | |
| meta_keywords = { | |
| "$id", | |
| "$ref", | |
| "$defs", | |
| "definitions", | |
| } | |
| # Validation keywords - only remove at schema-definition level, | |
| # NOT when they appear as property names under "properties" | |
| # Note: These are common property names that could be used by tools: | |
| # - "pattern" (glob, grep, regex tools) | |
| # - "format" (export, date/time tools) | |
| # - "default" (config tools) | |
| # - "title" (document tools) | |
| # - "minimum"/"maximum" (range tools) | |
| # | |
| # Keywords to strip for Claude only (Gemini accepts these): | |
| # Claude rejects most JSON Schema validation keywords | |
| validation_keywords_claude_only = { | |
| "$schema", | |
| "minItems", | |
| "maxItems", | |
| "uniqueItems", | |
| "pattern", | |
| "minLength", | |
| "maxLength", | |
| "minimum", | |
| "maximum", | |
| "exclusiveMinimum", | |
| "exclusiveMaximum", | |
| "multipleOf", | |
| "format", | |
| "minProperties", | |
| "maxProperties", | |
| "propertyNames", | |
| "contentEncoding", | |
| "contentMediaType", | |
| "contentSchema", | |
| "deprecated", | |
| "readOnly", | |
| "writeOnly", | |
| "examples", | |
| "title", | |
| "default", | |
| } | |
| # Handle 'anyOf' by taking the first option (Claude doesn't support anyOf) | |
| # Gemini supports anyOf/oneOf, so pass through for Gemini | |
| if not for_gemini: | |
| if "anyOf" in schema and isinstance(schema["anyOf"], list) and schema["anyOf"]: | |
| first_option = _clean_claude_schema(schema["anyOf"][0], for_gemini) | |
| if isinstance(first_option, dict): | |
| return first_option | |
| # Handle 'oneOf' similarly | |
| if "oneOf" in schema and isinstance(schema["oneOf"], list) and schema["oneOf"]: | |
| first_option = _clean_claude_schema(schema["oneOf"][0], for_gemini) | |
| if isinstance(first_option, dict): | |
| return first_option | |
| cleaned = {} | |
| # Handle 'const' by converting to 'enum' with single value (Claude only) | |
| # Gemini supports const, so pass through for Gemini | |
| if "const" in schema and not for_gemini: | |
| const_value = schema["const"] | |
| cleaned["enum"] = [const_value] | |
| for key, value in schema.items(): | |
| # Always skip meta keywords | |
| if key in meta_keywords: | |
| continue | |
| # Skip "const" for Claude (already converted to enum above) | |
| if key == "const" and not for_gemini: | |
| continue | |
| # Strip Claude-only keywords when not targeting Gemini | |
| if key in validation_keywords_claude_only: | |
| if for_gemini: | |
| # Gemini accepts these - preserve them | |
| cleaned[key] = value | |
| # For Claude: skip - not supported | |
| continue | |
| # Special handling for additionalProperties: | |
| # For Gemini: pass through as-is (Gemini accepts {}, true, false, typed schemas) | |
| # For Claude: normalize permissive values ({} or true) to true | |
| if key == "additionalProperties": | |
| if for_gemini: | |
| # Pass through additionalProperties as-is for Gemini | |
| # Gemini accepts: true, false, {}, {"type": "string"}, etc. | |
| cleaned["additionalProperties"] = value | |
| else: | |
| # Claude handling: normalize permissive values to true | |
| if ( | |
| value is True | |
| or value == {} | |
| or (isinstance(value, dict) and not value) | |
| ): | |
| cleaned["additionalProperties"] = True # Normalize {} to true | |
| elif value is False: | |
| cleaned["additionalProperties"] = False | |
| # Skip complex schema values for Claude (e.g., {"type": "string"}) | |
| continue | |
| # Special handling for "properties" - preserve property NAMES | |
| # The keys inside "properties" are user-defined property names, not schema keywords | |
| # We must preserve them even if they match validation keyword names | |
| if key == "properties" and isinstance(value, dict): | |
| cleaned_props = {} | |
| for prop_name, prop_schema in value.items(): | |
| # Log warning if property name matches a validation keyword | |
| # This helps debug potential issues where the old code would have dropped it | |
| if prop_name in validation_keywords_claude_only: | |
| lib_logger.debug( | |
| f"[Schema] Preserving property '{prop_name}' (matches validation keyword name)" | |
| ) | |
| cleaned_props[prop_name] = _clean_claude_schema(prop_schema, for_gemini) | |
| cleaned[key] = cleaned_props | |
| elif isinstance(value, dict): | |
| cleaned[key] = _clean_claude_schema(value, for_gemini) | |
| elif isinstance(value, list): | |
| cleaned[key] = [ | |
| _clean_claude_schema(item, for_gemini) | |
| if isinstance(item, dict) | |
| else item | |
| for item in value | |
| ] | |
| else: | |
| cleaned[key] = value | |
| return cleaned | |
| # ============================================================================= | |
| # FILE LOGGER | |
| # ============================================================================= | |
| class AntigravityFileLogger: | |
| """Transaction file logger for debugging Antigravity requests/responses.""" | |
| __slots__ = ("enabled", "log_dir") | |
| def __init__(self, model_name: str, enabled: bool = True): | |
| self.enabled = enabled | |
| self.log_dir: Optional[Path] = None | |
| if not enabled: | |
| return | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") | |
| safe_model = model_name.replace("/", "_").replace(":", "_") | |
| self.log_dir = ( | |
| _get_antigravity_logs_dir() / f"{timestamp}_{safe_model}_{uuid.uuid4()}" | |
| ) | |
| try: | |
| self.log_dir.mkdir(parents=True, exist_ok=True) | |
| except Exception as e: | |
| lib_logger.error(f"Failed to create log directory: {e}") | |
| self.enabled = False | |
| def log_request(self, payload: Dict[str, Any]) -> None: | |
| """Log the request payload.""" | |
| self._write_json("request_payload.json", payload) | |
| def log_response_chunk(self, chunk: str) -> None: | |
| """Append a raw chunk to the response stream log.""" | |
| self._append_text("response_stream.log", chunk) | |
| def log_error(self, error_message: str) -> None: | |
| """Log an error message.""" | |
| self._append_text( | |
| "error.log", f"[{datetime.utcnow().isoformat()}] {error_message}" | |
| ) | |
| def log_malformed_retry_request( | |
| self, retry_num: int, payload: Dict[str, Any] | |
| ) -> None: | |
| """Log a malformed call retry request payload in the same folder.""" | |
| self._write_json(f"malformed_retry_{retry_num}_request.json", payload) | |
| def log_malformed_retry_response(self, retry_num: int, chunk: str) -> None: | |
| """Append a chunk to the malformed retry response log.""" | |
| self._append_text(f"malformed_retry_{retry_num}_response.log", chunk) | |
| def log_final_response(self, response: Dict[str, Any]) -> None: | |
| """Log the final response.""" | |
| self._write_json("final_response.json", response) | |
| def log_malformed_autofix( | |
| self, tool_name: str, raw_args: str, fixed_json: str | |
| ) -> None: | |
| """Log details of an auto-fixed malformed function call.""" | |
| self._write_json( | |
| "malformed_autofix.json", | |
| { | |
| "tool_name": tool_name, | |
| "raw_args": raw_args, | |
| "fixed_json": fixed_json, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| }, | |
| ) | |
| def _write_json(self, filename: str, data: Dict[str, Any]) -> None: | |
| if not self.enabled or not self.log_dir: | |
| return | |
| try: | |
| with open(self.log_dir / filename, "w", encoding="utf-8") as f: | |
| json.dump(data, f, indent=2, ensure_ascii=False) | |
| except Exception as e: | |
| lib_logger.error(f"Failed to write {filename}: {e}") | |
| def _append_text(self, filename: str, text: str) -> None: | |
| if not self.enabled or not self.log_dir: | |
| return | |
| try: | |
| with open(self.log_dir / filename, "a", encoding="utf-8") as f: | |
| f.write(text + "\n") | |
| except Exception as e: | |
| lib_logger.error(f"Failed to append to {filename}: {e}") | |
| # ============================================================================= | |
| # MAIN PROVIDER CLASS | |
| # ============================================================================= | |
| class AntigravityProvider( | |
| AntigravityAuthBase, ProviderInterface, AntigravityQuotaTracker | |
| ): | |
| """ | |
| Antigravity provider for Gemini and Claude models via Google's internal API. | |
| Supports: | |
| - Gemini 2.5 (Pro/Flash) with thinkingBudget | |
| - Gemini 3 (Pro/Flash/Image) with thinkingLevel | |
| - Claude Sonnet 4.5 via Antigravity proxy | |
| - Claude Opus 4.5 via Antigravity proxy | |
| Features: | |
| - Unified streaming/non-streaming handling | |
| - ThoughtSignature caching for multi-turn conversations | |
| - Automatic base URL fallback | |
| - Gemini 3 tool hallucination prevention | |
| """ | |
| skip_cost_calculation = True | |
| # Sequential mode by default - preserves thinking signature caches between requests | |
| default_rotation_mode: str = "sequential" | |
| # ========================================================================= | |
| # TIER & USAGE CONFIGURATION | |
| # ========================================================================= | |
| # Provider name for env var lookups (QUOTA_GROUPS_ANTIGRAVITY_*) | |
| provider_env_name: str = "antigravity" | |
| # Tier name -> priority mapping (Single Source of Truth) | |
| # Lower numbers = higher priority | |
| tier_priorities = { | |
| # Priority 1: Highest paid tier (Google AI Ultra - name unconfirmed) | |
| # "google-ai-ultra": 1, # Uncomment when tier name is confirmed | |
| # Priority 2: Standard paid tier | |
| "standard-tier": 2, | |
| # Priority 3: Free tier | |
| "free-tier": 3, | |
| # Priority 10: Legacy/Unknown (lowest) | |
| "legacy-tier": 10, | |
| "unknown": 10, | |
| } | |
| # Default priority for tiers not in the mapping | |
| default_tier_priority: int = 10 | |
| # Usage reset configs keyed by priority sets | |
| # Priorities 1-2 (paid tiers) get 5h window, others get 7d window | |
| usage_reset_configs = { | |
| frozenset({1, 2}): UsageResetConfigDef( | |
| window_seconds=5 * 60 * 60, # 5 hours | |
| mode="per_model", | |
| description="5-hour per-model window (paid tier)", | |
| field_name="models", | |
| ), | |
| "default": UsageResetConfigDef( | |
| window_seconds=7 * 24 * 60 * 60, # 7 days | |
| mode="per_model", | |
| description="7-day per-model window (free/unknown tier)", | |
| field_name="models", | |
| ), | |
| } | |
| # Model quota groups (can be overridden via QUOTA_GROUPS_ANTIGRAVITY_CLAUDE) | |
| # Models in the same group share quota - when one is exhausted, all are | |
| # Based on empirical testing - see docs/ANTIGRAVITY_QUOTA_REPORT.md | |
| # Note: -thinking variants are included since they share the same quota pool | |
| # (users call non-thinking names, proxy maps to -thinking internally) | |
| model_quota_groups: QuotaGroupMap = { | |
| # Claude and GPT-OSS share the same quota pool | |
| "claude": [ | |
| "claude-sonnet-4-5", | |
| "claude-sonnet-4-5-thinking", | |
| "claude-opus-4-5", | |
| "claude-opus-4-5-thinking", | |
| "gpt-oss-120b-medium", | |
| ], | |
| # Gemini 3 Pro variants share quota | |
| "gemini-3-pro": [ | |
| "gemini-3-pro-high", | |
| "gemini-3-pro-low", | |
| "gemini-3-pro-preview", | |
| ], | |
| # Gemini 3 Flash (standalone, may share with 2.5 Flash - needs verification) | |
| "gemini-3-flash": [ | |
| "gemini-3-flash", | |
| ], | |
| # Gemini 2.5 Flash variants share quota | |
| "gemini-2.5-flash": [ | |
| "gemini-2.5-flash", | |
| "gemini-2.5-flash-thinking", | |
| "gemini-2.5-flash-lite", | |
| ], | |
| } | |
| # Model usage weights for grouped usage calculation | |
| # Opus consumes more quota per request, so its usage counts 2x when | |
| # comparing credentials for selection | |
| model_usage_weights = {} | |
| # Priority-based concurrency multipliers | |
| # Higher priority credentials (lower number) get higher multipliers | |
| # Priority 1 (paid ultra): 5x concurrent requests | |
| # Priority 2 (standard paid): 3x concurrent requests | |
| # Others: Use sequential fallback (2x) or balanced default (1x) | |
| default_priority_multipliers = {1: 5, 2: 3} | |
| # For sequential mode, lower priority tiers still get 2x to maintain stickiness | |
| # For balanced mode, this doesn't apply (falls back to 1x) | |
| default_sequential_fallback_multiplier = 2 | |
| def parse_quota_error( | |
| error: Exception, error_body: Optional[str] = None | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Parse Antigravity/Google RPC quota errors. | |
| Handles the Google Cloud API error format with ErrorInfo and RetryInfo details. | |
| Example error format: | |
| { | |
| "error": { | |
| "code": 429, | |
| "details": [ | |
| { | |
| "@type": "type.googleapis.com/google.rpc.ErrorInfo", | |
| "reason": "QUOTA_EXHAUSTED", | |
| "metadata": { | |
| "quotaResetDelay": "143h4m52.730699158s", | |
| "quotaResetTimeStamp": "2025-12-11T22:53:16Z" | |
| } | |
| }, | |
| { | |
| "@type": "type.googleapis.com/google.rpc.RetryInfo", | |
| "retryDelay": "515092.730699158s" | |
| } | |
| ] | |
| } | |
| } | |
| Args: | |
| error: The caught exception | |
| error_body: Optional raw response body string | |
| Returns: | |
| None if not a parseable quota error, otherwise: | |
| { | |
| "retry_after": int, | |
| "reason": str, | |
| "reset_timestamp": str | None, | |
| } | |
| """ | |
| import re as regex_module | |
| def parse_duration(duration_str: str) -> Optional[int]: | |
| """Parse duration strings like '143h4m52.73s' or '515092.73s' to seconds. | |
| Also handles millisecond format: '290.979975ms' -> 0 seconds (rounded). | |
| Returns 0 for sub-second durations (not None), as 0 is a valid value. | |
| """ | |
| if not duration_str: | |
| return None | |
| # Handle pure milliseconds format: "290.979975ms" | |
| # MUST check this BEFORE checking 'm' for minutes to avoid misinterpreting 'ms' | |
| ms_match = regex_module.match(r"^([\d.]+)ms$", duration_str) | |
| if ms_match: | |
| ms_value = float(ms_match.group(1)) | |
| # Convert milliseconds to seconds, round up to at least 1 if > 0 | |
| seconds = ms_value / 1000.0 | |
| return max(1, int(seconds)) if seconds > 0 else 0 | |
| # Handle pure seconds format: "515092.730699158s" or "0.290979975s" | |
| pure_seconds_match = regex_module.match(r"^([\d.]+)s$", duration_str) | |
| if pure_seconds_match: | |
| seconds = float(pure_seconds_match.group(1)) | |
| # For sub-second values, round up to 1 to avoid immediate retry floods | |
| return max(1, int(seconds)) if seconds > 0 else 0 | |
| # Handle compound format: "143h4m52.730699158s" | |
| # Note: 'm' here means minutes, not milliseconds (ms is handled above) | |
| total_seconds = 0.0 | |
| patterns = [ | |
| (r"(\d+)h", 3600), # hours | |
| ( | |
| r"(\d+)m(?!s)", | |
| 60, | |
| ), # minutes - negative lookahead to avoid matching 'ms' | |
| ( | |
| r"([\d.]+)s$", | |
| 1, | |
| ), # seconds - anchor to end to avoid matching 's' in 'ms' | |
| ] | |
| for pattern, multiplier in patterns: | |
| match = regex_module.search(pattern, duration_str) | |
| if match: | |
| total_seconds += float(match.group(1)) * multiplier | |
| # Return 0 explicitly for very small values (it's valid, not "no value") | |
| if total_seconds > 0: | |
| return max(1, int(total_seconds)) | |
| return None | |
| # Get error body from exception if not provided | |
| body = error_body | |
| if not body: | |
| # Try to extract from various exception attributes | |
| if hasattr(error, "response") and hasattr(error.response, "text"): | |
| body = error.response.text | |
| elif hasattr(error, "body"): | |
| body = str(error.body) | |
| elif hasattr(error, "message"): | |
| body = str(error.message) | |
| else: | |
| body = str(error) | |
| # Try to find JSON in the body | |
| try: | |
| # Handle cases where JSON is embedded in a larger string | |
| json_match = regex_module.search(r"\{[\s\S]*\}", body) | |
| if not json_match: | |
| return None | |
| data = json.loads(json_match.group(0)) | |
| except (json.JSONDecodeError, AttributeError, TypeError): | |
| return None | |
| # Navigate to error.details | |
| error_obj = data.get("error", data) | |
| details = error_obj.get("details", []) | |
| result = { | |
| "retry_after": None, | |
| "reason": None, | |
| "reset_timestamp": None, | |
| "quota_reset_timestamp": None, # Unix timestamp for quota reset | |
| } | |
| for detail in details: | |
| detail_type = detail.get("@type", "") | |
| # Parse RetryInfo - most authoritative source for retry delay | |
| if "RetryInfo" in detail_type: | |
| retry_delay = detail.get("retryDelay") | |
| if retry_delay: | |
| parsed = parse_duration(retry_delay) | |
| if parsed is not None: # 0 is valid, only None means "no value" | |
| result["retry_after"] = parsed | |
| # Parse ErrorInfo - contains reason and quota reset metadata | |
| elif "ErrorInfo" in detail_type: | |
| result["reason"] = detail.get("reason") | |
| metadata = detail.get("metadata", {}) | |
| # Get quotaResetDelay as fallback if RetryInfo not present | |
| if result["retry_after"] is None: | |
| quota_delay = metadata.get("quotaResetDelay") | |
| if quota_delay: | |
| parsed = parse_duration(quota_delay) | |
| if parsed is not None: # 0 is valid, only None means "no value" | |
| result["retry_after"] = parsed | |
| # Capture reset timestamp for logging and authoritative reset time | |
| reset_ts_str = metadata.get("quotaResetTimeStamp") | |
| result["reset_timestamp"] = reset_ts_str | |
| # Parse ISO timestamp to Unix timestamp for usage tracking | |
| if reset_ts_str: | |
| try: | |
| # Handle ISO format: "2025-12-11T22:53:16Z" | |
| reset_dt = datetime.fromisoformat( | |
| reset_ts_str.replace("Z", "+00:00") | |
| ) | |
| result["quota_reset_timestamp"] = reset_dt.timestamp() | |
| except (ValueError, AttributeError) as e: | |
| lib_logger.warning( | |
| f"Failed to parse quota reset timestamp '{reset_ts_str}': {e}" | |
| ) | |
| # Return None if we couldn't extract retry_after | |
| if result["retry_after"] is None: | |
| # Bare RESOURCE_EXHAUSTED without timing details | |
| # Return None to signal transient error (caller will retry internally) | |
| return None | |
| return result | |
| def __init__(self): | |
| super().__init__() | |
| self.model_definitions = ModelDefinitions() | |
| # NOTE: project_id_cache and project_tier_cache are inherited from AntigravityAuthBase | |
| # Base URL management | |
| self._base_url_index = 0 | |
| self._current_base_url = BASE_URLS[0] | |
| # Configuration from environment | |
| memory_ttl = _env_int("ANTIGRAVITY_SIGNATURE_CACHE_TTL", 3600) | |
| disk_ttl = _env_int("ANTIGRAVITY_SIGNATURE_DISK_TTL", 86400) | |
| # Initialize caches using shared ProviderCache | |
| self._signature_cache = ProviderCache( | |
| _get_gemini3_signature_cache_file(), | |
| memory_ttl, | |
| disk_ttl, | |
| env_prefix="ANTIGRAVITY_SIGNATURE", | |
| ) | |
| self._thinking_cache = ProviderCache( | |
| _get_claude_thinking_cache_file(), | |
| memory_ttl, | |
| disk_ttl, | |
| env_prefix="ANTIGRAVITY_THINKING", | |
| ) | |
| # Quota tracking state | |
| self._learned_costs: Dict[str, Dict[str, float]] = {} # tier -> model -> cost | |
| self._learned_costs_loaded: bool = False | |
| self._quota_refresh_interval = _env_int( | |
| "ANTIGRAVITY_QUOTA_REFRESH_INTERVAL", 300 | |
| ) # 5 min | |
| self._initial_quota_fetch_done: bool = ( | |
| False # Track if initial full fetch completed | |
| ) | |
| # Feature flags | |
| self._preserve_signatures_in_client = _env_bool( | |
| "ANTIGRAVITY_PRESERVE_THOUGHT_SIGNATURES", True | |
| ) | |
| self._enable_signature_cache = _env_bool( | |
| "ANTIGRAVITY_ENABLE_SIGNATURE_CACHE", True | |
| ) | |
| self._enable_dynamic_models = _env_bool( | |
| "ANTIGRAVITY_ENABLE_DYNAMIC_MODELS", False | |
| ) | |
| self._enable_gemini3_tool_fix = _env_bool("ANTIGRAVITY_GEMINI3_TOOL_FIX", True) | |
| self._enable_claude_tool_fix = _env_bool("ANTIGRAVITY_CLAUDE_TOOL_FIX", False) | |
| self._enable_thinking_sanitization = _env_bool( | |
| "ANTIGRAVITY_CLAUDE_THINKING_SANITIZATION", True | |
| ) | |
| # Gemini 3 tool fix configuration | |
| self._gemini3_tool_prefix = os.getenv( | |
| "ANTIGRAVITY_GEMINI3_TOOL_PREFIX", "gemini3_" | |
| ) | |
| self._gemini3_description_prompt = os.getenv( | |
| "ANTIGRAVITY_GEMINI3_DESCRIPTION_PROMPT", | |
| "\n\n⚠️ STRICT PARAMETERS (use EXACTLY as shown): {params}. Do NOT use parameters from your training data - use ONLY these parameter names.", | |
| ) | |
| self._gemini3_enforce_strict_schema = _env_bool( | |
| "ANTIGRAVITY_GEMINI3_STRICT_SCHEMA", True | |
| ) | |
| # Toggle for JSON string parsing in tool call arguments | |
| # NOTE: This is possibly redundant - modern Gemini models may not need this fix. | |
| # Disabled by default. Enable if you see JSON-stringified values in tool args. | |
| self._enable_json_string_parsing = _env_bool( | |
| "ANTIGRAVITY_ENABLE_JSON_STRING_PARSING", True | |
| ) | |
| self._gemini3_system_instruction = os.getenv( | |
| "ANTIGRAVITY_GEMINI3_SYSTEM_INSTRUCTION", DEFAULT_GEMINI3_SYSTEM_INSTRUCTION | |
| ) | |
| # Claude tool fix configuration (separate from Gemini 3) | |
| self._claude_description_prompt = os.getenv( | |
| "ANTIGRAVITY_CLAUDE_DESCRIPTION_PROMPT", "\n\nSTRICT PARAMETERS: {params}." | |
| ) | |
| self._claude_system_instruction = os.getenv( | |
| "ANTIGRAVITY_CLAUDE_SYSTEM_INSTRUCTION", DEFAULT_CLAUDE_SYSTEM_INSTRUCTION | |
| ) | |
| # Parallel tool usage instruction configuration | |
| self._enable_parallel_tool_instruction_claude = _env_bool( | |
| "ANTIGRAVITY_PARALLEL_TOOL_INSTRUCTION_CLAUDE", | |
| True, # ON for Claude | |
| ) | |
| self._enable_parallel_tool_instruction_gemini3 = _env_bool( | |
| "ANTIGRAVITY_PARALLEL_TOOL_INSTRUCTION_GEMINI3", | |
| False, # OFF for Gemini 3 | |
| ) | |
| self._parallel_tool_instruction = os.getenv( | |
| "ANTIGRAVITY_PARALLEL_TOOL_INSTRUCTION", DEFAULT_PARALLEL_TOOL_INSTRUCTION | |
| ) | |
| # Log configuration | |
| self._log_config() | |
| def _log_config(self) -> None: | |
| """Log provider configuration.""" | |
| lib_logger.debug( | |
| f"Antigravity config: signatures_in_client={self._preserve_signatures_in_client}, " | |
| f"cache={self._enable_signature_cache}, dynamic_models={self._enable_dynamic_models}, " | |
| f"gemini3_fix={self._enable_gemini3_tool_fix}, gemini3_strict_schema={self._gemini3_enforce_strict_schema}, " | |
| f"claude_fix={self._enable_claude_tool_fix}, thinking_sanitization={self._enable_thinking_sanitization}, " | |
| f"parallel_tool_claude={self._enable_parallel_tool_instruction_claude}, " | |
| f"parallel_tool_gemini3={self._enable_parallel_tool_instruction_gemini3}" | |
| ) | |
| def _get_antigravity_headers(self) -> Dict[str, str]: | |
| """Return the Antigravity API headers. Used by quota tracker mixin.""" | |
| return ANTIGRAVITY_HEADERS | |
| def _load_tier_from_file(self, credential_path: str) -> Optional[str]: | |
| """ | |
| Load tier from credential file's _proxy_metadata and cache it. | |
| This is used as a fallback when the tier isn't in the memory cache, | |
| typically on first access before initialize_credentials() has run. | |
| Args: | |
| credential_path: Path to the credential file | |
| Returns: | |
| Tier string if found, None otherwise | |
| """ | |
| # Skip env:// paths (environment-based credentials) | |
| if self._parse_env_credential_path(credential_path) is not None: | |
| return None | |
| try: | |
| with open(credential_path, "r") as f: | |
| creds = json.load(f) | |
| metadata = creds.get("_proxy_metadata", {}) | |
| tier = metadata.get("tier") | |
| project_id = metadata.get("project_id") | |
| if tier: | |
| self.project_tier_cache[credential_path] = tier | |
| lib_logger.debug( | |
| f"Lazy-loaded tier '{tier}' for credential: {Path(credential_path).name}" | |
| ) | |
| if project_id and credential_path not in self.project_id_cache: | |
| self.project_id_cache[credential_path] = project_id | |
| return tier | |
| except (FileNotFoundError, json.JSONDecodeError, KeyError) as e: | |
| lib_logger.debug(f"Could not lazy-load tier from {credential_path}: {e}") | |
| return None | |
| def get_credential_tier_name(self, credential: str) -> Optional[str]: | |
| """ | |
| Returns the human-readable tier name for a credential. | |
| Args: | |
| credential: The credential path | |
| Returns: | |
| Tier name string (e.g., "free-tier") or None if unknown | |
| """ | |
| tier = self.project_tier_cache.get(credential) | |
| if not tier: | |
| tier = self._load_tier_from_file(credential) | |
| return tier | |
| def get_model_tier_requirement(self, model: str) -> Optional[int]: | |
| """ | |
| Returns the minimum priority tier required for a model. | |
| Antigravity has no model-tier restrictions - all models work on all tiers. | |
| Args: | |
| model: The model name (with or without provider prefix) | |
| Returns: | |
| None - no restrictions for any model | |
| """ | |
| return None | |
| async def initialize_credentials(self, credential_paths: List[str]) -> None: | |
| """ | |
| Load persisted tier information from credential files at startup. | |
| This ensures all credential priorities are known before any API calls, | |
| preventing unknown credentials from getting priority 999. | |
| For credentials without persisted tier info (new or corrupted), performs | |
| full discovery to ensure proper prioritization in sequential rotation mode. | |
| """ | |
| # Step 1: Load persisted tiers from files | |
| await self._load_persisted_tiers(credential_paths) | |
| # Step 2: Identify credentials still missing tier info | |
| credentials_needing_discovery = [ | |
| path | |
| for path in credential_paths | |
| if path not in self.project_tier_cache | |
| and self._parse_env_credential_path(path) is None # Skip env:// paths | |
| ] | |
| if not credentials_needing_discovery: | |
| return # All credentials have tier info | |
| lib_logger.info( | |
| f"Antigravity: Discovering tier info for {len(credentials_needing_discovery)} credential(s)..." | |
| ) | |
| # Step 3: Perform discovery for each missing credential (sequential to avoid rate limits) | |
| for credential_path in credentials_needing_discovery: | |
| try: | |
| auth_header = await self.get_auth_header(credential_path) | |
| access_token = auth_header["Authorization"].split(" ")[1] | |
| await self._discover_project_id( | |
| credential_path, access_token, litellm_params={} | |
| ) | |
| discovered_tier = self.project_tier_cache.get( | |
| credential_path, "unknown" | |
| ) | |
| lib_logger.debug( | |
| f"Discovered tier '{discovered_tier}' for {Path(credential_path).name}" | |
| ) | |
| except Exception as e: | |
| lib_logger.warning( | |
| f"Failed to discover tier for {Path(credential_path).name}: {e}. " | |
| f"Credential will use default priority." | |
| ) | |
| # ========================================================================= | |
| # BACKGROUND JOB INTERFACE | |
| # ========================================================================= | |
| def get_background_job_config(self) -> Optional[Dict[str, Any]]: | |
| """ | |
| Return background job configuration for quota baseline refresh. | |
| The quota baseline refresh fetches current quota status from the API | |
| and stores it in UsageManager for accurate quota estimation. | |
| """ | |
| return { | |
| "interval": self._quota_refresh_interval, # default 300s (5 min) | |
| "name": "quota_baseline_refresh", | |
| "run_on_start": True, # fetch baselines immediately at startup | |
| } | |
| async def run_background_job( | |
| self, | |
| usage_manager: "UsageManager", | |
| credentials: List[str], | |
| ) -> None: | |
| """ | |
| Refresh quota baselines for credentials. | |
| On first run (startup): Fetches quota for ALL credentials to establish baselines. | |
| On subsequent runs: Only fetches for credentials used since last refresh. | |
| Fetches current quota status from the Antigravity API and stores | |
| the baselines in UsageManager for accurate quota estimation. | |
| """ | |
| if not self._initial_quota_fetch_done: | |
| # First run: fetch ALL credentials to establish baselines | |
| lib_logger.info( | |
| f"Antigravity: Fetching initial quota baselines for {len(credentials)} credentials..." | |
| ) | |
| quota_results = await self.fetch_initial_baselines(credentials) | |
| self._initial_quota_fetch_done = True | |
| else: | |
| # Subsequent runs: only recently used credentials (incremental updates) | |
| usage_data = await usage_manager._get_usage_data_snapshot() | |
| quota_results = await self.refresh_active_quota_baselines( | |
| credentials, usage_data | |
| ) | |
| if not quota_results: | |
| return | |
| # Store new baselines in UsageManager | |
| stored = await self._store_baselines_to_usage_manager( | |
| quota_results, usage_manager | |
| ) | |
| if stored > 0: | |
| lib_logger.debug( | |
| f"Antigravity quota refresh: updated {stored} model baselines" | |
| ) | |
| async def _load_persisted_tiers( | |
| self, credential_paths: List[str] | |
| ) -> Dict[str, str]: | |
| """ | |
| Load persisted tier information from credential files into memory cache. | |
| Args: | |
| credential_paths: List of credential file paths | |
| Returns: | |
| Dict mapping credential path to tier name for logging purposes | |
| """ | |
| loaded = {} | |
| for path in credential_paths: | |
| # Skip env:// paths (environment-based credentials) | |
| if self._parse_env_credential_path(path) is not None: | |
| continue | |
| # Skip if already in cache | |
| if path in self.project_tier_cache: | |
| continue | |
| try: | |
| with open(path, "r") as f: | |
| creds = json.load(f) | |
| metadata = creds.get("_proxy_metadata", {}) | |
| tier = metadata.get("tier") | |
| project_id = metadata.get("project_id") | |
| if tier: | |
| self.project_tier_cache[path] = tier | |
| loaded[path] = tier | |
| lib_logger.debug( | |
| f"Loaded persisted tier '{tier}' for credential: {Path(path).name}" | |
| ) | |
| if project_id: | |
| self.project_id_cache[path] = project_id | |
| except (FileNotFoundError, json.JSONDecodeError, KeyError) as e: | |
| lib_logger.debug(f"Could not load persisted tier from {path}: {e}") | |
| if loaded: | |
| # Log summary at debug level | |
| tier_counts: Dict[str, int] = {} | |
| for tier in loaded.values(): | |
| tier_counts[tier] = tier_counts.get(tier, 0) + 1 | |
| lib_logger.debug( | |
| f"Antigravity: Loaded {len(loaded)} credential tiers from disk: " | |
| + ", ".join( | |
| f"{tier}={count}" for tier, count in sorted(tier_counts.items()) | |
| ) | |
| ) | |
| return loaded | |
| # NOTE: _post_auth_discovery() is inherited from AntigravityAuthBase | |
| # ========================================================================= | |
| # MODEL UTILITIES | |
| # ========================================================================= | |
| def _alias_to_internal(self, alias: str) -> str: | |
| """Convert public alias to internal model name.""" | |
| return MODEL_ALIAS_REVERSE.get(alias, alias) | |
| def _internal_to_alias(self, internal: str) -> str: | |
| """Convert internal model name to public alias.""" | |
| if internal in EXCLUDED_MODELS: | |
| return "" | |
| return MODEL_ALIAS_MAP.get(internal, internal) | |
| def _is_gemini_3(self, model: str) -> bool: | |
| """Check if model is Gemini 3 (requires special handling).""" | |
| internal = self._alias_to_internal(model) | |
| return internal.startswith("gemini-3-") or model.startswith("gemini-3-") | |
| def _is_claude(self, model: str) -> bool: | |
| """Check if model is Claude.""" | |
| return "claude" in model.lower() | |
| def _strip_provider_prefix(self, model: str) -> str: | |
| """Strip provider prefix from model name.""" | |
| return model.split("/")[-1] if "/" in model else model | |
| # ========================================================================= | |
| # BASE URL MANAGEMENT | |
| # ========================================================================= | |
| def _get_base_url(self) -> str: | |
| """Get current base URL.""" | |
| return self._current_base_url | |
| def _get_available_models(self) -> List[str]: | |
| """ | |
| Get list of user-facing model names available via this provider. | |
| Used by quota tracker to filter which models to store baselines for. | |
| Only models in this list will have quota baselines tracked. | |
| Returns: | |
| List of user-facing model names (e.g., ["claude-sonnet-4-5", "claude-opus-4-5"]) | |
| """ | |
| return AVAILABLE_MODELS | |
| def _try_next_base_url(self) -> bool: | |
| """Switch to next base URL in fallback list. Returns True if successful.""" | |
| if self._base_url_index < len(BASE_URLS) - 1: | |
| self._base_url_index += 1 | |
| self._current_base_url = BASE_URLS[self._base_url_index] | |
| lib_logger.info(f"Switching to fallback URL: {self._current_base_url}") | |
| return True | |
| return False | |
| def _reset_base_url(self) -> None: | |
| """Reset to primary base URL.""" | |
| self._base_url_index = 0 | |
| self._current_base_url = BASE_URLS[0] | |
| # ========================================================================= | |
| # THINKING CACHE KEY GENERATION | |
| # ========================================================================= | |
| def _generate_thinking_cache_key( | |
| self, text_content: str, tool_calls: List[Dict] | |
| ) -> Optional[str]: | |
| """ | |
| Generate stable cache key from response content for Claude thinking preservation. | |
| Uses composite key: | |
| - Tool call IDs (most stable) | |
| - Text hash (for text-only responses) | |
| """ | |
| key_parts = [] | |
| if tool_calls: | |
| first_id = tool_calls[0].get("id", "") | |
| if first_id: | |
| key_parts.append(f"tool_{first_id.replace('call_', '')}") | |
| if text_content: | |
| text_hash = hashlib.md5(text_content[:200].encode()).hexdigest()[:16] | |
| key_parts.append(f"text_{text_hash}") | |
| return "thinking_" + "_".join(key_parts) if key_parts else None | |
| # NOTE: _discover_project_id() and _persist_project_metadata() are inherited from AntigravityAuthBase | |
| # ========================================================================= | |
| # THINKING MODE SANITIZATION | |
| # ========================================================================= | |
| def _analyze_conversation_state( | |
| self, messages: List[Dict[str, Any]] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze conversation state to detect tool use loops and thinking mode issues. | |
| Key insight: A "turn" can span multiple assistant messages in a tool-use loop. | |
| We need to find the TURN START (first assistant message after last real user message) | |
| and check if THAT message had thinking, not just the last assistant message. | |
| Returns: | |
| { | |
| "in_tool_loop": bool - True if we're in an incomplete tool use loop | |
| "turn_start_idx": int - Index of first model message in current turn | |
| "turn_has_thinking": bool - Whether the TURN started with thinking | |
| "last_model_idx": int - Index of last model message | |
| "last_model_has_thinking": bool - Whether last model msg has thinking | |
| "last_model_has_tool_calls": bool - Whether last model msg has tool calls | |
| "pending_tool_results": bool - Whether there are tool results after last model | |
| "thinking_block_indices": List[int] - Indices of messages with thinking/reasoning | |
| } | |
| NOTE: This now operates on Gemini-format messages (after transformation): | |
| - Role "model" instead of "assistant" | |
| - Role "user" for both user messages AND tool results (with functionResponse) | |
| - "parts" array with "thought": true for thinking | |
| - "parts" array with "functionCall" for tool calls | |
| - "parts" array with "functionResponse" for tool results | |
| """ | |
| state = { | |
| "in_tool_loop": False, | |
| "turn_start_idx": -1, | |
| "turn_has_thinking": False, | |
| "last_assistant_idx": -1, # Keep name for compatibility | |
| "last_assistant_has_thinking": False, | |
| "last_assistant_has_tool_calls": False, | |
| "pending_tool_results": False, | |
| "thinking_block_indices": [], | |
| } | |
| # First pass: Find the last "real" user message (not a tool result) | |
| # In Gemini format, tool results are "user" role with functionResponse parts | |
| last_real_user_idx = -1 | |
| for i, msg in enumerate(messages): | |
| role = msg.get("role") | |
| if role == "user": | |
| # Check if this is a real user message or a tool result container | |
| parts = msg.get("parts", []) | |
| is_tool_result_msg = any( | |
| isinstance(p, dict) and "functionResponse" in p for p in parts | |
| ) | |
| if not is_tool_result_msg: | |
| last_real_user_idx = i | |
| # Second pass: Analyze conversation and find turn boundaries | |
| for i, msg in enumerate(messages): | |
| role = msg.get("role") | |
| if role == "model": | |
| # Check for thinking/reasoning content (Gemini format) | |
| has_thinking = self._message_has_thinking(msg) | |
| # Check for tool calls (functionCall in parts) | |
| parts = msg.get("parts", []) | |
| has_tool_calls = any( | |
| isinstance(p, dict) and "functionCall" in p for p in parts | |
| ) | |
| # Track if this is the turn start | |
| if i > last_real_user_idx and state["turn_start_idx"] == -1: | |
| state["turn_start_idx"] = i | |
| state["turn_has_thinking"] = has_thinking | |
| state["last_assistant_idx"] = i | |
| state["last_assistant_has_tool_calls"] = has_tool_calls | |
| state["last_assistant_has_thinking"] = has_thinking | |
| if has_thinking: | |
| state["thinking_block_indices"].append(i) | |
| elif role == "user": | |
| # Check if this is a tool result (functionResponse in parts) | |
| parts = msg.get("parts", []) | |
| is_tool_result = any( | |
| isinstance(p, dict) and "functionResponse" in p for p in parts | |
| ) | |
| if is_tool_result and state["last_assistant_has_tool_calls"]: | |
| state["pending_tool_results"] = True | |
| # We're in a tool loop if: | |
| # 1. There are pending tool results | |
| # 2. The conversation ends with tool results (last message is user with functionResponse) | |
| if state["pending_tool_results"] and messages: | |
| last_msg = messages[-1] | |
| if last_msg.get("role") == "user": | |
| parts = last_msg.get("parts", []) | |
| ends_with_tool_result = any( | |
| isinstance(p, dict) and "functionResponse" in p for p in parts | |
| ) | |
| if ends_with_tool_result: | |
| state["in_tool_loop"] = True | |
| return state | |
| def _message_has_thinking(self, msg: Dict[str, Any]) -> bool: | |
| """ | |
| Check if a message contains thinking/reasoning content. | |
| Handles GEMINI format (after transformation): | |
| - "parts" array with items having "thought": true | |
| """ | |
| parts = msg.get("parts", []) | |
| for part in parts: | |
| if isinstance(part, dict) and part.get("thought") is True: | |
| return True | |
| return False | |
| def _message_has_tool_calls(self, msg: Dict[str, Any]) -> bool: | |
| """Check if a message contains tool calls (Gemini format).""" | |
| parts = msg.get("parts", []) | |
| return any(isinstance(p, dict) and "functionCall" in p for p in parts) | |
| def _sanitize_thinking_for_claude( | |
| self, messages: List[Dict[str, Any]], thinking_enabled: bool | |
| ) -> Tuple[List[Dict[str, Any]], bool]: | |
| """ | |
| Sanitize thinking blocks in conversation history for Claude compatibility. | |
| Handles the following scenarios per Claude docs: | |
| 1. If thinking is disabled, remove all thinking blocks from conversation | |
| 2. If thinking is enabled: | |
| a. In a tool use loop WITH thinking: preserve it (same mode continues) | |
| b. In a tool use loop WITHOUT thinking: this is INVALID toggle - force disable | |
| c. Not in tool loop: strip old thinking, new response adds thinking naturally | |
| Per Claude docs: | |
| - "If thinking is enabled, the final assistant turn must start with a thinking block" | |
| - "If thinking is disabled, the final assistant turn must not contain any thinking blocks" | |
| - Tool use loops are part of a single assistant turn | |
| - You CANNOT toggle thinking mid-turn | |
| The key insight: We only force-disable thinking when TOGGLING it ON mid-turn. | |
| If thinking was already enabled (assistant has thinking), we preserve. | |
| If thinking was disabled (assistant has no thinking), enabling it now is invalid. | |
| Returns: | |
| Tuple of (sanitized_messages, force_disable_thinking) | |
| - sanitized_messages: The cleaned message list | |
| - force_disable_thinking: If True, thinking must be disabled for this request | |
| """ | |
| messages = copy.deepcopy(messages) | |
| state = self._analyze_conversation_state(messages) | |
| lib_logger.debug( | |
| f"[Thinking Sanitization] thinking_enabled={thinking_enabled}, " | |
| f"in_tool_loop={state['in_tool_loop']}, " | |
| f"turn_has_thinking={state['turn_has_thinking']}, " | |
| f"turn_start_idx={state['turn_start_idx']}, " | |
| f"last_assistant_has_thinking={state['last_assistant_has_thinking']}, " | |
| f"last_assistant_has_tool_calls={state['last_assistant_has_tool_calls']}" | |
| ) | |
| if not thinking_enabled: | |
| # CASE 1: Thinking is disabled - strip ALL thinking blocks | |
| return self._strip_all_thinking_blocks(messages), False | |
| # CASE 2: Thinking is enabled | |
| if state["in_tool_loop"]: | |
| # We're in a tool use loop (conversation ends with tool_result) | |
| # Per Claude docs: entire assistant turn must operate in single thinking mode | |
| # | |
| # KEY FIX: Check turn_has_thinking (thinking at turn START), not last_assistant_has_thinking. | |
| # In multi-message tool loops, thinking is at the FIRST assistant message of the turn, | |
| # not necessarily the last one (which might just have tool_calls). | |
| if state["turn_has_thinking"]: | |
| # The TURN started with thinking - this is valid! | |
| # Thinking was enabled when tool was called, continue with thinking enabled. | |
| # Preserve thinking for the turn start message. | |
| lib_logger.debug( | |
| "[Thinking Sanitization] Tool loop with thinking at turn start - preserving. " | |
| f"turn_start_idx={state['turn_start_idx']}, last_assistant_idx={state['last_assistant_idx']}" | |
| ) | |
| return self._preserve_turn_start_thinking( | |
| messages, state["turn_start_idx"] | |
| ), False | |
| else: | |
| # The TURN did NOT start with thinking, but thinking is NOW enabled | |
| # This is the INVALID case: toggling thinking ON mid-turn | |
| # | |
| # Per Claude docs, this causes: | |
| # "Expected `thinking` or `redacted_thinking`, but found `tool_use`." | |
| # | |
| # There are TWO possible scenarios: | |
| # 1. Original turn was made WITHOUT thinking (e.g., by Gemini or non-thinking Claude) | |
| # → Solution: Close the tool loop with synthetic message | |
| # 2. Original turn HAD thinking but compaction stripped it | |
| # → Solution: Try to inject cached thinking, fallback to synthetic closure | |
| turn_start_msg = ( | |
| messages[state["turn_start_idx"]] | |
| if state["turn_start_idx"] >= 0 | |
| else None | |
| ) | |
| # Check if this looks like a compacted thinking turn | |
| if turn_start_msg and self._looks_like_compacted_thinking_turn( | |
| turn_start_msg | |
| ): | |
| # Try to recover cached thinking block | |
| recovered = self._try_recover_thinking_from_cache( | |
| messages, state["turn_start_idx"] | |
| ) | |
| if recovered: | |
| lib_logger.info( | |
| "[Thinking Sanitization] Recovered thinking from cache for compacted turn." | |
| ) | |
| return self._preserve_turn_start_thinking( | |
| messages, state["turn_start_idx"] | |
| ), False | |
| else: | |
| # Can't recover from cache - close the loop with synthetic messages | |
| # This allows Claude to start a fresh turn with thinking | |
| lib_logger.info( | |
| "[Thinking Sanitization] Compacted thinking turn detected in tool loop. " | |
| "Cache miss - closing loop with synthetic messages to enable fresh thinking turn." | |
| ) | |
| return self._close_tool_loop_for_thinking(messages), False | |
| else: | |
| # Not a compacted turn - genuinely no thinking. Close the loop. | |
| lib_logger.info( | |
| "[Thinking Sanitization] Closing tool loop with synthetic response. " | |
| "Turn did not start with thinking (turn_has_thinking=False). " | |
| "This allows thinking to be enabled on the new turn." | |
| ) | |
| return self._close_tool_loop_for_thinking(messages), False | |
| else: | |
| # Not in a tool loop - this is the simple case | |
| # The conversation doesn't end with tool_result, so we're starting fresh. | |
| # | |
| # HOWEVER, there's a special case: compaction might have removed the thinking | |
| # block from the turn start, but Claude still expects it. | |
| # We detect this by checking if there's an assistant message with tool_calls | |
| # but no thinking, and the conversation structure suggests thinking was expected. | |
| # Check if we need to inject a fake thinking block for compaction recovery | |
| if state["last_assistant_idx"] >= 0: | |
| last_assistant = messages[state["last_assistant_idx"]] | |
| if ( | |
| state["last_assistant_has_tool_calls"] | |
| and not state["turn_has_thinking"] | |
| ): | |
| # The turn has functionCall but no thinking at turn start. | |
| # This could be: | |
| # 1. Compaction removed the thinking block | |
| # 2. The original call was made without thinking | |
| # | |
| # For case 1, we need to close the turn and start fresh. | |
| # For case 2, we let the model respond naturally. | |
| # | |
| # We can detect case 1 if there's evidence thinking was expected: | |
| # - The turn_start message has functionCall (typical thinking-enabled flow) | |
| # - The content structure suggests a thinking block was stripped | |
| # Check if turn_start has the hallmarks of a compacted thinking response | |
| turn_start_msg = ( | |
| messages[state["turn_start_idx"]] | |
| if state["turn_start_idx"] >= 0 | |
| else None | |
| ) | |
| if turn_start_msg and self._looks_like_compacted_thinking_turn( | |
| turn_start_msg | |
| ): | |
| # Try cache recovery first | |
| recovered = self._try_recover_thinking_from_cache( | |
| messages, state["turn_start_idx"] | |
| ) | |
| if recovered: | |
| lib_logger.info( | |
| "[Thinking Sanitization] Recovered thinking from cache for compacted turn (not in tool loop)." | |
| ) | |
| return self._strip_old_turn_thinking( | |
| messages, state["turn_start_idx"] | |
| ), False | |
| else: | |
| # Can't recover - add synthetic user to start fresh turn (Gemini format) | |
| lib_logger.info( | |
| "[Thinking Sanitization] Detected compacted turn missing thinking block. " | |
| "Adding synthetic user message to start fresh thinking turn." | |
| ) | |
| # Add synthetic user message to trigger new turn with thinking | |
| synthetic_user = { | |
| "role": "user", | |
| "parts": [{"text": "[Continue]"}], | |
| } | |
| messages.append(synthetic_user) | |
| return self._strip_all_thinking_blocks(messages), False | |
| else: | |
| lib_logger.debug( | |
| "[Thinking Sanitization] Last model has functionCall but no thinking. " | |
| "This is likely from context compression or non-thinking model. " | |
| "New response will include thinking naturally." | |
| ) | |
| # Strip thinking from old turns, let new response add thinking naturally | |
| return self._strip_old_turn_thinking( | |
| messages, state["last_assistant_idx"] | |
| ), False | |
| def _strip_all_thinking_blocks( | |
| self, messages: List[Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Remove all thinking/reasoning content from messages. | |
| Handles GEMINI format (after transformation): | |
| - Role "model" instead of "assistant" | |
| - "parts" array with "thought": true for thinking | |
| """ | |
| for msg in messages: | |
| if msg.get("role") == "model": | |
| parts = msg.get("parts", []) | |
| if parts: | |
| # Filter out thinking parts (those with "thought": true) | |
| filtered = [ | |
| p | |
| for p in parts | |
| if not (isinstance(p, dict) and p.get("thought") is True) | |
| ] | |
| # Check if there are still functionCalls remaining | |
| has_function_calls = any( | |
| isinstance(p, dict) and "functionCall" in p for p in filtered | |
| ) | |
| if not filtered: | |
| # All parts were thinking - need placeholder for valid structure | |
| if not has_function_calls: | |
| msg["parts"] = [{"text": ""}] | |
| else: | |
| msg["parts"] = [] # Will be invalid, but shouldn't happen | |
| else: | |
| msg["parts"] = filtered | |
| return messages | |
| def _strip_old_turn_thinking( | |
| self, messages: List[Dict[str, Any]], last_model_idx: int | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Strip thinking from old turns but preserve for the last model turn. | |
| Per Claude docs: "thinking blocks from previous turns are removed from context" | |
| This mimics the API behavior and prevents issues. | |
| Handles GEMINI format: role "model", "parts" with "thought": true | |
| """ | |
| for i, msg in enumerate(messages): | |
| if msg.get("role") == "model" and i < last_model_idx: | |
| # Old turn - strip thinking parts | |
| parts = msg.get("parts", []) | |
| if parts: | |
| filtered = [ | |
| p | |
| for p in parts | |
| if not (isinstance(p, dict) and p.get("thought") is True) | |
| ] | |
| has_function_calls = any( | |
| isinstance(p, dict) and "functionCall" in p for p in filtered | |
| ) | |
| if not filtered: | |
| msg["parts"] = [{"text": ""}] if not has_function_calls else [] | |
| else: | |
| msg["parts"] = filtered | |
| return messages | |
| def _preserve_current_turn_thinking( | |
| self, messages: List[Dict[str, Any]], last_model_idx: int | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Preserve thinking only for the current (last) model turn. | |
| Strip from all previous turns. | |
| """ | |
| # Same as strip_old_turn_thinking - we keep the last turn intact | |
| return self._strip_old_turn_thinking(messages, last_model_idx) | |
| def _preserve_turn_start_thinking( | |
| self, messages: List[Dict[str, Any]], turn_start_idx: int | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Preserve thinking at the turn start message. | |
| In multi-message tool loops, the thinking block is at the FIRST model | |
| message of the turn (turn_start_idx), not the last one. We need to preserve | |
| thinking from the turn start, and strip it from all older turns. | |
| Handles GEMINI format: role "model", "parts" with "thought": true | |
| """ | |
| for i, msg in enumerate(messages): | |
| if msg.get("role") == "model" and i < turn_start_idx: | |
| # Old turn - strip thinking parts | |
| parts = msg.get("parts", []) | |
| if parts: | |
| filtered = [ | |
| p | |
| for p in parts | |
| if not (isinstance(p, dict) and p.get("thought") is True) | |
| ] | |
| has_function_calls = any( | |
| isinstance(p, dict) and "functionCall" in p for p in filtered | |
| ) | |
| if not filtered: | |
| msg["parts"] = [{"text": ""}] if not has_function_calls else [] | |
| else: | |
| msg["parts"] = filtered | |
| return messages | |
| def _looks_like_compacted_thinking_turn(self, msg: Dict[str, Any]) -> bool: | |
| """ | |
| Detect if a message looks like it was compacted from a thinking-enabled turn. | |
| Heuristics (GEMINI format): | |
| 1. Has functionCall parts (typical thinking flow produces tool calls) | |
| 2. No thinking parts (thought: true) | |
| 3. No text content before functionCall (thinking responses usually have text) | |
| This is imperfect but helps catch common compaction scenarios. | |
| """ | |
| parts = msg.get("parts", []) | |
| if not parts: | |
| return False | |
| has_function_call = any( | |
| isinstance(p, dict) and "functionCall" in p for p in parts | |
| ) | |
| if not has_function_call: | |
| return False | |
| # Check for text content (not thinking) | |
| has_text = any( | |
| isinstance(p, dict) | |
| and "text" in p | |
| and p.get("text", "").strip() | |
| and not p.get("thought") # Exclude thinking text | |
| for p in parts | |
| ) | |
| # If we have functionCall but no non-thinking text, likely compacted | |
| if not has_text: | |
| return True | |
| return False | |
| def _try_recover_thinking_from_cache( | |
| self, messages: List[Dict[str, Any]], turn_start_idx: int | |
| ) -> bool: | |
| """ | |
| Try to recover thinking content from cache for a compacted turn. | |
| Handles GEMINI format: extracts functionCall for cache key lookup, | |
| injects thinking as a part with thought: true. | |
| Returns True if thinking was successfully recovered and injected, False otherwise. | |
| """ | |
| if turn_start_idx < 0 or turn_start_idx >= len(messages): | |
| return False | |
| msg = messages[turn_start_idx] | |
| parts = msg.get("parts", []) | |
| # Extract text content and build tool_calls structure for cache key lookup | |
| text_content = "" | |
| tool_calls = [] | |
| for part in parts: | |
| if isinstance(part, dict): | |
| if "text" in part and not part.get("thought"): | |
| text_content = part["text"] | |
| elif "functionCall" in part: | |
| fc = part["functionCall"] | |
| # Convert to OpenAI tool_calls format for cache key compatibility | |
| tool_calls.append( | |
| { | |
| "id": fc.get("id", ""), | |
| "type": "function", | |
| "function": { | |
| "name": fc.get("name", ""), | |
| "arguments": json.dumps(fc.get("args", {})), | |
| }, | |
| } | |
| ) | |
| # Generate cache key and try to retrieve | |
| cache_key = self._generate_thinking_cache_key(text_content, tool_calls) | |
| if not cache_key: | |
| return False | |
| cached_json = self._thinking_cache.retrieve(cache_key) | |
| if not cached_json: | |
| lib_logger.debug( | |
| f"[Thinking Sanitization] No cached thinking found for key: {cache_key}" | |
| ) | |
| return False | |
| try: | |
| thinking_data = json.loads(cached_json) | |
| thinking_text = thinking_data.get("thinking_text", "") | |
| signature = thinking_data.get("thought_signature", "") | |
| if not thinking_text or not signature: | |
| lib_logger.debug( | |
| "[Thinking Sanitization] Cached thinking missing text or signature" | |
| ) | |
| return False | |
| # Inject the recovered thinking part at the beginning (Gemini format) | |
| thinking_part = { | |
| "text": thinking_text, | |
| "thought": True, | |
| "thoughtSignature": signature, | |
| } | |
| msg["parts"] = [thinking_part] + parts | |
| lib_logger.debug( | |
| f"[Thinking Sanitization] Recovered thinking from cache: {len(thinking_text)} chars" | |
| ) | |
| return True | |
| except json.JSONDecodeError: | |
| lib_logger.warning( | |
| f"[Thinking Sanitization] Failed to parse cached thinking" | |
| ) | |
| return False | |
| def _close_tool_loop_for_thinking( | |
| self, messages: List[Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Close an incomplete tool loop by injecting synthetic messages to start a new turn. | |
| This is used when: | |
| - We're in a tool loop (conversation ends with functionResponse) | |
| - The tool call was made WITHOUT thinking (e.g., by Gemini, non-thinking Claude, or compaction stripped it) | |
| - We NOW want to enable thinking | |
| Per Claude docs on toggling thinking modes: | |
| - "If thinking is enabled, the final assistant turn must start with a thinking block" | |
| - "To toggle thinking, you must complete the assistant turn first" | |
| - A non-tool-result user message ends the turn and allows a fresh start | |
| Solution (GEMINI format): | |
| 1. Add synthetic MODEL message to complete the non-thinking turn | |
| 2. Add synthetic USER message to start a NEW turn | |
| 3. Claude will generate thinking for its response to the new turn | |
| The synthetic messages are minimal and unobtrusive - they just satisfy the | |
| turn structure requirements without influencing model behavior. | |
| """ | |
| # Strip any old thinking first | |
| messages = self._strip_all_thinking_blocks(messages) | |
| # Count tool results from the end of the conversation (Gemini format) | |
| tool_result_count = 0 | |
| for msg in reversed(messages): | |
| if msg.get("role") == "user": | |
| parts = msg.get("parts", []) | |
| has_function_response = any( | |
| isinstance(p, dict) and "functionResponse" in p for p in parts | |
| ) | |
| if has_function_response: | |
| tool_result_count += len( | |
| [ | |
| p | |
| for p in parts | |
| if isinstance(p, dict) and "functionResponse" in p | |
| ] | |
| ) | |
| else: | |
| break # Real user message, stop counting | |
| elif msg.get("role") == "model": | |
| break # Stop at the model that made the tool calls | |
| # Safety check: if no tool results found, this shouldn't have been called | |
| # But handle gracefully with a generic message | |
| if tool_result_count == 0: | |
| lib_logger.warning( | |
| "[Thinking Sanitization] _close_tool_loop_for_thinking called but no tool results found. " | |
| "This may indicate malformed conversation history." | |
| ) | |
| synthetic_model_content = "[Processing previous context.]" | |
| elif tool_result_count == 1: | |
| synthetic_model_content = "[Tool execution completed.]" | |
| else: | |
| synthetic_model_content = ( | |
| f"[{tool_result_count} tool executions completed.]" | |
| ) | |
| # Step 1: Inject synthetic MODEL message to complete the non-thinking turn (Gemini format) | |
| synthetic_model = { | |
| "role": "model", | |
| "parts": [{"text": synthetic_model_content}], | |
| } | |
| messages.append(synthetic_model) | |
| # Step 2: Inject synthetic USER message to start a NEW turn (Gemini format) | |
| # This allows Claude to generate thinking for its response | |
| # The message is minimal and unobtrusive - just triggers a new turn | |
| synthetic_user = { | |
| "role": "user", | |
| "parts": [{"text": "[Continue]"}], | |
| } | |
| messages.append(synthetic_user) | |
| lib_logger.info( | |
| f"[Thinking Sanitization] Closed tool loop with synthetic messages. " | |
| f"Model: '{synthetic_model_content}', User: '[Continue]'. " | |
| f"Claude will now start a fresh turn with thinking enabled." | |
| ) | |
| return messages | |
| # ========================================================================= | |
| # REASONING CONFIGURATION | |
| # ========================================================================= | |
| def _get_thinking_config( | |
| self, reasoning_effort: Optional[str], model: str, custom_budget: bool = False | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Map reasoning_effort to thinking configuration. | |
| - Gemini 2.5 & Claude: thinkingBudget (integer tokens) | |
| - Gemini 3 Pro: thinkingLevel (string: "low"/"high") | |
| - Gemini 3 Flash: thinkingLevel (string: "minimal"/"low"/"medium"/"high") | |
| """ | |
| internal = self._alias_to_internal(model) | |
| is_gemini_25 = "gemini-2.5" in model | |
| is_gemini_3 = internal.startswith("gemini-3-") | |
| is_gemini_3_flash = "gemini-3-flash" in model or "gemini-3-flash" in internal | |
| is_claude = self._is_claude(model) | |
| if not (is_gemini_25 or is_gemini_3 or is_claude): | |
| return None | |
| # Gemini 3 Flash: Supports minimal/low/medium/high thinkingLevel | |
| if is_gemini_3_flash: | |
| if reasoning_effort == "disable": | |
| # "minimal" matches "no thinking" for most queries | |
| return {"thinkingLevel": "minimal", "include_thoughts": True} | |
| elif reasoning_effort == "low": | |
| return {"thinkingLevel": "low", "include_thoughts": True} | |
| elif reasoning_effort == "medium": | |
| return {"thinkingLevel": "medium", "include_thoughts": True} | |
| # Default to high for Flash | |
| return {"thinkingLevel": "high", "include_thoughts": True} | |
| # Gemini 3 Pro: Only supports low/high thinkingLevel | |
| if is_gemini_3: | |
| if reasoning_effort == "low": | |
| return {"thinkingLevel": "low", "include_thoughts": True} | |
| # medium maps to high for Pro (not supported) | |
| return {"thinkingLevel": "high", "include_thoughts": True} | |
| # Gemini 2.5 & Claude: Integer thinkingBudget | |
| if not reasoning_effort: | |
| return {"thinkingBudget": -1, "include_thoughts": True} # Auto | |
| if reasoning_effort == "disable": | |
| return {"thinkingBudget": 0, "include_thoughts": False} | |
| # Model-specific budgets | |
| if "gemini-2.5-pro" in model or is_claude: | |
| budgets = {"low": 8192, "medium": 16384, "high": 32768} | |
| elif "gemini-2.5-flash" in model: | |
| budgets = {"low": 6144, "medium": 12288, "high": 24576} | |
| else: | |
| budgets = {"low": 1024, "medium": 2048, "high": 4096} | |
| budget = budgets.get(reasoning_effort, -1) | |
| if not custom_budget: | |
| budget = budget // 4 # Default to 25% of max output tokens | |
| return {"thinkingBudget": budget, "include_thoughts": True} | |
| # ========================================================================= | |
| # MESSAGE TRANSFORMATION (OpenAI → Gemini) | |
| # ========================================================================= | |
| def _transform_messages( | |
| self, messages: List[Dict[str, Any]], model: str | |
| ) -> Tuple[Optional[Dict[str, Any]], List[Dict[str, Any]]]: | |
| """ | |
| Transform OpenAI messages to Gemini CLI format. | |
| Handles: | |
| - System instruction extraction | |
| - Multi-part content (text, images) | |
| - Tool calls and responses | |
| - Claude thinking injection from cache | |
| - Gemini 3 thoughtSignature preservation | |
| """ | |
| messages = copy.deepcopy(messages) | |
| system_instruction = None | |
| gemini_contents = [] | |
| # Extract system prompts (handle multiple consecutive system messages) | |
| system_parts = [] | |
| while messages and messages[0].get("role") == "system": | |
| system_content = messages.pop(0).get("content", "") | |
| if system_content: | |
| new_parts = self._parse_content_parts( | |
| system_content, _strip_cache_control=True | |
| ) | |
| system_parts.extend(new_parts) | |
| if system_parts: | |
| system_instruction = {"role": "user", "parts": system_parts} | |
| # Build tool_call_id → name mapping | |
| tool_id_to_name = {} | |
| for msg in messages: | |
| if msg.get("role") == "assistant" and msg.get("tool_calls"): | |
| for tc in msg["tool_calls"]: | |
| if tc.get("type") == "function": | |
| tc_id = tc["id"] | |
| tc_name = tc["function"]["name"] | |
| tool_id_to_name[tc_id] = tc_name | |
| # lib_logger.debug(f"[ID Mapping] Registered tool_call: id={tc_id}, name={tc_name}") | |
| # Convert each message, consolidating consecutive tool responses | |
| # Per Gemini docs: parallel function responses must be in a single user message | |
| pending_tool_parts = [] | |
| for msg in messages: | |
| role = msg.get("role") | |
| content = msg.get("content") | |
| parts = [] | |
| # Flush pending tool parts before non-tool message | |
| if pending_tool_parts and role != "tool": | |
| gemini_contents.append({"role": "user", "parts": pending_tool_parts}) | |
| pending_tool_parts = [] | |
| if role == "user": | |
| parts = self._transform_user_message(content) | |
| elif role == "assistant": | |
| parts = self._transform_assistant_message(msg, model, tool_id_to_name) | |
| elif role == "tool": | |
| tool_parts = self._transform_tool_message(msg, model, tool_id_to_name) | |
| # Accumulate tool responses instead of adding individually | |
| pending_tool_parts.extend(tool_parts) | |
| continue | |
| if parts: | |
| gemini_role = "model" if role == "assistant" else "user" | |
| gemini_contents.append({"role": gemini_role, "parts": parts}) | |
| # Flush any remaining tool parts | |
| if pending_tool_parts: | |
| gemini_contents.append({"role": "user", "parts": pending_tool_parts}) | |
| return system_instruction, gemini_contents | |
| def _parse_content_parts( | |
| self, content: Any, _strip_cache_control: bool = False | |
| ) -> List[Dict[str, Any]]: | |
| """Parse content into Gemini parts format.""" | |
| parts = [] | |
| if isinstance(content, str): | |
| if content: | |
| parts.append({"text": content}) | |
| elif isinstance(content, list): | |
| for item in content: | |
| if item.get("type") == "text": | |
| text = item.get("text", "") | |
| if text: | |
| parts.append({"text": text}) | |
| elif item.get("type") == "image_url": | |
| image_part = self._parse_image_url(item.get("image_url", {})) | |
| if image_part: | |
| parts.append(image_part) | |
| return parts | |
| def _parse_image_url(self, image_url: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Parse image URL into Gemini inlineData format.""" | |
| url = image_url.get("url", "") | |
| if not url.startswith("data:"): | |
| return None | |
| try: | |
| header, data = url.split(",", 1) | |
| mime_type = header.split(":")[1].split(";")[0] | |
| return {"inlineData": {"mimeType": mime_type, "data": data}} | |
| except Exception as e: | |
| lib_logger.warning(f"Failed to parse image URL: {e}") | |
| return None | |
| def _transform_user_message(self, content: Any) -> List[Dict[str, Any]]: | |
| """Transform user message content to Gemini parts.""" | |
| return self._parse_content_parts(content) | |
| def _transform_assistant_message( | |
| self, msg: Dict[str, Any], model: str, _tool_id_to_name: Dict[str, str] | |
| ) -> List[Dict[str, Any]]: | |
| """Transform assistant message including tool calls and thinking injection.""" | |
| parts = [] | |
| content = msg.get("content") | |
| tool_calls = msg.get("tool_calls", []) | |
| reasoning_content = msg.get("reasoning_content") | |
| # Handle reasoning_content if present (from original Claude response with thinking) | |
| if reasoning_content and self._is_claude(model): | |
| # Add thinking part with cached signature | |
| thinking_part = { | |
| "text": reasoning_content, | |
| "thought": True, | |
| } | |
| # Try to get signature from cache | |
| cache_key = self._generate_thinking_cache_key( | |
| content if isinstance(content, str) else "", tool_calls | |
| ) | |
| cached_sig = None | |
| if cache_key: | |
| cached_json = self._thinking_cache.retrieve(cache_key) | |
| if cached_json: | |
| try: | |
| cached_data = json.loads(cached_json) | |
| cached_sig = cached_data.get("thought_signature", "") | |
| except json.JSONDecodeError: | |
| pass | |
| if cached_sig: | |
| thinking_part["thoughtSignature"] = cached_sig | |
| parts.append(thinking_part) | |
| lib_logger.debug( | |
| f"Added reasoning_content with cached signature ({len(reasoning_content)} chars)" | |
| ) | |
| else: | |
| # No cached signature - skip the thinking block | |
| # This can happen if context was compressed and signature was lost | |
| lib_logger.warning( | |
| f"Skipping reasoning_content - no valid signature found. " | |
| f"This may cause issues if thinking is enabled." | |
| ) | |
| elif ( | |
| self._is_claude(model) | |
| and self._enable_signature_cache | |
| and not reasoning_content | |
| ): | |
| # Fallback: Try to inject cached thinking for Claude (original behavior) | |
| thinking_parts = self._get_cached_thinking(content, tool_calls) | |
| parts.extend(thinking_parts) | |
| # Add regular content | |
| if isinstance(content, str) and content: | |
| parts.append({"text": content}) | |
| # Add tool calls | |
| # Track if we've seen the first function call in this message | |
| # Per Gemini docs: Only the FIRST parallel function call gets a signature | |
| first_func_in_msg = True | |
| for tc in tool_calls: | |
| if tc.get("type") != "function": | |
| continue | |
| try: | |
| args = json.loads(tc["function"]["arguments"]) | |
| except (json.JSONDecodeError, TypeError): | |
| args = {} | |
| tool_id = tc.get("id", "") | |
| func_name = tc["function"]["name"] | |
| # lib_logger.debug( | |
| # f"[ID Transform] Converting assistant tool_call to functionCall: " | |
| # f"id={tool_id}, name={func_name}" | |
| # ) | |
| # Add prefix for Gemini 3 (and rename problematic tools) | |
| if self._is_gemini_3(model) and self._enable_gemini3_tool_fix: | |
| func_name = GEMINI3_TOOL_RENAMES.get(func_name, func_name) | |
| func_name = f"{self._gemini3_tool_prefix}{func_name}" | |
| func_part = { | |
| "functionCall": {"name": func_name, "args": args, "id": tool_id} | |
| } | |
| # Add thoughtSignature for Gemini 3 | |
| # Per Gemini docs: Only the FIRST parallel function call gets a signature. | |
| # Subsequent parallel calls should NOT have a thoughtSignature field. | |
| if self._is_gemini_3(model): | |
| sig = tc.get("thought_signature") | |
| if not sig and tool_id and self._enable_signature_cache: | |
| sig = self._signature_cache.retrieve(tool_id) | |
| if sig: | |
| func_part["thoughtSignature"] = sig | |
| elif first_func_in_msg: | |
| # Only add bypass to the first function call if no sig available | |
| func_part["thoughtSignature"] = "skip_thought_signature_validator" | |
| lib_logger.debug( | |
| f"Missing thoughtSignature for first func call {tool_id}, using bypass" | |
| ) | |
| # Subsequent parallel calls: no signature field at all | |
| first_func_in_msg = False | |
| parts.append(func_part) | |
| # Safety: ensure we return at least one part to maintain role alternation | |
| # This handles edge cases like assistant messages that had only thinking content | |
| # which got stripped, leaving the message otherwise empty | |
| if not parts: | |
| # Use a minimal text part - can happen after thinking is stripped | |
| parts.append({"text": ""}) | |
| lib_logger.debug( | |
| "[Transform] Added empty text part to maintain role alternation" | |
| ) | |
| return parts | |
| def _get_cached_thinking( | |
| self, content: Any, tool_calls: List[Dict] | |
| ) -> List[Dict[str, Any]]: | |
| """Retrieve and format cached thinking content for Claude.""" | |
| parts = [] | |
| msg_text = content if isinstance(content, str) else "" | |
| cache_key = self._generate_thinking_cache_key(msg_text, tool_calls) | |
| if not cache_key: | |
| return parts | |
| cached_json = self._thinking_cache.retrieve(cache_key) | |
| if not cached_json: | |
| return parts | |
| try: | |
| thinking_data = json.loads(cached_json) | |
| thinking_text = thinking_data.get("thinking_text", "") | |
| sig = thinking_data.get("thought_signature", "") | |
| if thinking_text: | |
| thinking_part = { | |
| "text": thinking_text, | |
| "thought": True, | |
| "thoughtSignature": sig or "skip_thought_signature_validator", | |
| } | |
| parts.append(thinking_part) | |
| lib_logger.debug(f"Injected {len(thinking_text)} chars of thinking") | |
| except json.JSONDecodeError: | |
| lib_logger.warning(f"Failed to parse cached thinking: {cache_key}") | |
| return parts | |
| def _transform_tool_message( | |
| self, msg: Dict[str, Any], model: str, tool_id_to_name: Dict[str, str] | |
| ) -> List[Dict[str, Any]]: | |
| """Transform tool response message.""" | |
| tool_id = msg.get("tool_call_id", "") | |
| func_name = tool_id_to_name.get(tool_id, "unknown_function") | |
| content = msg.get("content", "{}") | |
| # Log ID lookup | |
| if tool_id not in tool_id_to_name: | |
| lib_logger.warning( | |
| f"[ID Mismatch] Tool response has ID '{tool_id}' which was not found in tool_id_to_name map. " | |
| f"Available IDs: {list(tool_id_to_name.keys())}" | |
| ) | |
| # else: | |
| # lib_logger.debug(f"[ID Mapping] Tool response matched: id={tool_id}, name={func_name}") | |
| # Add prefix for Gemini 3 (and rename problematic tools) | |
| if self._is_gemini_3(model) and self._enable_gemini3_tool_fix: | |
| func_name = GEMINI3_TOOL_RENAMES.get(func_name, func_name) | |
| func_name = f"{self._gemini3_tool_prefix}{func_name}" | |
| try: | |
| parsed_content = json.loads(content) | |
| except (json.JSONDecodeError, TypeError): | |
| parsed_content = content | |
| return [ | |
| { | |
| "functionResponse": { | |
| "name": func_name, | |
| "response": {"result": parsed_content}, | |
| "id": tool_id, | |
| } | |
| } | |
| ] | |
| # ========================================================================= | |
| # TOOL RESPONSE GROUPING | |
| # ========================================================================= | |
| def _fix_tool_response_grouping( | |
| self, contents: List[Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Group function calls with their responses for Antigravity compatibility. | |
| Converts linear format (call, response, call, response) | |
| to grouped format (model with calls, user with all responses). | |
| IMPORTANT: Preserves ID-based pairing to prevent mismatches. | |
| When IDs don't match, attempts recovery by: | |
| 1. Matching by function name first | |
| 2. Matching by order if names don't match | |
| 3. Inserting placeholder responses if responses are missing | |
| 4. Inserting responses at the CORRECT position (after their corresponding call) | |
| """ | |
| new_contents = [] | |
| # Each pending group tracks: | |
| # - ids: expected response IDs | |
| # - func_names: expected function names (for orphan matching) | |
| # - insert_after_idx: position in new_contents where model message was added | |
| pending_groups = [] | |
| collected_responses = {} # Dict mapping ID -> response_part | |
| for content in contents: | |
| role = content.get("role") | |
| parts = content.get("parts", []) | |
| response_parts = [p for p in parts if "functionResponse" in p] | |
| if response_parts: | |
| # Collect responses by ID (ignore duplicates - keep first occurrence) | |
| for resp in response_parts: | |
| resp_id = resp.get("functionResponse", {}).get("id", "") | |
| if resp_id: | |
| if resp_id in collected_responses: | |
| lib_logger.warning( | |
| f"[Grouping] Duplicate response ID detected: {resp_id}. " | |
| f"Ignoring duplicate - this may indicate malformed conversation history." | |
| ) | |
| continue | |
| # lib_logger.debug( | |
| # f"[Grouping] Collected response for ID: {resp_id}" | |
| # ) | |
| collected_responses[resp_id] = resp | |
| # Try to satisfy pending groups (newest first) | |
| for i in range(len(pending_groups) - 1, -1, -1): | |
| group = pending_groups[i] | |
| group_ids = group["ids"] | |
| # Check if we have ALL responses for this group | |
| if all(gid in collected_responses for gid in group_ids): | |
| # Extract responses in the same order as the function calls | |
| group_responses = [ | |
| collected_responses.pop(gid) for gid in group_ids | |
| ] | |
| new_contents.append({"parts": group_responses, "role": "user"}) | |
| # lib_logger.debug( | |
| # f"[Grouping] Satisfied group with {len(group_responses)} responses: " | |
| # f"ids={group_ids}" | |
| # ) | |
| pending_groups.pop(i) | |
| break | |
| continue | |
| if role == "model": | |
| func_calls = [p for p in parts if "functionCall" in p] | |
| new_contents.append(content) | |
| if func_calls: | |
| call_ids = [ | |
| fc.get("functionCall", {}).get("id", "") for fc in func_calls | |
| ] | |
| call_ids = [cid for cid in call_ids if cid] # Filter empty IDs | |
| # Also extract function names for orphan matching | |
| func_names = [ | |
| fc.get("functionCall", {}).get("name", "") for fc in func_calls | |
| ] | |
| if call_ids: | |
| # lib_logger.debug( | |
| # f"[Grouping] Created pending group expecting {len(call_ids)} responses: " | |
| # f"ids={call_ids}, names={func_names}" | |
| # ) | |
| pending_groups.append( | |
| { | |
| "ids": call_ids, | |
| "func_names": func_names, | |
| "insert_after_idx": len(new_contents) - 1, | |
| } | |
| ) | |
| else: | |
| new_contents.append(content) | |
| # Handle remaining groups (shouldn't happen in well-formed conversations) | |
| # Attempt recovery by matching orphans to unsatisfied calls | |
| # Process in REVERSE order of insert_after_idx so insertions don't shift indices | |
| pending_groups.sort(key=lambda g: g["insert_after_idx"], reverse=True) | |
| for group in pending_groups: | |
| group_ids = group["ids"] | |
| group_func_names = group.get("func_names", []) | |
| insert_idx = group["insert_after_idx"] + 1 | |
| group_responses = [] | |
| lib_logger.debug( | |
| f"[Grouping Recovery] Processing unsatisfied group: " | |
| f"ids={group_ids}, names={group_func_names}, insert_at={insert_idx}" | |
| ) | |
| for i, expected_id in enumerate(group_ids): | |
| expected_name = group_func_names[i] if i < len(group_func_names) else "" | |
| if expected_id in collected_responses: | |
| # Direct ID match | |
| group_responses.append(collected_responses.pop(expected_id)) | |
| lib_logger.debug( | |
| f"[Grouping Recovery] Direct ID match for '{expected_id}'" | |
| ) | |
| elif collected_responses: | |
| # Try to find orphan with matching function name first | |
| matched_orphan_id = None | |
| # First pass: match by function name | |
| for orphan_id, orphan_resp in collected_responses.items(): | |
| orphan_name = orphan_resp.get("functionResponse", {}).get( | |
| "name", "" | |
| ) | |
| # Match if names are equal, or if orphan has "unknown_function" (can be fixed) | |
| if orphan_name == expected_name: | |
| matched_orphan_id = orphan_id | |
| lib_logger.debug( | |
| f"[Grouping Recovery] Matched orphan '{orphan_id}' by name '{orphan_name}'" | |
| ) | |
| break | |
| # Second pass: if no name match, try "unknown_function" orphans | |
| if not matched_orphan_id: | |
| for orphan_id, orphan_resp in collected_responses.items(): | |
| orphan_name = orphan_resp.get("functionResponse", {}).get( | |
| "name", "" | |
| ) | |
| if orphan_name == "unknown_function": | |
| matched_orphan_id = orphan_id | |
| lib_logger.debug( | |
| f"[Grouping Recovery] Matched unknown_function orphan '{orphan_id}' " | |
| f"to expected '{expected_name}'" | |
| ) | |
| break | |
| # Third pass: if still no match, take first available (order-based) | |
| if not matched_orphan_id: | |
| matched_orphan_id = next(iter(collected_responses)) | |
| lib_logger.debug( | |
| f"[Grouping Recovery] No name match, using first available orphan '{matched_orphan_id}'" | |
| ) | |
| if matched_orphan_id: | |
| orphan_resp = collected_responses.pop(matched_orphan_id) | |
| # Fix the ID in the response to match the call | |
| old_id = orphan_resp["functionResponse"].get("id", "") | |
| orphan_resp["functionResponse"]["id"] = expected_id | |
| # Fix the name if it was "unknown_function" | |
| if ( | |
| orphan_resp["functionResponse"].get("name") | |
| == "unknown_function" | |
| and expected_name | |
| ): | |
| orphan_resp["functionResponse"]["name"] = expected_name | |
| lib_logger.info( | |
| f"[Grouping Recovery] Fixed function name from 'unknown_function' to '{expected_name}'" | |
| ) | |
| lib_logger.warning( | |
| f"[Grouping] Auto-repaired ID mismatch: mapped response '{old_id}' " | |
| f"to call '{expected_id}' (function: {expected_name})" | |
| ) | |
| group_responses.append(orphan_resp) | |
| else: | |
| # No responses available - create placeholder | |
| placeholder_resp = { | |
| "functionResponse": { | |
| "name": expected_name or "unknown_function", | |
| "response": { | |
| "result": { | |
| "error": "Tool response was lost during context processing. " | |
| "This is a recovered placeholder.", | |
| "recovered": True, | |
| } | |
| }, | |
| "id": expected_id, | |
| } | |
| } | |
| lib_logger.warning( | |
| f"[Grouping Recovery] Created placeholder response for missing tool: " | |
| f"id='{expected_id}', name='{expected_name}'" | |
| ) | |
| group_responses.append(placeholder_resp) | |
| if group_responses: | |
| # Insert at the correct position (right after the model message with the calls) | |
| new_contents.insert( | |
| insert_idx, {"parts": group_responses, "role": "user"} | |
| ) | |
| lib_logger.info( | |
| f"[Grouping Recovery] Inserted {len(group_responses)} responses at position {insert_idx} " | |
| f"(expected {len(group_ids)})" | |
| ) | |
| # Warn about unmatched responses | |
| if collected_responses: | |
| lib_logger.warning( | |
| f"[Grouping] {len(collected_responses)} unmatched responses remaining: " | |
| f"ids={list(collected_responses.keys())}" | |
| ) | |
| return new_contents | |
| # ========================================================================= | |
| # GEMINI 3 TOOL TRANSFORMATIONS | |
| # ========================================================================= | |
| def _apply_gemini3_namespace( | |
| self, tools: List[Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Add namespace prefix to tool names for Gemini 3. | |
| Also renames certain tools that conflict with Gemini's internal behavior | |
| (e.g., "batch" triggers MALFORMED_FUNCTION_CALL errors). | |
| """ | |
| if not tools: | |
| return tools | |
| modified = copy.deepcopy(tools) | |
| for tool in modified: | |
| for func_decl in tool.get("functionDeclarations", []): | |
| name = func_decl.get("name", "") | |
| if name: | |
| # Rename problematic tools first | |
| name = GEMINI3_TOOL_RENAMES.get(name, name) | |
| # Then add prefix | |
| func_decl["name"] = f"{self._gemini3_tool_prefix}{name}" | |
| return modified | |
| def _enforce_strict_schema( | |
| self, tools: List[Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Enforce strict JSON schema for Gemini 3 to prevent hallucinated parameters. | |
| Adds 'additionalProperties: false' to object schemas with 'properties', | |
| which tells the model it CANNOT add properties not in the schema. | |
| IMPORTANT: Preserves 'additionalProperties: true' (or {}) when explicitly | |
| set in the original schema. This is critical for "freeform" parameter objects | |
| like batch/multi_tool's nested parameters which need to accept arbitrary | |
| tool parameters that aren't pre-defined in the schema. | |
| """ | |
| if not tools: | |
| return tools | |
| def enforce_strict(schema: Any) -> Any: | |
| if not isinstance(schema, dict): | |
| return schema | |
| result = {} | |
| preserved_additional_props = None | |
| for key, value in schema.items(): | |
| # Preserve additionalProperties as-is if it's truthy | |
| # This is critical for "freeform" parameter objects like batch's | |
| # nested parameters which need to accept arbitrary tool parameters | |
| if key == "additionalProperties": | |
| if value is not False: | |
| # Preserve the original value (true, {}, {"type": "string"}, etc.) | |
| preserved_additional_props = value | |
| continue | |
| if isinstance(value, dict): | |
| result[key] = enforce_strict(value) | |
| elif isinstance(value, list): | |
| result[key] = [ | |
| enforce_strict(item) if isinstance(item, dict) else item | |
| for item in value | |
| ] | |
| else: | |
| result[key] = value | |
| # Add additionalProperties: false to object schemas with properties, | |
| # BUT only if we didn't preserve a value from the original schema | |
| if result.get("type") == "object" and "properties" in result: | |
| if preserved_additional_props is not None: | |
| result["additionalProperties"] = preserved_additional_props | |
| else: | |
| result["additionalProperties"] = False | |
| return result | |
| modified = copy.deepcopy(tools) | |
| for tool in modified: | |
| for func_decl in tool.get("functionDeclarations", []): | |
| if "parametersJsonSchema" in func_decl: | |
| func_decl["parametersJsonSchema"] = enforce_strict( | |
| func_decl["parametersJsonSchema"] | |
| ) | |
| return modified | |
| def _inject_signature_into_descriptions( | |
| self, tools: List[Dict[str, Any]], description_prompt: Optional[str] = None | |
| ) -> List[Dict[str, Any]]: | |
| """Inject parameter signatures into tool descriptions for Gemini 3 & Claude.""" | |
| if not tools: | |
| return tools | |
| # Use provided prompt or default to Gemini 3 prompt | |
| prompt_template = description_prompt or self._gemini3_description_prompt | |
| modified = copy.deepcopy(tools) | |
| for tool in modified: | |
| for func_decl in tool.get("functionDeclarations", []): | |
| schema = func_decl.get("parametersJsonSchema", {}) | |
| if not schema: | |
| continue | |
| required = schema.get("required", []) | |
| properties = schema.get("properties", {}) | |
| if not properties: | |
| continue | |
| param_list = [] | |
| for prop_name, prop_data in properties.items(): | |
| if not isinstance(prop_data, dict): | |
| continue | |
| type_hint = self._format_type_hint(prop_data) | |
| is_required = prop_name in required | |
| param_list.append( | |
| f"{prop_name} ({type_hint}{', REQUIRED' if is_required else ''})" | |
| ) | |
| if param_list: | |
| sig_str = prompt_template.replace("{params}", ", ".join(param_list)) | |
| func_decl["description"] = ( | |
| func_decl.get("description", "") + sig_str | |
| ) | |
| return modified | |
| def _format_type_hint(self, prop_data: Dict[str, Any], depth: int = 0) -> str: | |
| """Format a detailed type hint for a property schema.""" | |
| type_hint = prop_data.get("type", "unknown") | |
| # Handle enum values - show allowed options | |
| if "enum" in prop_data: | |
| enum_vals = prop_data["enum"] | |
| if len(enum_vals) <= 5: | |
| return f"string ENUM[{', '.join(repr(v) for v in enum_vals)}]" | |
| return f"string ENUM[{len(enum_vals)} options]" | |
| # Handle const values | |
| if "const" in prop_data: | |
| return f"string CONST={repr(prop_data['const'])}" | |
| if type_hint == "array": | |
| items = prop_data.get("items", {}) | |
| if isinstance(items, dict): | |
| item_type = items.get("type", "unknown") | |
| if item_type == "object": | |
| nested_props = items.get("properties", {}) | |
| nested_req = items.get("required", []) | |
| if nested_props: | |
| nested_list = [] | |
| for n, d in nested_props.items(): | |
| if isinstance(d, dict): | |
| # Recursively format nested types (limit depth) | |
| if depth < 1: | |
| t = self._format_type_hint(d, depth + 1) | |
| else: | |
| t = d.get("type", "unknown") | |
| req = " REQUIRED" if n in nested_req else "" | |
| nested_list.append(f"{n}: {t}{req}") | |
| return f"ARRAY_OF_OBJECTS[{', '.join(nested_list)}]" | |
| return "ARRAY_OF_OBJECTS" | |
| return f"ARRAY_OF_{item_type.upper()}" | |
| return "ARRAY" | |
| if type_hint == "object": | |
| nested_props = prop_data.get("properties", {}) | |
| nested_req = prop_data.get("required", []) | |
| if nested_props and depth < 1: | |
| nested_list = [] | |
| for n, d in nested_props.items(): | |
| if isinstance(d, dict): | |
| t = d.get("type", "unknown") | |
| req = " REQUIRED" if n in nested_req else "" | |
| nested_list.append(f"{n}: {t}{req}") | |
| return f"object{{{', '.join(nested_list)}}}" | |
| return type_hint | |
| def _strip_gemini3_prefix(self, name: str) -> str: | |
| """ | |
| Strip the Gemini 3 namespace prefix from a tool name. | |
| Also reverses any tool renames that were applied to avoid Gemini conflicts. | |
| """ | |
| if name and name.startswith(self._gemini3_tool_prefix): | |
| stripped = name[len(self._gemini3_tool_prefix) :] | |
| # Reverse any renames | |
| return GEMINI3_TOOL_RENAMES_REVERSE.get(stripped, stripped) | |
| return name | |
| # ========================================================================= | |
| # MALFORMED FUNCTION CALL HANDLING | |
| # ========================================================================= | |
| def _check_for_malformed_call(self, response: Dict[str, Any]) -> Optional[str]: | |
| """ | |
| Check if response contains MALFORMED_FUNCTION_CALL. | |
| Returns finishMessage if malformed, None otherwise. | |
| """ | |
| candidates = response.get("candidates", []) | |
| if not candidates: | |
| return None | |
| candidate = candidates[0] | |
| if candidate.get("finishReason") == "MALFORMED_FUNCTION_CALL": | |
| return candidate.get("finishMessage", "Unknown malformed call error") | |
| return None | |
| def _parse_malformed_call_message( | |
| self, finish_message: str, model: str | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Parse MALFORMED_FUNCTION_CALL finishMessage to extract tool info. | |
| Input format: "Malformed function call: call:namespace:tool_name{raw_args}" | |
| Returns: | |
| {"tool_name": "read", "prefixed_name": "gemini3_read", | |
| "raw_args": "{filePath: \"...\"}"} | |
| or None if unparseable | |
| """ | |
| import re | |
| # Pattern: "Malformed function call: call:namespace:tool_name{args}" | |
| pattern = r"Malformed function call:\s*call:[^:]+:([^{]+)(\{.+\})$" | |
| match = re.match(pattern, finish_message, re.DOTALL) | |
| if not match: | |
| lib_logger.warning( | |
| f"[Antigravity] Could not parse MALFORMED_FUNCTION_CALL: {finish_message[:100]}" | |
| ) | |
| return None | |
| prefixed_name = match.group(1).strip() # "gemini3_read" | |
| raw_args = match.group(2) # "{filePath: \"...\"}" | |
| # Strip our prefix to get original tool name | |
| tool_name = self._strip_gemini3_prefix(prefixed_name) | |
| return { | |
| "tool_name": tool_name, | |
| "prefixed_name": prefixed_name, | |
| "raw_args": raw_args, | |
| } | |
| def _analyze_json_error(self, raw_args: str) -> Dict[str, Any]: | |
| """ | |
| Analyze malformed JSON to detect specific errors and attempt to fix it. | |
| Combines json.JSONDecodeError with heuristic pattern detection | |
| to provide actionable error information. | |
| Returns: | |
| { | |
| "json_error": str or None, # Python's JSON error message | |
| "json_position": int or None, # Position of error | |
| "issues": List[str], # Human-readable issues detected | |
| "unquoted_keys": List[str], # Specific unquoted key names | |
| "fixed_json": str or None, # Corrected JSON if we could fix it | |
| } | |
| """ | |
| import re as re_module | |
| result = { | |
| "json_error": None, | |
| "json_position": None, | |
| "issues": [], | |
| "unquoted_keys": [], | |
| "fixed_json": None, | |
| } | |
| # Option 1: Try json.loads to get exact error | |
| try: | |
| json.loads(raw_args) | |
| return result # Valid JSON, no errors | |
| except json.JSONDecodeError as e: | |
| result["json_error"] = e.msg | |
| result["json_position"] = e.pos | |
| # Option 2: Heuristic pattern detection for specific issues | |
| # Detect unquoted keys: {word: or ,word: | |
| unquoted_key_pattern = r"[{,]\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:" | |
| unquoted_keys = re_module.findall(unquoted_key_pattern, raw_args) | |
| if unquoted_keys: | |
| result["unquoted_keys"] = unquoted_keys | |
| if len(unquoted_keys) == 1: | |
| result["issues"].append(f"Unquoted key: '{unquoted_keys[0]}'") | |
| else: | |
| result["issues"].append( | |
| f"Unquoted keys: {', '.join(repr(k) for k in unquoted_keys)}" | |
| ) | |
| # Detect single quotes | |
| if "'" in raw_args: | |
| result["issues"].append("Single quotes used instead of double quotes") | |
| # Detect trailing comma | |
| if re_module.search(r",\s*[}\]]", raw_args): | |
| result["issues"].append("Trailing comma before closing bracket") | |
| # Option 3: Try to fix the JSON and validate | |
| fixed = raw_args | |
| # Add quotes around unquoted keys | |
| fixed = re_module.sub( | |
| r"([{,])\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*:", | |
| r'\1"\2":', | |
| fixed, | |
| ) | |
| # Replace single quotes with double quotes | |
| fixed = fixed.replace("'", '"') | |
| # Remove trailing commas | |
| fixed = re_module.sub(r",(\s*[}\]])", r"\1", fixed) | |
| try: | |
| # Validate the fix works | |
| parsed = json.loads(fixed) | |
| # Use compact JSON format (matches what model should produce) | |
| result["fixed_json"] = json.dumps(parsed, separators=(",", ":")) | |
| except json.JSONDecodeError: | |
| # First fix didn't work - try more aggressive cleanup | |
| pass | |
| # Option 4: If first attempt failed, try more aggressive fixes | |
| if result["fixed_json"] is None: | |
| try: | |
| # Normalize all whitespace (collapse newlines/multiple spaces) | |
| aggressive_fix = re_module.sub(r"\s+", " ", fixed) | |
| # Try parsing again | |
| parsed = json.loads(aggressive_fix) | |
| result["fixed_json"] = json.dumps(parsed, separators=(",", ":")) | |
| lib_logger.debug( | |
| "[Antigravity] Fixed malformed JSON with aggressive whitespace normalization" | |
| ) | |
| except json.JSONDecodeError: | |
| pass | |
| # Option 5: If still failing, try fixing unquoted string values | |
| if result["fixed_json"] is None: | |
| try: | |
| # Some models produce unquoted string values like {key: value} | |
| # Try to quote values that look like unquoted strings | |
| # Match : followed by unquoted word (not a number, bool, null, or object/array) | |
| aggressive_fix = re_module.sub( | |
| r":\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*([,}\]])", | |
| r': "\1"\2', | |
| fixed, | |
| ) | |
| parsed = json.loads(aggressive_fix) | |
| result["fixed_json"] = json.dumps(parsed, separators=(",", ":")) | |
| lib_logger.debug( | |
| "[Antigravity] Fixed malformed JSON by quoting unquoted string values" | |
| ) | |
| except json.JSONDecodeError: | |
| # All fixes failed, leave as None | |
| pass | |
| return result | |
| def _build_malformed_call_retry_messages( | |
| self, | |
| parsed_call: Dict[str, Any], | |
| tool_schema: Optional[Dict[str, Any]], | |
| ) -> Tuple[Dict[str, Any], Dict[str, Any]]: | |
| """ | |
| Build synthetic Gemini-format messages for malformed call retry. | |
| Returns: (assistant_message, user_message) in Gemini format | |
| """ | |
| tool_name = parsed_call["tool_name"] | |
| raw_args = parsed_call["raw_args"] | |
| # Analyze the JSON error and try to fix it | |
| error_info = self._analyze_json_error(raw_args) | |
| # Assistant message: Show what it tried to do | |
| assistant_msg = { | |
| "role": "model", | |
| "parts": [{"text": f"I'll call the '{tool_name}' function."}], | |
| } | |
| # Build a concise error message | |
| if error_info["fixed_json"]: | |
| # We successfully fixed the JSON - show the corrected version | |
| error_text = f"""[FUNCTION CALL ERROR - INVALID JSON] | |
| Your call to '{tool_name}' failed. All JSON keys must be double-quoted. | |
| INVALID: {raw_args} | |
| CORRECTED: {error_info["fixed_json"]} | |
| Retry the function call now using the corrected JSON above. Output ONLY the tool call, no text.""" | |
| else: | |
| # Couldn't auto-fix - give hints | |
| error_text = f"""[FUNCTION CALL ERROR - INVALID JSON] | |
| Your call to '{tool_name}' failed due to malformed JSON. | |
| You provided: {raw_args} | |
| Fix: All JSON keys must be double-quoted. Example: {{"key":"value"}} not {{key:"value"}} | |
| Analyze what you did wrong, correct it, and retry the function call. Output ONLY the tool call, no text.""" | |
| # Add schema if available (strip $schema reference) | |
| if tool_schema: | |
| clean_schema = {k: v for k, v in tool_schema.items() if k != "$schema"} | |
| schema_str = json.dumps(clean_schema, separators=(",", ":")) | |
| error_text += f"\n\nSchema: {schema_str}" | |
| user_msg = {"role": "user", "parts": [{"text": error_text}]} | |
| return assistant_msg, user_msg | |
| def _build_malformed_fallback_response( | |
| self, model: str, error_details: str | |
| ) -> litellm.ModelResponse: | |
| """ | |
| Build error response when malformed call retries are exhausted. | |
| Uses finish_reason=None to indicate the response didn't complete normally, | |
| allowing clients to detect the incomplete state and potentially retry. | |
| """ | |
| return litellm.ModelResponse( | |
| **{ | |
| "id": f"chatcmpl-{uuid.uuid4().hex[:24]}", | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": ( | |
| "[TOOL CALL ERROR] I attempted to call a function but " | |
| "repeatedly produced malformed syntax. This may be a model issue.\n\n" | |
| f"Last error: {error_details}\n\n" | |
| "Please try rephrasing your request or try a different approach." | |
| ), | |
| }, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| ) | |
| def _build_malformed_fallback_chunk( | |
| self, | |
| model: str, | |
| error_details: str, | |
| response_id: Optional[str] = None, | |
| usage: Optional[Dict[str, Any]] = None, | |
| ) -> litellm.ModelResponse: | |
| """ | |
| Build streaming chunk error response when malformed call retries are exhausted. | |
| Uses streaming format (delta instead of message) for consistency with streaming responses. | |
| Includes usage with completion_tokens > 0 so client.py recognizes it as a final chunk. | |
| """ | |
| chunk_id = response_id or f"chatcmpl-{uuid.uuid4().hex[:24]}" | |
| # Ensure usage has completion_tokens > 0 for client to recognize as final chunk | |
| if not usage or usage.get("completion_tokens", 0) <= 0: | |
| prompt_tokens = usage.get("prompt_tokens", 0) if usage else 0 | |
| usage = { | |
| "prompt_tokens": prompt_tokens, | |
| "completion_tokens": 1, | |
| "total_tokens": prompt_tokens + 1, | |
| } | |
| return litellm.ModelResponse( | |
| **{ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": { | |
| "role": "assistant", | |
| "content": ( | |
| "[TOOL CALL ERROR] I attempted to call a function but " | |
| "repeatedly produced malformed syntax. This may be a model issue.\n\n" | |
| f"Last error: {error_details}\n\n" | |
| "Please try rephrasing your request or try a different approach." | |
| ), | |
| }, | |
| "finish_reason": "stop", | |
| } | |
| ], | |
| "usage": usage, | |
| } | |
| ) | |
| def _build_fixed_tool_call_response( | |
| self, | |
| model: str, | |
| parsed_call: Dict[str, Any], | |
| error_info: Dict[str, Any], | |
| ) -> Optional[litellm.ModelResponse]: | |
| """ | |
| Build a synthetic valid tool call response from auto-fixed malformed JSON. | |
| When Gemini 3 produces malformed JSON (e.g., unquoted keys), this method | |
| takes the auto-corrected JSON from _analyze_json_error() and builds a | |
| proper OpenAI-format tool call response. | |
| Returns None if the JSON couldn't be fixed. | |
| """ | |
| fixed_json = error_info.get("fixed_json") | |
| if not fixed_json: | |
| return None | |
| # Validate the fixed JSON is actually valid | |
| try: | |
| json.loads(fixed_json) | |
| except json.JSONDecodeError: | |
| return None | |
| tool_name = parsed_call["tool_name"] | |
| tool_id = f"call_{uuid.uuid4().hex[:24]}" | |
| return litellm.ModelResponse( | |
| **{ | |
| "id": f"chatcmpl-{uuid.uuid4().hex[:24]}", | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": { | |
| "role": "assistant", | |
| "content": None, | |
| "tool_calls": [ | |
| { | |
| "id": tool_id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_name, | |
| "arguments": fixed_json, | |
| }, | |
| } | |
| ], | |
| }, | |
| "finish_reason": "tool_calls", | |
| } | |
| ], | |
| } | |
| ) | |
| def _build_fixed_tool_call_chunk( | |
| self, | |
| model: str, | |
| parsed_call: Dict[str, Any], | |
| error_info: Dict[str, Any], | |
| response_id: Optional[str] = None, | |
| usage: Optional[Dict[str, Any]] = None, | |
| ) -> Optional[litellm.ModelResponse]: | |
| """ | |
| Build a streaming chunk with the auto-fixed tool call. | |
| Similar to _build_fixed_tool_call_response but uses streaming format: | |
| - object: "chat.completion.chunk" instead of "chat.completion" | |
| - delta: {...} instead of message: {...} | |
| - tool_calls items include "index" field | |
| Args: | |
| response_id: Optional original response ID to maintain stream continuity | |
| usage: Optional usage from previous chunks. Must include completion_tokens > 0 | |
| for client to recognize this as a final chunk. | |
| Returns None if the JSON couldn't be fixed. | |
| """ | |
| fixed_json = error_info.get("fixed_json") | |
| if not fixed_json: | |
| return None | |
| # Validate the fixed JSON is actually valid | |
| try: | |
| json.loads(fixed_json) | |
| except json.JSONDecodeError: | |
| return None | |
| tool_name = parsed_call["tool_name"] | |
| tool_id = f"call_{uuid.uuid4().hex[:24]}" | |
| # Use original response ID if provided, otherwise generate new one | |
| chunk_id = response_id or f"chatcmpl-{uuid.uuid4().hex[:24]}" | |
| # Ensure usage has completion_tokens > 0 for client to recognize as final chunk | |
| # Client.py's _safe_streaming_wrapper uses completion_tokens > 0 to detect final chunks | |
| if not usage or usage.get("completion_tokens", 0) <= 0: | |
| prompt_tokens = usage.get("prompt_tokens", 0) if usage else 0 | |
| usage = { | |
| "prompt_tokens": prompt_tokens, | |
| "completion_tokens": 1, # Minimum to signal final chunk | |
| "total_tokens": prompt_tokens + 1, | |
| } | |
| return litellm.ModelResponse( | |
| **{ | |
| "id": chunk_id, | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": { | |
| "role": "assistant", | |
| "content": None, | |
| "tool_calls": [ | |
| { | |
| "index": 0, | |
| "id": tool_id, | |
| "type": "function", | |
| "function": { | |
| "name": tool_name, | |
| "arguments": fixed_json, | |
| }, | |
| } | |
| ], | |
| }, | |
| "finish_reason": "tool_calls", | |
| } | |
| ], | |
| "usage": usage, | |
| } | |
| ) | |
| def _translate_tool_choice( | |
| self, tool_choice: Union[str, Dict[str, Any]], model: str = "" | |
| ) -> Optional[Dict[str, Any]]: | |
| """ | |
| Translates OpenAI's `tool_choice` to Gemini's `toolConfig`. | |
| Handles Gemini 3 namespace prefixes for specific tool selection. | |
| """ | |
| if not tool_choice: | |
| return None | |
| config = {} | |
| mode = "AUTO" # Default to auto | |
| is_gemini_3 = self._is_gemini_3(model) | |
| if isinstance(tool_choice, str): | |
| if tool_choice == "auto": | |
| mode = "AUTO" | |
| elif tool_choice == "none": | |
| mode = "NONE" | |
| elif tool_choice == "required": | |
| mode = "ANY" | |
| elif isinstance(tool_choice, dict) and tool_choice.get("type") == "function": | |
| function_name = tool_choice.get("function", {}).get("name") | |
| if function_name: | |
| # Add Gemini 3 prefix if needed (and rename problematic tools) | |
| if is_gemini_3 and self._enable_gemini3_tool_fix: | |
| function_name = GEMINI3_TOOL_RENAMES.get( | |
| function_name, function_name | |
| ) | |
| function_name = f"{self._gemini3_tool_prefix}{function_name}" | |
| mode = "ANY" # Force a call, but only to this function | |
| config["functionCallingConfig"] = { | |
| "mode": mode, | |
| "allowedFunctionNames": [function_name], | |
| } | |
| return config | |
| config["functionCallingConfig"] = {"mode": mode} | |
| return config | |
| # ========================================================================= | |
| # REQUEST TRANSFORMATION | |
| # ========================================================================= | |
| def _build_tools_payload( | |
| self, tools: Optional[List[Dict[str, Any]]], model: str | |
| ) -> Optional[List[Dict[str, Any]]]: | |
| """Build Gemini-format tools from OpenAI tools. | |
| For Gemini models, all tools are placed in a SINGLE functionDeclarations array. | |
| This matches the format expected by Gemini CLI and prevents MALFORMED_FUNCTION_CALL errors. | |
| """ | |
| if not tools: | |
| return None | |
| function_declarations = [] | |
| for tool in tools: | |
| if tool.get("type") != "function": | |
| continue | |
| func = tool.get("function", {}) | |
| params = func.get("parameters") | |
| func_decl = { | |
| "name": func.get("name", ""), | |
| "description": func.get("description", ""), | |
| } | |
| if params and isinstance(params, dict): | |
| schema = dict(params) | |
| schema.pop("strict", None) | |
| # Inline $ref definitions, then strip unsupported keywords | |
| schema = _inline_schema_refs(schema) | |
| # For Gemini models, use for_gemini=True to: | |
| # - Preserve truthy additionalProperties (for freeform param objects) | |
| # - Strip false values (let _enforce_strict_schema add them) | |
| is_gemini = not self._is_claude(model) | |
| schema = _clean_claude_schema(schema, for_gemini=is_gemini) | |
| schema = _normalize_type_arrays(schema) | |
| # Workaround: Antigravity/Gemini fails to emit functionCall | |
| # when tool has empty properties {}. Inject a dummy optional | |
| # parameter to ensure the tool call is emitted. | |
| # Using a required confirmation parameter forces the model to | |
| # commit to the tool call rather than just thinking about it. | |
| props = schema.get("properties", {}) | |
| if not props: | |
| schema["properties"] = { | |
| "_confirm": { | |
| "type": "string", | |
| "description": "Enter 'yes' to proceed", | |
| } | |
| } | |
| schema["required"] = ["_confirm"] | |
| func_decl["parametersJsonSchema"] = schema | |
| else: | |
| # No parameters provided - use default with required confirm param | |
| # to ensure the tool call is emitted properly | |
| func_decl["parametersJsonSchema"] = { | |
| "type": "object", | |
| "properties": { | |
| "_confirm": { | |
| "type": "string", | |
| "description": "Enter 'yes' to proceed", | |
| } | |
| }, | |
| "required": ["_confirm"], | |
| } | |
| function_declarations.append(func_decl) | |
| if not function_declarations: | |
| return None | |
| # Return all tools in a SINGLE functionDeclarations array | |
| # This is the format Gemini CLI uses and prevents MALFORMED_FUNCTION_CALL errors | |
| return [{"functionDeclarations": function_declarations}] | |
| def _transform_to_antigravity_format( | |
| self, | |
| gemini_payload: Dict[str, Any], | |
| model: str, | |
| project_id: str, | |
| max_tokens: Optional[int] = None, | |
| reasoning_effort: Optional[str] = None, | |
| tool_choice: Optional[Union[str, Dict[str, Any]]] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Transform Gemini CLI payload to complete Antigravity format. | |
| Args: | |
| gemini_payload: Request in Gemini CLI format | |
| model: Model name (public alias) | |
| max_tokens: Max output tokens (including thinking) | |
| reasoning_effort: Reasoning effort level (determines -thinking variant for Claude) | |
| """ | |
| internal_model = self._alias_to_internal(model) | |
| # Map Claude models to their -thinking variant | |
| # claude-opus-4-5: ALWAYS use -thinking (non-thinking variant doesn't exist) | |
| # claude-sonnet-4-5: only use -thinking when reasoning_effort is provided | |
| if self._is_claude(internal_model) and not internal_model.endswith("-thinking"): | |
| if internal_model == "claude-opus-4-5": | |
| # Opus 4.5 ALWAYS requires -thinking variant | |
| internal_model = "claude-opus-4-5-thinking" | |
| elif internal_model == "claude-sonnet-4-5" and reasoning_effort: | |
| # Sonnet 4.5 uses -thinking only when reasoning_effort is provided | |
| internal_model = "claude-sonnet-4-5-thinking" | |
| # Map gemini-2.5-flash to -thinking variant when reasoning_effort is provided | |
| if internal_model == "gemini-2.5-flash" and reasoning_effort: | |
| internal_model = "gemini-2.5-flash-thinking" | |
| # Map gemini-3-pro-preview to -low/-high variant based on thinking config | |
| if model == "gemini-3-pro-preview" or internal_model == "gemini-3-pro-preview": | |
| # Check thinking config to determine variant | |
| thinking_config = gemini_payload.get("generationConfig", {}).get( | |
| "thinkingConfig", {} | |
| ) | |
| thinking_level = thinking_config.get("thinkingLevel", "high") | |
| if thinking_level == "low": | |
| internal_model = "gemini-3-pro-low" | |
| else: | |
| internal_model = "gemini-3-pro-high" | |
| # Wrap in Antigravity envelope | |
| antigravity_payload = { | |
| "project": project_id, # Will be passed as parameter | |
| "userAgent": "antigravity", | |
| "requestId": _generate_request_id(), | |
| "model": internal_model, | |
| "request": copy.deepcopy(gemini_payload), | |
| } | |
| # Add session ID | |
| antigravity_payload["request"]["sessionId"] = _generate_session_id() | |
| # Add default safety settings to prevent content filtering | |
| # Only add if not already present in the payload | |
| if "safetySettings" not in antigravity_payload["request"]: | |
| antigravity_payload["request"]["safetySettings"] = copy.deepcopy( | |
| DEFAULT_SAFETY_SETTINGS | |
| ) | |
| # Handle max_tokens - only apply to Claude, or if explicitly set for others | |
| gen_config = antigravity_payload["request"].get("generationConfig", {}) | |
| is_claude = self._is_claude(model) | |
| if max_tokens is not None: | |
| # Explicitly set in request - apply to all models | |
| gen_config["maxOutputTokens"] = max_tokens | |
| elif is_claude: | |
| # Claude model without explicit max_tokens - use default | |
| gen_config["maxOutputTokens"] = DEFAULT_MAX_OUTPUT_TOKENS | |
| # For non-Claude models without explicit max_tokens, don't set it | |
| antigravity_payload["request"]["generationConfig"] = gen_config | |
| # Set toolConfig based on tool_choice parameter | |
| tool_config_result = self._translate_tool_choice(tool_choice, model) | |
| if tool_config_result: | |
| antigravity_payload["request"]["toolConfig"] = tool_config_result | |
| else: | |
| # Default to AUTO if no tool_choice specified | |
| tool_config = antigravity_payload["request"].setdefault("toolConfig", {}) | |
| func_config = tool_config.setdefault("functionCallingConfig", {}) | |
| func_config["mode"] = "AUTO" | |
| # Handle Gemini 3 thinking logic | |
| if not internal_model.startswith("gemini-3-"): | |
| thinking_config = gen_config.get("thinkingConfig", {}) | |
| if "thinkingLevel" in thinking_config: | |
| del thinking_config["thinkingLevel"] | |
| thinking_config["thinkingBudget"] = -1 | |
| # Ensure first function call in each model message has a thoughtSignature for Gemini 3 | |
| # Per Gemini docs: Only the FIRST parallel function call gets a signature | |
| if internal_model.startswith("gemini-3-"): | |
| for content in antigravity_payload["request"].get("contents", []): | |
| if content.get("role") == "model": | |
| first_func_seen = False | |
| for part in content.get("parts", []): | |
| if "functionCall" in part: | |
| if not first_func_seen: | |
| # First function call in this message - needs a signature | |
| if "thoughtSignature" not in part: | |
| part["thoughtSignature"] = ( | |
| "skip_thought_signature_validator" | |
| ) | |
| first_func_seen = True | |
| # Subsequent parallel calls: leave as-is (no signature) | |
| # Claude-specific tool schema transformation | |
| if internal_model.startswith("claude-sonnet-") or internal_model.startswith( | |
| "claude-opus-" | |
| ): | |
| self._apply_claude_tool_transform(antigravity_payload) | |
| return antigravity_payload | |
| def _apply_claude_tool_transform(self, payload: Dict[str, Any]) -> None: | |
| """Apply Claude-specific tool schema transformations. | |
| Converts parametersJsonSchema to parameters and applies Claude-specific | |
| schema sanitization (inlines $ref, removes unsupported JSON Schema fields). | |
| """ | |
| tools = payload["request"].get("tools", []) | |
| for tool in tools: | |
| for func_decl in tool.get("functionDeclarations", []): | |
| if "parametersJsonSchema" in func_decl: | |
| params = func_decl["parametersJsonSchema"] | |
| if isinstance(params, dict): | |
| params = _inline_schema_refs(params) | |
| params = _clean_claude_schema(params) | |
| func_decl["parameters"] = params | |
| del func_decl["parametersJsonSchema"] | |
| # ========================================================================= | |
| # RESPONSE TRANSFORMATION | |
| # ========================================================================= | |
| def _unwrap_response(self, response: Dict[str, Any]) -> Dict[str, Any]: | |
| """Extract Gemini response from Antigravity envelope.""" | |
| return response.get("response", response) | |
| def _gemini_to_openai_chunk( | |
| self, | |
| chunk: Dict[str, Any], | |
| model: str, | |
| accumulator: Optional[Dict[str, Any]] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Convert Gemini response chunk to OpenAI streaming format. | |
| Args: | |
| chunk: Gemini API response chunk | |
| model: Model name | |
| accumulator: Optional dict to accumulate data for post-processing | |
| """ | |
| candidates = chunk.get("candidates", []) | |
| if not candidates: | |
| return {} | |
| candidate = candidates[0] | |
| content_parts = candidate.get("content", {}).get("parts", []) | |
| text_content = "" | |
| reasoning_content = "" | |
| tool_calls = [] | |
| # Use accumulator's tool_idx if available, otherwise use local counter | |
| tool_idx = accumulator.get("tool_idx", 0) if accumulator else 0 | |
| for part in content_parts: | |
| has_func = "functionCall" in part | |
| has_text = "text" in part | |
| has_sig = bool(part.get("thoughtSignature")) | |
| is_thought = ( | |
| part.get("thought") is True | |
| or str(part.get("thought")).lower() == "true" | |
| ) | |
| # Accumulate signature for Claude caching | |
| if has_sig and is_thought and accumulator is not None: | |
| accumulator["thought_signature"] = part["thoughtSignature"] | |
| # Skip standalone signature parts | |
| if has_sig and not has_func and (not has_text or not part.get("text")): | |
| continue | |
| if has_text: | |
| text = part["text"] | |
| if is_thought: | |
| reasoning_content += text | |
| if accumulator is not None: | |
| accumulator["reasoning_content"] += text | |
| else: | |
| text_content += text | |
| if accumulator is not None: | |
| accumulator["text_content"] += text | |
| if has_func: | |
| # Get tool_schemas from accumulator for schema-aware parsing | |
| tool_schemas = accumulator.get("tool_schemas") if accumulator else None | |
| tool_call = self._extract_tool_call( | |
| part, model, tool_idx, accumulator, tool_schemas | |
| ) | |
| # Store signature for each tool call (needed for parallel tool calls) | |
| if has_sig: | |
| self._handle_tool_signature(tool_call, part["thoughtSignature"]) | |
| tool_calls.append(tool_call) | |
| tool_idx += 1 | |
| # Build delta | |
| delta = {} | |
| if text_content: | |
| delta["content"] = text_content | |
| if reasoning_content: | |
| delta["reasoning_content"] = reasoning_content | |
| if tool_calls: | |
| delta["tool_calls"] = tool_calls | |
| delta["role"] = "assistant" | |
| # Update tool_idx for next chunk | |
| if accumulator is not None: | |
| accumulator["tool_idx"] = tool_idx | |
| elif text_content or reasoning_content: | |
| delta["role"] = "assistant" | |
| # Build usage if present | |
| usage = self._build_usage(chunk.get("usageMetadata", {})) | |
| # Store last received usage for final chunk | |
| if usage and accumulator is not None: | |
| accumulator["last_usage"] = usage | |
| # Mark completion when we see usageMetadata | |
| if chunk.get("usageMetadata") and accumulator is not None: | |
| accumulator["is_complete"] = True | |
| # Build choice - just translate, don't include finish_reason | |
| # Client will handle finish_reason logic | |
| choice = {"index": 0, "delta": delta} | |
| response = { | |
| "id": chunk.get("responseId", f"chatcmpl-{uuid.uuid4().hex[:24]}"), | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [choice], | |
| } | |
| if usage: | |
| response["usage"] = usage | |
| return response | |
| def _gemini_to_openai_non_streaming( | |
| self, | |
| response: Dict[str, Any], | |
| model: str, | |
| tool_schemas: Optional[Dict[str, Dict[str, Any]]] = None, | |
| ) -> Dict[str, Any]: | |
| """Convert Gemini response to OpenAI non-streaming format.""" | |
| candidates = response.get("candidates", []) | |
| if not candidates: | |
| return {} | |
| candidate = candidates[0] | |
| content_parts = candidate.get("content", {}).get("parts", []) | |
| text_content = "" | |
| reasoning_content = "" | |
| tool_calls = [] | |
| thought_sig = "" | |
| for part in content_parts: | |
| has_func = "functionCall" in part | |
| has_text = "text" in part | |
| has_sig = bool(part.get("thoughtSignature")) | |
| is_thought = ( | |
| part.get("thought") is True | |
| or str(part.get("thought")).lower() == "true" | |
| ) | |
| if has_sig and is_thought: | |
| thought_sig = part["thoughtSignature"] | |
| if has_sig and not has_func and (not has_text or not part.get("text")): | |
| continue | |
| if has_text: | |
| if is_thought: | |
| reasoning_content += part["text"] | |
| else: | |
| text_content += part["text"] | |
| if has_func: | |
| tool_call = self._extract_tool_call( | |
| part, model, len(tool_calls), tool_schemas=tool_schemas | |
| ) | |
| # Store signature for each tool call (needed for parallel tool calls) | |
| if has_sig: | |
| self._handle_tool_signature(tool_call, part["thoughtSignature"]) | |
| tool_calls.append(tool_call) | |
| # Cache Claude thinking | |
| if ( | |
| reasoning_content | |
| and self._is_claude(model) | |
| and self._enable_signature_cache | |
| ): | |
| self._cache_thinking( | |
| reasoning_content, thought_sig, text_content, tool_calls | |
| ) | |
| # Build message | |
| message = {"role": "assistant"} | |
| if text_content: | |
| message["content"] = text_content | |
| elif not tool_calls: | |
| message["content"] = "" | |
| if reasoning_content: | |
| message["reasoning_content"] = reasoning_content | |
| if tool_calls: | |
| message["tool_calls"] = tool_calls | |
| message.pop("content", None) | |
| finish_reason = self._map_finish_reason( | |
| candidate.get("finishReason"), bool(tool_calls) | |
| ) | |
| usage = self._build_usage(response.get("usageMetadata", {})) | |
| # For non-streaming, always include finish_reason (should always be present) | |
| result = { | |
| "id": response.get("responseId", f"chatcmpl-{uuid.uuid4().hex[:24]}"), | |
| "object": "chat.completion", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "message": message, | |
| "finish_reason": finish_reason or "stop", | |
| } | |
| ], | |
| } | |
| if usage: | |
| result["usage"] = usage | |
| return result | |
| def _build_tool_schema_map( | |
| self, tools: Optional[List[Dict[str, Any]]], model: str | |
| ) -> Dict[str, Dict[str, Any]]: | |
| """ | |
| Build a mapping of tool name -> parameter schema from tools payload. | |
| Used for schema-aware JSON string parsing to avoid corrupting | |
| string content that looks like JSON (e.g., write tool's content field). | |
| """ | |
| if not tools: | |
| return {} | |
| schema_map = {} | |
| for tool in tools: | |
| for func_decl in tool.get("functionDeclarations", []): | |
| name = func_decl.get("name", "") | |
| # Strip gemini3 prefix if applicable | |
| if self._is_gemini_3(model) and self._enable_gemini3_tool_fix: | |
| name = self._strip_gemini3_prefix(name) | |
| schema = func_decl.get("parametersJsonSchema", {}) | |
| if name and schema: | |
| schema_map[name] = schema | |
| return schema_map | |
| def _extract_tool_call( | |
| self, | |
| part: Dict[str, Any], | |
| model: str, | |
| index: int, | |
| accumulator: Optional[Dict[str, Any]] = None, | |
| tool_schemas: Optional[Dict[str, Dict[str, Any]]] = None, | |
| ) -> Dict[str, Any]: | |
| """Extract and format a tool call from a response part.""" | |
| func_call = part["functionCall"] | |
| tool_id = func_call.get("id") or f"call_{uuid.uuid4().hex[:24]}" | |
| # lib_logger.debug(f"[ID Extraction] Extracting tool call: id={tool_id}, raw_id={func_call.get('id')}") | |
| tool_name = func_call.get("name", "") | |
| if self._is_gemini_3(model) and self._enable_gemini3_tool_fix: | |
| tool_name = self._strip_gemini3_prefix(tool_name) | |
| raw_args = func_call.get("args", {}) | |
| # Optionally parse JSON strings (handles escaped control chars, malformed JSON) | |
| # NOTE: Gemini 3 sometimes returns stringified arrays for array parameters | |
| # (e.g., batch, todowrite). Schema-aware parsing prevents corrupting string | |
| # content that looks like JSON (e.g., write tool's content field). | |
| if self._enable_json_string_parsing: | |
| # Get schema for this tool if available | |
| tool_schema = tool_schemas.get(tool_name) if tool_schemas else None | |
| parsed_args = _recursively_parse_json_strings( | |
| raw_args, schema=tool_schema, parse_json_objects=True | |
| ) | |
| else: | |
| parsed_args = raw_args | |
| # Strip the injected _confirm parameter ONLY if it's the sole parameter | |
| # This ensures we only strip our injection, not legitimate user params | |
| if isinstance(parsed_args, dict) and "_confirm" in parsed_args: | |
| if len(parsed_args) == 1: | |
| # _confirm is the only param - this was our injection | |
| parsed_args.pop("_confirm") | |
| tool_call = { | |
| "id": tool_id, | |
| "type": "function", | |
| "index": index, | |
| "function": {"name": tool_name, "arguments": json.dumps(parsed_args)}, | |
| } | |
| if accumulator is not None: | |
| accumulator["tool_calls"].append(tool_call) | |
| return tool_call | |
| def _handle_tool_signature(self, tool_call: Dict, signature: str) -> None: | |
| """Handle thoughtSignature for a tool call.""" | |
| tool_id = tool_call["id"] | |
| if self._enable_signature_cache: | |
| self._signature_cache.store(tool_id, signature) | |
| lib_logger.debug(f"Stored signature for {tool_id}") | |
| if self._preserve_signatures_in_client: | |
| tool_call["thought_signature"] = signature | |
| def _map_finish_reason( | |
| self, gemini_reason: Optional[str], has_tool_calls: bool | |
| ) -> Optional[str]: | |
| """Map Gemini finish reason to OpenAI format.""" | |
| if not gemini_reason: | |
| return None | |
| reason = FINISH_REASON_MAP.get(gemini_reason, "stop") | |
| return "tool_calls" if has_tool_calls else reason | |
| def _build_usage(self, metadata: Dict[str, Any]) -> Optional[Dict[str, Any]]: | |
| """Build usage dict from Gemini usage metadata.""" | |
| if not metadata: | |
| return None | |
| prompt = metadata.get("promptTokenCount", 0) | |
| thoughts = metadata.get("thoughtsTokenCount", 0) | |
| completion = metadata.get("candidatesTokenCount", 0) | |
| usage = { | |
| "prompt_tokens": prompt + thoughts, | |
| "completion_tokens": completion, | |
| "total_tokens": metadata.get("totalTokenCount", 0), | |
| } | |
| if thoughts > 0: | |
| usage["completion_tokens_details"] = {"reasoning_tokens": thoughts} | |
| return usage | |
| def _cache_thinking( | |
| self, reasoning: str, signature: str, text: str, tool_calls: List[Dict] | |
| ) -> None: | |
| """Cache Claude thinking content.""" | |
| cache_key = self._generate_thinking_cache_key(text, tool_calls) | |
| if not cache_key: | |
| return | |
| data = { | |
| "thinking_text": reasoning, | |
| "thought_signature": signature, | |
| "text_preview": text[:100] if text else "", | |
| "tool_ids": [tc.get("id", "") for tc in tool_calls], | |
| "timestamp": time.time(), | |
| } | |
| self._thinking_cache.store(cache_key, json.dumps(data)) | |
| lib_logger.debug(f"Cached thinking: {cache_key[:50]}...") | |
| # ========================================================================= | |
| # PROVIDER INTERFACE IMPLEMENTATION | |
| # ========================================================================= | |
| async def get_valid_token(self, credential_identifier: str) -> str: | |
| """Get a valid access token for the credential.""" | |
| creds = await self._load_credentials(credential_identifier) | |
| if self._is_token_expired(creds): | |
| creds = await self._refresh_token(credential_identifier, creds) | |
| return creds["access_token"] | |
| def has_custom_logic(self) -> bool: | |
| """Antigravity uses custom translation logic.""" | |
| return True | |
| async def get_auth_header(self, credential_identifier: str) -> Dict[str, str]: | |
| """Get OAuth authorization header.""" | |
| token = await self.get_valid_token(credential_identifier) | |
| return {"Authorization": f"Bearer {token}"} | |
| async def get_models(self, api_key: str, client: httpx.AsyncClient) -> List[str]: | |
| """Fetch available models from Antigravity.""" | |
| if not self._enable_dynamic_models: | |
| lib_logger.debug("Using hardcoded model list") | |
| return [f"antigravity/{m}" for m in AVAILABLE_MODELS] | |
| try: | |
| token = await self.get_valid_token(api_key) | |
| url = f"{self._get_base_url()}/fetchAvailableModels" | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json", | |
| **ANTIGRAVITY_HEADERS, | |
| } | |
| payload = { | |
| "project": _generate_project_id(), | |
| "requestId": _generate_request_id(), | |
| "userAgent": "antigravity", | |
| } | |
| response = await client.post( | |
| url, json=payload, headers=headers, timeout=30.0 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| models = [] | |
| for model_info in data.get("models", []): | |
| internal = model_info.get("name", "").replace("models/", "") | |
| if internal: | |
| public = self._internal_to_alias(internal) | |
| if public: | |
| models.append(f"antigravity/{public}") | |
| if models: | |
| lib_logger.info(f"Discovered {len(models)} models") | |
| return models | |
| except Exception as e: | |
| lib_logger.warning(f"Dynamic model discovery failed: {e}") | |
| return [f"antigravity/{m}" for m in AVAILABLE_MODELS] | |
| async def acompletion( | |
| self, client: httpx.AsyncClient, **kwargs | |
| ) -> Union[litellm.ModelResponse, AsyncGenerator[litellm.ModelResponse, None]]: | |
| """ | |
| Handle completion requests for Antigravity. | |
| Main entry point that: | |
| 1. Extracts parameters and transforms messages | |
| 2. Builds Antigravity request payload | |
| 3. Makes API call with fallback logic | |
| 4. Transforms response to OpenAI format | |
| """ | |
| # Extract parameters | |
| model = self._strip_provider_prefix(kwargs.get("model", "gemini-2.5-pro")) | |
| messages = kwargs.get("messages", []) | |
| stream = kwargs.get("stream", False) | |
| credential_path = kwargs.pop("credential_identifier", kwargs.get("api_key", "")) | |
| tools = kwargs.get("tools") | |
| tool_choice = kwargs.get("tool_choice") | |
| reasoning_effort = kwargs.get("reasoning_effort") | |
| top_p = kwargs.get("top_p") | |
| temperature = kwargs.get("temperature") | |
| max_tokens = kwargs.get("max_tokens") | |
| custom_budget = kwargs.get("custom_reasoning_budget", False) | |
| enable_logging = kwargs.pop("enable_request_logging", False) | |
| # Create logger | |
| file_logger = AntigravityFileLogger(model, enable_logging) | |
| # Determine if thinking is enabled for this request | |
| # Thinking is enabled if reasoning_effort is set (and not "disable") for Claude | |
| thinking_enabled = False | |
| if self._is_claude(model): | |
| # For Claude, thinking is enabled when reasoning_effort is provided and not "disable" | |
| thinking_enabled = ( | |
| reasoning_effort is not None and reasoning_effort != "disable" | |
| ) | |
| # Transform messages to Gemini format FIRST | |
| # This restores thinking from cache if reasoning_content was stripped by client | |
| system_instruction, gemini_contents = self._transform_messages(messages, model) | |
| gemini_contents = self._fix_tool_response_grouping(gemini_contents) | |
| # Sanitize thinking blocks for Claude AFTER transformation | |
| # Now we can see the full picture including cached thinking that was restored | |
| # This handles: context compression, model switching, mid-turn thinking toggle | |
| force_disable_thinking = False | |
| if self._is_claude(model) and self._enable_thinking_sanitization: | |
| gemini_contents, force_disable_thinking = ( | |
| self._sanitize_thinking_for_claude(gemini_contents, thinking_enabled) | |
| ) | |
| # If we're in a mid-turn thinking toggle situation, we MUST disable thinking | |
| # for this request. Thinking will naturally resume on the next turn. | |
| if force_disable_thinking: | |
| thinking_enabled = False | |
| reasoning_effort = "disable" # Force disable for this request | |
| # Build payload | |
| gemini_payload = {"contents": gemini_contents} | |
| if system_instruction: | |
| gemini_payload["system_instruction"] = system_instruction | |
| # Inject tool usage hardening system instructions | |
| if tools: | |
| if self._is_gemini_3(model) and self._enable_gemini3_tool_fix: | |
| self._inject_tool_hardening_instruction( | |
| gemini_payload, self._gemini3_system_instruction | |
| ) | |
| elif self._is_claude(model) and self._enable_claude_tool_fix: | |
| self._inject_tool_hardening_instruction( | |
| gemini_payload, self._claude_system_instruction | |
| ) | |
| # Inject parallel tool usage encouragement (independent of tool hardening) | |
| if self._is_claude(model) and self._enable_parallel_tool_instruction_claude: | |
| self._inject_tool_hardening_instruction( | |
| gemini_payload, self._parallel_tool_instruction | |
| ) | |
| elif ( | |
| self._is_gemini_3(model) | |
| and self._enable_parallel_tool_instruction_gemini3 | |
| ): | |
| self._inject_tool_hardening_instruction( | |
| gemini_payload, self._parallel_tool_instruction | |
| ) | |
| # Add generation config | |
| gen_config = {} | |
| if top_p is not None: | |
| gen_config["topP"] = top_p | |
| # Handle temperature - Gemini 3 defaults to 1 if not explicitly set | |
| if temperature is not None: | |
| gen_config["temperature"] = temperature | |
| elif self._is_gemini_3(model): | |
| # Gemini 3 performs better with temperature=1 for tool use | |
| gen_config["temperature"] = 1.0 | |
| thinking_config = self._get_thinking_config( | |
| reasoning_effort, model, custom_budget | |
| ) | |
| if thinking_config: | |
| gen_config.setdefault("thinkingConfig", {}).update(thinking_config) | |
| if gen_config: | |
| gemini_payload["generationConfig"] = gen_config | |
| # Add tools | |
| gemini_tools = self._build_tools_payload(tools, model) | |
| if gemini_tools: | |
| gemini_payload["tools"] = gemini_tools | |
| # Apply tool transformations | |
| if self._is_gemini_3(model) and self._enable_gemini3_tool_fix: | |
| # Gemini 3: namespace prefix + strict schema + parameter signatures | |
| gemini_payload["tools"] = self._apply_gemini3_namespace( | |
| gemini_payload["tools"] | |
| ) | |
| if self._gemini3_enforce_strict_schema: | |
| gemini_payload["tools"] = self._enforce_strict_schema( | |
| gemini_payload["tools"] | |
| ) | |
| gemini_payload["tools"] = self._inject_signature_into_descriptions( | |
| gemini_payload["tools"], self._gemini3_description_prompt | |
| ) | |
| elif self._is_claude(model) and self._enable_claude_tool_fix: | |
| # Claude: parameter signatures only (no namespace prefix) | |
| gemini_payload["tools"] = self._inject_signature_into_descriptions( | |
| gemini_payload["tools"], self._claude_description_prompt | |
| ) | |
| # Get access token first (needed for project discovery) | |
| token = await self.get_valid_token(credential_path) | |
| # Discover real project ID | |
| litellm_params = kwargs.get("litellm_params", {}) or {} | |
| project_id = await self._discover_project_id( | |
| credential_path, token, litellm_params | |
| ) | |
| # Transform to Antigravity format with real project ID | |
| payload = self._transform_to_antigravity_format( | |
| gemini_payload, model, project_id, max_tokens, reasoning_effort, tool_choice | |
| ) | |
| file_logger.log_request(payload) | |
| # Pre-build tool schema map for malformed call handling | |
| # This maps original tool names (without prefix) to their schemas | |
| tool_schemas = self._build_tool_schema_map(gemini_payload.get("tools"), model) | |
| # Make API call | |
| base_url = self._get_base_url() | |
| endpoint = ":streamGenerateContent" if stream else ":generateContent" | |
| url = f"{base_url}{endpoint}" | |
| if stream: | |
| url = f"{url}?alt=sse" | |
| # These headers are REQUIRED for gemini-3-pro-high/low to work | |
| # Without X-Goog-Api-Client and Client-Metadata, only gemini-3-pro-preview works | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json", | |
| "Accept": "text/event-stream" if stream else "application/json", | |
| **ANTIGRAVITY_HEADERS, | |
| } | |
| # Track malformed call retries (separate from empty response retries) | |
| malformed_retry_count = 0 | |
| # Keep a mutable reference to gemini_contents for retry injection | |
| current_gemini_contents = gemini_contents | |
| # URL fallback loop - handles HTTP errors (except 429) and network errors | |
| # by switching to fallback URLs. Empty response retry is handled separately | |
| # inside _streaming_with_retry (streaming) or the inner loop (non-streaming). | |
| while True: | |
| try: | |
| if stream: | |
| # Streaming: _streaming_with_retry handles empty response retries internally | |
| return self._streaming_with_retry( | |
| client, | |
| url, | |
| headers, | |
| payload, | |
| model, | |
| file_logger, | |
| tool_schemas, | |
| current_gemini_contents, | |
| gemini_payload, | |
| project_id, | |
| max_tokens, | |
| reasoning_effort, | |
| tool_choice, | |
| ) | |
| else: | |
| # Non-streaming: empty response, bare 429, and malformed call retry | |
| empty_error_msg = ( | |
| "The model returned an empty response after multiple attempts. " | |
| "This may indicate a temporary service issue. Please try again." | |
| ) | |
| transient_429_msg = ( | |
| "The model returned transient 429 errors after multiple attempts. " | |
| "This may indicate a temporary service issue. Please try again." | |
| ) | |
| for attempt in range(EMPTY_RESPONSE_MAX_ATTEMPTS): | |
| try: | |
| result = await self._handle_non_streaming( | |
| client, | |
| url, | |
| headers, | |
| payload, | |
| model, | |
| file_logger, | |
| ) | |
| # Check if we got anything - empty dict means no candidates | |
| result_dict = ( | |
| result.model_dump() | |
| if hasattr(result, "model_dump") | |
| else dict(result) | |
| ) | |
| got_response = bool(result_dict.get("choices")) | |
| if not got_response: | |
| if attempt < EMPTY_RESPONSE_MAX_ATTEMPTS - 1: | |
| lib_logger.warning( | |
| f"[Antigravity] Empty response from {model}, " | |
| f"attempt {attempt + 1}/{EMPTY_RESPONSE_MAX_ATTEMPTS}. Retrying..." | |
| ) | |
| await asyncio.sleep(EMPTY_RESPONSE_RETRY_DELAY) | |
| continue | |
| else: | |
| # Last attempt failed - raise without extra logging | |
| # (caller will log the error) | |
| raise EmptyResponseError( | |
| provider="antigravity", | |
| model=model, | |
| message=empty_error_msg, | |
| ) | |
| return result | |
| except _MalformedFunctionCallDetected as e: | |
| # Handle MALFORMED_FUNCTION_CALL - try auto-fix first | |
| parsed = self._parse_malformed_call_message( | |
| e.finish_message, model | |
| ) | |
| if parsed: | |
| # Try to auto-fix the malformed JSON | |
| error_info = self._analyze_json_error( | |
| parsed["raw_args"] | |
| ) | |
| if error_info.get("fixed_json"): | |
| # Auto-fix successful - build synthetic response | |
| lib_logger.info( | |
| f"[Antigravity] Auto-fixed malformed function call for " | |
| f"'{parsed['tool_name']}' from {model}" | |
| ) | |
| # Log the auto-fix details | |
| if file_logger: | |
| file_logger.log_malformed_autofix( | |
| parsed["tool_name"], | |
| parsed["raw_args"], | |
| error_info["fixed_json"], | |
| ) | |
| fixed_response = ( | |
| self._build_fixed_tool_call_response( | |
| model, parsed, error_info | |
| ) | |
| ) | |
| if fixed_response: | |
| return fixed_response | |
| # Auto-fix failed - retry by asking model to fix its JSON | |
| # Each retry response will also attempt auto-fix first | |
| if malformed_retry_count < MALFORMED_CALL_MAX_RETRIES: | |
| malformed_retry_count += 1 | |
| lib_logger.warning( | |
| f"[Antigravity] MALFORMED_FUNCTION_CALL from {model}, " | |
| f"retry {malformed_retry_count}/{MALFORMED_CALL_MAX_RETRIES}: " | |
| f"{e.finish_message[:100]}..." | |
| ) | |
| if parsed: | |
| # Get schema for the failed tool | |
| tool_schema = tool_schemas.get(parsed["tool_name"]) | |
| # Build corrective messages | |
| assistant_msg, user_msg = ( | |
| self._build_malformed_call_retry_messages( | |
| parsed, tool_schema | |
| ) | |
| ) | |
| # Inject into conversation | |
| current_gemini_contents = list( | |
| current_gemini_contents | |
| ) | |
| current_gemini_contents.append(assistant_msg) | |
| current_gemini_contents.append(user_msg) | |
| # Rebuild payload with modified contents | |
| gemini_payload_copy = copy.deepcopy(gemini_payload) | |
| gemini_payload_copy["contents"] = ( | |
| current_gemini_contents | |
| ) | |
| payload = self._transform_to_antigravity_format( | |
| gemini_payload_copy, | |
| model, | |
| project_id, | |
| max_tokens, | |
| reasoning_effort, | |
| tool_choice, | |
| ) | |
| # Log the retry request in the same folder | |
| if file_logger: | |
| file_logger.log_malformed_retry_request( | |
| malformed_retry_count, payload | |
| ) | |
| await asyncio.sleep(MALFORMED_CALL_RETRY_DELAY) | |
| break # Break inner loop to retry with modified payload | |
| else: | |
| # Auto-fix failed and retries disabled/exceeded - return fallback | |
| lib_logger.warning( | |
| f"[Antigravity] MALFORMED_FUNCTION_CALL could not be auto-fixed " | |
| f"for {model}: {e.finish_message[:100]}..." | |
| ) | |
| return self._build_malformed_fallback_response( | |
| model, e.finish_message | |
| ) | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code == 429: | |
| # Check if this is a bare 429 (no retry info) vs real quota exhaustion | |
| quota_info = self.parse_quota_error(e) | |
| if quota_info is None: | |
| # Bare 429 - retry like empty response | |
| if attempt < EMPTY_RESPONSE_MAX_ATTEMPTS - 1: | |
| lib_logger.warning( | |
| f"[Antigravity] Bare 429 from {model}, " | |
| f"attempt {attempt + 1}/{EMPTY_RESPONSE_MAX_ATTEMPTS}. Retrying..." | |
| ) | |
| await asyncio.sleep(EMPTY_RESPONSE_RETRY_DELAY) | |
| continue | |
| else: | |
| # Last attempt failed - raise TransientQuotaError to rotate | |
| raise TransientQuotaError( | |
| provider="antigravity", | |
| model=model, | |
| message=transient_429_msg, | |
| ) | |
| # Has retry info - real quota exhaustion, propagate for cooldown | |
| lib_logger.debug( | |
| f"429 with retry info - propagating for cooldown: {e}" | |
| ) | |
| # Re-raise all HTTP errors (429 with retry info, or other errors) | |
| raise | |
| else: | |
| # For loop completed normally (no break) - should not happen | |
| # This means we exhausted EMPTY_RESPONSE_MAX_ATTEMPTS without success | |
| lib_logger.error( | |
| f"[Antigravity] Unexpected exit from retry loop for {model}" | |
| ) | |
| raise EmptyResponseError( | |
| provider="antigravity", | |
| model=model, | |
| message=empty_error_msg, | |
| ) | |
| # If we broke out of the for loop (malformed retry), continue while loop | |
| continue | |
| except httpx.HTTPStatusError as e: | |
| # 429 = Rate limit/quota exhausted - tied to credential, not URL | |
| # Do NOT retry on different URL, just raise immediately | |
| if e.response.status_code == 429: | |
| lib_logger.debug( | |
| f"429 quota error - not retrying on fallback URL: {e}" | |
| ) | |
| raise | |
| # Other HTTP errors (403, 500, etc.) - try fallback URL | |
| if self._try_next_base_url(): | |
| lib_logger.warning(f"Retrying with fallback URL: {e}") | |
| url = f"{self._get_base_url()}{endpoint}" | |
| if stream: | |
| url = f"{url}?alt=sse" | |
| continue # Retry with new URL | |
| raise # No more fallback URLs | |
| except (EmptyResponseError, TransientQuotaError): | |
| # Already retried internally - don't catch, propagate for credential rotation | |
| raise | |
| except Exception as e: | |
| # Non-HTTP errors (network issues, timeouts, etc.) - try fallback URL | |
| if self._try_next_base_url(): | |
| lib_logger.warning(f"Retrying with fallback URL: {e}") | |
| url = f"{self._get_base_url()}{endpoint}" | |
| if stream: | |
| url = f"{url}?alt=sse" | |
| continue # Retry with new URL | |
| raise # No more fallback URLs | |
| def _inject_tool_hardening_instruction( | |
| self, payload: Dict[str, Any], instruction_text: str | |
| ) -> None: | |
| """Inject tool usage hardening system instruction for Gemini 3 & Claude.""" | |
| if not instruction_text: | |
| return | |
| instruction_part = {"text": instruction_text} | |
| if "system_instruction" in payload: | |
| existing = payload["system_instruction"] | |
| if isinstance(existing, dict) and "parts" in existing: | |
| existing["parts"].insert(0, instruction_part) | |
| else: | |
| payload["system_instruction"] = { | |
| "role": "user", | |
| "parts": [instruction_part, {"text": str(existing)}], | |
| } | |
| else: | |
| payload["system_instruction"] = { | |
| "role": "user", | |
| "parts": [instruction_part], | |
| } | |
| async def _handle_non_streaming( | |
| self, | |
| client: httpx.AsyncClient, | |
| url: str, | |
| headers: Dict[str, str], | |
| payload: Dict[str, Any], | |
| model: str, | |
| file_logger: Optional[AntigravityFileLogger] = None, | |
| ) -> litellm.ModelResponse: | |
| """Handle non-streaming completion.""" | |
| response = await client.post( | |
| url, | |
| headers=headers, | |
| json=payload, | |
| timeout=TimeoutConfig.non_streaming(), | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| if file_logger: | |
| file_logger.log_final_response(data) | |
| gemini_response = self._unwrap_response(data) | |
| # Check for MALFORMED_FUNCTION_CALL before conversion | |
| malformed_msg = self._check_for_malformed_call(gemini_response) | |
| if malformed_msg: | |
| raise _MalformedFunctionCallDetected(malformed_msg, gemini_response) | |
| # Build tool schema map for schema-aware JSON parsing | |
| tool_schemas = self._build_tool_schema_map(payload.get("tools"), model) | |
| openai_response = self._gemini_to_openai_non_streaming( | |
| gemini_response, model, tool_schemas | |
| ) | |
| return litellm.ModelResponse(**openai_response) | |
| async def _handle_streaming( | |
| self, | |
| client: httpx.AsyncClient, | |
| url: str, | |
| headers: Dict[str, str], | |
| payload: Dict[str, Any], | |
| model: str, | |
| file_logger: Optional[AntigravityFileLogger] = None, | |
| malformed_retry_num: Optional[int] = None, | |
| ) -> AsyncGenerator[litellm.ModelResponse, None]: | |
| """Handle streaming completion. | |
| Args: | |
| malformed_retry_num: If set, log response chunks to malformed_retry_N_response.log | |
| instead of the main response_stream.log | |
| """ | |
| # Build tool schema map for schema-aware JSON parsing | |
| tool_schemas = self._build_tool_schema_map(payload.get("tools"), model) | |
| # Accumulator tracks state across chunks for caching and tool indexing | |
| accumulator = { | |
| "reasoning_content": "", | |
| "thought_signature": "", | |
| "text_content": "", | |
| "tool_calls": [], | |
| "tool_idx": 0, # Track tool call index across chunks | |
| "is_complete": False, # Track if we received usageMetadata | |
| "last_usage": None, # Track last received usage for final chunk | |
| "yielded_any": False, # Track if we yielded any real chunks | |
| "tool_schemas": tool_schemas, # For schema-aware JSON string parsing | |
| "malformed_call": None, # Track MALFORMED_FUNCTION_CALL if detected | |
| "response_id": None, # Track original response ID for synthetic chunks | |
| } | |
| async with client.stream( | |
| "POST", | |
| url, | |
| headers=headers, | |
| json=payload, | |
| timeout=TimeoutConfig.streaming(), | |
| ) as response: | |
| if response.status_code >= 400: | |
| # Read error body so it's available in response.text for logging | |
| # The actual logging happens in failure_logger via _extract_response_body | |
| try: | |
| await response.aread() | |
| # lib_logger.error( | |
| # f"API error {response.status_code}: {error_body.decode()}" | |
| # ) | |
| except Exception: | |
| pass | |
| response.raise_for_status() | |
| async for line in response.aiter_lines(): | |
| if file_logger: | |
| if malformed_retry_num is not None: | |
| file_logger.log_malformed_retry_response( | |
| malformed_retry_num, line | |
| ) | |
| else: | |
| file_logger.log_response_chunk(line) | |
| if line.startswith("data: "): | |
| data_str = line[6:] | |
| if data_str == "[DONE]": | |
| break | |
| try: | |
| chunk = json.loads(data_str) | |
| gemini_chunk = self._unwrap_response(chunk) | |
| # Capture response ID from first chunk for synthetic responses | |
| if not accumulator.get("response_id"): | |
| accumulator["response_id"] = gemini_chunk.get("responseId") | |
| # Check for MALFORMED_FUNCTION_CALL | |
| malformed_msg = self._check_for_malformed_call(gemini_chunk) | |
| if malformed_msg: | |
| # Store for retry handler, don't yield anything more | |
| accumulator["malformed_call"] = malformed_msg | |
| break | |
| openai_chunk = self._gemini_to_openai_chunk( | |
| gemini_chunk, model, accumulator | |
| ) | |
| yield litellm.ModelResponse(**openai_chunk) | |
| accumulator["yielded_any"] = True | |
| except json.JSONDecodeError: | |
| if file_logger: | |
| file_logger.log_error(f"Parse error: {data_str[:100]}") | |
| continue | |
| # Check if we detected a malformed call - raise exception for retry handler | |
| if accumulator.get("malformed_call"): | |
| raise _MalformedFunctionCallDetected( | |
| accumulator["malformed_call"], | |
| {"accumulator": accumulator}, | |
| ) | |
| # Only emit synthetic final chunk if we actually received real data | |
| # If no data was received, the caller will detect zero chunks and retry | |
| if accumulator.get("yielded_any"): | |
| # If stream ended without usageMetadata chunk, emit a final chunk | |
| if not accumulator.get("is_complete"): | |
| final_chunk = { | |
| "id": f"chatcmpl-{uuid.uuid4().hex[:24]}", | |
| "object": "chat.completion.chunk", | |
| "created": int(time.time()), | |
| "model": model, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": None}], | |
| } | |
| # Only include usage if we received real data during streaming | |
| if accumulator.get("last_usage"): | |
| final_chunk["usage"] = accumulator["last_usage"] | |
| yield litellm.ModelResponse(**final_chunk) | |
| # Cache Claude thinking after stream completes | |
| if ( | |
| self._is_claude(model) | |
| and self._enable_signature_cache | |
| and accumulator.get("reasoning_content") | |
| ): | |
| self._cache_thinking( | |
| accumulator["reasoning_content"], | |
| accumulator["thought_signature"], | |
| accumulator["text_content"], | |
| accumulator["tool_calls"], | |
| ) | |
| async def _streaming_with_retry( | |
| self, | |
| client: httpx.AsyncClient, | |
| url: str, | |
| headers: Dict[str, str], | |
| payload: Dict[str, Any], | |
| model: str, | |
| file_logger: Optional[AntigravityFileLogger] = None, | |
| tool_schemas: Optional[Dict[str, Dict[str, Any]]] = None, | |
| gemini_contents: Optional[List[Dict[str, Any]]] = None, | |
| gemini_payload: Optional[Dict[str, Any]] = None, | |
| project_id: Optional[str] = None, | |
| max_tokens: Optional[int] = None, | |
| reasoning_effort: Optional[str] = None, | |
| tool_choice: Optional[Union[str, Dict[str, Any]]] = None, | |
| ) -> AsyncGenerator[litellm.ModelResponse, None]: | |
| """ | |
| Wrapper around _handle_streaming that retries on empty responses, bare 429s, | |
| and MALFORMED_FUNCTION_CALL errors. | |
| If the stream yields zero chunks (Antigravity returned nothing) or encounters | |
| a bare 429 (no retry info), retry up to EMPTY_RESPONSE_MAX_ATTEMPTS times | |
| before giving up. | |
| If MALFORMED_FUNCTION_CALL is detected, inject corrective messages and retry | |
| up to MALFORMED_CALL_MAX_RETRIES times. | |
| """ | |
| empty_error_msg = ( | |
| "The model returned an empty response after multiple attempts. " | |
| "This may indicate a temporary service issue. Please try again." | |
| ) | |
| transient_429_msg = ( | |
| "The model returned transient 429 errors after multiple attempts. " | |
| "This may indicate a temporary service issue. Please try again." | |
| ) | |
| # Track malformed call retries (separate from empty response retries) | |
| malformed_retry_count = 0 | |
| current_gemini_contents = gemini_contents | |
| current_payload = payload | |
| for attempt in range(EMPTY_RESPONSE_MAX_ATTEMPTS): | |
| chunk_count = 0 | |
| try: | |
| # Pass malformed_retry_count to log response to separate file | |
| retry_num = malformed_retry_count if malformed_retry_count > 0 else None | |
| async for chunk in self._handle_streaming( | |
| client, | |
| url, | |
| headers, | |
| current_payload, | |
| model, | |
| file_logger, | |
| malformed_retry_num=retry_num, | |
| ): | |
| chunk_count += 1 | |
| yield chunk # Stream immediately - true streaming preserved | |
| if chunk_count > 0: | |
| return # Success - we got data | |
| # Zero chunks - empty response | |
| if attempt < EMPTY_RESPONSE_MAX_ATTEMPTS - 1: | |
| lib_logger.warning( | |
| f"[Antigravity] Empty stream from {model}, " | |
| f"attempt {attempt + 1}/{EMPTY_RESPONSE_MAX_ATTEMPTS}. Retrying..." | |
| ) | |
| await asyncio.sleep(EMPTY_RESPONSE_RETRY_DELAY) | |
| continue | |
| else: | |
| # Last attempt failed - raise without extra logging | |
| # (caller will log the error) | |
| raise EmptyResponseError( | |
| provider="antigravity", | |
| model=model, | |
| message=empty_error_msg, | |
| ) | |
| except _MalformedFunctionCallDetected as e: | |
| # Handle MALFORMED_FUNCTION_CALL - try auto-fix first | |
| parsed = self._parse_malformed_call_message(e.finish_message, model) | |
| # Extract response_id and last_usage from accumulator for all paths | |
| response_id = None | |
| last_usage = None | |
| if e.raw_response and isinstance(e.raw_response, dict): | |
| acc = e.raw_response.get("accumulator", {}) | |
| response_id = acc.get("response_id") | |
| last_usage = acc.get("last_usage") | |
| if parsed: | |
| # Try to auto-fix the malformed JSON | |
| error_info = self._analyze_json_error(parsed["raw_args"]) | |
| if error_info.get("fixed_json"): | |
| # Auto-fix successful - build synthetic response | |
| lib_logger.info( | |
| f"[Antigravity] Auto-fixed malformed function call for " | |
| f"'{parsed['tool_name']}' from {model} (streaming)" | |
| ) | |
| # Log the auto-fix details | |
| if file_logger: | |
| file_logger.log_malformed_autofix( | |
| parsed["tool_name"], | |
| parsed["raw_args"], | |
| error_info["fixed_json"], | |
| ) | |
| # Use chunk format for streaming with original response ID and usage | |
| fixed_chunk = self._build_fixed_tool_call_chunk( | |
| model, | |
| parsed, | |
| error_info, | |
| response_id=response_id, | |
| usage=last_usage, | |
| ) | |
| if fixed_chunk: | |
| yield fixed_chunk | |
| return | |
| # Auto-fix failed - retry by asking model to fix its JSON | |
| # Each retry response will also attempt auto-fix first | |
| if malformed_retry_count < MALFORMED_CALL_MAX_RETRIES: | |
| malformed_retry_count += 1 | |
| lib_logger.warning( | |
| f"[Antigravity] MALFORMED_FUNCTION_CALL from {model} (streaming), " | |
| f"retry {malformed_retry_count}/{MALFORMED_CALL_MAX_RETRIES}: " | |
| f"{e.finish_message[:100]}..." | |
| ) | |
| if parsed and gemini_payload is not None: | |
| # Get schema for the failed tool | |
| tool_schema = ( | |
| tool_schemas.get(parsed["tool_name"]) | |
| if tool_schemas | |
| else None | |
| ) | |
| # Build corrective messages | |
| assistant_msg, user_msg = ( | |
| self._build_malformed_call_retry_messages( | |
| parsed, tool_schema | |
| ) | |
| ) | |
| # Inject into conversation | |
| current_gemini_contents = list(current_gemini_contents or []) | |
| current_gemini_contents.append(assistant_msg) | |
| current_gemini_contents.append(user_msg) | |
| # Rebuild payload with modified contents | |
| gemini_payload_copy = copy.deepcopy(gemini_payload) | |
| gemini_payload_copy["contents"] = current_gemini_contents | |
| current_payload = self._transform_to_antigravity_format( | |
| gemini_payload_copy, | |
| model, | |
| project_id or "", | |
| max_tokens, | |
| reasoning_effort, | |
| tool_choice, | |
| ) | |
| # Log the retry request in the same folder | |
| if file_logger: | |
| file_logger.log_malformed_retry_request( | |
| malformed_retry_count, current_payload | |
| ) | |
| await asyncio.sleep(MALFORMED_CALL_RETRY_DELAY) | |
| continue # Retry with modified payload | |
| else: | |
| # Auto-fix failed and retries disabled/exceeded - yield fallback response | |
| lib_logger.warning( | |
| f"[Antigravity] MALFORMED_FUNCTION_CALL could not be auto-fixed " | |
| f"for {model} (streaming): {e.finish_message[:100]}..." | |
| ) | |
| fallback = self._build_malformed_fallback_chunk( | |
| model, | |
| e.finish_message, | |
| response_id=response_id, | |
| usage=last_usage, | |
| ) | |
| yield fallback | |
| return | |
| except httpx.HTTPStatusError as e: | |
| if e.response.status_code == 429: | |
| # Check if this is a bare 429 (no retry info) vs real quota exhaustion | |
| quota_info = self.parse_quota_error(e) | |
| if quota_info is None: | |
| # Bare 429 - retry like empty response | |
| if attempt < EMPTY_RESPONSE_MAX_ATTEMPTS - 1: | |
| lib_logger.warning( | |
| f"[Antigravity] Bare 429 from {model}, " | |
| f"attempt {attempt + 1}/{EMPTY_RESPONSE_MAX_ATTEMPTS}. Retrying..." | |
| ) | |
| await asyncio.sleep(EMPTY_RESPONSE_RETRY_DELAY) | |
| continue | |
| else: | |
| # Last attempt failed - raise TransientQuotaError to rotate | |
| raise TransientQuotaError( | |
| provider="antigravity", | |
| model=model, | |
| message=transient_429_msg, | |
| ) | |
| # Has retry info - real quota exhaustion, propagate for cooldown | |
| lib_logger.debug( | |
| f"429 with retry info - propagating for cooldown: {e}" | |
| ) | |
| raise | |
| # Other HTTP errors - raise immediately (let caller handle) | |
| raise | |
| except Exception: | |
| # Non-HTTP errors - raise immediately | |
| raise | |
| # Should not reach here, but just in case | |
| lib_logger.error( | |
| f"[Antigravity] Unexpected exit from streaming retry loop for {model}" | |
| ) | |
| raise EmptyResponseError( | |
| provider="antigravity", | |
| model=model, | |
| message=empty_error_msg, | |
| ) | |
| async def count_tokens( | |
| self, | |
| client: httpx.AsyncClient, | |
| credential_path: str, | |
| model: str, | |
| messages: List[Dict[str, Any]], | |
| tools: Optional[List[Dict[str, Any]]] = None, | |
| litellm_params: Optional[Dict[str, Any]] = None, | |
| ) -> Dict[str, int]: | |
| """Count tokens for the given prompt using Antigravity :countTokens endpoint.""" | |
| try: | |
| token = await self.get_valid_token(credential_path) | |
| internal_model = self._alias_to_internal(model) | |
| # Discover project ID | |
| project_id = await self._discover_project_id( | |
| credential_path, token, litellm_params or {} | |
| ) | |
| system_instruction, contents = self._transform_messages( | |
| messages, internal_model | |
| ) | |
| contents = self._fix_tool_response_grouping(contents) | |
| gemini_payload = {"contents": contents} | |
| if system_instruction: | |
| gemini_payload["systemInstruction"] = system_instruction | |
| gemini_tools = self._build_tools_payload(tools, model) | |
| if gemini_tools: | |
| gemini_payload["tools"] = gemini_tools | |
| antigravity_payload = { | |
| "project": project_id, | |
| "userAgent": "antigravity", | |
| "requestId": _generate_request_id(), | |
| "model": internal_model, | |
| "request": gemini_payload, | |
| } | |
| url = f"{self._get_base_url()}:countTokens" | |
| headers = { | |
| "Authorization": f"Bearer {token}", | |
| "Content-Type": "application/json", | |
| } | |
| response = await client.post( | |
| url, headers=headers, json=antigravity_payload, timeout=30 | |
| ) | |
| response.raise_for_status() | |
| data = response.json() | |
| unwrapped = self._unwrap_response(data) | |
| total = unwrapped.get("totalTokens", 0) | |
| return {"prompt_tokens": total, "total_tokens": total} | |
| except Exception as e: | |
| lib_logger.error(f"Token counting failed: {e}") | |
| return {"prompt_tokens": 0, "total_tokens": 0} | |