XiaoBai1221's picture
Bloom Ware 最終修復版本 (Spaces 兼容)
3f0377e
import os
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
# 統一配置管理
from core.config import settings
# JWT 配置
SECRET_KEY = settings.JWT_SECRET_KEY or secrets.token_urlsafe(32)
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES
# 密碼哈希配置
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 安全方案
security = HTTPBearer(auto_error=False)
class JWTAuth:
def __init__(self):
self.secret_key = SECRET_KEY
self.algorithm = ALGORITHM
self.access_token_expire_minutes = ACCESS_TOKEN_EXPIRE_MINUTES
def create_access_token(self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
"""創建訪問令牌"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes)
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
return encoded_jwt
def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
"""驗證JWT令牌"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
# 檢查 token 是否過期
exp = payload.get("exp")
if exp:
from datetime import datetime
import time
current_time = time.time()
if current_time > exp:
import logging
logger = logging.getLogger("core.auth.jwt")
logger.warning(f"❌ Token 已過期: exp={exp}, current={current_time}, 差距={current_time - exp}秒")
return None
return payload
except JWTError as e:
import logging
logger = logging.getLogger("core.auth.jwt")
logger.warning(f"❌ JWT 驗證失敗: {e}")
return None
def get_current_user(self, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]:
"""獲取當前用戶(從JWT令牌)"""
if not credentials:
return None
payload = self.verify_token(credentials.credentials)
if not payload:
return None
return payload
def hash_password(self, password: str) -> str:
"""哈希密碼"""
return pwd_context.hash(password)
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""驗證密碼"""
return pwd_context.verify(plain_password, hashed_password)
# JWT認證實例
jwt_auth = JWTAuth()
def get_current_user_optional(request: Request) -> Optional[Dict[str, Any]]:
"""可選的用戶認證(不會拋出異常)"""
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return None
token = auth_header.split(" ")[1]
return jwt_auth.verify_token(token)
def require_auth(user: Optional[Dict[str, Any]] = Depends(get_current_user_optional)):
"""需要認證的依賴項"""
if not user:
raise HTTPException(status_code=401, detail="認證失敗")
return user