from rest_framework.authentication import BaseAuthentication from rest_framework_simplejwt.tokens import AccessToken from rest_framework_simplejwt.exceptions import InvalidToken, AuthenticationFailed from .mongo_auth import MongoBackend from django.utils.translation import gettext_lazy as _ class MongoJWTAuthentication(BaseAuthentication): """ Custom JWT Authentication that retrieves users from MongoDB instead of Django ORM. """ def authenticate(self, request): header = self.get_header(request) if header is None: return None raw_token = self.get_raw_token(header) if raw_token is None: return None try: validated_token = AccessToken(raw_token) except Exception: raise InvalidToken(_('Token is invalid or expired')) user = self.get_user(validated_token) if not user: return None # Pass to next authentication class (e.g. standard JWT for SQLite) return user, validated_token def get_header(self, request): auth = request.META.get('HTTP_AUTHORIZATION', '').split() if not auth or auth[0].lower() != 'bearer': return None return auth def get_raw_token(self, header): if len(header) != 2: return None return header[1] def get_user(self, validated_token): """ Attempts to find and return a user using the given validated token. """ try: user_id = validated_token['user_id'] except KeyError: raise InvalidToken(_('Token contained no recognizable user identification')) backend = MongoBackend() user = backend.get_user(user_id) if not user: return None # raise AuthenticationFailed(_('User not found'), code='user_not_found') if not user.is_active: raise AuthenticationFailed(_('User is inactive'), code='user_inactive') return user