todoappapi / core /security.py
GrowWithTalha's picture
feat: add ChatKit migration with SSE streaming
a57a50a
"""Password hashing and JWT token management.
[Task]: T011
[From]: specs/001-user-auth/plan.md, specs/001-user-auth/research.md
"""
import hashlib
import bcrypt
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import HTTPException, status
from core.config import get_settings
settings = get_settings()
def _pre_hash_password(password: str) -> bytes:
"""Pre-hash password with SHA-256 to handle bcrypt's 72-byte limit.
Bcrypt cannot hash passwords longer than 72 bytes. This function
pre-hashes the password with SHA-256 first, then bcrypt hashes that.
Args:
password: Plaintext password (any length)
Returns:
SHA-256 hash of the password (always 32 bytes)
"""
return hashlib.sha256(password.encode()).digest()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash.
Args:
plain_password: Plaintext password to verify
hashed_password: Hashed password to compare against
Returns:
True if password matches hash, False otherwise
"""
try:
# Pre-hash the plain password to match how it was stored
pre_hashed = _pre_hash_password(plain_password)
# Convert the stored hash to bytes
hashed_bytes = hashed_password.encode('utf-8')
# Verify using bcrypt directly
return bcrypt.checkpw(pre_hashed, hashed_bytes)
except Exception:
return False
def get_password_hash(password: str) -> str:
"""Hash a password using bcrypt with SHA-256 pre-hashing.
This two-step approach:
1. Hash password with SHA-256 (handles any length)
2. Hash the SHA-256 hash with bcrypt (adds salt and security)
Args:
password: Plaintext password to hash (any length)
Returns:
Hashed password (bcrypt hash with salt)
Example:
```python
hashed = get_password_hash("my_password")
# Returns: $2b$12$... (bcrypt hash)
```
"""
# Pre-hash with SHA-256 to handle long passwords
pre_hashed = _pre_hash_password(password)
# Generate salt and hash with bcrypt
# Using 12 rounds (2^12 = 4096 iterations) for good security
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(pre_hashed, salt)
# Return as string
return hashed.decode('utf-8')
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token.
Args:
data: Payload data to encode in token (typically {"sub": user_id})
expires_delta: Optional custom expiration time
Returns:
Encoded JWT token string
Example:
```python
token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(days=7)
)
```
"""
to_encode = data.copy()
# Set expiration time
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=settings.jwt_expiration_days)
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
# Encode JWT
encoded_jwt = jwt.encode(
to_encode,
settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
return encoded_jwt
def decode_access_token(token: str) -> dict:
"""Decode and verify JWT access token.
Args:
token: JWT token string to decode
Returns:
Decoded token payload
Raises:
HTTPException: If token is invalid or expired
"""
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm]
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user_id_from_cookie(request) -> Optional[str]:
"""Extract and validate user ID from JWT token in httpOnly cookie.
[Task]: T011
[From]: specs/010-chatkit-migration/contracts/backend.md - Authentication Contracts
This function extracts the JWT token from the auth_token httpOnly cookie,
decodes it, and returns the user_id (sub claim).
Args:
request: FastAPI/Starlette request object
Returns:
User ID (UUID string) or None if authentication fails
Raises:
HTTPException: If token is invalid (only if raise_on_error=True)
"""
# Try httpOnly cookie first
# [From]: specs/010-chatkit-migration/contracts/backend.md
auth_token = request.cookies.get("auth_token")
if not auth_token:
return None
try:
payload = decode_access_token(auth_token)
user_id = payload.get("sub")
return user_id
except HTTPException:
return None
except Exception as e:
# Log but don't raise for non-critical operations
import logging
logging.getLogger("api.auth").warning(f"Failed to decode token from cookie: {e}")
return None