Fake / Backend /app /auth.py
Ravi1212's picture
uploaded the all the dependencies
bf5067d verified
import os
import bcrypt
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from dotenv import load_dotenv
from app.database import get_users_collection
from app.schemas.auth import TokenData
load_dotenv()
# JWT Configuration
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-super-secret-jwt-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "1440")) # 24 hours
# Security scheme
security = HTTPBearer()
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a password against its hash"""
return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8'))
def get_password_hash(password: str) -> str:
"""Hash a password"""
return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8')
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""Create a JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> Optional[TokenData]:
"""Decode and validate a JWT token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
email: str = payload.get("email")
if user_id is None:
return None
return TokenData(user_id=user_id, email=email)
except JWTError:
return None
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Get current authenticated user from JWT token"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
token = credentials.credentials
token_data = decode_token(token)
if token_data is None:
raise credentials_exception
# Get user from database
users_collection = get_users_collection()
from bson import ObjectId
try:
user = await users_collection.find_one({"_id": ObjectId(token_data.user_id)})
except:
raise credentials_exception
if user is None:
raise credentials_exception
if not user.get("is_active", True):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="User account is disabled"
)
return user