Agentic_flow / google_auth_flow.py
disLodge's picture
spot fix for the missing code verifier
93f1280
import os
import json
from google_auth_oauthlib.flow import Flow
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request
from dotenv import load_dotenv
SCOPES = [
"https://www.googleapis.com/auth/drive.readonly",
"https://www.googleapis.com/auth/userinfo.email",
"openid",
]
oauth_pkce_store: dict[str, str] = {}
load_dotenv()
CLIENT_ID = os.getenv("GOOGLE_CLIENT_ID", "")
CLIENT_SECRET = os.getenv("GOOGLE_CLIENT_SECRET", "")
REDIRECT_URI = os.getenv("GOOGLE_REDIRECT_URI", "")
def _client_config() -> dict:
"""Builds the client config dict that google_auth_oauthlib expects."""
return {
"web": {
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"redirect_uris": [REDIRECT_URI],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
}
}
def get_auth_url(state: str | None = None) -> str:
"""
Returns the Google OAuth consent-screen URL to redirect the user to.
`state` can carry any context you want back in the callback (e.g. user_email).
"""
flow = Flow.from_client_config(_client_config(), scopes=SCOPES)
flow.redirect_uri = REDIRECT_URI
auth_url, returned_state = flow.authorization_url(
access_type="offline", # get refresh_token
include_granted_scopes="true",
prompt="consent", # force refresh_token every time during dev
state=state or "",
)
oauth_pkce_store[returned_state] = flow.code_verifier
print(">>> Stored PKCE verifier for state:", returned_state)
return auth_url
def exchange_code_for_token(code: str, state: str) -> dict:
"""
Exchanges an authorization code (from the OAuth callback) for credentials.
Returns a JSON-serialisable token dict.
"""
flow = Flow.from_client_config(_client_config(), scopes=SCOPES)
flow.redirect_uri = REDIRECT_URI
code_verifier = oauth_pkce_store.get(state)
print(">>> Retrieved verifier:", code_verifier)
flow.code_verifier = code_verifier
flow.fetch_token(code=code)
creds = flow.credentials
return _creds_to_dict(creds)
def credentials_from_token_dict(token_dict: dict) -> Credentials:
"""
Re-hydrates a Credentials object from a stored token dict,
refreshing automatically if the access token is expired.
"""
creds = Credentials(
token=token_dict.get("token"),
refresh_token=token_dict.get("refresh_token"),
token_uri="https://oauth2.googleapis.com/token",
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
scopes=token_dict.get("scopes", SCOPES),
)
if creds.expired and creds.refresh_token:
creds.refresh(Request())
return creds
def _creds_to_dict(creds: Credentials) -> dict:
return {
"token": creds.token,
"refresh_token": creds.refresh_token,
"token_uri": creds.token_uri,
"client_id": creds.client_id,
"client_secret": creds.client_secret,
"scopes": list(creds.scopes or SCOPES),
"expiry": creds.expiry.isoformat() if creds.expiry else None,
}