Spaces:
Running
Running
| """ | |
| Google OAuth 2.0 Authorization Code Flow with PKCE | |
| 實現安全的Google登入流程,包含PKCE保護機制 | |
| """ | |
| import os | |
| import secrets | |
| import hashlib | |
| import base64 | |
| import json | |
| import logging | |
| from typing import Optional, Dict, Any | |
| from urllib.parse import urlencode, urlparse, parse_qs | |
| import httpx | |
| from fastapi import HTTPException | |
| from google.oauth2.credentials import Credentials | |
| from google.auth.transport.requests import Request as GoogleRequest | |
| # 統一配置管理 | |
| from core.config import settings | |
| logger = logging.getLogger("GoogleOAuth") | |
| class GoogleOAuthManager: | |
| """Google OAuth 2.0 管理器,實現Authorization Code Flow + PKCE""" | |
| def __init__(self): | |
| self.client_id = settings.GOOGLE_CLIENT_ID | |
| self.client_secret = settings.GOOGLE_CLIENT_SECRET | |
| # 默認使用配置中的值,但可以在調用時動態覆蓋 | |
| self.redirect_uri = settings.GOOGLE_REDIRECT_URI | |
| if not self.client_id or not self.client_secret: | |
| raise ValueError("GOOGLE_CLIENT_ID 和 GOOGLE_CLIENT_SECRET 環境變數必須設置") | |
| # Google OAuth 2.0 端點 | |
| self.auth_url = "https://accounts.google.com/o/oauth2/v2/auth" | |
| self.token_url = "https://oauth2.googleapis.com/token" | |
| self.userinfo_url = "https://www.googleapis.com/oauth2/v2/userinfo" | |
| self.revoke_url = "https://oauth2.googleapis.com/revoke" | |
| def generate_pkce_pair(self) -> Dict[str, str]: | |
| """生成PKCE code_verifier 和 code_challenge""" | |
| # 生成code_verifier (43-128字符) | |
| code_verifier = secrets.token_urlsafe(64) | |
| # 生成code_challenge (SHA256 hash of code_verifier, base64url encoded) | |
| code_challenge = base64.urlsafe_b64encode( | |
| hashlib.sha256(code_verifier.encode()).digest() | |
| ).decode().rstrip('=') | |
| return { | |
| "code_verifier": code_verifier, | |
| "code_challenge": code_challenge | |
| } | |
| def get_authorization_url(self, state: str = None, code_challenge: str = None) -> str: | |
| """生成授權URL""" | |
| if not code_challenge: | |
| pkce_pair = self.generate_pkce_pair() | |
| code_challenge = pkce_pair["code_challenge"] | |
| params = { | |
| "client_id": self.client_id, | |
| "redirect_uri": self.redirect_uri, | |
| "scope": "openid email profile", | |
| "response_type": "code", | |
| "access_type": "offline", # 請求refresh token | |
| "prompt": "consent", # 強制顯示同意畫面 | |
| "code_challenge": code_challenge, | |
| "code_challenge_method": "S256" | |
| } | |
| if state: | |
| params["state"] = state | |
| return f"{self.auth_url}?{urlencode(params)}" | |
| async def exchange_code_for_tokens(self, code: str, code_verifier: str) -> Dict[str, Any]: | |
| """交換authorization code為access token""" | |
| data = { | |
| "client_id": self.client_id, | |
| "client_secret": self.client_secret, | |
| "code": code, | |
| "grant_type": "authorization_code", | |
| "redirect_uri": self.redirect_uri, | |
| "code_verifier": code_verifier | |
| } | |
| logger.info(f"🔑 Token 交換請求: redirect_uri={self.redirect_uri}") | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.post( | |
| self.token_url, | |
| data=data, | |
| headers={"Content-Type": "application/x-www-form-urlencoded"} | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPStatusError as e: | |
| # 記錄 Google 返回的詳細錯誤 | |
| error_detail = e.response.text | |
| logger.error(f"❌ Google Token 交換失敗: status={e.response.status_code}, detail={error_detail}") | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Token exchange failed: {error_detail}" | |
| ) | |
| except httpx.HTTPError as e: | |
| logger.error(f"❌ HTTP 錯誤: {str(e)}") | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Token exchange failed: {str(e)}" | |
| ) | |
| async def get_user_info(self, access_token: str) -> Dict[str, Any]: | |
| """獲取用戶信息""" | |
| headers = {"Authorization": f"Bearer {access_token}"} | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.get(self.userinfo_url, headers=headers) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPError as e: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Failed to get user info: {str(e)}" | |
| ) | |
| async def refresh_access_token(self, refresh_token: str) -> Dict[str, Any]: | |
| """刷新access token""" | |
| data = { | |
| "client_id": self.client_id, | |
| "client_secret": self.client_secret, | |
| "grant_type": "refresh_token", | |
| "refresh_token": refresh_token | |
| } | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.post( | |
| self.token_url, | |
| data=data, | |
| headers={"Content-Type": "application/x-www-form-urlencoded"} | |
| ) | |
| response.raise_for_status() | |
| return response.json() | |
| except httpx.HTTPError as e: | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"Token refresh failed: {str(e)}" | |
| ) | |
| async def revoke_token(self, token: str) -> bool: | |
| """撤銷token""" | |
| data = { | |
| "token": token, | |
| "client_id": self.client_id, | |
| "client_secret": self.client_secret | |
| } | |
| async with httpx.AsyncClient() as client: | |
| try: | |
| response = await client.post( | |
| self.revoke_url, | |
| data=data, | |
| headers={"Content-Type": "application/x-www-form-urlencoded"} | |
| ) | |
| return response.status_code == 200 | |
| except httpx.HTTPError: | |
| return False | |
| def validate_state(self, received_state: str, expected_state: str) -> bool: | |
| """驗證state參數防止CSRF攻擊""" | |
| return received_state == expected_state | |
| def create_credentials(self, token_data: Dict[str, Any]) -> Credentials: | |
| """創建Google Credentials對象""" | |
| return Credentials( | |
| token=token_data.get("access_token"), | |
| refresh_token=token_data.get("refresh_token"), | |
| token_uri=self.token_url, | |
| client_id=self.client_id, | |
| client_secret=self.client_secret, | |
| scopes=["openid", "email", "profile"] | |
| ) | |
| # 全局OAuth管理器實例 | |
| oauth_manager = GoogleOAuthManager() |