File size: 3,640 Bytes
3f0377e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
import os
import secrets
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import HTTPException, Depends, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

# 統一配置管理
from core.config import settings

# JWT 配置
SECRET_KEY = settings.JWT_SECRET_KEY or secrets.token_urlsafe(32)
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = settings.ACCESS_TOKEN_EXPIRE_MINUTES

# 密碼哈希配置
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 安全方案
security = HTTPBearer(auto_error=False)

class JWTAuth:
    def __init__(self):
        self.secret_key = SECRET_KEY
        self.algorithm = ALGORITHM
        self.access_token_expire_minutes = ACCESS_TOKEN_EXPIRE_MINUTES

    def create_access_token(self, data: Dict[str, Any], expires_delta: Optional[timedelta] = None) -> str:
        """創建訪問令牌"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(minutes=self.access_token_expire_minutes)

        to_encode.update({"exp": expire, "iat": datetime.utcnow()})
        encoded_jwt = jwt.encode(to_encode, self.secret_key, algorithm=self.algorithm)
        return encoded_jwt

    def verify_token(self, token: str) -> Optional[Dict[str, Any]]:
        """驗證JWT令牌"""
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
            
            # 檢查 token 是否過期
            exp = payload.get("exp")
            if exp:
                from datetime import datetime
                import time
                current_time = time.time()
                if current_time > exp:
                    import logging
                    logger = logging.getLogger("core.auth.jwt")
                    logger.warning(f"❌ Token 已過期: exp={exp}, current={current_time}, 差距={current_time - exp}秒")
                    return None
                    
            return payload
        except JWTError as e:
            import logging
            logger = logging.getLogger("core.auth.jwt")
            logger.warning(f"❌ JWT 驗證失敗: {e}")
            return None

    def get_current_user(self, credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]:
        """獲取當前用戶(從JWT令牌)"""
        if not credentials:
            return None

        payload = self.verify_token(credentials.credentials)
        if not payload:
            return None

        return payload

    def hash_password(self, password: str) -> str:
        """哈希密碼"""
        return pwd_context.hash(password)

    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """驗證密碼"""
        return pwd_context.verify(plain_password, hashed_password)

# JWT認證實例
jwt_auth = JWTAuth()

def get_current_user_optional(request: Request) -> Optional[Dict[str, Any]]:
    """可選的用戶認證(不會拋出異常)"""
    auth_header = request.headers.get("Authorization")
    if not auth_header or not auth_header.startswith("Bearer "):
        return None

    token = auth_header.split(" ")[1]
    return jwt_auth.verify_token(token)

def require_auth(user: Optional[Dict[str, Any]] = Depends(get_current_user_optional)):
    """需要認證的依賴項"""
    if not user:
        raise HTTPException(status_code=401, detail="認證失敗")
    return user