File size: 8,005 Bytes
e816bb2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
import os
import re
from pathlib import Path

from curl_cffi.requests import AsyncSession, Cookies, Response

from .load_browser_cookies import HAS_BC3, load_browser_cookies
from .logger import logger
from .rotate_1psidts import _extract_cookie_value
from ..constants import Endpoint, Headers
from ..exceptions import AuthError


async def send_request(

    client: AsyncSession, cookies: dict | Cookies, verbose: bool = False

) -> Response:
    """

    Send http request with provided cookies using a shared session.

    """
    client.cookies.clear()
    if isinstance(cookies, Cookies):
        client.cookies.update(cookies)
    else:
        for k, v in cookies.items():
            client.cookies.set(k, v, domain=".google.com")

    response = await client.get(Endpoint.INIT, headers=Headers.GEMINI.value)
    if verbose:
        logger.debug(f"HTTP Request: GET {Endpoint.INIT} [{response.status_code}]")
    response.raise_for_status()
    return response


async def get_access_token(

    base_cookies: dict | Cookies,

    proxy: str | None = None,

    verbose: bool = False,

    verify: bool = True,

) -> tuple[str | None, str | None, str | None, AsyncSession]:
    """

    Send a get request to gemini.google.com for each group of available cookies and return

    the value of "SNlM0e" as access token on the first successful request.



    Returns the **live** AsyncSession that succeeded so the caller can reuse

    the same TLS connection for subsequent requests.

    """

    client = AsyncSession(
        impersonate="chrome", proxy=proxy, allow_redirects=True, verify=verify
    )

    try:
        response = await client.get(Endpoint.GOOGLE)
        if verbose:
            logger.debug(
                f"HTTP Request: GET {Endpoint.GOOGLE} [{response.status_code}]"
            )
        preflight_cookies = Cookies(client.cookies)
    except Exception:
        await client.close()
        raise

    extra_cookies = Cookies()
    if response.status_code == 200:
        extra_cookies = preflight_cookies

    # Phase 1: Prepare Cache
    cookie_jars_to_test = []
    tried_psid_ts = set()

    if isinstance(base_cookies, Cookies):
        base_psid = _extract_cookie_value(base_cookies, "__Secure-1PSID")
        base_psidts = _extract_cookie_value(base_cookies, "__Secure-1PSIDTS")
    else:
        base_psid = base_cookies.get("__Secure-1PSID")
        base_psidts = base_cookies.get("__Secure-1PSIDTS")

    gemini_cookie_path = os.getenv("GEMINI_COOKIE_PATH")
    if gemini_cookie_path:
        cache_dir = Path(gemini_cookie_path)
    else:
        cache_dir = Path(__file__).parent / "temp"

    if base_psid:
        filename = f".cached_1psidts_{base_psid}.txt"
        cache_file = cache_dir / filename
        if cache_file.is_file():
            cached_1psidts = cache_file.read_text().strip()
            if cached_1psidts:
                jar = Cookies(extra_cookies)
                jar.update(base_cookies)
                jar.set("__Secure-1PSIDTS", cached_1psidts, domain=".google.com")
                cookie_jars_to_test.append((jar, "Cache"))
                tried_psid_ts.add((base_psid, cached_1psidts))
            elif verbose:
                logger.debug("Skipping loading cached cookies. Cache file is empty.")
        elif verbose:
            logger.debug("Skipping loading cached cookies. Cache file not found.")

    if not base_psid:
        for cache_file in cache_dir.glob(".cached_1psidts_*.txt"):
            psid = cache_file.stem[16:]
            cached_1psidts = cache_file.read_text().strip()
            if cached_1psidts:
                jar = Cookies(extra_cookies)
                jar.set("__Secure-1PSID", psid, domain=".google.com")
                jar.set("__Secure-1PSIDTS", cached_1psidts, domain=".google.com")
                cookie_jars_to_test.append((jar, "Cache"))
                tried_psid_ts.add((psid, cached_1psidts))

    # Phase 2: Base Cookies
    if base_psid and base_psidts:
        if (base_psid, base_psidts) not in tried_psid_ts:
            jar = Cookies(extra_cookies)
            jar.update(base_cookies)
            cookie_jars_to_test.append((jar, "Base Cookies"))
            tried_psid_ts.add((base_psid, base_psidts))
        elif verbose:
            logger.debug("Skipping base cookies as they match cached cookies.")
    elif verbose and not cookie_jars_to_test:
        logger.debug(
            "Skipping loading base cookies. Either __Secure-1PSID or __Secure-1PSIDTS is not provided."
        )

    # Phase 3: Browser Cookies
    try:
        browser_cookies = load_browser_cookies(
            domain_name="google.com", verbose=verbose
        )
        if browser_cookies:
            for browser, cookies in browser_cookies.items():
                if secure_1psid := cookies.get("__Secure-1PSID"):
                    if base_psid and base_psid != secure_1psid:
                        if verbose:
                            logger.debug(
                                f"Skipping loading local browser cookies from {browser}. "
                                "__Secure-1PSID does not match the one provided."
                            )
                        continue

                    secure_1psidts = cookies.get("__Secure-1PSIDTS")
                    if (secure_1psid, secure_1psidts or "") in tried_psid_ts:
                        continue

                    local_cookies = {"__Secure-1PSID": secure_1psid}
                    if secure_1psidts:
                        local_cookies["__Secure-1PSIDTS"] = secure_1psidts
                    if nid := cookies.get("NID"):
                        local_cookies["NID"] = nid

                    jar = Cookies(extra_cookies)
                    for k, v in local_cookies.items():
                        jar.set(k, v, domain=".google.com")

                    cookie_jars_to_test.append((jar, f"Browser ({browser})"))
                    tried_psid_ts.add((secure_1psid, secure_1psidts or ""))
                    if verbose:
                        logger.debug(f"Prepared local browser cookies from {browser}")

        if (
            HAS_BC3
            and not any(group.startswith("Browser") for _, group in cookie_jars_to_test)
            and verbose
        ):
            logger.debug(
                "Skipping loading local browser cookies. Login to gemini.google.com in your browser first."
            )
    except Exception:
        if verbose:
            logger.debug(
                "Skipping loading local browser cookies (Not available or no permission)."
            )

    current_attempt = 0
    for jar, group_name in cookie_jars_to_test:
        current_attempt += 1
        try:
            res = await send_request(client, jar, verbose=verbose)
            snlm0e = re.search(r'"SNlM0e":\s*"(.*?)"', res.text)
            cfb2h = re.search(r'"cfb2h":\s*"(.*?)"', res.text)
            fdrfje = re.search(r'"FdrFJe":\s*"(.*?)"', res.text)
            if snlm0e or cfb2h or fdrfje:
                if verbose:
                    logger.debug(
                        f"Init attempt ({current_attempt}) from {group_name} succeeded."
                    )
                return (
                    snlm0e.group(1) if snlm0e else None,
                    cfb2h.group(1) if cfb2h else None,
                    fdrfje.group(1) if fdrfje else None,
                    client,
                )
        except Exception:
            if verbose:
                logger.debug(
                    f"Init attempt ({current_attempt}) from {group_name} failed."
                )

    await client.close()
    raise AuthError(
        f"Failed to initialize client after {current_attempt} attempts. SECURE_1PSIDTS could get expired frequently, please make sure cookie values are up to date."
    )