Spaces:
Running
Running
| import os | |
| import secrets | |
| from datetime import datetime, timedelta | |
| from typing import Optional, Dict, Any | |
| from jose import JWTError, jwt | |
| from passlib.context import CryptContext | |
| from fastapi import HTTPException, Depends, Request | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| # 統一配置管理 | |
| from core.config import settings | |
| # JWT 配置 | |
| SECRET_KEY = settings.JWT_SECRET_KEY or secrets.token_urlsafe(32) | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES | |
| # 密碼哈希配置 | |
| pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") | |
| # 安全方案 | |
| security = HTTPBearer(auto_error=False) | |
| class JWTAuth: | |
| def __init__(self): | |
| self.secret_key = SECRET_KEY | |
| self.algorithm = ALGORITHM | |
| self.access_token_expire_minutes = ACCESS_TOKEN_EXPIRE_MINUTES | |
| def create_access_token(self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str: | |
| """創建訪問令牌""" | |
| to_encode = data.copy() | |
| if expires_delta: | |
| expire = datetime.utcnow() + expires_delta | |
| else: | |
| expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes) | |
| to_encode.update({"exp": expire, "iat": datetime.utcnow()}) | |
| encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm) | |
| return encoded_jwt | |
| def verify_token(self, token: str) -> Optional[Dict[str, Any]]: | |
| """驗證JWT令牌""" | |
| try: | |
| payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) | |
| # 檢查 token 是否過期 | |
| exp = payload.get("exp") | |
| if exp: | |
| from datetime import datetime | |
| import time | |
| current_time = time.time() | |
| if current_time > exp: | |
| import logging | |
| logger = logging.getLogger("core.auth.jwt") | |
| logger.warning(f"❌ Token 已過期: exp={exp}, current={current_time}, 差距={current_time - exp}秒") | |
| return None | |
| return payload | |
| except JWTError as e: | |
| import logging | |
| logger = logging.getLogger("core.auth.jwt") | |
| logger.warning(f"❌ JWT 驗證失敗: {e}") | |
| return None | |
| def get_current_user(self, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]: | |
| """獲取當前用戶(從JWT令牌)""" | |
| if not credentials: | |
| return None | |
| payload = self.verify_token(credentials.credentials) | |
| if not payload: | |
| return None | |
| return payload | |
| def hash_password(self, password: str) -> str: | |
| """哈希密碼""" | |
| return pwd_context.hash(password) | |
| def verify_password(self, plain_password: str, hashed_password: str) -> bool: | |
| """驗證密碼""" | |
| return pwd_context.verify(plain_password, hashed_password) | |
| # JWT認證實例 | |
| jwt_auth = JWTAuth() | |
| def get_current_user_optional(request: Request) -> Optional[Dict[str, Any]]: | |
| """可選的用戶認證(不會拋出異常)""" | |
| auth_header = request.headers.get("Authorization") | |
| if not auth_header or not auth_header.startswith("Bearer "): | |
| return None | |
| token = auth_header.split(" ")[1] | |
| return jwt_auth.verify_token(token) | |
| def require_auth(user: Optional[Dict[str, Any]] = Depends(get_current_user_optional)): | |
| """需要認證的依賴項""" | |
| if not user: | |
| raise HTTPException(status_code=401, detail="認證失敗") | |
| return user |