subapi / gemini_webapi /utils /get_access_token.py
habulaj's picture
Upload 49 files
e816bb2 verified
import os
import re
from pathlib import Path
from curl_cffi.requests import AsyncSession, Cookies, Response
from .load_browser_cookies import HAS_BC3, load_browser_cookies
from .logger import logger
from .rotate_1psidts import _extract_cookie_value
from ..constants import Endpoint, Headers
from ..exceptions import AuthError
async def send_request(
client: AsyncSession, cookies: dict | Cookies, verbose: bool = False
) -> Response:
"""
Send http request with provided cookies using a shared session.
"""
client.cookies.clear()
if isinstance(cookies, Cookies):
client.cookies.update(cookies)
else:
for k, v in cookies.items():
client.cookies.set(k, v, domain=".google.com")
response = await client.get(Endpoint.INIT, headers=Headers.GEMINI.value)
if verbose:
logger.debug(f"HTTP Request: GET {Endpoint.INIT} [{response.status_code}]")
response.raise_for_status()
return response
async def get_access_token(
base_cookies: dict | Cookies,
proxy: str | None = None,
verbose: bool = False,
verify: bool = True,
) -> tuple[str | None, str | None, str | None, AsyncSession]:
"""
Send a get request to gemini.google.com for each group of available cookies and return
the value of "SNlM0e" as access token on the first successful request.
Returns the **live** AsyncSession that succeeded so the caller can reuse
the same TLS connection for subsequent requests.
"""
client = AsyncSession(
impersonate="chrome", proxy=proxy, allow_redirects=True, verify=verify
)
try:
response = await client.get(Endpoint.GOOGLE)
if verbose:
logger.debug(
f"HTTP Request: GET {Endpoint.GOOGLE} [{response.status_code}]"
)
preflight_cookies = Cookies(client.cookies)
except Exception:
await client.close()
raise
extra_cookies = Cookies()
if response.status_code == 200:
extra_cookies = preflight_cookies
# Phase 1: Prepare Cache
cookie_jars_to_test = []
tried_psid_ts = set()
if isinstance(base_cookies, Cookies):
base_psid = _extract_cookie_value(base_cookies, "__Secure-1PSID")
base_psidts = _extract_cookie_value(base_cookies, "__Secure-1PSIDTS")
else:
base_psid = base_cookies.get("__Secure-1PSID")
base_psidts = base_cookies.get("__Secure-1PSIDTS")
gemini_cookie_path = os.getenv("GEMINI_COOKIE_PATH")
if gemini_cookie_path:
cache_dir = Path(gemini_cookie_path)
else:
cache_dir = Path(__file__).parent / "temp"
if base_psid:
filename = f".cached_1psidts_{base_psid}.txt"
cache_file = cache_dir / filename
if cache_file.is_file():
cached_1psidts = cache_file.read_text().strip()
if cached_1psidts:
jar = Cookies(extra_cookies)
jar.update(base_cookies)
jar.set("__Secure-1PSIDTS", cached_1psidts, domain=".google.com")
cookie_jars_to_test.append((jar, "Cache"))
tried_psid_ts.add((base_psid, cached_1psidts))
elif verbose:
logger.debug("Skipping loading cached cookies. Cache file is empty.")
elif verbose:
logger.debug("Skipping loading cached cookies. Cache file not found.")
if not base_psid:
for cache_file in cache_dir.glob(".cached_1psidts_*.txt"):
psid = cache_file.stem[16:]
cached_1psidts = cache_file.read_text().strip()
if cached_1psidts:
jar = Cookies(extra_cookies)
jar.set("__Secure-1PSID", psid, domain=".google.com")
jar.set("__Secure-1PSIDTS", cached_1psidts, domain=".google.com")
cookie_jars_to_test.append((jar, "Cache"))
tried_psid_ts.add((psid, cached_1psidts))
# Phase 2: Base Cookies
if base_psid and base_psidts:
if (base_psid, base_psidts) not in tried_psid_ts:
jar = Cookies(extra_cookies)
jar.update(base_cookies)
cookie_jars_to_test.append((jar, "Base Cookies"))
tried_psid_ts.add((base_psid, base_psidts))
elif verbose:
logger.debug("Skipping base cookies as they match cached cookies.")
elif verbose and not cookie_jars_to_test:
logger.debug(
"Skipping loading base cookies. Either __Secure-1PSID or __Secure-1PSIDTS is not provided."
)
# Phase 3: Browser Cookies
try:
browser_cookies = load_browser_cookies(
domain_name="google.com", verbose=verbose
)
if browser_cookies:
for browser, cookies in browser_cookies.items():
if secure_1psid := cookies.get("__Secure-1PSID"):
if base_psid and base_psid != secure_1psid:
if verbose:
logger.debug(
f"Skipping loading local browser cookies from {browser}. "
"__Secure-1PSID does not match the one provided."
)
continue
secure_1psidts = cookies.get("__Secure-1PSIDTS")
if (secure_1psid, secure_1psidts or "") in tried_psid_ts:
continue
local_cookies = {"__Secure-1PSID": secure_1psid}
if secure_1psidts:
local_cookies["__Secure-1PSIDTS"] = secure_1psidts
if nid := cookies.get("NID"):
local_cookies["NID"] = nid
jar = Cookies(extra_cookies)
for k, v in local_cookies.items():
jar.set(k, v, domain=".google.com")
cookie_jars_to_test.append((jar, f"Browser ({browser})"))
tried_psid_ts.add((secure_1psid, secure_1psidts or ""))
if verbose:
logger.debug(f"Prepared local browser cookies from {browser}")
if (
HAS_BC3
and not any(group.startswith("Browser") for _, group in cookie_jars_to_test)
and verbose
):
logger.debug(
"Skipping loading local browser cookies. Login to gemini.google.com in your browser first."
)
except Exception:
if verbose:
logger.debug(
"Skipping loading local browser cookies (Not available or no permission)."
)
current_attempt = 0
for jar, group_name in cookie_jars_to_test:
current_attempt += 1
try:
res = await send_request(client, jar, verbose=verbose)
snlm0e = re.search(r'"SNlM0e":\s*"(.*?)"', res.text)
cfb2h = re.search(r'"cfb2h":\s*"(.*?)"', res.text)
fdrfje = re.search(r'"FdrFJe":\s*"(.*?)"', res.text)
if snlm0e or cfb2h or fdrfje:
if verbose:
logger.debug(
f"Init attempt ({current_attempt}) from {group_name} succeeded."
)
return (
snlm0e.group(1) if snlm0e else None,
cfb2h.group(1) if cfb2h else None,
fdrfje.group(1) if fdrfje else None,
client,
)
except Exception:
if verbose:
logger.debug(
f"Init attempt ({current_attempt}) from {group_name} failed."
)
await client.close()
raise AuthError(
f"Failed to initialize client after {current_attempt} attempts. SECURE_1PSIDTS could get expired frequently, please make sure cookie values are up to date."
)