File size: 3,895 Bytes
37f768f
bac337d
 
37f768f
 
 
 
 
bac337d
 
 
37f768f
 
1fdc3b2
37f768f
bac337d
37f768f
 
bac337d
93f1280
37f768f
bac337d
37f768f
 
 
 
cd8d49f
bac337d
 
 
 
37f768f
 
 
 
 
 
 
 
 
 
 
 
bac337d
37f768f
 
 
 
 
bac337d
37f768f
 
93f1280
bac337d
37f768f
bac337d
 
37f768f
bac337d
 
 
37f768f
 
bac337d
93f1280
37f768f
 
 
 
bac337d
 
 
93f1280
bac337d
 
 
 
 
93f1280
bac337d
 
93f1280
37f768f
 
 
 
bac337d
37f768f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bac337d
37f768f
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
from urllib.parse import unquote

from google_auth_oauthlib.flow import Flow
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from dotenv import load_dotenv

# Google may grant extra scopes (e.g. userinfo.profile with openid); relax strict checks.
os.environ.setdefault("OAUTHLIB_RELAX_TOKEN_SCOPE", "1")

SCOPES = [
    "https://www.googleapis.com/auth/drive.readonly",
    "https://www.googleapis.com/auth/calendar.events",
    "https://www.googleapis.com/auth/userinfo.email",
    "https://www.googleapis.com/auth/userinfo.profile",
    "openid",
]

oauth_pkce_store: dict[str, str] = {}
load_dotenv()

CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "")
CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "")
REDIRECT_URI = os.getenv("GOOGLE_REDIRECT_URI", "")


def _normalize_state(state: str) -> str:
    return unquote(state or "").strip()


def _client_config() -> dict:
    """Builds the client config dict that google_auth_oauthlib expects."""
    return {
        "web": {
            "client_id": CLIENT_ID,
            "client_secret": CLIENT_SECRET,
            "redirect_uris": [REDIRECT_URI],
            "auth_uri": "https://accounts.google.com/o/oauth2/auth",
            "token_uri": "https://oauth2.googleapis.com/token",
        }
    }


def get_auth_url(state: str | None = None) -> str:
    """
    Returns the Google OAuth consent-screen URL to redirect the user to.
    `state` can carry any context you want back in the callback (e.g. user_email).
    """
    user_state = _normalize_state(state or "")
    flow = Flow.from_client_config(_client_config(), scopes=SCOPES)
    flow.redirect_uri = REDIRECT_URI
    auth_url, returned_state = flow.authorization_url(
        access_type="offline",
        include_granted_scopes="true",
        prompt="consent",
        state=user_state,
    )
    store_key = _normalize_state(returned_state or user_state)
    oauth_pkce_store[store_key] = flow.code_verifier
    print(">>> Stored PKCE verifier for state:", store_key)
    return auth_url


def exchange_code_for_token(code: str, state: str) -> dict:
    """
    Exchanges an authorization code (from the OAuth callback) for credentials.
    Returns a JSON-serialisable token dict.
    """
    state_key = _normalize_state(state)
    code_verifier = oauth_pkce_store.pop(state_key, None)
    print(">>> Retrieved PKCE verifier for state:", state_key, "found:", bool(code_verifier))

    if not code_verifier:
        raise ValueError(
            "No PKCE code verifier found for this sign-in. "
            "Request a new auth link and complete it on the same server instance."
        )

    flow = Flow.from_client_config(_client_config(), scopes=SCOPES)
    flow.redirect_uri = REDIRECT_URI
    flow.code_verifier = code_verifier
    flow.fetch_token(code=code)
    creds = flow.credentials
    return _creds_to_dict(creds)


def credentials_from_token_dict(token_dict: dict) -> Credentials:
    """
    Re-hydrates a Credentials object from a stored token dict,
    refreshing automatically if the access token is expired.
    """
    creds = Credentials(
        token=token_dict.get("token"),
        refresh_token=token_dict.get("refresh_token"),
        token_uri="https://oauth2.googleapis.com/token",
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
        scopes=token_dict.get("scopes", SCOPES),
    )
    if creds.expired and creds.refresh_token:
        creds.refresh(Request())
    return creds


def _creds_to_dict(creds: Credentials) -> dict:
    return {
        "token": creds.token,
        "refresh_token": creds.refresh_token,
        "token_uri": creds.token_uri,
        "client_id": creds.client_id,
        "client_secret": creds.client_secret,
        "scopes": list(creds.scopes or SCOPES),
        "expiry": creds.expiry.isoformat() if creds.expiry else None,
    }