| from rest_framework.authentication import BaseAuthentication |
| from rest_framework_simplejwt.tokens import AccessToken |
| from rest_framework_simplejwt.exceptions import InvalidToken, AuthenticationFailed |
| from .mongo_auth import MongoBackend |
| from django.utils.translation import gettext_lazy as _ |
|
|
| class MongoJWTAuthentication(BaseAuthentication): |
| """ |
| Custom JWT Authentication that retrieves users from MongoDB instead of Django ORM. |
| """ |
| def authenticate(self, request): |
| header = self.get_header(request) |
| if header is None: |
| return None |
|
|
| raw_token = self.get_raw_token(header) |
| if raw_token is None: |
| return None |
|
|
| try: |
| validated_token = AccessToken(raw_token) |
| except Exception: |
| raise InvalidToken(_('Token is invalid or expired')) |
|
|
| user = self.get_user(validated_token) |
| if not user: |
| return None |
| |
| return user, validated_token |
|
|
| def get_header(self, request): |
| auth = request.META.get('HTTP_AUTHORIZATION', '').split() |
| if not auth or auth[0].lower() != 'bearer': |
| return None |
| return auth |
|
|
| def get_raw_token(self, header): |
| if len(header) != 2: |
| return None |
| return header[1] |
|
|
| def get_user(self, validated_token): |
| """ |
| Attempts to find and return a user using the given validated token. |
| """ |
| try: |
| user_id = validated_token['user_id'] |
| except KeyError: |
| raise InvalidToken(_('Token contained no recognizable user identification')) |
|
|
| backend = MongoBackend() |
| user = backend.get_user(user_id) |
|
|
| if not user: |
| return None |
| |
|
|
| if not user.is_active: |
| raise AuthenticationFailed(_('User is inactive'), code='user_inactive') |
|
|
| return user |
|
|