Spaces:
Running
Running
| """Password hashing and JWT token management. | |
| [Task]: T011 | |
| [From]: specs/001-user-auth/plan.md, specs/001-user-auth/research.md | |
| """ | |
| import hashlib | |
| import bcrypt | |
| from datetime import datetime, timedelta | |
| from typing import Optional | |
| from jose import JWTError, jwt | |
| from fastapi import HTTPException, status | |
| from core.config import get_settings | |
| settings = get_settings() | |
| def _pre_hash_password(password: str) -> bytes: | |
| """Pre-hash password with SHA-256 to handle bcrypt's 72-byte limit. | |
| Bcrypt cannot hash passwords longer than 72 bytes. This function | |
| pre-hashes the password with SHA-256 first, then bcrypt hashes that. | |
| Args: | |
| password: Plaintext password (any length) | |
| Returns: | |
| SHA-256 hash of the password (always 32 bytes) | |
| """ | |
| return hashlib.sha256(password.encode()).digest() | |
| def verify_password(plain_password: str, hashed_password: str) -> bool: | |
| """Verify a password against a hash. | |
| Args: | |
| plain_password: Plaintext password to verify | |
| hashed_password: Hashed password to compare against | |
| Returns: | |
| True if password matches hash, False otherwise | |
| """ | |
| try: | |
| # Pre-hash the plain password to match how it was stored | |
| pre_hashed = _pre_hash_password(plain_password) | |
| # Convert the stored hash to bytes | |
| hashed_bytes = hashed_password.encode('utf-8') | |
| # Verify using bcrypt directly | |
| return bcrypt.checkpw(pre_hashed, hashed_bytes) | |
| except Exception: | |
| return False | |
| def get_password_hash(password: str) -> str: | |
| """Hash a password using bcrypt with SHA-256 pre-hashing. | |
| This two-step approach: | |
| 1. Hash password with SHA-256 (handles any length) | |
| 2. Hash the SHA-256 hash with bcrypt (adds salt and security) | |
| Args: | |
| password: Plaintext password to hash (any length) | |
| Returns: | |
| Hashed password (bcrypt hash with salt) | |
| Example: | |
| ```python | |
| hashed = get_password_hash("my_password") | |
| # Returns: $2b$12$... (bcrypt hash) | |
| ``` | |
| """ | |
| # Pre-hash with SHA-256 to handle long passwords | |
| pre_hashed = _pre_hash_password(password) | |
| # Generate salt and hash with bcrypt | |
| # Using 12 rounds (2^12 = 4096 iterations) for good security | |
| salt = bcrypt.gensalt(rounds=12) | |
| hashed = bcrypt.hashpw(pre_hashed, salt) | |
| # Return as string | |
| return hashed.decode('utf-8') | |
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: | |
| """Create JWT access token. | |
| Args: | |
| data: Payload data to encode in token (typically {"sub": user_id}) | |
| expires_delta: Optional custom expiration time | |
| Returns: | |
| Encoded JWT token string | |
| Example: | |
| ```python | |
| token = create_access_token( | |
| data={"sub": str(user.id)}, | |
| expires_delta=timedelta(days=7) | |
| ) | |
| ``` | |
| """ | |
| to_encode = data.copy() | |
| # Set expiration time | |
| if expires_delta: | |
| expire = datetime.utcnow() + expires_delta | |
| else: | |
| expire = datetime.utcnow() + timedelta(days=settings.jwt_expiration_days) | |
| to_encode.update({"exp": expire, "iat": datetime.utcnow()}) | |
| # Encode JWT | |
| encoded_jwt = jwt.encode( | |
| to_encode, | |
| settings.jwt_secret, | |
| algorithm=settings.jwt_algorithm | |
| ) | |
| return encoded_jwt | |
| def decode_access_token(token: str) -> dict: | |
| """Decode and verify JWT access token. | |
| Args: | |
| token: JWT token string to decode | |
| Returns: | |
| Decoded token payload | |
| Raises: | |
| HTTPException: If token is invalid or expired | |
| """ | |
| try: | |
| payload = jwt.decode( | |
| token, | |
| settings.jwt_secret, | |
| algorithms=[settings.jwt_algorithm] | |
| ) | |
| return payload | |
| except JWTError: | |
| raise HTTPException( | |
| status_code=status.HTTP_401_UNAUTHORIZED, | |
| detail="Could not validate credentials", | |
| headers={"WWW-Authenticate": "Bearer"}, | |
| ) | |
| async def get_current_user_id_from_cookie(request) -> Optional[str]: | |
| """Extract and validate user ID from JWT token in httpOnly cookie. | |
| [Task]: T011 | |
| [From]: specs/010-chatkit-migration/contracts/backend.md - Authentication Contracts | |
| This function extracts the JWT token from the auth_token httpOnly cookie, | |
| decodes it, and returns the user_id (sub claim). | |
| Args: | |
| request: FastAPI/Starlette request object | |
| Returns: | |
| User ID (UUID string) or None if authentication fails | |
| Raises: | |
| HTTPException: If token is invalid (only if raise_on_error=True) | |
| """ | |
| # Try httpOnly cookie first | |
| # [From]: specs/010-chatkit-migration/contracts/backend.md | |
| auth_token = request.cookies.get("auth_token") | |
| if not auth_token: | |
| return None | |
| try: | |
| payload = decode_access_token(auth_token) | |
| user_id = payload.get("sub") | |
| return user_id | |
| except HTTPException: | |
| return None | |
| except Exception as e: | |
| # Log but don't raise for non-critical operations | |
| import logging | |
| logging.getLogger("api.auth").warning(f"Failed to decode token from cookie: {e}") | |
| return None | |