"""Password hashing and JWT token management. [Task]: T011 [From]: specs/001-user-auth/plan.md, specs/001-user-auth/research.md """ import hashlib import bcrypt from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from fastapi import HTTPException, status from core.config import get_settings settings = get_settings() def _pre_hash_password(password: str) -> bytes: """Pre-hash password with SHA-256 to handle bcrypt's 72-byte limit. Bcrypt cannot hash passwords longer than 72 bytes. This function pre-hashes the password with SHA-256 first, then bcrypt hashes that. Args: password: Plaintext password (any length) Returns: SHA-256 hash of the password (always 32 bytes) """ return hashlib.sha256(password.encode()).digest() def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against a hash. Args: plain_password: Plaintext password to verify hashed_password: Hashed password to compare against Returns: True if password matches hash, False otherwise """ try: # Pre-hash the plain password to match how it was stored pre_hashed = _pre_hash_password(plain_password) # Convert the stored hash to bytes hashed_bytes = hashed_password.encode('utf-8') # Verify using bcrypt directly return bcrypt.checkpw(pre_hashed, hashed_bytes) except Exception: return False def get_password_hash(password: str) -> str: """Hash a password using bcrypt with SHA-256 pre-hashing. This two-step approach: 1. Hash password with SHA-256 (handles any length) 2. Hash the SHA-256 hash with bcrypt (adds salt and security) Args: password: Plaintext password to hash (any length) Returns: Hashed password (bcrypt hash with salt) Example: ```python hashed = get_password_hash("my_password") # Returns: $2b$12$... (bcrypt hash) ``` """ # Pre-hash with SHA-256 to handle long passwords pre_hashed = _pre_hash_password(password) # Generate salt and hash with bcrypt # Using 12 rounds (2^12 = 4096 iterations) for good security salt = bcrypt.gensalt(rounds=12) hashed = bcrypt.hashpw(pre_hashed, salt) # Return as string return hashed.decode('utf-8') def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create JWT access token. Args: data: Payload data to encode in token (typically {"sub": user_id}) expires_delta: Optional custom expiration time Returns: Encoded JWT token string Example: ```python token = create_access_token( data={"sub": str(user.id)}, expires_delta=timedelta(days=7) ) ``` """ to_encode = data.copy() # Set expiration time if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(days=settings.jwt_expiration_days) to_encode.update({"exp": expire, "iat": datetime.utcnow()}) # Encode JWT encoded_jwt = jwt.encode( to_encode, settings.jwt_secret, algorithm=settings.jwt_algorithm ) return encoded_jwt def decode_access_token(token: str) -> dict: """Decode and verify JWT access token. Args: token: JWT token string to decode Returns: Decoded token payload Raises: HTTPException: If token is invalid or expired """ try: payload = jwt.decode( token, settings.jwt_secret, algorithms=[settings.jwt_algorithm] ) return payload except JWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) async def get_current_user_id_from_cookie(request) -> Optional[str]: """Extract and validate user ID from JWT token in httpOnly cookie. [Task]: T011 [From]: specs/010-chatkit-migration/contracts/backend.md - Authentication Contracts This function extracts the JWT token from the auth_token httpOnly cookie, decodes it, and returns the user_id (sub claim). Args: request: FastAPI/Starlette request object Returns: User ID (UUID string) or None if authentication fails Raises: HTTPException: If token is invalid (only if raise_on_error=True) """ # Try httpOnly cookie first # [From]: specs/010-chatkit-migration/contracts/backend.md auth_token = request.cookies.get("auth_token") if not auth_token: return None try: payload = decode_access_token(auth_token) user_id = payload.get("sub") return user_id except HTTPException: return None except Exception as e: # Log but don't raise for non-critical operations import logging logging.getLogger("api.auth").warning(f"Failed to decode token from cookie: {e}") return None