"""Authentication utilities for JWT and password management. Uses BETTER_AUTH_SECRET for JWT verification (shared secret with Better Auth frontend). """ import os from datetime import datetime, timedelta import bcrypt from jose import JWTError, jwt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from dotenv import load_dotenv load_dotenv() # JWT configuration - uses BETTER_AUTH_SECRET (shared with Better Auth frontend) # Falls back to JWT_SECRET for backwards compatibility BETTER_AUTH_SECRET = os.getenv("BETTER_AUTH_SECRET") or os.getenv("JWT_SECRET") if not BETTER_AUTH_SECRET: raise ValueError("BETTER_AUTH_SECRET (or JWT_SECRET) environment variable must be set") # Remove quotes if present if BETTER_AUTH_SECRET.startswith('"') and BETTER_AUTH_SECRET.endswith('"'): BETTER_AUTH_SECRET = BETTER_AUTH_SECRET[1:-1] JWT_ALGORITHM = os.getenv("JWT_ALGORITHM", "HS256") ACCESS_TOKEN_EXPIRE_HOURS = int(os.getenv("ACCESS_TOKEN_EXPIRE_HOURS", "24")) # HTTP Bearer token scheme security = HTTPBearer() def hash_password(password: str) -> str: """Hash a password using bcrypt.""" password_bytes = password.encode('utf-8') salt = bcrypt.gensalt() hashed = bcrypt.hashpw(password_bytes, salt) return hashed.decode('utf-8') def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash.""" password_bytes = plain_password.encode('utf-8') hashed_bytes = hashed_password.encode('utf-8') return bcrypt.checkpw(password_bytes, hashed_bytes) def create_access_token(user_id: int, email: str) -> str: """Create JWT access token with user_id and email.""" expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS) to_encode = { "sub": str(user_id), "email": email, "exp": expire } encoded_jwt = jwt.encode(to_encode, BETTER_AUTH_SECRET, algorithm=JWT_ALGORITHM) return encoded_jwt def verify_token(token: str) -> dict: """Verify JWT token and return payload. Supports Better Auth JWT plugin tokens.""" try: payload = jwt.decode(token, BETTER_AUTH_SECRET, algorithms=[JWT_ALGORITHM]) user_id: str = payload.get("sub") if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials" ) return payload except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid authentication credentials" ) def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> int: """Dependency to extract current user ID from JWT token.""" token = credentials.credentials payload = verify_token(token) user_id = int(payload.get("sub")) return user_id