|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Security utilities for authentication and authorization.""" |
|
|
import jwt |
|
|
from datetime import datetime, timedelta |
|
|
from passlib.context import CryptContext |
|
|
from fastapi import HTTPException, status, Depends |
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
|
from typing import Dict, Any |
|
|
from src.core.config import settings |
|
|
|
|
|
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") |
|
|
security = HTTPBearer() |
|
|
import hashlib |
|
|
MAX_BCRYPT_BYTES = 72 |
|
|
|
|
|
|
|
|
def _normalize_password(password: str) -> bytes: |
|
|
""" |
|
|
bcrypt only supports 72 bytes. |
|
|
This guarantees no runtime crash in any environment. |
|
|
""" |
|
|
password_bytes = password.encode("utf-8") |
|
|
if len(password_bytes) > MAX_BCRYPT_BYTES: |
|
|
password_bytes = password_bytes[:MAX_BCRYPT_BYTES] |
|
|
return password_bytes |
|
|
|
|
|
|
|
|
def hash_password(password: str) -> str: |
|
|
|
|
|
pre_hashed = hashlib.sha256(password.encode("utf-8")).hexdigest() |
|
|
return pwd_context.hash(pre_hashed) |
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool: |
|
|
pre_hashed = hashlib.sha256(plain_password.encode("utf-8")).hexdigest() |
|
|
return pwd_context.verify(pre_hashed, hashed_password) |
|
|
|
|
|
def create_jwt_token(user_id: int, email: str, secret: str, expiration_days: int = 7) -> str: |
|
|
now = datetime.utcnow() |
|
|
payload = { |
|
|
"sub": str(user_id), |
|
|
"email": email, |
|
|
"iat": now, |
|
|
"exp": now + timedelta(days=expiration_days), |
|
|
"iss": "better-auth", |
|
|
} |
|
|
return jwt.encode(payload, secret, algorithm="HS256") |
|
|
|
|
|
|
|
|
def verify_jwt_token(token: str, secret: str) -> dict: |
|
|
try: |
|
|
payload = jwt.decode( |
|
|
token, |
|
|
secret, |
|
|
algorithms=["HS256"], |
|
|
options={"verify_exp": True}, |
|
|
) |
|
|
if payload.get("iss") != "better-auth": |
|
|
raise HTTPException(status_code=401, detail="Invalid token issuer") |
|
|
return payload |
|
|
except jwt.ExpiredSignatureError: |
|
|
raise HTTPException(status_code=401, detail="Token expired") |
|
|
except jwt.InvalidTokenError: |
|
|
raise HTTPException(status_code=401, detail="Invalid token") |
|
|
|
|
|
|
|
|
def get_current_user( |
|
|
credentials: HTTPAuthorizationCredentials = Depends(security) |
|
|
) -> Dict[str, Any]: |
|
|
""" |
|
|
FastAPI dependency to extract and validate JWT token from Authorization header. |
|
|
|
|
|
Args: |
|
|
credentials: HTTP Bearer token credentials from request header |
|
|
|
|
|
Returns: |
|
|
Dictionary containing user information from token payload: |
|
|
- id: User ID (parsed from 'sub' claim) |
|
|
- email: User email |
|
|
- iat: Token issued at timestamp |
|
|
- exp: Token expiration timestamp |
|
|
|
|
|
Raises: |
|
|
HTTPException 401: If token is missing, invalid, or expired |
|
|
""" |
|
|
token = credentials.credentials |
|
|
|
|
|
try: |
|
|
payload = verify_jwt_token(token, settings.BETTER_AUTH_SECRET) |
|
|
|
|
|
|
|
|
user_id = int(payload.get("sub")) |
|
|
|
|
|
return { |
|
|
"id": user_id, |
|
|
"email": payload.get("email"), |
|
|
"iat": payload.get("iat"), |
|
|
"exp": payload.get("exp") |
|
|
} |
|
|
except ValueError: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail="Invalid user ID in token", |
|
|
headers={"WWW-Authenticate": "Bearer"} |
|
|
) |
|
|
except Exception as e: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_401_UNAUTHORIZED, |
|
|
detail=f"Authentication failed: {str(e)}", |
|
|
headers={"WWW-Authenticate": "Bearer"} |
|
|
) |
|
|
|