Spaces:
Running
Running
File size: 5,171 Bytes
69be42f a57a50a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
"""Password hashing and JWT token management.
[Task]: T011
[From]: specs/001-user-auth/plan.md, specs/001-user-auth/research.md
"""
import hashlib
import bcrypt
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import HTTPException, status
from core.config import get_settings
settings = get_settings()
def _pre_hash_password(password: str) -> bytes:
"""Pre-hash password with SHA-256 to handle bcrypt's 72-byte limit.
Bcrypt cannot hash passwords longer than 72 bytes. This function
pre-hashes the password with SHA-256 first, then bcrypt hashes that.
Args:
password: Plaintext password (any length)
Returns:
SHA-256 hash of the password (always 32 bytes)
"""
return hashlib.sha256(password.encode()).digest()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against a hash.
Args:
plain_password: Plaintext password to verify
hashed_password: Hashed password to compare against
Returns:
True if password matches hash, False otherwise
"""
try:
# Pre-hash the plain password to match how it was stored
pre_hashed = _pre_hash_password(plain_password)
# Convert the stored hash to bytes
hashed_bytes = hashed_password.encode('utf-8')
# Verify using bcrypt directly
return bcrypt.checkpw(pre_hashed, hashed_bytes)
except Exception:
return False
def get_password_hash(password: str) -> str:
"""Hash a password using bcrypt with SHA-256 pre-hashing.
This two-step approach:
1. Hash password with SHA-256 (handles any length)
2. Hash the SHA-256 hash with bcrypt (adds salt and security)
Args:
password: Plaintext password to hash (any length)
Returns:
Hashed password (bcrypt hash with salt)
Example:
```python
hashed = get_password_hash("my_password")
# Returns: $2b$12$... (bcrypt hash)
```
"""
# Pre-hash with SHA-256 to handle long passwords
pre_hashed = _pre_hash_password(password)
# Generate salt and hash with bcrypt
# Using 12 rounds (2^12 = 4096 iterations) for good security
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(pre_hashed, salt)
# Return as string
return hashed.decode('utf-8')
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token.
Args:
data: Payload data to encode in token (typically {"sub": user_id})
expires_delta: Optional custom expiration time
Returns:
Encoded JWT token string
Example:
```python
token = create_access_token(
data={"sub": str(user.id)},
expires_delta=timedelta(days=7)
)
```
"""
to_encode = data.copy()
# Set expiration time
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(days=settings.jwt_expiration_days)
to_encode.update({"exp": expire, "iat": datetime.utcnow()})
# Encode JWT
encoded_jwt = jwt.encode(
to_encode,
settings.jwt_secret,
algorithm=settings.jwt_algorithm
)
return encoded_jwt
def decode_access_token(token: str) -> dict:
"""Decode and verify JWT access token.
Args:
token: JWT token string to decode
Returns:
Decoded token payload
Raises:
HTTPException: If token is invalid or expired
"""
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm]
)
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
async def get_current_user_id_from_cookie(request) -> Optional[str]:
"""Extract and validate user ID from JWT token in httpOnly cookie.
[Task]: T011
[From]: specs/010-chatkit-migration/contracts/backend.md - Authentication Contracts
This function extracts the JWT token from the auth_token httpOnly cookie,
decodes it, and returns the user_id (sub claim).
Args:
request: FastAPI/Starlette request object
Returns:
User ID (UUID string) or None if authentication fails
Raises:
HTTPException: If token is invalid (only if raise_on_error=True)
"""
# Try httpOnly cookie first
# [From]: specs/010-chatkit-migration/contracts/backend.md
auth_token = request.cookies.get("auth_token")
if not auth_token:
return None
try:
payload = decode_access_token(auth_token)
user_id = payload.get("sub")
return user_id
except HTTPException:
return None
except Exception as e:
# Log but don't raise for non-critical operations
import logging
logging.getLogger("api.auth").warning(f"Failed to decode token from cookie: {e}")
return None
|