| """ |
| Security utilities for password hashing and JWT token management. |
| """ |
| from datetime import datetime, timedelta |
| from typing import Optional |
| from jose import JWTError, jwt |
| from fastapi import Depends, HTTPException, status |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| from sqlalchemy.orm import Session |
|
|
| from .config import settings |
| from .database import get_db |
| from ..models.user import User |
|
|
| import bcrypt |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
|
|
| |
| security = HTTPBearer() |
|
|
| def hash_password(password: str) -> str: |
| """Hash a plain text password using bcrypt.""" |
| |
| |
| pwd_bytes = password.encode('utf-8')[:72] |
| salt = bcrypt.gensalt() |
| hashed = bcrypt.hashpw(pwd_bytes, salt) |
| return hashed.decode('utf-8') |
|
|
|
|
| def verify_password(plain_password: str, hashed_password: str) -> bool: |
| """Verify a plain text password against a hashed password.""" |
| try: |
| return bcrypt.checkpw( |
| plain_password.encode('utf-8')[:72], |
| hashed_password.encode('utf-8') |
| ) |
| except Exception: |
| return False |
|
|
|
|
| def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: |
| """ |
| Create a JWT access token. |
| |
| Args: |
| data: Dictionary of data to encode in the token |
| expires_delta: Optional custom expiration time |
| |
| Returns: |
| Encoded JWT token string |
| """ |
| to_encode = data.copy() |
| |
| if expires_delta: |
| expire = datetime.utcnow() + expires_delta |
| else: |
| expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) |
| |
| to_encode.update({"exp": expire}) |
| encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) |
| |
| return encoded_jwt |
|
|
|
|
| def decode_access_token(token: str) -> Optional[dict]: |
| """ |
| Decode and validate a JWT access token. |
| |
| Args: |
| token: JWT token string |
| |
| Returns: |
| Decoded token payload or None if invalid |
| """ |
| try: |
| payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) |
| return payload |
| except JWTError: |
| return None |
|
|
|
|
| async def get_current_user( |
| credentials: HTTPAuthorizationCredentials = Depends(security), |
| db: Session = Depends(get_db) |
| ) -> User: |
| """ |
| FastAPI dependency to get the current authenticated user from JWT token. |
| |
| Args: |
| credentials: HTTP Bearer credentials from request header |
| db: Database session |
| |
| Returns: |
| User object |
| |
| Raises: |
| HTTPException: If token is invalid or user not found |
| """ |
| credentials_exception = HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Could not validate credentials", |
| headers={"WWW-Authenticate": "Bearer"}, |
| ) |
| |
| token = credentials.credentials |
| payload = decode_access_token(token) |
| |
| if payload is None: |
| raise credentials_exception |
| |
| user_id: str = payload.get("sub") |
| if user_id is None: |
| raise credentials_exception |
| |
| |
| user = db.query(User).filter(User.id == user_id).first() |
| |
| if user is None: |
| raise credentials_exception |
| |
| if not user.is_active: |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail="Inactive user" |
| ) |
| |
| return user |
|
|
|
|
| def require_role(required_role: str): |
| """ |
| Dependency factory to require a specific user role. |
| |
| Args: |
| required_role: Role required (e.g., "ADMIN") |
| |
| Returns: |
| Dependency function that checks user role |
| """ |
| async def role_checker(current_user: User = Depends(get_current_user)) -> User: |
| if current_user.role != required_role: |
| raise HTTPException( |
| status_code=status.HTTP_403_FORBIDDEN, |
| detail=f"Access denied. {required_role} role required." |
| ) |
| return current_user |
| |
| return role_checker |
|
|