Mirrowel commited on
Commit
97f1950
·
1 Parent(s): fc70523

feat(auth): extract GoogleOAuthBase and add antigravity provider

Browse files

- Add providers/google_oauth_base.py to centralize Google OAuth logic (auth flow, token refresh, env loading, atomic saves, backoff/retry, queueing, headless support, and validation).
- Migrate GeminiAuthBase and AntigravityAuthBase to inherit from GoogleOAuthBase and expose provider-specific constants (CLIENT_ID, CLIENT_SECRET, OAUTH_SCOPES, ENV_PREFIX, CALLBACK_PORT, CALLBACK_PATH).
- Register "antigravity" in DEFAULT_OAUTH_DIRS and mark it as OAuth-only in credential_tool; include a user-friendly display name for interactive flows.
- Remove large duplicated OAuth implementations from provider-specific files and consolidate behavior to reduce maintenance surface and ensure consistent token handling.

src/rotator_library/credential_manager.py CHANGED
@@ -14,6 +14,7 @@ DEFAULT_OAUTH_DIRS = {
14
  "gemini_cli": Path.home() / ".gemini",
15
  "qwen_code": Path.home() / ".qwen",
16
  "iflow": Path.home() / ".iflow",
 
17
  # Add other providers like 'claude' here if they have a standard CLI path
18
  }
19
 
 
14
  "gemini_cli": Path.home() / ".gemini",
15
  "qwen_code": Path.home() / ".qwen",
16
  "iflow": Path.home() / ".iflow",
17
+ "antigravity": Path.home() / ".antigravity",
18
  # Add other providers like 'claude' here if they have a standard CLI path
19
  }
20
 
src/rotator_library/credential_tool.py CHANGED
@@ -98,7 +98,7 @@ async def setup_api_key():
98
  # Discover custom providers and add them to the list
99
  # Note: gemini_cli is OAuth-only, but qwen_code and iflow support both OAuth and API keys
100
  _, PROVIDER_PLUGINS = _ensure_providers_loaded()
101
- oauth_only_providers = {'gemini_cli'}
102
  discovered_providers = {
103
  p.replace('_', ' ').title(): p.upper() + "_API_KEY"
104
  for p in PROVIDER_PLUGINS.keys()
@@ -195,7 +195,8 @@ async def setup_new_credential(provider_name: str):
195
  oauth_friendly_names = {
196
  "gemini_cli": "Gemini CLI (OAuth)",
197
  "qwen_code": "Qwen Code (OAuth - also supports API keys)",
198
- "iflow": "iFlow (OAuth - also supports API keys)"
 
199
  }
200
  display_name = oauth_friendly_names.get(provider_name, provider_name.replace('_', ' ').title())
201
 
@@ -578,7 +579,8 @@ async def main(clear_on_start=True):
578
  oauth_friendly_names = {
579
  "gemini_cli": "Gemini CLI (OAuth)",
580
  "qwen_code": "Qwen Code (OAuth - also supports API keys)",
581
- "iflow": "iFlow (OAuth - also supports API keys)"
 
582
  }
583
 
584
  provider_text = Text()
 
98
  # Discover custom providers and add them to the list
99
  # Note: gemini_cli is OAuth-only, but qwen_code and iflow support both OAuth and API keys
100
  _, PROVIDER_PLUGINS = _ensure_providers_loaded()
101
+ oauth_only_providers = {'gemini_cli', 'antigravity'}
102
  discovered_providers = {
103
  p.replace('_', ' ').title(): p.upper() + "_API_KEY"
104
  for p in PROVIDER_PLUGINS.keys()
 
195
  oauth_friendly_names = {
196
  "gemini_cli": "Gemini CLI (OAuth)",
197
  "qwen_code": "Qwen Code (OAuth - also supports API keys)",
198
+ "iflow": "iFlow (OAuth - also supports API keys)",
199
+ "antigravity": "Antigravity (OAuth)"
200
  }
201
  display_name = oauth_friendly_names.get(provider_name, provider_name.replace('_', ' ').title())
202
 
 
579
  oauth_friendly_names = {
580
  "gemini_cli": "Gemini CLI (OAuth)",
581
  "qwen_code": "Qwen Code (OAuth - also supports API keys)",
582
+ "iflow": "iFlow (OAuth - also supports API keys)",
583
+ "antigravity": "Antigravity (OAuth)",
584
  }
585
 
586
  provider_text = Text()
src/rotator_library/providers/antigravity_auth_base.py CHANGED
@@ -1,466 +1,24 @@
1
  # src/rotator_library/providers/antigravity_auth_base.py
2
 
3
- import os
4
- import webbrowser
5
- from typing import Union, Optional
6
- import json
7
- import time
8
- import asyncio
9
- import logging
10
- from pathlib import Path
11
- from typing import Dict, Any
12
- import tempfile
13
- import shutil
14
 
15
- import httpx
16
- from rich.console import Console
17
- from rich.panel import Panel
18
- from rich.text import Text
19
-
20
- from ..utils.headless_detection import is_headless_environment
21
-
22
- lib_logger = logging.getLogger('rotator_library')
23
-
24
- # Antigravity OAuth credentials from CLIProxyAPI
25
- CLIENT_ID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com"
26
- CLIENT_SECRET = "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"
27
- TOKEN_URI = "https://oauth2.googleapis.com/token"
28
- USER_INFO_URI = "https://www.googleapis.com/oauth2/v1/userinfo"
29
- REFRESH_EXPIRY_BUFFER_SECONDS = 30 * 60 # 30 minutes buffer before expiry
30
-
31
- # Antigravity requires additional scopes
32
- OAUTH_SCOPES = [
33
- "https://www.googleapis.com/auth/cloud-platform",
34
- "https://www.googleapis.com/auth/userinfo.email",
35
- "https://www.googleapis.com/auth/userinfo.profile",
36
- "https://www.googleapis.com/auth/cclog", # Antigravity-specific
37
- "https://www.googleapis.com/auth/experimentsandconfigs" # Antigravity-specific
38
- ]
39
-
40
- console = Console()
41
-
42
- class AntigravityAuthBase:
43
  """
44
- Base authentication class for Antigravity provider.
45
- Handles OAuth2 flow, token management, and refresh logic.
46
 
47
- Based on GeminiAuthBase but uses Antigravity-specific OAuth credentials and scopes.
 
48
  """
49
 
50
- def __init__(self):
51
- self._credentials_cache: Dict[str, Dict[str, Any]] = {}
52
- self._refresh_locks: Dict[str, asyncio.Lock] = {}
53
- self._locks_lock = asyncio.Lock() # Protects the locks dict from race conditions
54
- # [BACKOFF TRACKING] Track consecutive failures per credential
55
- self._refresh_failures: Dict[str, int] = {} # Track consecutive failures per credential
56
- self._next_refresh_after: Dict[str, float] = {} # Track backoff timers (Unix timestamp)
57
-
58
- # [QUEUE SYSTEM] Sequential refresh processing
59
- self._refresh_queue: asyncio.Queue = asyncio.Queue()
60
- self._queued_credentials: set = set() # Track credentials already in queue
61
- self._unavailable_credentials: set = set() # Mark credentials unavailable during re-auth
62
- self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
63
- self._queue_processor_task: Optional[asyncio.Task] = None # Background worker task
64
-
65
- def _load_from_env(self) -> Optional[Dict[str, Any]]:
66
- """
67
- Load OAuth credentials from environment variables for stateless deployments.
68
-
69
- Expected environment variables:
70
- - ANTIGRAVITY_ACCESS_TOKEN (required)
71
- - ANTIGRAVITY_REFRESH_TOKEN (required)
72
- - ANTIGRAVITY_EXPIRY_DATE (optional, defaults to 0)
73
- - ANTIGRAVITY_CLIENT_ID (optional, uses default)
74
- - ANTIGRAVITY_CLIENT_SECRET (optional, uses default)
75
- - ANTIGRAVITY_TOKEN_URI (optional, uses default)
76
- - ANTIGRAVITY_UNIVERSE_DOMAIN (optional, defaults to googleapis.com)
77
- - ANTIGRAVITY_EMAIL (optional, defaults to "env-user")
78
-
79
- Returns:
80
- Dict with credential structure if env vars present, None otherwise
81
- """
82
- access_token = os.getenv("ANTIGRAVITY_ACCESS_TOKEN")
83
- refresh_token = os.getenv("ANTIGRAVITY_REFRESH_TOKEN")
84
-
85
- # Both access and refresh tokens are required
86
- if not (access_token and refresh_token):
87
- return None
88
-
89
- lib_logger.debug("Loading Antigravity credentials from environment variables")
90
-
91
- # Parse expiry_date as float, default to 0 if not present
92
- expiry_str = os.getenv("ANTIGRAVITY_EXPIRY_DATE", "0")
93
- try:
94
- expiry_date = float(expiry_str)
95
- except ValueError:
96
- lib_logger.warning(f"Invalid ANTIGRAVITY_EXPIRY_DATE value: {expiry_str}, using 0")
97
- expiry_date = 0
98
-
99
- creds = {
100
- "access_token": access_token,
101
- "refresh_token": refresh_token,
102
- "expiry_date": expiry_date,
103
- "client_id": os.getenv("ANTIGRAVITY_CLIENT_ID", CLIENT_ID),
104
- "client_secret": os.getenv("ANTIGRAVITY_CLIENT_SECRET", CLIENT_SECRET),
105
- "token_uri": os.getenv("ANTIGRAVITY_TOKEN_URI", TOKEN_URI),
106
- "universe_domain": os.getenv("ANTIGRAVITY_UNIVERSE_DOMAIN", "googleapis.com"),
107
- "_proxy_metadata": {
108
- "email": os.getenv("ANTIGRAVITY_EMAIL", "env-user"),
109
- "last_check_timestamp": time.time(),
110
- "loaded_from_env": True # Flag to indicate env-based credentials
111
- }
112
- }
113
-
114
- return creds
115
-
116
- async def _load_credentials(self, path: str) -> Dict[str, Any]:
117
- """
118
- Load credentials from a file. First attempts file-based load,
119
- then falls back to environment variables if file not found.
120
-
121
- Args:
122
- path: File path to load credentials from
123
-
124
- Returns:
125
- Dict containing the credentials
126
-
127
- Raises:
128
- ValueError: If credentials cannot be loaded from either source
129
- """
130
- # If path is special marker "env", load from environment
131
- if path == "env":
132
- env_creds = self._load_from_env()
133
- if env_creds:
134
- lib_logger.debug("Using Antigravity credentials from environment variables")
135
- return env_creds
136
- raise ValueError("ANTIGRAVITY_ACCESS_TOKEN and ANTIGRAVITY_REFRESH_TOKEN environment variables not set")
137
-
138
- # Try loading from cache first
139
- if path in self._credentials_cache:
140
- cached_creds = self._credentials_cache[path]
141
- lib_logger.debug(f"Using cached Antigravity credentials for: {Path(path).name}")
142
- return cached_creds
143
-
144
- # Try loading from file
145
- try:
146
- with open(path, 'r') as f:
147
- creds = json.load(f)
148
- self._credentials_cache[path] = creds
149
- lib_logger.debug(f"Loaded Antigravity credentials from file: {Path(path).name}")
150
- return creds
151
- except FileNotFoundError:
152
- # Fall back to environment variables
153
- lib_logger.debug(f"Credential file not found: {path}, attempting environment variables")
154
- env_creds = self._load_from_env()
155
- if env_creds:
156
- lib_logger.debug("Using Antigravity credentials from environment variables as fallback")
157
- # Cache with special path marker
158
- self._credentials_cache[path] = env_creds
159
- return env_creds
160
- raise ValueError(f"Credential file not found: {path} and environment variables not set")
161
- except json.JSONDecodeError as e:
162
- raise ValueError(f"Invalid JSON in credential file {path}: {e}")
163
-
164
- async def _save_credentials(self, path: str, creds: Dict[str, Any]) -> None:
165
- """
166
- Save credentials to a file. Skip if credentials were loaded from environment.
167
-
168
- Args:
169
- path: File path to save credentials to
170
- creds: Credentials dictionary to save
171
- """
172
- # Don't save environment-based credentials to file
173
- if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
174
- lib_logger.debug("Skipping credential save (loaded from environment)")
175
- return
176
-
177
- # Don't save if path is special marker
178
- if path == "env":
179
- return
180
-
181
- try:
182
- # Ensure directory exists
183
- Path(path).parent.mkdir(parents=True, exist_ok=True)
184
-
185
- # Write atomically using temp file + rename
186
- temp_fd, temp_path = tempfile.mkstemp(
187
- dir=Path(path).parent,
188
- prefix='.tmp_',
189
- suffix='.json'
190
- )
191
- try:
192
- with os.fdopen(temp_fd, 'w') as f:
193
- json.dump(creds, f, indent=2)
194
- shutil.move(temp_path, path)
195
- lib_logger.debug(f"Saved Antigravity credentials to: {Path(path).name}")
196
- except Exception:
197
- # Clean up temp file on error
198
- try:
199
- os.unlink(temp_path)
200
- except Exception:
201
- pass
202
- raise
203
- except Exception as e:
204
- lib_logger.warning(f"Failed to save Antigravity credentials to {path}: {e}")
205
-
206
- def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
207
- """
208
- Check if the access token is expired or close to expiry.
209
-
210
- Args:
211
- creds: Credentials dict with expiry_date field (in milliseconds)
212
-
213
- Returns:
214
- True if token is expired or within buffer time of expiry
215
- """
216
- if 'expiry_date' not in creds:
217
- return True
218
-
219
- # expiry_date is in milliseconds
220
- expiry_timestamp = creds['expiry_date'] / 1000.0
221
- current_time = time.time()
222
-
223
- # Consider expired if within buffer time
224
- return (expiry_timestamp - current_time) <= REFRESH_EXPIRY_BUFFER_SECONDS
225
-
226
- async def _refresh_token(self, path: str, creds: Dict[str, Any]) -> Dict[str, Any]:
227
- """
228
- Refresh an expired OAuth token using the refresh token.
229
-
230
- Args:
231
- path: Credential file path (for saving updated credentials)
232
- creds: Current credentials dict with refresh_token
233
-
234
- Returns:
235
- Updated credentials dict with fresh access token
236
-
237
- Raises:
238
- ValueError: If refresh fails
239
- """
240
- if 'refresh_token' not in creds:
241
- raise ValueError("No refresh token available")
242
-
243
- lib_logger.debug(f"Refreshing Antigravity OAuth token for: {Path(path).name if path != 'env' else 'env'}")
244
-
245
- client_id = creds.get('client_id', CLIENT_ID)
246
- client_secret = creds.get('client_secret', CLIENT_SECRET)
247
- token_uri = creds.get('token_uri', TOKEN_URI)
248
-
249
- async with httpx.AsyncClient() as client:
250
- try:
251
- response = await client.post(
252
- token_uri,
253
- data={
254
- 'client_id': client_id,
255
- 'client_secret': client_secret,
256
- 'refresh_token': creds['refresh_token'],
257
- 'grant_type': 'refresh_token'
258
- },
259
- timeout=30.0
260
- )
261
- response.raise_for_status()
262
- token_data = response.json()
263
-
264
- # Update credentials with new token
265
- creds['access_token'] = token_data['access_token']
266
- creds['expiry_date'] = (time.time() + token_data['expires_in']) * 1000
267
-
268
- # Update metadata
269
- if '_proxy_metadata' not in creds:
270
- creds['_proxy_metadata'] = {}
271
- creds['_proxy_metadata']['last_check_timestamp'] = time.time()
272
-
273
- # Save updated credentials
274
- await self._save_credentials(path, creds)
275
-
276
- # Update cache
277
- self._credentials_cache[path] = creds
278
-
279
- # Reset failure count on success
280
- self._refresh_failures[path] = 0
281
-
282
- lib_logger.info(f"Successfully refreshed Antigravity OAuth token for: {Path(path).name if path != 'env' else 'env'}")
283
- return creds
284
-
285
- except httpx.HTTPStatusError as e:
286
- # Track failures for backoff
287
- self._refresh_failures[path] = self._refresh_failures.get(path, 0) + 1
288
- raise ValueError(f"Failed to refresh Antigravity token (HTTP {e.response.status_code}): {e.response.text}")
289
- except Exception as e:
290
- self._refresh_failures[path] = self._refresh_failures.get(path, 0) + 1
291
- raise ValueError(f"Failed to refresh Antigravity token: {e}")
292
-
293
- async def initialize_token(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
294
- """
295
- Initialize or refresh an OAuth token. Handles the complete OAuth flow if needed.
296
-
297
- Args:
298
- creds_or_path: Either a credentials dict or a file path string
299
-
300
- Returns:
301
- Valid credentials dict with fresh access token
302
- """
303
- path = creds_or_path if isinstance(creds_or_path, str) else None
304
-
305
- if isinstance(creds_or_path, dict):
306
- display_name = creds_or_path.get("_proxy_metadata", {}).get("display_name", "in-memory object")
307
- else:
308
- display_name = Path(path).name if path and path != "env" else "env"
309
-
310
- lib_logger.debug(f"Initializing Antigravity token for '{display_name}'...")
311
-
312
- try:
313
- creds = await self._load_credentials(creds_or_path) if path else creds_or_path
314
- reason = ""
315
- if not creds.get("refresh_token"):
316
- reason = "refresh token is missing"
317
- elif self._is_token_expired(creds):
318
- reason = "token is expired"
319
-
320
- if reason:
321
- if reason == "token is expired" and creds.get("refresh_token"):
322
- try:
323
- return await self._refresh_token(path, creds)
324
- except Exception as e:
325
- lib_logger.warning(f"Automatic token refresh for '{display_name}' failed: {e}. Proceeding to interactive login.")
326
-
327
- lib_logger.warning(f"Antigravity OAuth token for '{display_name}' needs setup: {reason}.")
328
-
329
- is_headless = is_headless_environment()
330
-
331
- auth_code_future = asyncio.get_event_loop().create_future()
332
- server = None
333
-
334
- async def handle_callback(reader, writer):
335
- try:
336
- request_line_bytes = await reader.readline()
337
- if not request_line_bytes:
338
- return
339
- path_str = request_line_bytes.decode('utf-8').strip().split(' ')[1]
340
- # Consume headers
341
- while await reader.readline() != b'\r\n':
342
- pass
343
-
344
- from urllib.parse import urlparse, parse_qs
345
- query_params = parse_qs(urlparse(path_str).query)
346
-
347
- writer.write(b"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n")
348
- if 'code' in query_params:
349
- if not auth_code_future.done():
350
- auth_code_future.set_result(query_params['code'][0])
351
- writer.write(b"<html><body><h1>Authentication successful!</h1><p>You can close this window.</p></body></html>")
352
- else:
353
- error = query_params.get('error', ['Unknown error'])[0]
354
- if not auth_code_future.done():
355
- auth_code_future.set_exception(Exception(f"OAuth failed: {error}"))
356
- writer.write(f"<html><body><h1>Authentication Failed</h1><p>Error: {error}. Please try again.</p></body></html>".encode())
357
- await writer.drain()
358
- except Exception as e:
359
- lib_logger.error(f"Error in OAuth callback handler: {e}")
360
- finally:
361
- writer.close()
362
-
363
- try:
364
- server = await asyncio.start_server(handle_callback, '127.0.0.1', 8085)
365
-
366
- from urllib.parse import urlencode
367
- auth_url = "https://accounts.google.com/o/oauth2/v2/auth?" + urlencode({
368
- "client_id": CLIENT_ID,
369
- "redirect_uri": "http://localhost:8085/oauth2callback",
370
- "scope": " ".join(OAUTH_SCOPES),
371
- "access_type": "offline",
372
- "response_type": "code",
373
- "prompt": "consent"
374
- })
375
-
376
- if is_headless:
377
- auth_panel_text = Text.from_markup(
378
- "Running in headless environment (no GUI detected).\n"
379
- "Please open the URL below in a browser on another machine to authorize:\n"
380
- )
381
- else:
382
- auth_panel_text = Text.from_markup(
383
- "1. Your browser will now open to log in and authorize the application.\n"
384
- "2. If it doesn't open automatically, please open the URL below manually."
385
- )
386
-
387
- console.print(Panel(auth_panel_text, title=f"Antigravity OAuth Setup for [bold yellow]{display_name}[/bold yellow]", style="bold blue"))
388
- console.print(f"[bold]URL:[/bold] [link={auth_url}]{auth_url}[/link]\n")
389
-
390
- if not is_headless:
391
- try:
392
- webbrowser.open(auth_url)
393
- lib_logger.info("Browser opened successfully for OAuth flow")
394
- except Exception as e:
395
- lib_logger.warning(f"Failed to open browser automatically: {e}. Please open the URL manually.")
396
-
397
- with console.status("[bold green]Waiting for you to complete authentication in the browser...[/bold green]", spinner="dots"):
398
- auth_code = await asyncio.wait_for(auth_code_future, timeout=300)
399
- except asyncio.TimeoutError:
400
- raise Exception("OAuth flow timed out. Please try again.")
401
- finally:
402
- if server:
403
- server.close()
404
- await server.wait_closed()
405
-
406
- lib_logger.info(f"Attempting to exchange authorization code for tokens...")
407
- async with httpx.AsyncClient() as client:
408
- response = await client.post(TOKEN_URI, data={
409
- "code": auth_code.strip(),
410
- "client_id": CLIENT_ID,
411
- "client_secret": CLIENT_SECRET,
412
- "redirect_uri": "http://localhost:8085/oauth2callback",
413
- "grant_type": "authorization_code"
414
- })
415
- response.raise_for_status()
416
- token_data = response.json()
417
-
418
- creds = token_data.copy()
419
- creds["expiry_date"] = (time.time() + creds.pop("expires_in")) * 1000
420
- creds["client_id"] = CLIENT_ID
421
- creds["client_secret"] = CLIENT_SECRET
422
- creds["token_uri"] = TOKEN_URI
423
- creds["universe_domain"] = "googleapis.com"
424
-
425
- # Fetch user info
426
- user_info_response = await client.get(
427
- USER_INFO_URI,
428
- headers={"Authorization": f"Bearer {creds['access_token']}"}
429
- )
430
- user_info_response.raise_for_status()
431
- user_info = user_info_response.json()
432
-
433
- creds["_proxy_metadata"] = {
434
- "email": user_info.get("email"),
435
- "last_check_timestamp": time.time()
436
- }
437
-
438
- if path:
439
- await self._save_credentials(path, creds)
440
-
441
- lib_logger.info(f"Antigravity OAuth initialized successfully for '{display_name}'.")
442
- return creds
443
-
444
- lib_logger.info(f"Antigravity OAuth token at '{display_name}' is valid.")
445
- return creds
446
- except Exception as e:
447
- raise ValueError(f"Failed to initialize Antigravity OAuth for '{display_name}': {e}")
448
-
449
- async def get_valid_token(self, credential_path: str) -> str:
450
- """
451
- Get a valid access token, refreshing if necessary.
452
-
453
- Args:
454
- credential_path: Path to credential file or "env" for environment variables
455
-
456
- Returns:
457
- Valid access token string
458
-
459
- Raises:
460
- ValueError: If token cannot be obtained
461
- """
462
- try:
463
- creds = await self.initialize_token(credential_path)
464
- return creds['access_token']
465
- except Exception as e:
466
- raise ValueError(f"Failed to get valid Antigravity token: {e}")
 
1
  # src/rotator_library/providers/antigravity_auth_base.py
2
 
3
+ from .google_oauth_base import GoogleOAuthBase
 
 
 
 
 
 
 
 
 
 
4
 
5
+ class AntigravityAuthBase(GoogleOAuthBase):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6
  """
7
+ Antigravity OAuth2 authentication implementation.
 
8
 
9
+ Inherits all OAuth functionality from GoogleOAuthBase with Antigravity-specific configuration.
10
+ Uses Antigravity's OAuth credentials and includes additional scopes for cclog and experimentsandconfigs.
11
  """
12
 
13
+ CLIENT_ID = "1071006060591-tmhssin2h21lcre235vtolojh4g403ep.apps.googleusercontent.com"
14
+ CLIENT_SECRET = "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"
15
+ OAUTH_SCOPES = [
16
+ "https://www.googleapis.com/auth/cloud-platform",
17
+ "https://www.googleapis.com/auth/userinfo.email",
18
+ "https://www.googleapis.com/auth/userinfo.profile",
19
+ "https://www.googleapis.com/auth/cclog", # Antigravity-specific
20
+ "https://www.googleapis.com/auth/experimentsandconfigs", # Antigravity-specific
21
+ ]
22
+ ENV_PREFIX = "ANTIGRAVITY"
23
+ CALLBACK_PORT = 51121
24
+ CALLBACK_PATH = "/oauthcallback"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/rotator_library/providers/gemini_auth_base.py CHANGED
@@ -1,625 +1,21 @@
1
  # src/rotator_library/providers/gemini_auth_base.py
2
 
3
- import os
4
- import webbrowser
5
- from typing import Union, Optional
6
- import json
7
- import time
8
- import asyncio
9
- import logging
10
- from pathlib import Path
11
- from typing import Dict, Any
12
- import tempfile
13
- import shutil
14
-
15
- import httpx
16
- from rich.console import Console
17
- from rich.panel import Panel
18
- from rich.text import Text
19
-
20
- from ..utils.headless_detection import is_headless_environment
21
-
22
- lib_logger = logging.getLogger('rotator_library')
23
-
24
- CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com" #https://api.kilocode.ai/extension-config.json
25
- CLIENT_SECRET = "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" #https://api.kilocode.ai/extension-config.json
26
- TOKEN_URI = "https://oauth2.googleapis.com/token"
27
- USER_INFO_URI = "https://www.googleapis.com/oauth2/v1/userinfo"
28
- REFRESH_EXPIRY_BUFFER_SECONDS = 30 * 60 # 30 minutes buffer before expiry
29
-
30
- console = Console()
31
-
32
- class GeminiAuthBase:
33
- def __init__(self):
34
- self._credentials_cache: Dict[str, Dict[str, Any]] = {}
35
- self._refresh_locks: Dict[str, asyncio.Lock] = {}
36
- self._locks_lock = asyncio.Lock() # Protects the locks dict from race conditions
37
- # [BACKOFF TRACKING] Track consecutive failures per credential
38
- self._refresh_failures: Dict[str, int] = {} # Track consecutive failures per credential
39
- self._next_refresh_after: Dict[str, float] = {} # Track backoff timers (Unix timestamp)
40
-
41
- # [QUEUE SYSTEM] Sequential refresh processing
42
- self._refresh_queue: asyncio.Queue = asyncio.Queue()
43
- self._queued_credentials: set = set() # Track credentials already in queue
44
- self._unavailable_credentials: set = set() # Mark credentials unavailable during re-auth
45
- self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
46
- self._queue_processor_task: Optional[asyncio.Task] = None # Background worker task
47
-
48
- def _load_from_env(self) -> Optional[Dict[str, Any]]:
49
- """
50
- Load OAuth credentials from environment variables for stateless deployments.
51
-
52
- Expected environment variables:
53
- - GEMINI_CLI_ACCESS_TOKEN (required)
54
- - GEMINI_CLI_REFRESH_TOKEN (required)
55
- - GEMINI_CLI_EXPIRY_DATE (optional, defaults to 0)
56
- - GEMINI_CLI_CLIENT_ID (optional, uses default)
57
- - GEMINI_CLI_CLIENT_SECRET (optional, uses default)
58
- - GEMINI_CLI_TOKEN_URI (optional, uses default)
59
- - GEMINI_CLI_UNIVERSE_DOMAIN (optional, defaults to googleapis.com)
60
- - GEMINI_CLI_EMAIL (optional, defaults to "env-user")
61
- - GEMINI_CLI_PROJECT_ID (optional)
62
- - GEMINI_CLI_TIER (optional)
63
-
64
- Returns:
65
- Dict with credential structure if env vars present, None otherwise
66
- """
67
- access_token = os.getenv("GEMINI_CLI_ACCESS_TOKEN")
68
- refresh_token = os.getenv("GEMINI_CLI_REFRESH_TOKEN")
69
-
70
- # Both access and refresh tokens are required
71
- if not (access_token and refresh_token):
72
- return None
73
-
74
- lib_logger.debug("Loading Gemini CLI credentials from environment variables")
75
-
76
- # Parse expiry_date as float, default to 0 if not present
77
- expiry_str = os.getenv("GEMINI_CLI_EXPIRY_DATE", "0")
78
- try:
79
- expiry_date = float(expiry_str)
80
- except ValueError:
81
- lib_logger.warning(f"Invalid GEMINI_CLI_EXPIRY_DATE value: {expiry_str}, using 0")
82
- expiry_date = 0
83
-
84
- creds = {
85
- "access_token": access_token,
86
- "refresh_token": refresh_token,
87
- "expiry_date": expiry_date,
88
- "client_id": os.getenv("GEMINI_CLI_CLIENT_ID", CLIENT_ID),
89
- "client_secret": os.getenv("GEMINI_CLI_CLIENT_SECRET", CLIENT_SECRET),
90
- "token_uri": os.getenv("GEMINI_CLI_TOKEN_URI", TOKEN_URI),
91
- "universe_domain": os.getenv("GEMINI_CLI_UNIVERSE_DOMAIN", "googleapis.com"),
92
- "_proxy_metadata": {
93
- "email": os.getenv("GEMINI_CLI_EMAIL", "env-user"),
94
- "last_check_timestamp": time.time(),
95
- "loaded_from_env": True # Flag to indicate env-based credentials
96
- }
97
- }
98
-
99
- # Add project_id if provided
100
- project_id = os.getenv("GEMINI_CLI_PROJECT_ID")
101
- if project_id:
102
- creds["_proxy_metadata"]["project_id"] = project_id
103
-
104
- # Add tier if provided
105
- tier = os.getenv("GEMINI_CLI_TIER")
106
- if tier:
107
- creds["_proxy_metadata"]["tier"] = tier
108
-
109
- return creds
110
-
111
- async def _load_credentials(self, path: str) -> Dict[str, Any]:
112
- if path in self._credentials_cache:
113
- return self._credentials_cache[path]
114
-
115
- async with await self._get_lock(path):
116
- if path in self._credentials_cache:
117
- return self._credentials_cache[path]
118
-
119
- # First, try loading from environment variables
120
- env_creds = self._load_from_env()
121
- if env_creds:
122
- lib_logger.info("Using Gemini CLI credentials from environment variables")
123
- # Cache env-based credentials using the path as key
124
- self._credentials_cache[path] = env_creds
125
- return env_creds
126
-
127
- # Fall back to file-based loading
128
- try:
129
- lib_logger.debug(f"Loading Gemini credentials from file: {path}")
130
- with open(path, 'r') as f:
131
- creds = json.load(f)
132
- # Handle gcloud-style creds file which nest tokens under "credential"
133
- if "credential" in creds:
134
- creds = creds["credential"]
135
- self._credentials_cache[path] = creds
136
- return creds
137
- except FileNotFoundError:
138
- raise IOError(f"Gemini OAuth credential file not found at '{path}'")
139
- except Exception as e:
140
- raise IOError(f"Failed to load Gemini OAuth credentials from '{path}': {e}")
141
-
142
- async def _save_credentials(self, path: str, creds: Dict[str, Any]):
143
- # Don't save to file if credentials were loaded from environment
144
- if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
145
- lib_logger.debug("Credentials loaded from env, skipping file save")
146
- # Still update cache for in-memory consistency
147
- self._credentials_cache[path] = creds
148
- return
149
-
150
- # [ATOMIC WRITE] Use tempfile + move pattern to ensure atomic writes
151
- # This prevents credential corruption if the process is interrupted during write
152
- parent_dir = os.path.dirname(os.path.abspath(path))
153
- os.makedirs(parent_dir, exist_ok=True)
154
-
155
- tmp_fd = None
156
- tmp_path = None
157
- try:
158
- # Create temp file in same directory as target (ensures same filesystem)
159
- tmp_fd, tmp_path = tempfile.mkstemp(dir=parent_dir, prefix='.tmp_', suffix='.json', text=True)
160
-
161
- # Write JSON to temp file
162
- with os.fdopen(tmp_fd, 'w') as f:
163
- json.dump(creds, f, indent=2)
164
- tmp_fd = None # fdopen closes the fd
165
-
166
- # Set secure permissions (0600 = owner read/write only)
167
- try:
168
- os.chmod(tmp_path, 0o600)
169
- except (OSError, AttributeError):
170
- # Windows may not support chmod, ignore
171
- pass
172
-
173
- # Atomic move (overwrites target if it exists)
174
- shutil.move(tmp_path, path)
175
- tmp_path = None # Successfully moved
176
-
177
- # Update cache AFTER successful file write (prevents cache/file inconsistency)
178
- self._credentials_cache[path] = creds
179
- lib_logger.debug(f"Saved updated Gemini OAuth credentials to '{path}' (atomic write).")
180
-
181
- except Exception as e:
182
- lib_logger.error(f"Failed to save updated Gemini OAuth credentials to '{path}': {e}")
183
- # Clean up temp file if it still exists
184
- if tmp_fd is not None:
185
- try:
186
- os.close(tmp_fd)
187
- except:
188
- pass
189
- if tmp_path and os.path.exists(tmp_path):
190
- try:
191
- os.unlink(tmp_path)
192
- except:
193
- pass
194
- raise
195
-
196
- def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
197
- expiry = creds.get("token_expiry") # gcloud format
198
- if not expiry: # gemini-cli format
199
- expiry_timestamp = creds.get("expiry_date", 0) / 1000
200
- else:
201
- expiry_timestamp = time.mktime(time.strptime(expiry, "%Y-%m-%dT%H:%M:%SZ"))
202
- return expiry_timestamp < time.time() + REFRESH_EXPIRY_BUFFER_SECONDS
203
-
204
- async def _refresh_token(self, path: str, creds: Dict[str, Any], force: bool = False) -> Dict[str, Any]:
205
- async with await self._get_lock(path):
206
- # Skip the expiry check if a refresh is being forced
207
- if not force and not self._is_token_expired(self._credentials_cache.get(path, creds)):
208
- return self._credentials_cache.get(path, creds)
209
-
210
- lib_logger.debug(f"Refreshing Gemini OAuth token for '{Path(path).name}' (forced: {force})...")
211
- refresh_token = creds.get("refresh_token")
212
- if not refresh_token:
213
- raise ValueError("No refresh_token found in credentials file.")
214
-
215
- # [RETRY LOGIC] Implement exponential backoff for transient errors
216
- max_retries = 3
217
- new_token_data = None
218
- last_error = None
219
- needs_reauth = False
220
-
221
- async with httpx.AsyncClient() as client:
222
- for attempt in range(max_retries):
223
- try:
224
- response = await client.post(TOKEN_URI, data={
225
- "client_id": creds.get("client_id", CLIENT_ID),
226
- "client_secret": creds.get("client_secret", CLIENT_SECRET),
227
- "refresh_token": refresh_token,
228
- "grant_type": "refresh_token",
229
- }, timeout=30.0)
230
- response.raise_for_status()
231
- new_token_data = response.json()
232
- break # Success, exit retry loop
233
-
234
- except httpx.HTTPStatusError as e:
235
- last_error = e
236
- status_code = e.response.status_code
237
-
238
- # [INVALID GRANT HANDLING] Handle 401/403 by triggering re-authentication
239
- if status_code == 401 or status_code == 403:
240
- lib_logger.warning(
241
- f"Refresh token invalid for '{Path(path).name}' (HTTP {status_code}). "
242
- f"Token may have been revoked or expired. Starting re-authentication..."
243
- )
244
- needs_reauth = True
245
- break # Exit retry loop to trigger re-auth
246
-
247
- elif status_code == 429:
248
- # Rate limit - honor Retry-After header if present
249
- retry_after = int(e.response.headers.get("Retry-After", 60))
250
- lib_logger.warning(f"Rate limited (HTTP 429), retry after {retry_after}s")
251
- if attempt < max_retries - 1:
252
- await asyncio.sleep(retry_after)
253
- continue
254
- raise
255
-
256
- elif status_code >= 500 and status_code < 600:
257
- # Server error - retry with exponential backoff
258
- if attempt < max_retries - 1:
259
- wait_time = 2 ** attempt # 1s, 2s, 4s
260
- lib_logger.warning(f"Server error (HTTP {status_code}), retry {attempt + 1}/{max_retries} in {wait_time}s")
261
- await asyncio.sleep(wait_time)
262
- continue
263
- raise # Final attempt failed
264
-
265
- else:
266
- # Other errors - don't retry
267
- raise
268
-
269
- except (httpx.RequestError, httpx.TimeoutException) as e:
270
- # Network errors - retry with backoff
271
- last_error = e
272
- if attempt < max_retries - 1:
273
- wait_time = 2 ** attempt
274
- lib_logger.warning(f"Network error during refresh: {e}, retry {attempt + 1}/{max_retries} in {wait_time}s")
275
- await asyncio.sleep(wait_time)
276
- continue
277
- raise
278
-
279
- # [INVALID GRANT RE-AUTH] Trigger OAuth flow if refresh token is invalid
280
- if needs_reauth:
281
- lib_logger.info(f"Starting re-authentication for '{Path(path).name}'...")
282
- try:
283
- # Call initialize_token to trigger OAuth flow
284
- new_creds = await self.initialize_token(path)
285
- return new_creds
286
- except Exception as reauth_error:
287
- lib_logger.error(f"Re-authentication failed for '{Path(path).name}': {reauth_error}")
288
- raise ValueError(f"Refresh token invalid and re-authentication failed: {reauth_error}")
289
-
290
- # If we exhausted retries without success
291
- if new_token_data is None:
292
- raise last_error or Exception("Token refresh failed after all retries")
293
-
294
- # [FIX 1] Update OAuth token fields from response
295
- creds["access_token"] = new_token_data["access_token"]
296
- expiry_timestamp = time.time() + new_token_data["expires_in"]
297
- creds["expiry_date"] = expiry_timestamp * 1000 # gemini-cli format
298
-
299
- # [FIX 2] Update refresh_token if server provided a new one (rare but possible with Google OAuth)
300
- if "refresh_token" in new_token_data:
301
- creds["refresh_token"] = new_token_data["refresh_token"]
302
-
303
- # [FIX 3] Ensure all required OAuth client fields are present (restore if missing)
304
- if "client_id" not in creds or not creds["client_id"]:
305
- creds["client_id"] = CLIENT_ID
306
- if "client_secret" not in creds or not creds["client_secret"]:
307
- creds["client_secret"] = CLIENT_SECRET
308
- if "token_uri" not in creds or not creds["token_uri"]:
309
- creds["token_uri"] = TOKEN_URI
310
- if "universe_domain" not in creds or not creds["universe_domain"]:
311
- creds["universe_domain"] = "googleapis.com"
312
-
313
- # [FIX 4] Add scopes array if missing
314
- if "scopes" not in creds:
315
- creds["scopes"] = [
316
- "https://www.googleapis.com/auth/cloud-platform",
317
- "https://www.googleapis.com/auth/userinfo.email",
318
- "https://www.googleapis.com/auth/userinfo.profile",
319
- ]
320
-
321
- # [FIX 5] Ensure _proxy_metadata exists and update timestamp
322
- if "_proxy_metadata" not in creds:
323
- creds["_proxy_metadata"] = {}
324
- creds["_proxy_metadata"]["last_check_timestamp"] = time.time()
325
-
326
- # [VALIDATION] Verify refreshed credentials have all required fields
327
- required_fields = ["access_token", "refresh_token", "client_id", "client_secret", "token_uri"]
328
- missing_fields = [field for field in required_fields if not creds.get(field)]
329
- if missing_fields:
330
- raise ValueError(f"Refreshed credentials missing required fields: {missing_fields}")
331
-
332
- # [VALIDATION] Optional: Test that the refreshed token is actually usable
333
- try:
334
- async with httpx.AsyncClient() as client:
335
- test_response = await client.get(
336
- USER_INFO_URI,
337
- headers={"Authorization": f"Bearer {creds['access_token']}"},
338
- timeout=5.0
339
- )
340
- test_response.raise_for_status()
341
- lib_logger.debug(f"Token validation successful for '{Path(path).name}'")
342
- except Exception as e:
343
- lib_logger.warning(f"Refreshed token validation failed for '{Path(path).name}': {e}")
344
- # Don't fail the refresh - the token might still work for other endpoints
345
- # But log it for debugging purposes
346
-
347
- await self._save_credentials(path, creds)
348
- lib_logger.debug(f"Successfully refreshed Gemini OAuth token for '{Path(path).name}'.")
349
- return creds
350
-
351
- async def proactively_refresh(self, credential_path: str):
352
- """Proactively refresh a credential by queueing it for refresh."""
353
- creds = await self._load_credentials(credential_path)
354
- if self._is_token_expired(creds):
355
- # Queue for refresh with needs_reauth=False (automated refresh)
356
- await self._queue_refresh(credential_path, force=False, needs_reauth=False)
357
-
358
- async def _get_lock(self, path: str) -> asyncio.Lock:
359
- # [FIX RACE CONDITION] Protect lock creation with a master lock
360
- # This prevents TOCTOU bug where multiple coroutines check and create simultaneously
361
- async with self._locks_lock:
362
- if path not in self._refresh_locks:
363
- self._refresh_locks[path] = asyncio.Lock()
364
- return self._refresh_locks[path]
365
-
366
- def is_credential_available(self, path: str) -> bool:
367
- """Check if a credential is available for rotation (not queued/refreshing)."""
368
- return path not in self._unavailable_credentials
369
-
370
- async def _ensure_queue_processor_running(self):
371
- """Lazily starts the queue processor if not already running."""
372
- if self._queue_processor_task is None or self._queue_processor_task.done():
373
- self._queue_processor_task = asyncio.create_task(self._process_refresh_queue())
374
-
375
- async def _queue_refresh(self, path: str, force: bool = False, needs_reauth: bool = False):
376
- """Add a credential to the refresh queue if not already queued.
377
-
378
- Args:
379
- path: Credential file path
380
- force: Force refresh even if not expired
381
- needs_reauth: True if full re-authentication needed (bypasses backoff)
382
- """
383
- # IMPORTANT: Only check backoff for simple automated refreshes
384
- # Re-authentication (interactive OAuth) should BYPASS backoff since it needs user input
385
- if not needs_reauth:
386
- now = time.time()
387
- if path in self._next_refresh_after:
388
- backoff_until = self._next_refresh_after[path]
389
- if now < backoff_until:
390
- # Credential is in backoff for automated refresh, do not queue
391
- remaining = int(backoff_until - now)
392
- lib_logger.debug(f"Skipping automated refresh for '{Path(path).name}' (in backoff for {remaining}s)")
393
- return
394
-
395
- async with self._queue_tracking_lock:
396
- if path not in self._queued_credentials:
397
- self._queued_credentials.add(path)
398
- self._unavailable_credentials.add(path) # Mark as unavailable
399
- await self._refresh_queue.put((path, force, needs_reauth))
400
- await self._ensure_queue_processor_running()
401
-
402
- async def _process_refresh_queue(self):
403
- """Background worker that processes refresh requests sequentially."""
404
- while True:
405
- path = None
406
- try:
407
- # Wait for an item with timeout to allow graceful shutdown
408
- try:
409
- path, force, needs_reauth = await asyncio.wait_for(
410
- self._refresh_queue.get(),
411
- timeout=60.0
412
- )
413
- except asyncio.TimeoutError:
414
- # No items for 60s, exit to save resources
415
- self._queue_processor_task = None
416
- return
417
-
418
- try:
419
- # Perform the actual refresh (still using per-credential lock)
420
- async with await self._get_lock(path):
421
- # Re-check if still expired (may have changed since queueing)
422
- creds = self._credentials_cache.get(path)
423
- if creds and not self._is_token_expired(creds):
424
- # No longer expired, mark as available
425
- async with self._queue_tracking_lock:
426
- self._unavailable_credentials.discard(path)
427
- continue
428
-
429
- # Perform refresh
430
- if not creds:
431
- creds = await self._load_credentials(path)
432
- await self._refresh_token(path, creds, force=force)
433
-
434
- # SUCCESS: Mark as available again
435
- async with self._queue_tracking_lock:
436
- self._unavailable_credentials.discard(path)
437
-
438
- finally:
439
- # Remove from queued set
440
- async with self._queue_tracking_lock:
441
- self._queued_credentials.discard(path)
442
- self._refresh_queue.task_done()
443
- except asyncio.CancelledError:
444
- break
445
- except Exception as e:
446
- lib_logger.error(f"Error in queue processor: {e}")
447
- # Even on error, mark as available (backoff will prevent immediate retry)
448
- if path:
449
- async with self._queue_tracking_lock:
450
- self._unavailable_credentials.discard(path)
451
-
452
- async def initialize_token(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
453
- path = creds_or_path if isinstance(creds_or_path, str) else None
454
-
455
- # Get display name from metadata if available, otherwise derive from path
456
- if isinstance(creds_or_path, dict):
457
- display_name = creds_or_path.get("_proxy_metadata", {}).get("display_name", "in-memory object")
458
- else:
459
- display_name = Path(path).name if path else "in-memory object"
460
-
461
- lib_logger.debug(f"Initializing Gemini token for '{display_name}'...")
462
- try:
463
- creds = await self._load_credentials(creds_or_path) if path else creds_or_path
464
- reason = ""
465
- if not creds.get("refresh_token"):
466
- reason = "refresh token is missing"
467
- elif self._is_token_expired(creds):
468
- reason = "token is expired"
469
-
470
- if reason:
471
- if reason == "token is expired" and creds.get("refresh_token"):
472
- try:
473
- return await self._refresh_token(path, creds)
474
- except Exception as e:
475
- lib_logger.warning(f"Automatic token refresh for '{display_name}' failed: {e}. Proceeding to interactive login.")
476
-
477
- lib_logger.warning(f"Gemini OAuth token for '{display_name}' needs setup: {reason}.")
478
-
479
- # [HEADLESS DETECTION] Check if running in headless environment
480
- is_headless = is_headless_environment()
481
-
482
- auth_code_future = asyncio.get_event_loop().create_future()
483
- server = None
484
-
485
- async def handle_callback(reader, writer):
486
- try:
487
- request_line_bytes = await reader.readline()
488
- if not request_line_bytes: return
489
- path = request_line_bytes.decode('utf-8').strip().split(' ')[1]
490
- while await reader.readline() != b'\r\n': pass
491
- from urllib.parse import urlparse, parse_qs
492
- query_params = parse_qs(urlparse(path).query)
493
- writer.write(b"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n")
494
- if 'code' in query_params:
495
- if not auth_code_future.done():
496
- auth_code_future.set_result(query_params['code'][0])
497
- writer.write(b"<html><body><h1>Authentication successful!</h1><p>You can close this window.</p></body></html>")
498
- else:
499
- error = query_params.get('error', ['Unknown error'])[0]
500
- if not auth_code_future.done():
501
- auth_code_future.set_exception(Exception(f"OAuth failed: {error}"))
502
- writer.write(f"<html><body><h1>Authentication Failed</h1><p>Error: {error}. Please try again.</p></body></html>".encode())
503
- await writer.drain()
504
- except Exception as e:
505
- lib_logger.error(f"Error in OAuth callback handler: {e}")
506
- finally:
507
- writer.close()
508
-
509
- try:
510
- server = await asyncio.start_server(handle_callback, '127.0.0.1', 8085)
511
- from urllib.parse import urlencode
512
- auth_url = "https://accounts.google.com/o/oauth2/v2/auth?" + urlencode({
513
- "client_id": CLIENT_ID,
514
- "redirect_uri": "http://localhost:8085/oauth2callback",
515
- "scope": " ".join(["https://www.googleapis.com/auth/cloud-platform", "https://www.googleapis.com/auth/userinfo.email", "https://www.googleapis.com/auth/userinfo.profile"]),
516
- "access_type": "offline", "response_type": "code", "prompt": "consent"
517
- })
518
-
519
- # [HEADLESS SUPPORT] Display appropriate instructions
520
- if is_headless:
521
- auth_panel_text = Text.from_markup(
522
- "Running in headless environment (no GUI detected).\n"
523
- "Please open the URL below in a browser on another machine to authorize:\n"
524
- )
525
- else:
526
- auth_panel_text = Text.from_markup(
527
- "1. Your browser will now open to log in and authorize the application.\n"
528
- "2. If it doesn't open automatically, please open the URL below manually."
529
- )
530
-
531
- console.print(Panel(auth_panel_text, title=f"Gemini OAuth Setup for [bold yellow]{display_name}[/bold yellow]", style="bold blue"))
532
- console.print(f"[bold]URL:[/bold] [link={auth_url}]{auth_url}[/link]\n")
533
-
534
- # [HEADLESS SUPPORT] Only attempt browser open if NOT headless
535
- if not is_headless:
536
- try:
537
- webbrowser.open(auth_url)
538
- lib_logger.info("Browser opened successfully for OAuth flow")
539
- except Exception as e:
540
- lib_logger.warning(f"Failed to open browser automatically: {e}. Please open the URL manually.")
541
-
542
- with console.status("[bold green]Waiting for you to complete authentication in the browser...[/bold green]", spinner="dots"):
543
- auth_code = await asyncio.wait_for(auth_code_future, timeout=300)
544
- except asyncio.TimeoutError:
545
- raise Exception("OAuth flow timed out. Please try again.")
546
- finally:
547
- if server:
548
- server.close()
549
- await server.wait_closed()
550
-
551
- lib_logger.info(f"Attempting to exchange authorization code for tokens...")
552
- async with httpx.AsyncClient() as client:
553
- response = await client.post(TOKEN_URI, data={
554
- "code": auth_code.strip(), "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET,
555
- "redirect_uri": "http://localhost:8085/oauth2callback", "grant_type": "authorization_code"
556
- })
557
- response.raise_for_status()
558
- token_data = response.json()
559
- # Start with the full token data from the exchange
560
- creds = token_data.copy()
561
-
562
- # Convert 'expires_in' to 'expiry_date' in milliseconds
563
- creds["expiry_date"] = (time.time() + creds.pop("expires_in")) * 1000
564
-
565
- # Ensure client_id and client_secret are present
566
- creds["client_id"] = CLIENT_ID
567
- creds["client_secret"] = CLIENT_SECRET
568
-
569
- creds["token_uri"] = TOKEN_URI
570
- creds["universe_domain"] = "googleapis.com"
571
-
572
- # Fetch user info and add metadata
573
- user_info_response = await client.get(USER_INFO_URI, headers={"Authorization": f"Bearer {creds['access_token']}"})
574
- user_info_response.raise_for_status()
575
- user_info = user_info_response.json()
576
- creds["_proxy_metadata"] = {
577
- "email": user_info.get("email"),
578
- "last_check_timestamp": time.time()
579
- }
580
-
581
- if path:
582
- await self._save_credentials(path, creds)
583
- lib_logger.info(f"Gemini OAuth initialized successfully for '{display_name}'.")
584
- return creds
585
-
586
- lib_logger.info(f"Gemini OAuth token at '{display_name}' is valid.")
587
- return creds
588
- except Exception as e:
589
- raise ValueError(f"Failed to initialize Gemini OAuth for '{path}': {e}")
590
-
591
- async def get_auth_header(self, credential_path: str) -> Dict[str, str]:
592
- creds = await self._load_credentials(credential_path)
593
- if self._is_token_expired(creds):
594
- creds = await self._refresh_token(credential_path, creds)
595
- return {"Authorization": f"Bearer {creds['access_token']}"}
596
-
597
- async def get_user_info(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
598
- path = creds_or_path if isinstance(creds_or_path, str) else None
599
- creds = await self._load_credentials(creds_or_path) if path else creds_or_path
600
-
601
- if path and self._is_token_expired(creds):
602
- creds = await self._refresh_token(path, creds)
603
-
604
- # Prefer locally stored metadata
605
- if creds.get("_proxy_metadata", {}).get("email"):
606
- if path:
607
- creds["_proxy_metadata"]["last_check_timestamp"] = time.time()
608
- await self._save_credentials(path, creds)
609
- return {"email": creds["_proxy_metadata"]["email"]}
610
-
611
- # Fallback to API call if metadata is missing
612
- headers = {"Authorization": f"Bearer {creds['access_token']}"}
613
- async with httpx.AsyncClient() as client:
614
- response = await client.get(USER_INFO_URI, headers=headers)
615
- response.raise_for_status()
616
- user_info = response.json()
617
-
618
- # Save the retrieved info for future use
619
- creds["_proxy_metadata"] = {
620
- "email": user_info.get("email"),
621
- "last_check_timestamp": time.time()
622
- }
623
- if path:
624
- await self._save_credentials(path, creds)
625
- return {"email": user_info.get("email")}
 
1
  # src/rotator_library/providers/gemini_auth_base.py
2
 
3
+ from .google_oauth_base import GoogleOAuthBase
4
+
5
+ class GeminiAuthBase(GoogleOAuthBase):
6
+ """
7
+ Gemini CLI OAuth2 authentication implementation.
8
+
9
+ Inherits all OAuth functionality from GoogleOAuthBase with Gemini-specific configuration.
10
+ """
11
+
12
+ CLIENT_ID = "681255809395-oo8ft2oprdrnp9e3aqf6av3hmdib135j.apps.googleusercontent.com"
13
+ CLIENT_SECRET = "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"
14
+ OAUTH_SCOPES = [
15
+ "https://www.googleapis.com/auth/cloud-platform",
16
+ "https://www.googleapis.com/auth/userinfo.email",
17
+ "https://www.googleapis.com/auth/userinfo.profile",
18
+ ]
19
+ ENV_PREFIX = "GEMINI_CLI"
20
+ CALLBACK_PORT = 8085
21
+ CALLBACK_PATH = "/oauth2callback"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/rotator_library/providers/google_oauth_base.py ADDED
@@ -0,0 +1,653 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # src/rotator_library/providers/google_oauth_base.py
2
+
3
+ import os
4
+ import webbrowser
5
+ from typing import Union, Optional
6
+ import json
7
+ import time
8
+ import asyncio
9
+ import logging
10
+ from pathlib import Path
11
+ from typing import Dict, Any
12
+ import tempfile
13
+ import shutil
14
+
15
+ import httpx
16
+ from rich.console import Console
17
+ from rich.panel import Panel
18
+ from rich.text import Text
19
+
20
+ from ..utils.headless_detection import is_headless_environment
21
+
22
+ lib_logger = logging.getLogger('rotator_library')
23
+
24
+ console = Console()
25
+
26
+ class GoogleOAuthBase:
27
+ """
28
+ Base class for Google OAuth2 authentication providers.
29
+
30
+ Subclasses must override:
31
+ - CLIENT_ID: OAuth client ID
32
+ - CLIENT_SECRET: OAuth client secret
33
+ - OAUTH_SCOPES: List of OAuth scopes
34
+ - ENV_PREFIX: Prefix for environment variables (e.g., "GEMINI_CLI", "ANTIGRAVITY")
35
+
36
+ Subclasses may optionally override:
37
+ - CALLBACK_PORT: Local OAuth callback server port (default: 8085)
38
+ - CALLBACK_PATH: OAuth callback path (default: "/oauth2callback")
39
+ - REFRESH_EXPIRY_BUFFER_SECONDS: Time buffer before token expiry (default: 30 minutes)
40
+ """
41
+
42
+ # Subclasses MUST override these
43
+ CLIENT_ID: str = None
44
+ CLIENT_SECRET: str = None
45
+ OAUTH_SCOPES: list = None
46
+ ENV_PREFIX: str = None
47
+
48
+ # Subclasses MAY override these
49
+ TOKEN_URI: str = "https://oauth2.googleapis.com/token"
50
+ USER_INFO_URI: str = "https://www.googleapis.com/oauth2/v1/userinfo"
51
+ CALLBACK_PORT: int = 8085
52
+ CALLBACK_PATH: str = "/oauth2callback"
53
+ REFRESH_EXPIRY_BUFFER_SECONDS: int = 30 * 60 # 30 minutes
54
+
55
+ def __init__(self):
56
+ # Validate that subclass has set required attributes
57
+ if self.CLIENT_ID is None:
58
+ raise NotImplementedError(f"{self.__class__.__name__} must set CLIENT_ID")
59
+ if self.CLIENT_SECRET is None:
60
+ raise NotImplementedError(f"{self.__class__.__name__} must set CLIENT_SECRET")
61
+ if self.OAUTH_SCOPES is None:
62
+ raise NotImplementedError(f"{self.__class__.__name__} must set OAUTH_SCOPES")
63
+ if self.ENV_PREFIX is None:
64
+ raise NotImplementedError(f"{self.__class__.__name__} must set ENV_PREFIX")
65
+
66
+ self._credentials_cache: Dict[str, Dict[str, Any]] = {}
67
+ self._refresh_locks: Dict[str, asyncio.Lock] = {}
68
+ self._locks_lock = asyncio.Lock() # Protects the locks dict from race conditions
69
+ # [BACKOFF TRACKING] Track consecutive failures per credential
70
+ self._refresh_failures: Dict[str, int] = {} # Track consecutive failures per credential
71
+ self._next_refresh_after: Dict[str, float] = {} # Track backoff timers (Unix timestamp)
72
+
73
+ # [QUEUE SYSTEM] Sequential refresh processing
74
+ self._refresh_queue: asyncio.Queue = asyncio.Queue()
75
+ self._queued_credentials: set = set() # Track credentials already in queue
76
+ self._unavailable_credentials: set = set() # Mark credentials unavailable during re-auth
77
+ self._queue_tracking_lock = asyncio.Lock() # Protects queue sets
78
+ self._queue_processor_task: Optional[asyncio.Task] = None # Background worker task
79
+
80
+ def _load_from_env(self) -> Optional[Dict[str, Any]]:
81
+ """
82
+ Load OAuth credentials from environment variables for stateless deployments.
83
+
84
+ Expected environment variables:
85
+ - {ENV_PREFIX}_ACCESS_TOKEN (required)
86
+ - {ENV_PREFIX}_REFRESH_TOKEN (required)
87
+ - {ENV_PREFIX}_EXPIRY_DATE (optional, defaults to 0)
88
+ - {ENV_PREFIX}_CLIENT_ID (optional, uses default)
89
+ - {ENV_PREFIX}_CLIENT_SECRET (optional, uses default)
90
+ - {ENV_PREFIX}_TOKEN_URI (optional, uses default)
91
+ - {ENV_PREFIX}_UNIVERSE_DOMAIN (optional, defaults to googleapis.com)
92
+ - {ENV_PREFIX}_EMAIL (optional, defaults to "env-user")
93
+ - {ENV_PREFIX}_PROJECT_ID (optional)
94
+ - {ENV_PREFIX}_TIER (optional)
95
+
96
+ Returns:
97
+ Dict with credential structure if env vars present, None otherwise
98
+ """
99
+ access_token = os.getenv(f"{self.ENV_PREFIX}_ACCESS_TOKEN")
100
+ refresh_token = os.getenv(f"{self.ENV_PREFIX}_REFRESH_TOKEN")
101
+
102
+ # Both access and refresh tokens are required
103
+ if not (access_token and refresh_token):
104
+ return None
105
+
106
+ lib_logger.debug(f"Loading {self.ENV_PREFIX} credentials from environment variables")
107
+
108
+ # Parse expiry_date as float, default to 0 if not present
109
+ expiry_str = os.getenv(f"{self.ENV_PREFIX}_EXPIRY_DATE", "0")
110
+ try:
111
+ expiry_date = float(expiry_str)
112
+ except ValueError:
113
+ lib_logger.warning(f"Invalid {self.ENV_PREFIX}_EXPIRY_DATE value: {expiry_str}, using 0")
114
+ expiry_date = 0
115
+
116
+ creds = {
117
+ "access_token": access_token,
118
+ "refresh_token": refresh_token,
119
+ "expiry_date": expiry_date,
120
+ "client_id": os.getenv(f"{self.ENV_PREFIX}_CLIENT_ID", self.CLIENT_ID),
121
+ "client_secret": os.getenv(f"{self.ENV_PREFIX}_CLIENT_SECRET", self.CLIENT_SECRET),
122
+ "token_uri": os.getenv(f"{self.ENV_PREFIX}_TOKEN_URI", self.TOKEN_URI),
123
+ "universe_domain": os.getenv(f"{self.ENV_PREFIX}_UNIVERSE_DOMAIN", "googleapis.com"),
124
+ "_proxy_metadata": {
125
+ "email": os.getenv(f"{self.ENV_PREFIX}_EMAIL", "env-user"),
126
+ "last_check_timestamp": time.time(),
127
+ "loaded_from_env": True # Flag to indicate env-based credentials
128
+ }
129
+ }
130
+
131
+ # Add project_id if provided
132
+ project_id = os.getenv(f"{self.ENV_PREFIX}_PROJECT_ID")
133
+ if project_id:
134
+ creds["_proxy_metadata"]["project_id"] = project_id
135
+
136
+ # Add tier if provided
137
+ tier = os.getenv(f"{self.ENV_PREFIX}_TIER")
138
+ if tier:
139
+ creds["_proxy_metadata"]["tier"] = tier
140
+
141
+ return creds
142
+
143
+ async def _load_credentials(self, path: str) -> Dict[str, Any]:
144
+ if path in self._credentials_cache:
145
+ return self._credentials_cache[path]
146
+
147
+ async with await self._get_lock(path):
148
+ if path in self._credentials_cache:
149
+ return self._credentials_cache[path]
150
+
151
+ # First, try loading from environment variables
152
+ env_creds = self._load_from_env()
153
+ if env_creds:
154
+ lib_logger.info(f"Using {self.ENV_PREFIX} credentials from environment variables")
155
+ # Cache env-based credentials using the path as key
156
+ self._credentials_cache[path] = env_creds
157
+ return env_creds
158
+
159
+ # Fall back to file-based loading
160
+ try:
161
+ lib_logger.debug(f"Loading {self.ENV_PREFIX} credentials from file: {path}")
162
+ with open(path, 'r') as f:
163
+ creds = json.load(f)
164
+ # Handle gcloud-style creds file which nest tokens under "credential"
165
+ if "credential" in creds:
166
+ creds = creds["credential"]
167
+ self._credentials_cache[path] = creds
168
+ return creds
169
+ except FileNotFoundError:
170
+ raise IOError(f"{self.ENV_PREFIX} OAuth credential file not found at '{path}'")
171
+ except Exception as e:
172
+ raise IOError(f"Failed to load {self.ENV_PREFIX} OAuth credentials from '{path}': {e}")
173
+
174
+ async def _save_credentials(self, path: str, creds: Dict[str, Any]):
175
+ # Don't save to file if credentials were loaded from environment
176
+ if creds.get("_proxy_metadata", {}).get("loaded_from_env"):
177
+ lib_logger.debug("Credentials loaded from env, skipping file save")
178
+ # Still update cache for in-memory consistency
179
+ self._credentials_cache[path] = creds
180
+ return
181
+
182
+ # [ATOMIC WRITE] Use tempfile + move pattern to ensure atomic writes
183
+ # This prevents credential corruption if the process is interrupted during write
184
+ parent_dir = os.path.dirname(os.path.abspath(path))
185
+ os.makedirs(parent_dir, exist_ok=True)
186
+
187
+ tmp_fd = None
188
+ tmp_path = None
189
+ try:
190
+ # Create temp file in same directory as target (ensures same filesystem)
191
+ tmp_fd, tmp_path = tempfile.mkstemp(dir=parent_dir, prefix='.tmp_', suffix='.json', text=True)
192
+
193
+ # Write JSON to temp file
194
+ with os.fdopen(tmp_fd, 'w') as f:
195
+ json.dump(creds, f, indent=2)
196
+ tmp_fd = None # fdopen closes the fd
197
+
198
+ # Set secure permissions (0600 = owner read/write only)
199
+ try:
200
+ os.chmod(tmp_path, 0o600)
201
+ except (OSError, AttributeError):
202
+ # Windows may not support chmod, ignore
203
+ pass
204
+
205
+ # Atomic move (overwrites target if it exists)
206
+ shutil.move(tmp_path, path)
207
+ tmp_path = None # Successfully moved
208
+
209
+ # Update cache AFTER successful file write (prevents cache/file inconsistency)
210
+ self._credentials_cache[path] = creds
211
+ lib_logger.debug(f"Saved updated {self.ENV_PREFIX} OAuth credentials to '{path}' (atomic write).")
212
+
213
+ except Exception as e:
214
+ lib_logger.error(f"Failed to save updated {self.ENV_PREFIX} OAuth credentials to '{path}': {e}")
215
+ # Clean up temp file if it still exists
216
+ if tmp_fd is not None:
217
+ try:
218
+ os.close(tmp_fd)
219
+ except:
220
+ pass
221
+ if tmp_path and os.path.exists(tmp_path):
222
+ try:
223
+ os.unlink(tmp_path)
224
+ except:
225
+ pass
226
+ raise
227
+
228
+ def _is_token_expired(self, creds: Dict[str, Any]) -> bool:
229
+ expiry = creds.get("token_expiry") # gcloud format
230
+ if not expiry: # gemini-cli format
231
+ expiry_timestamp = creds.get("expiry_date", 0) / 1000
232
+ else:
233
+ expiry_timestamp = time.mktime(time.strptime(expiry, "%Y-%m-%dT%H:%M:%SZ"))
234
+ return expiry_timestamp < time.time() + self.REFRESH_EXPIRY_BUFFER_SECONDS
235
+
236
+ async def _refresh_token(self, path: str, creds: Dict[str, Any], force: bool = False) -> Dict[str, Any]:
237
+ async with await self._get_lock(path):
238
+ # Skip the expiry check if a refresh is being forced
239
+ if not force and not self._is_token_expired(self._credentials_cache.get(path, creds)):
240
+ return self._credentials_cache.get(path, creds)
241
+
242
+ lib_logger.debug(f"Refreshing {self.ENV_PREFIX} OAuth token for '{Path(path).name}' (forced: {force})...")
243
+ refresh_token = creds.get("refresh_token")
244
+ if not refresh_token:
245
+ raise ValueError("No refresh_token found in credentials file.")
246
+
247
+ # [RETRY LOGIC] Implement exponential backoff for transient errors
248
+ max_retries = 3
249
+ new_token_data = None
250
+ last_error = None
251
+ needs_reauth = False
252
+
253
+ async with httpx.AsyncClient() as client:
254
+ for attempt in range(max_retries):
255
+ try:
256
+ response = await client.post(self.TOKEN_URI, data={
257
+ "client_id": creds.get("client_id", self.CLIENT_ID),
258
+ "client_secret": creds.get("client_secret", self.CLIENT_SECRET),
259
+ "refresh_token": refresh_token,
260
+ "grant_type": "refresh_token",
261
+ }, timeout=30.0)
262
+ response.raise_for_status()
263
+ new_token_data = response.json()
264
+ break # Success, exit retry loop
265
+
266
+ except httpx.HTTPStatusError as e:
267
+ last_error = e
268
+ status_code = e.response.status_code
269
+
270
+ # [INVALID GRANT HANDLING] Handle 401/403 by triggering re-authentication
271
+ if status_code == 401 or status_code == 403:
272
+ lib_logger.warning(
273
+ f"Refresh token invalid for '{Path(path).name}' (HTTP {status_code}). "
274
+ f"Token may have been revoked or expired. Starting re-authentication..."
275
+ )
276
+ needs_reauth = True
277
+ break # Exit retry loop to trigger re-auth
278
+
279
+ elif status_code == 429:
280
+ # Rate limit - honor Retry-After header if present
281
+ retry_after = int(e.response.headers.get("Retry-After", 60))
282
+ lib_logger.warning(f"Rate limited (HTTP 429), retry after {retry_after}s")
283
+ if attempt < max_retries - 1:
284
+ await asyncio.sleep(retry_after)
285
+ continue
286
+ raise
287
+
288
+ elif status_code >= 500 and status_code < 600:
289
+ # Server error - retry with exponential backoff
290
+ if attempt < max_retries - 1:
291
+ wait_time = 2 ** attempt # 1s, 2s, 4s
292
+ lib_logger.warning(f"Server error (HTTP {status_code}), retry {attempt + 1}/{max_retries} in {wait_time}s")
293
+ await asyncio.sleep(wait_time)
294
+ continue
295
+ raise # Final attempt failed
296
+
297
+ else:
298
+ # Other errors - don't retry
299
+ raise
300
+
301
+ except (httpx.RequestError, httpx.TimeoutException) as e:
302
+ # Network errors - retry with backoff
303
+ last_error = e
304
+ if attempt < max_retries - 1:
305
+ wait_time = 2 ** attempt
306
+ lib_logger.warning(f"Network error during refresh: {e}, retry {attempt + 1}/{max_retries} in {wait_time}s")
307
+ await asyncio.sleep(wait_time)
308
+ continue
309
+ raise
310
+
311
+ # [INVALID GRANT RE-AUTH] Trigger OAuth flow if refresh token is invalid
312
+ if needs_reauth:
313
+ lib_logger.info(f"Starting re-authentication for '{Path(path).name}'...")
314
+ try:
315
+ # Call initialize_token to trigger OAuth flow
316
+ new_creds = await self.initialize_token(path)
317
+ return new_creds
318
+ except Exception as reauth_error:
319
+ lib_logger.error(f"Re-authentication failed for '{Path(path).name}': {reauth_error}")
320
+ raise ValueError(f"Refresh token invalid and re-authentication failed: {reauth_error}")
321
+
322
+ # If we exhausted retries without success
323
+ if new_token_data is None:
324
+ raise last_error or Exception("Token refresh failed after all retries")
325
+
326
+ # [FIX 1] Update OAuth token fields from response
327
+ creds["access_token"] = new_token_data["access_token"]
328
+ expiry_timestamp = time.time() + new_token_data["expires_in"]
329
+ creds["expiry_date"] = expiry_timestamp * 1000 # gemini-cli format
330
+
331
+ # [FIX 2] Update refresh_token if server provided a new one (rare but possible with Google OAuth)
332
+ if "refresh_token" in new_token_data:
333
+ creds["refresh_token"] = new_token_data["refresh_token"]
334
+
335
+ # [FIX 3] Ensure all required OAuth client fields are present (restore if missing)
336
+ if "client_id" not in creds or not creds["client_id"]:
337
+ creds["client_id"] = self.CLIENT_ID
338
+ if "client_secret" not in creds or not creds["client_secret"]:
339
+ creds["client_secret"] = self.CLIENT_SECRET
340
+ if "token_uri" not in creds or not creds["token_uri"]:
341
+ creds["token_uri"] = self.TOKEN_URI
342
+ if "universe_domain" not in creds or not creds["universe_domain"]:
343
+ creds["universe_domain"] = "googleapis.com"
344
+
345
+ # [FIX 4] Add scopes array if missing
346
+ if "scopes" not in creds:
347
+ creds["scopes"] = self.OAUTH_SCOPES
348
+
349
+ # [FIX 5] Ensure _proxy_metadata exists and update timestamp
350
+ if "_proxy_metadata" not in creds:
351
+ creds["_proxy_metadata"] = {}
352
+ creds["_proxy_metadata"]["last_check_timestamp"] = time.time()
353
+
354
+ # [VALIDATION] Verify refreshed credentials have all required fields
355
+ required_fields = ["access_token", "refresh_token", "client_id", "client_secret", "token_uri"]
356
+ missing_fields = [field for field in required_fields if not creds.get(field)]
357
+ if missing_fields:
358
+ raise ValueError(f"Refreshed credentials missing required fields: {missing_fields}")
359
+
360
+ # [VALIDATION] Optional: Test that the refreshed token is actually usable
361
+ try:
362
+ async with httpx.AsyncClient() as client:
363
+ test_response = await client.get(
364
+ self.USER_INFO_URI,
365
+ headers={"Authorization": f"Bearer {creds['access_token']}"},
366
+ timeout=5.0
367
+ )
368
+ test_response.raise_for_status()
369
+ lib_logger.debug(f"Token validation successful for '{Path(path).name}'")
370
+ except Exception as e:
371
+ lib_logger.warning(f"Refreshed token validation failed for '{Path(path).name}': {e}")
372
+ # Don't fail the refresh - the token might still work for other endpoints
373
+ # But log it for debugging purposes
374
+
375
+ await self._save_credentials(path, creds)
376
+ lib_logger.debug(f"Successfully refreshed {self.ENV_PREFIX} OAuth token for '{Path(path).name}'.")
377
+ return creds
378
+
379
+ async def proactively_refresh(self, credential_path: str):
380
+ """Proactively refresh a credential by queueing it for refresh."""
381
+ creds = await self._load_credentials(credential_path)
382
+ if self._is_token_expired(creds):
383
+ # Queue for refresh with needs_reauth=False (automated refresh)
384
+ await self._queue_refresh(credential_path, force=False, needs_reauth=False)
385
+
386
+ async def _get_lock(self, path: str) -> asyncio.Lock:
387
+ # [FIX RACE CONDITION] Protect lock creation with a master lock
388
+ # This prevents TOCTOU bug where multiple coroutines check and create simultaneously
389
+ async with self._locks_lock:
390
+ if path not in self._refresh_locks:
391
+ self._refresh_locks[path] = asyncio.Lock()
392
+ return self._refresh_locks[path]
393
+
394
+ def is_credential_available(self, path: str) -> bool:
395
+ """Check if a credential is available for rotation (not queued/refreshing)."""
396
+ return path not in self._unavailable_credentials
397
+
398
+ async def _ensure_queue_processor_running(self):
399
+ """Lazily starts the queue processor if not already running."""
400
+ if self._queue_processor_task is None or self._queue_processor_task.done():
401
+ self._queue_processor_task = asyncio.create_task(self._process_refresh_queue())
402
+
403
+ async def _queue_refresh(self, path: str, force: bool = False, needs_reauth: bool = False):
404
+ """Add a credential to the refresh queue if not already queued.
405
+
406
+ Args:
407
+ path: Credential file path
408
+ force: Force refresh even if not expired
409
+ needs_reauth: True if full re-authentication needed (bypasses backoff)
410
+ """
411
+ # IMPORTANT: Only check backoff for simple automated refreshes
412
+ # Re-authentication (interactive OAuth) should BYPASS backoff since it needs user input
413
+ if not needs_reauth:
414
+ now = time.time()
415
+ if path in self._next_refresh_after:
416
+ backoff_until = self._next_refresh_after[path]
417
+ if now < backoff_until:
418
+ # Credential is in backoff for automated refresh, do not queue
419
+ remaining = int(backoff_until - now)
420
+ lib_logger.debug(f"Skipping automated refresh for '{Path(path).name}' (in backoff for {remaining}s)")
421
+ return
422
+
423
+ async with self._queue_tracking_lock:
424
+ if path not in self._queued_credentials:
425
+ self._queued_credentials.add(path)
426
+ self._unavailable_credentials.add(path) # Mark as unavailable
427
+ await self._refresh_queue.put((path, force, needs_reauth))
428
+ await self._ensure_queue_processor_running()
429
+
430
+ async def _process_refresh_queue(self):
431
+ """Background worker that processes refresh requests sequentially."""
432
+ while True:
433
+ path = None
434
+ try:
435
+ # Wait for an item with timeout to allow graceful shutdown
436
+ try:
437
+ path, force, needs_reauth = await asyncio.wait_for(
438
+ self._refresh_queue.get(),
439
+ timeout=60.0
440
+ )
441
+ except asyncio.TimeoutError:
442
+ # No items for 60s, exit to save resources
443
+ self._queue_processor_task = None
444
+ return
445
+
446
+ try:
447
+ # Perform the actual refresh (still using per-credential lock)
448
+ async with await self._get_lock(path):
449
+ # Re-check if still expired (may have changed since queueing)
450
+ creds = self._credentials_cache.get(path)
451
+ if creds and not self._is_token_expired(creds):
452
+ # No longer expired, mark as available
453
+ async with self._queue_tracking_lock:
454
+ self._unavailable_credentials.discard(path)
455
+ continue
456
+
457
+ # Perform refresh
458
+ if not creds:
459
+ creds = await self._load_credentials(path)
460
+ await self._refresh_token(path, creds, force=force)
461
+
462
+ # SUCCESS: Mark as available again
463
+ async with self._queue_tracking_lock:
464
+ self._unavailable_credentials.discard(path)
465
+
466
+ finally:
467
+ # Remove from queued set
468
+ async with self._queue_tracking_lock:
469
+ self._queued_credentials.discard(path)
470
+ self._refresh_queue.task_done()
471
+ except asyncio.CancelledError:
472
+ break
473
+ except Exception as e:
474
+ lib_logger.error(f"Error in queue processor: {e}")
475
+ # Even on error, mark as available (backoff will prevent immediate retry)
476
+ if path:
477
+ async with self._queue_tracking_lock:
478
+ self._unavailable_credentials.discard(path)
479
+
480
+ async def initialize_token(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
481
+ path = creds_or_path if isinstance(creds_or_path, str) else None
482
+
483
+ # Get display name from metadata if available, otherwise derive from path
484
+ if isinstance(creds_or_path, dict):
485
+ display_name = creds_or_path.get("_proxy_metadata", {}).get("display_name", "in-memory object")
486
+ else:
487
+ display_name = Path(path).name if path else "in-memory object"
488
+
489
+ lib_logger.debug(f"Initializing {self.ENV_PREFIX} token for '{display_name}'...")
490
+ try:
491
+ creds = await self._load_credentials(creds_or_path) if path else creds_or_path
492
+ reason = ""
493
+ if not creds.get("refresh_token"):
494
+ reason = "refresh token is missing"
495
+ elif self._is_token_expired(creds):
496
+ reason = "token is expired"
497
+
498
+ if reason:
499
+ if reason == "token is expired" and creds.get("refresh_token"):
500
+ try:
501
+ return await self._refresh_token(path, creds)
502
+ except Exception as e:
503
+ lib_logger.warning(f"Automatic token refresh for '{display_name}' failed: {e}. Proceeding to interactive login.")
504
+
505
+ lib_logger.warning(f"{self.ENV_PREFIX} OAuth token for '{display_name}' needs setup: {reason}.")
506
+
507
+ # [HEADLESS DETECTION] Check if running in headless environment
508
+ is_headless = is_headless_environment()
509
+
510
+ auth_code_future = asyncio.get_event_loop().create_future()
511
+ server = None
512
+
513
+ async def handle_callback(reader, writer):
514
+ try:
515
+ request_line_bytes = await reader.readline()
516
+ if not request_line_bytes: return
517
+ path_str = request_line_bytes.decode('utf-8').strip().split(' ')[1]
518
+ while await reader.readline() != b'\r\n': pass
519
+ from urllib.parse import urlparse, parse_qs
520
+ query_params = parse_qs(urlparse(path_str).query)
521
+ writer.write(b"HTTP/1.1 200 OK\r\nContent-Type: text/html\r\n\r\n")
522
+ if 'code' in query_params:
523
+ if not auth_code_future.done():
524
+ auth_code_future.set_result(query_params['code'][0])
525
+ writer.write(b"<html><body><h1>Authentication successful!</h1><p>You can close this window.</p></body></html>")
526
+ else:
527
+ error = query_params.get('error', ['Unknown error'])[0]
528
+ if not auth_code_future.done():
529
+ auth_code_future.set_exception(Exception(f"OAuth failed: {error}"))
530
+ writer.write(f"<html><body><h1>Authentication Failed</h1><p>Error: {error}. Please try again.</p></body></html>".encode())
531
+ await writer.drain()
532
+ except Exception as e:
533
+ lib_logger.error(f"Error in OAuth callback handler: {e}")
534
+ finally:
535
+ writer.close()
536
+
537
+ try:
538
+ server = await asyncio.start_server(handle_callback, '127.0.0.1', self.CALLBACK_PORT)
539
+ from urllib.parse import urlencode
540
+ auth_url = "https://accounts.google.com/o/oauth2/v2/auth?" + urlencode({
541
+ "client_id": self.CLIENT_ID,
542
+ "redirect_uri": f"http://localhost:{self.CALLBACK_PORT}{self.CALLBACK_PATH}",
543
+ "scope": " ".join(self.OAUTH_SCOPES),
544
+ "access_type": "offline", "response_type": "code", "prompt": "consent"
545
+ })
546
+
547
+ # [HEADLESS SUPPORT] Display appropriate instructions
548
+ if is_headless:
549
+ auth_panel_text = Text.from_markup(
550
+ "Running in headless environment (no GUI detected).\n"
551
+ "Please open the URL below in a browser on another machine to authorize:\n"
552
+ )
553
+ else:
554
+ auth_panel_text = Text.from_markup(
555
+ "1. Your browser will now open to log in and authorize the application.\n"
556
+ "2. If it doesn't open automatically, please open the URL below manually."
557
+ )
558
+
559
+ console.print(Panel(auth_panel_text, title=f"{self.ENV_PREFIX} OAuth Setup for [bold yellow]{display_name}[/bold yellow]", style="bold blue"))
560
+ console.print(f"[bold]URL:[/bold] [link={auth_url}]{auth_url}[/link]\n")
561
+
562
+ # [HEADLESS SUPPORT] Only attempt browser open if NOT headless
563
+ if not is_headless:
564
+ try:
565
+ webbrowser.open(auth_url)
566
+ lib_logger.info("Browser opened successfully for OAuth flow")
567
+ except Exception as e:
568
+ lib_logger.warning(f"Failed to open browser automatically: {e}. Please open the URL manually.")
569
+
570
+ with console.status(f"[bold green]Waiting for you to complete authentication in the browser...[/bold green]", spinner="dots"):
571
+ auth_code = await asyncio.wait_for(auth_code_future, timeout=300)
572
+ except asyncio.TimeoutError:
573
+ raise Exception("OAuth flow timed out. Please try again.")
574
+ finally:
575
+ if server:
576
+ server.close()
577
+ await server.wait_closed()
578
+
579
+ lib_logger.info(f"Attempting to exchange authorization code for tokens...")
580
+ async with httpx.AsyncClient() as client:
581
+ response = await client.post(self.TOKEN_URI, data={
582
+ "code": auth_code.strip(), "client_id": self.CLIENT_ID, "client_secret": self.CLIENT_SECRET,
583
+ "redirect_uri": f"http://localhost:{self.CALLBACK_PORT}{self.CALLBACK_PATH}", "grant_type": "authorization_code"
584
+ })
585
+ response.raise_for_status()
586
+ token_data = response.json()
587
+ # Start with the full token data from the exchange
588
+ creds = token_data.copy()
589
+
590
+ # Convert 'expires_in' to 'expiry_date' in milliseconds
591
+ creds["expiry_date"] = (time.time() + creds.pop("expires_in")) * 1000
592
+
593
+ # Ensure client_id and client_secret are present
594
+ creds["client_id"] = self.CLIENT_ID
595
+ creds["client_secret"] = self.CLIENT_SECRET
596
+
597
+ creds["token_uri"] = self.TOKEN_URI
598
+ creds["universe_domain"] = "googleapis.com"
599
+
600
+ # Fetch user info and add metadata
601
+ user_info_response = await client.get(self.USER_INFO_URI, headers={"Authorization": f"Bearer {creds['access_token']}"})
602
+ user_info_response.raise_for_status()
603
+ user_info = user_info_response.json()
604
+ creds["_proxy_metadata"] = {
605
+ "email": user_info.get("email"),
606
+ "last_check_timestamp": time.time()
607
+ }
608
+
609
+ if path:
610
+ await self._save_credentials(path, creds)
611
+ lib_logger.info(f"{self.ENV_PREFIX} OAuth initialized successfully for '{display_name}'.")
612
+ return creds
613
+
614
+ lib_logger.info(f"{self.ENV_PREFIX} OAuth token at '{display_name}' is valid.")
615
+ return creds
616
+ except Exception as e:
617
+ raise ValueError(f"Failed to initialize {self.ENV_PREFIX} OAuth for '{path}': {e}")
618
+
619
+ async def get_auth_header(self, credential_path: str) -> Dict[str, str]:
620
+ creds = await self._load_credentials(credential_path)
621
+ if self._is_token_expired(creds):
622
+ creds = await self._refresh_token(credential_path, creds)
623
+ return {"Authorization": f"Bearer {creds['access_token']}"}
624
+
625
+ async def get_user_info(self, creds_or_path: Union[Dict[str, Any], str]) -> Dict[str, Any]:
626
+ path = creds_or_path if isinstance(creds_or_path, str) else None
627
+ creds = await self._load_credentials(creds_or_path) if path else creds_or_path
628
+
629
+ if path and self._is_token_expired(creds):
630
+ creds = await self._refresh_token(path, creds)
631
+
632
+ # Prefer locally stored metadata
633
+ if creds.get("_proxy_metadata", {}).get("email"):
634
+ if path:
635
+ creds["_proxy_metadata"]["last_check_timestamp"] = time.time()
636
+ await self._save_credentials(path, creds)
637
+ return {"email": creds["_proxy_metadata"]["email"]}
638
+
639
+ # Fallback to API call if metadata is missing
640
+ headers = {"Authorization": f"Bearer {creds['access_token']}"}
641
+ async with httpx.AsyncClient() as client:
642
+ response = await client.get(self.USER_INFO_URI, headers=headers)
643
+ response.raise_for_status()
644
+ user_info = response.json()
645
+
646
+ # Save the retrieved info for future use
647
+ creds["_proxy_metadata"] = {
648
+ "email": user_info.get("email"),
649
+ "last_check_timestamp": time.time()
650
+ }
651
+ if path:
652
+ await self._save_credentials(path, creds)
653
+ return {"email": user_info.get("email")}