File size: 5,171 Bytes
69be42f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a57a50a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""Password hashing and JWT token management.

[Task]: T011
[From]: specs/001-user-auth/plan.md, specs/001-user-auth/research.md
"""
import hashlib
import bcrypt
from datetime import datetime, timedelta
from typing import Optional

from jose import JWTError, jwt
from fastapi import HTTPException, status

from core.config import get_settings

settings = get_settings()


def _pre_hash_password(password: str) -> bytes:
    """Pre-hash password with SHA-256 to handle bcrypt's 72-byte limit.

    Bcrypt cannot hash passwords longer than 72 bytes. This function
    pre-hashes the password with SHA-256 first, then bcrypt hashes that.

    Args:
        password: Plaintext password (any length)

    Returns:
        SHA-256 hash of the password (always 32 bytes)
    """
    return hashlib.sha256(password.encode()).digest()


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """Verify a password against a hash.

    Args:
        plain_password: Plaintext password to verify
        hashed_password: Hashed password to compare against

    Returns:
        True if password matches hash, False otherwise
    """
    try:
        # Pre-hash the plain password to match how it was stored
        pre_hashed = _pre_hash_password(plain_password)
        # Convert the stored hash to bytes
        hashed_bytes = hashed_password.encode('utf-8')
        # Verify using bcrypt directly
        return bcrypt.checkpw(pre_hashed, hashed_bytes)
    except Exception:
        return False


def get_password_hash(password: str) -> str:
    """Hash a password using bcrypt with SHA-256 pre-hashing.

    This two-step approach:
    1. Hash password with SHA-256 (handles any length)
    2. Hash the SHA-256 hash with bcrypt (adds salt and security)

    Args:
        password: Plaintext password to hash (any length)

    Returns:
        Hashed password (bcrypt hash with salt)

    Example:
        ```python
        hashed = get_password_hash("my_password")
        # Returns: $2b$12$... (bcrypt hash)
        ```
    """
    # Pre-hash with SHA-256 to handle long passwords
    pre_hashed = _pre_hash_password(password)

    # Generate salt and hash with bcrypt
    # Using 12 rounds (2^12 = 4096 iterations) for good security
    salt = bcrypt.gensalt(rounds=12)
    hashed = bcrypt.hashpw(pre_hashed, salt)

    # Return as string
    return hashed.decode('utf-8')


def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
    """Create JWT access token.

    Args:
        data: Payload data to encode in token (typically {"sub": user_id})
        expires_delta: Optional custom expiration time

    Returns:
        Encoded JWT token string

    Example:
        ```python
        token = create_access_token(
            data={"sub": str(user.id)},
            expires_delta=timedelta(days=7)
        )
        ```
    """
    to_encode = data.copy()

    # Set expiration time
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(days=settings.jwt_expiration_days)

    to_encode.update({"exp": expire, "iat": datetime.utcnow()})

    # Encode JWT
    encoded_jwt = jwt.encode(
        to_encode,
        settings.jwt_secret,
        algorithm=settings.jwt_algorithm
    )
    return encoded_jwt


def decode_access_token(token: str) -> dict:
    """Decode and verify JWT access token.

    Args:
        token: JWT token string to decode

    Returns:
        Decoded token payload

    Raises:
        HTTPException: If token is invalid or expired
    """
    try:
        payload = jwt.decode(
            token,
            settings.jwt_secret,
            algorithms=[settings.jwt_algorithm]
        )
        return payload
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )


async def get_current_user_id_from_cookie(request) -> Optional[str]:
    """Extract and validate user ID from JWT token in httpOnly cookie.

    [Task]: T011
    [From]: specs/010-chatkit-migration/contracts/backend.md - Authentication Contracts

    This function extracts the JWT token from the auth_token httpOnly cookie,
    decodes it, and returns the user_id (sub claim).

    Args:
        request: FastAPI/Starlette request object

    Returns:
        User ID (UUID string) or None if authentication fails

    Raises:
        HTTPException: If token is invalid (only if raise_on_error=True)
    """
    # Try httpOnly cookie first
    # [From]: specs/010-chatkit-migration/contracts/backend.md
    auth_token = request.cookies.get("auth_token")
    if not auth_token:
        return None

    try:
        payload = decode_access_token(auth_token)
        user_id = payload.get("sub")
        return user_id
    except HTTPException:
        return None
    except Exception as e:
        # Log but don't raise for non-critical operations
        import logging
        logging.getLogger("api.auth").warning(f"Failed to decode token from cookie: {e}")
        return None