import os import bcrypt from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from dotenv import load_dotenv from app.database import get_users_collection from app.schemas.auth import TokenData load_dotenv() # JWT Configuration SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your-super-secret-jwt-key-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = int(os.getenv("ACCESS_TOKEN_EXPIRE_MINUTES", "1440")) # 24 hours # Security scheme security = HTTPBearer() def verify_password(plain_password: str, hashed_password: str) -> bool: """Verify a password against its hash""" return bcrypt.checkpw(plain_password.encode('utf-8'), hashed_password.encode('utf-8')) def get_password_hash(password: str) -> str: """Hash a password""" return bcrypt.hashpw(password.encode('utf-8'), bcrypt.gensalt()).decode('utf-8') def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: """Create a JWT access token""" to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def decode_token(token: str) -> Optional[TokenData]: """Decode and validate a JWT token""" try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id: str = payload.get("sub") email: str = payload.get("email") if user_id is None: return None return TokenData(user_id=user_id, email=email) except JWTError: return None async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): """Get current authenticated user from JWT token""" credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) token = credentials.credentials token_data = decode_token(token) if token_data is None: raise credentials_exception # Get user from database users_collection = get_users_collection() from bson import ObjectId try: user = await users_collection.find_one({"_id": ObjectId(token_data.user_id)}) except: raise credentials_exception if user is None: raise credentials_exception if not user.get("is_active", True): raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="User account is disabled" ) return user