llm-api-proxy / src /rotator_library /providers /antigravity_auth_base.py
Mirrowel
refactor(auth): 🔨 consolidate OAuth credential management into base classes
f439788
# src/rotator_library/providers/antigravity_auth_base.py
import asyncio
import json
import logging
import os
from pathlib import Path
from typing import Any, Dict, Optional, List
import httpx
from .google_oauth_base import GoogleOAuthBase
lib_logger = logging.getLogger("rotator_library")
# Code Assist endpoint for project discovery
CODE_ASSIST_ENDPOINT = "https://cloudcode-pa.googleapis.com/v1internal"
class AntigravityAuthBase(GoogleOAuthBase):
"""
Antigravity OAuth2 authentication implementation.
Inherits all OAuth functionality from GoogleOAuthBase with Antigravity-specific configuration.
Uses Antigravity's OAuth credentials and includes additional scopes for cclog and experimentsandconfigs.
Also provides project/tier discovery functionality that runs during authentication,
ensuring credentials have their tier and project_id cached before any API requests.
"""
CLIENT_ID = (
"1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com"
)
CLIENT_SECRET = "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"
OAUTH_SCOPES = [
"https://www.googleapis.com/auth/cloud-platform",
"https://www.googleapis.com/auth/userinfo.email",
"https://www.googleapis.com/auth/userinfo.profile",
"https://www.googleapis.com/auth/cclog", # Antigravity-specific
"https://www.googleapis.com/auth/experimentsandconfigs", # Antigravity-specific
]
ENV_PREFIX = "ANTIGRAVITY"
CALLBACK_PORT = 51121
CALLBACK_PATH = "/oauthcallback"
def __init__(self):
super().__init__()
# Project and tier caches - shared between auth base and provider
self.project_id_cache: Dict[str, str] = {}
self.project_tier_cache: Dict[str, str] = {}
# =========================================================================
# POST-AUTH DISCOVERY HOOK
# =========================================================================
async def _post_auth_discovery(
self, credential_path: str, access_token: str
) -> None:
"""
Discover and cache tier/project information immediately after OAuth authentication.
This is called by GoogleOAuthBase._perform_interactive_oauth() after successful auth,
ensuring tier and project_id are cached during the authentication flow rather than
waiting for the first API request.
Args:
credential_path: Path to the credential file
access_token: The newly obtained access token
"""
lib_logger.debug(
f"Starting post-auth discovery for Antigravity credential: {Path(credential_path).name}"
)
# Skip if already discovered (shouldn't happen during fresh auth, but be defensive)
if (
credential_path in self.project_id_cache
and credential_path in self.project_tier_cache
):
lib_logger.debug(
f"Tier and project already cached for {Path(credential_path).name}, skipping discovery"
)
return
# Call _discover_project_id which handles tier/project discovery and persistence
# Pass empty litellm_params since we're in auth context (no model-specific overrides)
project_id = await self._discover_project_id(
credential_path, access_token, litellm_params={}
)
tier = self.project_tier_cache.get(credential_path, "unknown")
lib_logger.info(
f"Post-auth discovery complete for {Path(credential_path).name}: "
f"tier={tier}, project={project_id}"
)
# =========================================================================
# PROJECT ID DISCOVERY
# =========================================================================
async def _discover_project_id(
self, credential_path: str, access_token: str, litellm_params: Dict[str, Any]
) -> str:
"""
Discovers the Google Cloud Project ID, with caching and onboarding for new accounts.
This follows the official Gemini CLI discovery flow adapted for Antigravity:
1. Check in-memory cache
2. Check configured project_id override (litellm_params or env var)
3. Check persisted project_id in credential file
4. Call loadCodeAssist to check if user is already known (has currentTier)
- If currentTier exists AND cloudaicompanionProject returned: use server's project
- If no currentTier: user needs onboarding
5. Onboard user (FREE tier: pass cloudaicompanionProject=None for server-managed)
6. Fallback to GCP Resource Manager project listing
Note: Unlike GeminiCli, Antigravity doesn't use tier-based credential prioritization,
but we still cache tier info for debugging and consistency.
"""
lib_logger.debug(
f"Starting Antigravity project discovery for credential: {credential_path}"
)
# Check in-memory cache first
if credential_path in self.project_id_cache:
cached_project = self.project_id_cache[credential_path]
lib_logger.debug(f"Using cached project ID: {cached_project}")
return cached_project
# Check for configured project ID override (from litellm_params or env var)
configured_project_id = (
litellm_params.get("project_id")
or os.getenv("ANTIGRAVITY_PROJECT_ID")
or os.getenv("GOOGLE_CLOUD_PROJECT")
)
if configured_project_id:
lib_logger.debug(
f"Found configured project_id override: {configured_project_id}"
)
# Load credentials from file to check for persisted project_id and tier
# Skip for env:// paths (environment-based credentials don't persist to files)
credential_index = self._parse_env_credential_path(credential_path)
if credential_index is None:
# Only try to load from file if it's not an env:// path
try:
with open(credential_path, "r") as f:
creds = json.load(f)
metadata = creds.get("_proxy_metadata", {})
persisted_project_id = metadata.get("project_id")
persisted_tier = metadata.get("tier")
if persisted_project_id:
lib_logger.info(
f"Loaded persisted project ID from credential file: {persisted_project_id}"
)
self.project_id_cache[credential_path] = persisted_project_id
# Also load tier if available (for debugging/logging purposes)
if persisted_tier:
self.project_tier_cache[credential_path] = persisted_tier
lib_logger.debug(f"Loaded persisted tier: {persisted_tier}")
return persisted_project_id
except (FileNotFoundError, json.JSONDecodeError, KeyError) as e:
lib_logger.debug(f"Could not load persisted project ID from file: {e}")
lib_logger.debug(
"No cached or configured project ID found, initiating discovery..."
)
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
}
discovered_project_id = None
discovered_tier = None
async with httpx.AsyncClient() as client:
# 1. Try discovery endpoint with loadCodeAssist
lib_logger.debug(
"Attempting project discovery via Code Assist loadCodeAssist endpoint..."
)
try:
# Build metadata - include duetProject only if we have a configured project
core_client_metadata = {
"ideType": "IDE_UNSPECIFIED",
"platform": "PLATFORM_UNSPECIFIED",
"pluginType": "GEMINI",
}
if configured_project_id:
core_client_metadata["duetProject"] = configured_project_id
# Build load request - pass configured_project_id if available, otherwise None
load_request = {
"cloudaicompanionProject": configured_project_id, # Can be None
"metadata": core_client_metadata,
}
lib_logger.debug(
f"Sending loadCodeAssist request with cloudaicompanionProject={configured_project_id}"
)
response = await client.post(
f"{CODE_ASSIST_ENDPOINT}:loadCodeAssist",
headers=headers,
json=load_request,
timeout=20,
)
response.raise_for_status()
data = response.json()
# Log full response for debugging
lib_logger.debug(
f"loadCodeAssist full response keys: {list(data.keys())}"
)
# Extract tier information
allowed_tiers = data.get("allowedTiers", [])
current_tier = data.get("currentTier")
lib_logger.debug(f"=== Tier Information ===")
lib_logger.debug(f"currentTier: {current_tier}")
lib_logger.debug(f"allowedTiers count: {len(allowed_tiers)}")
for i, tier in enumerate(allowed_tiers):
tier_id = tier.get("id", "unknown")
is_default = tier.get("isDefault", False)
user_defined = tier.get("userDefinedCloudaicompanionProject", False)
lib_logger.debug(
f" Tier {i + 1}: id={tier_id}, isDefault={is_default}, userDefinedProject={user_defined}"
)
lib_logger.debug(f"========================")
# Determine the current tier ID
current_tier_id = None
if current_tier:
current_tier_id = current_tier.get("id")
lib_logger.debug(f"User has currentTier: {current_tier_id}")
# Check if user is already known to server (has currentTier)
if current_tier_id:
# User is already onboarded - check for project from server
server_project = data.get("cloudaicompanionProject")
# Check if this tier requires user-defined project (paid tiers)
requires_user_project = any(
t.get("id") == current_tier_id
and t.get("userDefinedCloudaicompanionProject", False)
for t in allowed_tiers
)
is_free_tier = current_tier_id == "free-tier"
if server_project:
# Server returned a project - use it (server wins)
project_id = server_project
lib_logger.debug(f"Server returned project: {project_id}")
elif configured_project_id:
# No server project but we have configured one - use it
project_id = configured_project_id
lib_logger.debug(
f"No server project, using configured: {project_id}"
)
elif is_free_tier:
# Free tier user without server project - try onboarding
lib_logger.debug(
"Free tier user with currentTier but no project - will try onboarding"
)
project_id = None
elif requires_user_project:
# Paid tier requires a project ID to be set
raise ValueError(
f"Paid tier '{current_tier_id}' requires setting ANTIGRAVITY_PROJECT_ID environment variable."
)
else:
# Unknown tier without project - proceed to onboarding
lib_logger.warning(
f"Tier '{current_tier_id}' has no project and none configured - will try onboarding"
)
project_id = None
if project_id:
# Cache tier info
self.project_tier_cache[credential_path] = current_tier_id
discovered_tier = current_tier_id
# Log appropriately based on tier
is_paid = current_tier_id and current_tier_id not in [
"free-tier",
"legacy-tier",
"unknown",
]
if is_paid:
lib_logger.info(
f"Using Antigravity paid tier '{current_tier_id}' with project: {project_id}"
)
else:
lib_logger.info(
f"Discovered Antigravity project ID via loadCodeAssist: {project_id}"
)
self.project_id_cache[credential_path] = project_id
discovered_project_id = project_id
# Persist to credential file
await self._persist_project_metadata(
credential_path, project_id, discovered_tier
)
return project_id
# 2. User needs onboarding - no currentTier or no project found
lib_logger.info(
"No existing Antigravity session found (no currentTier), attempting to onboard user..."
)
# Determine which tier to onboard with
onboard_tier = None
for tier in allowed_tiers:
if tier.get("isDefault"):
onboard_tier = tier
break
# Fallback to legacy tier if no default
if not onboard_tier and allowed_tiers:
for tier in allowed_tiers:
if tier.get("id") == "legacy-tier":
onboard_tier = tier
break
if not onboard_tier:
onboard_tier = allowed_tiers[0]
if not onboard_tier:
raise ValueError("No onboarding tiers available from server")
tier_id = onboard_tier.get("id", "free-tier")
requires_user_project = onboard_tier.get(
"userDefinedCloudaicompanionProject", False
)
lib_logger.debug(
f"Onboarding with tier: {tier_id}, requiresUserProject: {requires_user_project}"
)
# Build onboard request based on tier type
# FREE tier: cloudaicompanionProject = None (server-managed)
# PAID tier: cloudaicompanionProject = configured_project_id
is_free_tier = tier_id == "free-tier"
if is_free_tier:
# Free tier uses server-managed project
onboard_request = {
"tierId": tier_id,
"cloudaicompanionProject": None, # Server will create/manage
"metadata": core_client_metadata,
}
lib_logger.debug(
"Free tier onboarding: using server-managed project"
)
else:
# Paid/legacy tier requires user-provided project
if not configured_project_id and requires_user_project:
raise ValueError(
f"Tier '{tier_id}' requires setting ANTIGRAVITY_PROJECT_ID environment variable."
)
onboard_request = {
"tierId": tier_id,
"cloudaicompanionProject": configured_project_id,
"metadata": {
**core_client_metadata,
"duetProject": configured_project_id,
}
if configured_project_id
else core_client_metadata,
}
lib_logger.debug(
f"Paid tier onboarding: using project {configured_project_id}"
)
lib_logger.debug("Initiating onboardUser request...")
lro_response = await client.post(
f"{CODE_ASSIST_ENDPOINT}:onboardUser",
headers=headers,
json=onboard_request,
timeout=30,
)
lro_response.raise_for_status()
lro_data = lro_response.json()
lib_logger.debug(
f"Initial onboarding response: done={lro_data.get('done')}"
)
# Poll for onboarding completion (up to 5 minutes)
for i in range(150): # 150 × 2s = 5 minutes
if lro_data.get("done"):
lib_logger.debug(
f"Onboarding completed after {i} polling attempts"
)
break
await asyncio.sleep(2)
if (i + 1) % 15 == 0: # Log every 30 seconds
lib_logger.info(
f"Still waiting for onboarding completion... ({(i + 1) * 2}s elapsed)"
)
lib_logger.debug(
f"Polling onboarding status... (Attempt {i + 1}/150)"
)
lro_response = await client.post(
f"{CODE_ASSIST_ENDPOINT}:onboardUser",
headers=headers,
json=onboard_request,
timeout=30,
)
lro_response.raise_for_status()
lro_data = lro_response.json()
if not lro_data.get("done"):
lib_logger.error("Onboarding process timed out after 5 minutes")
raise ValueError(
"Onboarding process timed out after 5 minutes. Please try again or contact support."
)
# Extract project ID from LRO response
# Note: onboardUser returns response.cloudaicompanionProject as an object with .id
lro_response_data = lro_data.get("response", {})
lro_project_obj = lro_response_data.get("cloudaicompanionProject", {})
project_id = (
lro_project_obj.get("id")
if isinstance(lro_project_obj, dict)
else None
)
# Fallback to configured project if LRO didn't return one
if not project_id and configured_project_id:
project_id = configured_project_id
lib_logger.debug(
f"LRO didn't return project, using configured: {project_id}"
)
if not project_id:
lib_logger.error(
"Onboarding completed but no project ID in response and none configured"
)
raise ValueError(
"Onboarding completed, but no project ID was returned. "
"For paid tiers, set ANTIGRAVITY_PROJECT_ID environment variable."
)
lib_logger.debug(
f"Successfully extracted project ID from onboarding response: {project_id}"
)
# Cache tier info
self.project_tier_cache[credential_path] = tier_id
discovered_tier = tier_id
lib_logger.debug(f"Cached tier information: {tier_id}")
# Log concise message based on tier
is_paid = tier_id and tier_id not in ["free-tier", "legacy-tier"]
if is_paid:
lib_logger.info(
f"Using Antigravity paid tier '{tier_id}' with project: {project_id}"
)
else:
lib_logger.info(
f"Successfully onboarded user and discovered project ID: {project_id}"
)
self.project_id_cache[credential_path] = project_id
discovered_project_id = project_id
# Persist to credential file
await self._persist_project_metadata(
credential_path, project_id, discovered_tier
)
return project_id
except httpx.HTTPStatusError as e:
error_body = ""
try:
error_body = e.response.text
except Exception:
pass
if e.response.status_code == 403:
lib_logger.error(
f"Antigravity Code Assist API access denied (403). Response: {error_body}"
)
lib_logger.error(
"Possible causes: 1) cloudaicompanion.googleapis.com API not enabled, 2) Wrong project ID for paid tier, 3) Account lacks permissions"
)
elif e.response.status_code == 404:
lib_logger.warning(
f"Antigravity Code Assist endpoint not found (404). Falling back to project listing."
)
elif e.response.status_code == 412:
# Precondition Failed - often means wrong project for free tier onboarding
lib_logger.error(
f"Precondition failed (412): {error_body}. This may mean the project ID is incompatible with the selected tier."
)
else:
lib_logger.warning(
f"Antigravity onboarding/discovery failed with status {e.response.status_code}: {error_body}. Falling back to project listing."
)
except httpx.RequestError as e:
lib_logger.warning(
f"Antigravity onboarding/discovery network error: {e}. Falling back to project listing."
)
# 3. Fallback to listing all available GCP projects (last resort)
lib_logger.debug(
"Attempting to discover project via GCP Resource Manager API..."
)
try:
async with httpx.AsyncClient() as client:
lib_logger.debug(
"Querying Cloud Resource Manager for available projects..."
)
response = await client.get(
"https://cloudresourcemanager.googleapis.com/v1/projects",
headers=headers,
timeout=20,
)
response.raise_for_status()
projects = response.json().get("projects", [])
lib_logger.debug(f"Found {len(projects)} total projects")
active_projects = [
p for p in projects if p.get("lifecycleState") == "ACTIVE"
]
lib_logger.debug(f"Found {len(active_projects)} active projects")
if not projects:
lib_logger.error(
"No GCP projects found for this account. Please create a project in Google Cloud Console."
)
elif not active_projects:
lib_logger.error(
"No active GCP projects found. Please activate a project in Google Cloud Console."
)
else:
project_id = active_projects[0]["projectId"]
lib_logger.info(
f"Discovered Antigravity project ID from active projects list: {project_id}"
)
lib_logger.debug(
f"Selected first active project: {project_id} (out of {len(active_projects)} active projects)"
)
self.project_id_cache[credential_path] = project_id
discovered_project_id = project_id
# Persist to credential file (no tier info from resource manager)
await self._persist_project_metadata(
credential_path, project_id, None
)
return project_id
except httpx.HTTPStatusError as e:
if e.response.status_code == 403:
lib_logger.error(
"Failed to list GCP projects due to a 403 Forbidden error. The Cloud Resource Manager API may not be enabled, or your account lacks the 'resourcemanager.projects.list' permission."
)
else:
lib_logger.error(
f"Failed to list GCP projects with status {e.response.status_code}: {e}"
)
except httpx.RequestError as e:
lib_logger.error(f"Network error while listing GCP projects: {e}")
raise ValueError(
"Could not auto-discover Antigravity project ID. Possible causes:\n"
" 1. The cloudaicompanion.googleapis.com API is not enabled (enable it in Google Cloud Console)\n"
" 2. No active GCP projects exist for this account (create one in Google Cloud Console)\n"
" 3. Account lacks necessary permissions\n"
"To manually specify a project, set ANTIGRAVITY_PROJECT_ID in your .env file."
)
async def _persist_project_metadata(
self, credential_path: str, project_id: str, tier: Optional[str]
):
"""Persists project ID and tier to the credential file for faster future startups."""
# Skip persistence for env:// paths (environment-based credentials)
credential_index = self._parse_env_credential_path(credential_path)
if credential_index is not None:
lib_logger.debug(
f"Skipping project metadata persistence for env:// credential path: {credential_path}"
)
return
try:
# Load current credentials
with open(credential_path, "r") as f:
creds = json.load(f)
# Update metadata
if "_proxy_metadata" not in creds:
creds["_proxy_metadata"] = {}
creds["_proxy_metadata"]["project_id"] = project_id
if tier:
creds["_proxy_metadata"]["tier"] = tier
# Save back using the existing save method (handles atomic writes and permissions)
await self._save_credentials(credential_path, creds)
lib_logger.debug(
f"Persisted project_id and tier to credential file: {credential_path}"
)
except Exception as e:
lib_logger.warning(
f"Failed to persist project metadata to credential file: {e}"
)
# Non-fatal - just means slower startup next time
# =========================================================================
# CREDENTIAL MANAGEMENT OVERRIDES
# =========================================================================
def _get_provider_file_prefix(self) -> str:
"""Return the file prefix for Antigravity credentials."""
return "antigravity"
def build_env_lines(self, creds: Dict[str, Any], cred_number: int) -> List[str]:
"""
Generate .env file lines for an Antigravity credential.
Includes tier and project_id from _proxy_metadata.
"""
# Get base lines from parent class
lines = super().build_env_lines(creds, cred_number)
# Add Antigravity-specific fields (tier and project_id)
metadata = creds.get("_proxy_metadata", {})
prefix = f"{self.ENV_PREFIX}_{cred_number}"
project_id = metadata.get("project_id", "")
tier = metadata.get("tier", "")
if project_id:
lines.append(f"{prefix}_PROJECT_ID={project_id}")
if tier:
lines.append(f"{prefix}_TIER={tier}")
return lines