Finance / backend /app /core /security.py
BOLO-KESARI
Verifying bcrypt fix and versioning
52c9e6d
"""
Security utilities for password hashing and JWT token management.
"""
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from .config import settings
from .database import get_db
from ..models.user import User
import bcrypt
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
# HTTP Bearer token scheme
security = HTTPBearer()
def hash_password(password: str) -> str:
"""Hash a plain text password using bcrypt."""
# Bcrypt has a 72-byte limit. We'll truncate just in case, though
# passwords shouldn't be that long.
pwd_bytes = password.encode('utf-8')[:72]
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(pwd_bytes, salt)
return hashed.decode('utf-8')
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a plain text password against a hashed password."""
try:
return bcrypt.checkpw(
plain_password.encode('utf-8')[:72],
hashed_password.encode('utf-8')
)
except Exception:
return False
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""
Create a JWT access token.
Args:
data: Dictionary of data to encode in the token
expires_delta: Optional custom expiration time
Returns:
Encoded JWT token string
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> Optional[dict]:
"""
Decode and validate a JWT access token.
Args:
token: JWT token string
Returns:
Decoded token payload or None if invalid
"""
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
"""
FastAPI dependency to get the current authenticated user from JWT token.
Args:
credentials: HTTP Bearer credentials from request header
db: Database session
Returns:
User object
Raises:
HTTPException: If token is invalid or user not found
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
payload = decode_access_token(token)
if payload is None:
raise credentials_exception
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
# Get user from database
user = db.query(User).filter(User.id == user_id).first()
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return user
def require_role(required_role: str):
"""
Dependency factory to require a specific user role.
Args:
required_role: Role required (e.g., "ADMIN")
Returns:
Dependency function that checks user role
"""
async def role_checker(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != required_role:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access denied. {required_role} role required."
)
return current_user
return role_checker