from datetime import datetime, timedelta from typing import Optional, Dict, Any from fastapi import Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from jose import JWTError, jwt from passlib.context import CryptContext from app.core.config import settings import os from dotenv import load_dotenv load_dotenv() # JWT Configuration SECRET_KEY = settings.JWT_SECRET_KEY ALGORITHM = settings.JWT_ALGORITHM ACCESS_TOKEN_EXPIRE_MINUTES = settings.JWT_ACCESS_TOKEN_EXPIRE_MINUTES # Password hashing pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") # HTTP Bearer token scheme security = HTTPBearer() class AuthenticationError(HTTPException): def __init__(self, detail: str = "Could not validate credentials"): super().__init__( status_code=status.HTTP_401_UNAUTHORIZED, detail=detail, headers={"WWW-Authenticate": "Bearer"}, ) def verify_token(token: str) -> dict: """ Verify and decode JWT token, raise HTTPException if invalid. """ credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) customer_id: str = payload.get("sub") if customer_id is None: raise credentials_exception return payload except JWTError: raise credentials_exception async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): """Dependency to get the current authenticated user.""" token = credentials.credentials payload = verify_token(token) # Extract user information from token customer_id = payload.get("sub") merchant_id = payload.get("merchant_id") return { "sub": customer_id, "payload": payload, "merchant_id": merchant_id }