Spaces:
Running
Running
File size: 7,062 Bytes
3f0377e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 |
"""
Google OAuth 2.0 Authorization Code Flow with PKCE
實現安全的Google登入流程,包含PKCE保護機制
"""
import os
import secrets
import hashlib
import base64
import json
import logging
from typing import Optional, Dict, Any
from urllib.parse import urlencode, urlparse, parse_qs
import httpx
from fastapi import HTTPException
from google.oauth2.credentials import Credentials
from google.auth.transport.requests import Request as GoogleRequest
# 統一配置管理
from core.config import settings
logger = logging.getLogger("GoogleOAuth")
class GoogleOAuthManager:
"""Google OAuth 2.0 管理器,實現Authorization Code Flow + PKCE"""
def __init__(self):
self.client_id = settings.GOOGLE_CLIENT_ID
self.client_secret = settings.GOOGLE_CLIENT_SECRET
# 默認使用配置中的值,但可以在調用時動態覆蓋
self.redirect_uri = settings.GOOGLE_REDIRECT_URI
if not self.client_id or not self.client_secret:
raise ValueError("GOOGLE_CLIENT_ID 和 GOOGLE_CLIENT_SECRET 環境變數必須設置")
# Google OAuth 2.0 端點
self.auth_url = "https://accounts.google.com/o/oauth2/v2/auth"
self.token_url = "https://oauth2.googleapis.com/token"
self.userinfo_url = "https://www.googleapis.com/oauth2/v2/userinfo"
self.revoke_url = "https://oauth2.googleapis.com/revoke"
def generate_pkce_pair(self) -> Dict[str, str]:
"""生成PKCE code_verifier 和 code_challenge"""
# 生成code_verifier (43-128字符)
code_verifier = secrets.token_urlsafe(64)
# 生成code_challenge (SHA256 hash of code_verifier, base64url encoded)
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).decode().rstrip('=')
return {
"code_verifier": code_verifier,
"code_challenge": code_challenge
}
def get_authorization_url(self, state: str = None, code_challenge: str = None) -> str:
"""生成授權URL"""
if not code_challenge:
pkce_pair = self.generate_pkce_pair()
code_challenge = pkce_pair["code_challenge"]
params = {
"client_id": self.client_id,
"redirect_uri": self.redirect_uri,
"scope": "openid email profile",
"response_type": "code",
"access_type": "offline", # 請求refresh token
"prompt": "consent", # 強制顯示同意畫面
"code_challenge": code_challenge,
"code_challenge_method": "S256"
}
if state:
params["state"] = state
return f"{self.auth_url}?{urlencode(params)}"
async def exchange_code_for_tokens(self, code: str, code_verifier: str) -> Dict[str, Any]:
"""交換authorization code為access token"""
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"code": code,
"grant_type": "authorization_code",
"redirect_uri": self.redirect_uri,
"code_verifier": code_verifier
}
logger.info(f"🔑 Token 交換請求: redirect_uri={self.redirect_uri}")
async with httpx.AsyncClient() as client:
try:
response = await client.post(
self.token_url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
# 記錄 Google 返回的詳細錯誤
error_detail = e.response.text
logger.error(f"❌ Google Token 交換失敗: status={e.response.status_code}, detail={error_detail}")
raise HTTPException(
status_code=400,
detail=f"Token exchange failed: {error_detail}"
)
except httpx.HTTPError as e:
logger.error(f"❌ HTTP 錯誤: {str(e)}")
raise HTTPException(
status_code=400,
detail=f"Token exchange failed: {str(e)}"
)
async def get_user_info(self, access_token: str) -> Dict[str, Any]:
"""獲取用戶信息"""
headers = {"Authorization": f"Bearer {access_token}"}
async with httpx.AsyncClient() as client:
try:
response = await client.get(self.userinfo_url, headers=headers)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
raise HTTPException(
status_code=400,
detail=f"Failed to get user info: {str(e)}"
)
async def refresh_access_token(self, refresh_token: str) -> Dict[str, Any]:
"""刷新access token"""
data = {
"client_id": self.client_id,
"client_secret": self.client_secret,
"grant_type": "refresh_token",
"refresh_token": refresh_token
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(
self.token_url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
response.raise_for_status()
return response.json()
except httpx.HTTPError as e:
raise HTTPException(
status_code=400,
detail=f"Token refresh failed: {str(e)}"
)
async def revoke_token(self, token: str) -> bool:
"""撤銷token"""
data = {
"token": token,
"client_id": self.client_id,
"client_secret": self.client_secret
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(
self.revoke_url,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
return response.status_code == 200
except httpx.HTTPError:
return False
def validate_state(self, received_state: str, expected_state: str) -> bool:
"""驗證state參數防止CSRF攻擊"""
return received_state == expected_state
def create_credentials(self, token_data: Dict[str, Any]) -> Credentials:
"""創建Google Credentials對象"""
return Credentials(
token=token_data.get("access_token"),
refresh_token=token_data.get("refresh_token"),
token_uri=self.token_url,
client_id=self.client_id,
client_secret=self.client_secret,
scopes=["openid", "email", "profile"]
)
# 全局OAuth管理器實例
oauth_manager = GoogleOAuthManager() |