import os import re from pathlib import Path from curl_cffi.requests import AsyncSession, Cookies, Response from .load_browser_cookies import HAS_BC3, load_browser_cookies from .logger import logger from .rotate_1psidts import _extract_cookie_value from ..constants import Endpoint, Headers from ..exceptions import AuthError async def send_request( client: AsyncSession, cookies: dict | Cookies, verbose: bool = False ) -> Response: """ Send http request with provided cookies using a shared session. """ client.cookies.clear() if isinstance(cookies, Cookies): client.cookies.update(cookies) else: for k, v in cookies.items(): client.cookies.set(k, v, domain=".google.com") response = await client.get(Endpoint.INIT, headers=Headers.GEMINI.value) if verbose: logger.debug(f"HTTP Request: GET {Endpoint.INIT} [{response.status_code}]") response.raise_for_status() return response async def get_access_token( base_cookies: dict | Cookies, proxy: str | None = None, verbose: bool = False, verify: bool = True, ) -> tuple[str | None, str | None, str | None, AsyncSession]: """ Send a get request to gemini.google.com for each group of available cookies and return the value of "SNlM0e" as access token on the first successful request. Returns the **live** AsyncSession that succeeded so the caller can reuse the same TLS connection for subsequent requests. """ client = AsyncSession( impersonate="chrome", proxy=proxy, allow_redirects=True, verify=verify ) try: response = await client.get(Endpoint.GOOGLE) if verbose: logger.debug( f"HTTP Request: GET {Endpoint.GOOGLE} [{response.status_code}]" ) preflight_cookies = Cookies(client.cookies) except Exception: await client.close() raise extra_cookies = Cookies() if response.status_code == 200: extra_cookies = preflight_cookies # Phase 1: Prepare Cache cookie_jars_to_test = [] tried_psid_ts = set() if isinstance(base_cookies, Cookies): base_psid = _extract_cookie_value(base_cookies, "__Secure-1PSID") base_psidts = _extract_cookie_value(base_cookies, "__Secure-1PSIDTS") else: base_psid = base_cookies.get("__Secure-1PSID") base_psidts = base_cookies.get("__Secure-1PSIDTS") gemini_cookie_path = os.getenv("GEMINI_COOKIE_PATH") if gemini_cookie_path: cache_dir = Path(gemini_cookie_path) else: cache_dir = Path(__file__).parent / "temp" if base_psid: filename = f".cached_1psidts_{base_psid}.txt" cache_file = cache_dir / filename if cache_file.is_file(): cached_1psidts = cache_file.read_text().strip() if cached_1psidts: jar = Cookies(extra_cookies) jar.update(base_cookies) jar.set("__Secure-1PSIDTS", cached_1psidts, domain=".google.com") cookie_jars_to_test.append((jar, "Cache")) tried_psid_ts.add((base_psid, cached_1psidts)) elif verbose: logger.debug("Skipping loading cached cookies. Cache file is empty.") elif verbose: logger.debug("Skipping loading cached cookies. Cache file not found.") if not base_psid: for cache_file in cache_dir.glob(".cached_1psidts_*.txt"): psid = cache_file.stem[16:] cached_1psidts = cache_file.read_text().strip() if cached_1psidts: jar = Cookies(extra_cookies) jar.set("__Secure-1PSID", psid, domain=".google.com") jar.set("__Secure-1PSIDTS", cached_1psidts, domain=".google.com") cookie_jars_to_test.append((jar, "Cache")) tried_psid_ts.add((psid, cached_1psidts)) # Phase 2: Base Cookies if base_psid and base_psidts: if (base_psid, base_psidts) not in tried_psid_ts: jar = Cookies(extra_cookies) jar.update(base_cookies) cookie_jars_to_test.append((jar, "Base Cookies")) tried_psid_ts.add((base_psid, base_psidts)) elif verbose: logger.debug("Skipping base cookies as they match cached cookies.") elif verbose and not cookie_jars_to_test: logger.debug( "Skipping loading base cookies. Either __Secure-1PSID or __Secure-1PSIDTS is not provided." ) # Phase 3: Browser Cookies try: browser_cookies = load_browser_cookies( domain_name="google.com", verbose=verbose ) if browser_cookies: for browser, cookies in browser_cookies.items(): if secure_1psid := cookies.get("__Secure-1PSID"): if base_psid and base_psid != secure_1psid: if verbose: logger.debug( f"Skipping loading local browser cookies from {browser}. " "__Secure-1PSID does not match the one provided." ) continue secure_1psidts = cookies.get("__Secure-1PSIDTS") if (secure_1psid, secure_1psidts or "") in tried_psid_ts: continue local_cookies = {"__Secure-1PSID": secure_1psid} if secure_1psidts: local_cookies["__Secure-1PSIDTS"] = secure_1psidts if nid := cookies.get("NID"): local_cookies["NID"] = nid jar = Cookies(extra_cookies) for k, v in local_cookies.items(): jar.set(k, v, domain=".google.com") cookie_jars_to_test.append((jar, f"Browser ({browser})")) tried_psid_ts.add((secure_1psid, secure_1psidts or "")) if verbose: logger.debug(f"Prepared local browser cookies from {browser}") if ( HAS_BC3 and not any(group.startswith("Browser") for _, group in cookie_jars_to_test) and verbose ): logger.debug( "Skipping loading local browser cookies. Login to gemini.google.com in your browser first." ) except Exception: if verbose: logger.debug( "Skipping loading local browser cookies (Not available or no permission)." ) current_attempt = 0 for jar, group_name in cookie_jars_to_test: current_attempt += 1 try: res = await send_request(client, jar, verbose=verbose) snlm0e = re.search(r'"SNlM0e":\s*"(.*?)"', res.text) cfb2h = re.search(r'"cfb2h":\s*"(.*?)"', res.text) fdrfje = re.search(r'"FdrFJe":\s*"(.*?)"', res.text) if snlm0e or cfb2h or fdrfje: if verbose: logger.debug( f"Init attempt ({current_attempt}) from {group_name} succeeded." ) return ( snlm0e.group(1) if snlm0e else None, cfb2h.group(1) if cfb2h else None, fdrfje.group(1) if fdrfje else None, client, ) except Exception: if verbose: logger.debug( f"Init attempt ({current_attempt}) from {group_name} failed." ) await client.close() raise AuthError( f"Failed to initialize client after {current_attempt} attempts. SECURE_1PSIDTS could get expired frequently, please make sure cookie values are up to date." )