FinMK / backend /users /authentication.py
Kumar
Refactor: Exclude PDF and CSV files from Git to fix HF push error
24e6f5b
from rest_framework.authentication import BaseAuthentication
from rest_framework_simplejwt.tokens import AccessToken
from rest_framework_simplejwt.exceptions import InvalidToken, AuthenticationFailed
from .mongo_auth import MongoBackend
from django.utils.translation import gettext_lazy as _
class MongoJWTAuthentication(BaseAuthentication):
"""
Custom JWT Authentication that retrieves users from MongoDB instead of Django ORM.
"""
def authenticate(self, request):
header = self.get_header(request)
if header is None:
return None
raw_token = self.get_raw_token(header)
if raw_token is None:
return None
try:
validated_token = AccessToken(raw_token)
except Exception:
raise InvalidToken(_('Token is invalid or expired'))
user = self.get_user(validated_token)
if not user:
return None # Pass to next authentication class (e.g. standard JWT for SQLite)
return user, validated_token
def get_header(self, request):
auth = request.META.get('HTTP_AUTHORIZATION', '').split()
if not auth or auth[0].lower() != 'bearer':
return None
return auth
def get_raw_token(self, header):
if len(header) != 2:
return None
return header[1]
def get_user(self, validated_token):
"""
Attempts to find and return a user using the given validated token.
"""
try:
user_id = validated_token['user_id']
except KeyError:
raise InvalidToken(_('Token contained no recognizable user identification'))
backend = MongoBackend()
user = backend.get_user(user_id)
if not user:
return None
# raise AuthenticationFailed(_('User not found'), code='user_not_found')
if not user.is_active:
raise AuthenticationFailed(_('User is inactive'), code='user_inactive')
return user