Hamza4100's picture
Update app/services/auth.py
411903d verified
"""Authentication service with JWT tokens and password hashing."""
from datetime import datetime, timedelta
from typing import Optional
from passlib.context import CryptContext
import jwt
from app.config import settings
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__truncate_error=False # Don't raise error on long passwords
)
def hash_password(password: str) -> str:
"""Hash a password using bcrypt."""
# Bcrypt has a 72 byte limit - encode and truncate at byte level
password_bytes = password.encode('utf-8')
if len(password_bytes) > 72:
# Truncate to 72 bytes and decode back, ignoring partial chars
password = password_bytes[:72].decode('utf-8', errors='ignore')
return pwd_context.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash."""
# Bcrypt has a 72 byte limit - encode and truncate at byte level
password_bytes = plain_password.encode('utf-8')
if len(password_bytes) > 72:
# Truncate to 72 bytes and decode back, ignoring partial chars
plain_password = password_bytes[:72].decode('utf-8', errors='ignore')
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(
data: dict,
expires_delta: Optional[timedelta] = None
) -> str:
"""Create a JWT access token."""
to_encode = data.copy()
expire = datetime.utcnow() + (
expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(
to_encode,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM
)
def create_refresh_token(
data: dict,
expires_delta: Optional[timedelta] = None
) -> str:
"""Create a JWT refresh token."""
to_encode = data.copy()
expire = datetime.utcnow() + (
expires_delta or timedelta(days=settings.REFRESH_TOKEN_EXPIRE_DAYS)
)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(
to_encode,
settings.JWT_SECRET_KEY,
algorithm=settings.JWT_ALGORITHM
)
def decode_token(token: str) -> Optional[dict]:
"""Decode and verify a JWT token."""
try:
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
return payload
except jwt.ExpiredSignatureError:
return None
except jwt.InvalidTokenError:
return None
def create_tokens(user_id: str, email: str, role: str) -> dict:
"""Create both access and refresh tokens."""
token_data = {"sub": user_id, "email": email, "role": role}
return {
"access_token": create_access_token(token_data),
"refresh_token": create_refresh_token(token_data),
"token_type": "bearer",
}