Buckets:
ktongue/docker_container / simsite /venv /lib /python3.14 /site-packages /rest_framework /authentication.py
| """ | |
| Provides various authentication policies. | |
| """ | |
| import base64 | |
| import binascii | |
| from django.contrib.auth import authenticate, get_user_model | |
| from django.middleware.csrf import CsrfViewMiddleware | |
| from django.utils.translation import gettext_lazy as _ | |
| from rest_framework import HTTP_HEADER_ENCODING, exceptions | |
| def get_authorization_header(request): | |
| """ | |
| Return request's 'Authorization:' header, as a bytestring. | |
| Hide some test client ickyness where the header can be unicode. | |
| """ | |
| auth = request.META.get('HTTP_AUTHORIZATION', b'') | |
| if isinstance(auth, str): | |
| # Work around django test client oddness | |
| auth = auth.encode(HTTP_HEADER_ENCODING) | |
| return auth | |
| class CSRFCheck(CsrfViewMiddleware): | |
| def _reject(self, request, reason): | |
| # Return the failure reason instead of an HttpResponse | |
| return reason | |
| class BaseAuthentication: | |
| """ | |
| All authentication classes should extend BaseAuthentication. | |
| """ | |
| def authenticate(self, request): | |
| """ | |
| Authenticate the request and return a two-tuple of (user, token). | |
| """ | |
| raise NotImplementedError(".authenticate() must be overridden.") | |
| def authenticate_header(self, request): | |
| """ | |
| Return a string to be used as the value of the `WWW-Authenticate` | |
| header in a `401 Unauthenticated` response, or `None` if the | |
| authentication scheme should return `403 Permission Denied` responses. | |
| """ | |
| pass | |
| class BasicAuthentication(BaseAuthentication): | |
| """ | |
| HTTP Basic authentication against username/password. | |
| """ | |
| www_authenticate_realm = 'api' | |
| def authenticate(self, request): | |
| """ | |
| Returns a `User` if a correct username and password have been supplied | |
| using HTTP Basic authentication. Otherwise returns `None`. | |
| """ | |
| auth = get_authorization_header(request).split() | |
| if not auth or auth[0].lower() != b'basic': | |
| return None | |
| if len(auth) == 1: | |
| msg = _('Invalid basic header. No credentials provided.') | |
| raise exceptions.AuthenticationFailed(msg) | |
| elif len(auth) > 2: | |
| msg = _('Invalid basic header. Credentials string should not contain spaces.') | |
| raise exceptions.AuthenticationFailed(msg) | |
| try: | |
| try: | |
| auth_decoded = base64.b64decode(auth[1]).decode('utf-8') | |
| except UnicodeDecodeError: | |
| auth_decoded = base64.b64decode(auth[1]).decode('latin-1') | |
| userid, password = auth_decoded.split(':', 1) | |
| except (TypeError, ValueError, UnicodeDecodeError, binascii.Error): | |
| msg = _('Invalid basic header. Credentials not correctly base64 encoded.') | |
| raise exceptions.AuthenticationFailed(msg) | |
| return self.authenticate_credentials(userid, password, request) | |
| def authenticate_credentials(self, userid, password, request=None): | |
| """ | |
| Authenticate the userid and password against username and password | |
| with optional request for context. | |
| """ | |
| credentials = { | |
| get_user_model().USERNAME_FIELD: userid, | |
| 'password': password | |
| } | |
| user = authenticate(request=request, **credentials) | |
| if user is None: | |
| raise exceptions.AuthenticationFailed(_('Invalid username/password.')) | |
| if not user.is_active: | |
| raise exceptions.AuthenticationFailed(_('User inactive or deleted.')) | |
| return (user, None) | |
| def authenticate_header(self, request): | |
| return 'Basic realm="%s"' % self.www_authenticate_realm | |
| class SessionAuthentication(BaseAuthentication): | |
| """ | |
| Use Django's session framework for authentication. | |
| """ | |
| def authenticate(self, request): | |
| """ | |
| Returns a `User` if the request session currently has a logged in user. | |
| Otherwise returns `None`. | |
| """ | |
| # Get the session-based user from the underlying HttpRequest object | |
| user = getattr(request._request, 'user', None) | |
| # Unauthenticated, CSRF validation not required | |
| if not user or not user.is_active: | |
| return None | |
| self.enforce_csrf(request) | |
| # CSRF passed with authenticated user | |
| return (user, None) | |
| def enforce_csrf(self, request): | |
| """ | |
| Enforce CSRF validation for session based authentication. | |
| """ | |
| def dummy_get_response(request): # pragma: no cover | |
| return None | |
| check = CSRFCheck(dummy_get_response) | |
| # populates request.META['CSRF_COOKIE'], which is used in process_view() | |
| check.process_request(request) | |
| reason = check.process_view(request, None, (), {}) | |
| if reason: | |
| # CSRF failed, bail with explicit error message | |
| raise exceptions.PermissionDenied('CSRF Failed: %s' % reason) | |
| class TokenAuthentication(BaseAuthentication): | |
| """ | |
| Simple token based authentication. | |
| Clients should authenticate by passing the token key in the "Authorization" | |
| HTTP header, prepended with the string "Token ". For example: | |
| Authorization: Token 401f7ac837da42b97f613d789819ff93537bee6a | |
| """ | |
| keyword = 'Token' | |
| model = None | |
| def get_model(self): | |
| if self.model is not None: | |
| return self.model | |
| from rest_framework.authtoken.models import Token | |
| return Token | |
| """ | |
| A custom token model may be used, but must have the following properties. | |
| * key -- The string identifying the token | |
| * user -- The user to which the token belongs | |
| """ | |
| def authenticate(self, request): | |
| auth = get_authorization_header(request).split() | |
| if not auth or auth[0].lower() != self.keyword.lower().encode(): | |
| return None | |
| if len(auth) == 1: | |
| msg = _('Invalid token header. No credentials provided.') | |
| raise exceptions.AuthenticationFailed(msg) | |
| elif len(auth) > 2: | |
| msg = _('Invalid token header. Token string should not contain spaces.') | |
| raise exceptions.AuthenticationFailed(msg) | |
| try: | |
| token = auth[1].decode() | |
| except UnicodeError: | |
| msg = _('Invalid token header. Token string should not contain invalid characters.') | |
| raise exceptions.AuthenticationFailed(msg) | |
| return self.authenticate_credentials(token) | |
| def authenticate_credentials(self, key): | |
| model = self.get_model() | |
| try: | |
| token = model.objects.select_related('user').get(key=key) | |
| except model.DoesNotExist: | |
| raise exceptions.AuthenticationFailed(_('Invalid token.')) | |
| if not token.user.is_active: | |
| raise exceptions.AuthenticationFailed(_('User inactive or deleted.')) | |
| return (token.user, token) | |
| def authenticate_header(self, request): | |
| return self.keyword | |
| class RemoteUserAuthentication(BaseAuthentication): | |
| """ | |
| REMOTE_USER authentication. | |
| To use this, set up your web server to perform authentication, which will | |
| set the REMOTE_USER environment variable. You will need to have | |
| 'django.contrib.auth.backends.RemoteUserBackend in your | |
| AUTHENTICATION_BACKENDS setting | |
| """ | |
| # Name of request header to grab username from. This will be the key as | |
| # used in the request.META dictionary, i.e. the normalization of headers to | |
| # all uppercase and the addition of "HTTP_" prefix apply. | |
| header = "REMOTE_USER" | |
| def authenticate(self, request): | |
| user = authenticate(request=request, remote_user=request.META.get(self.header)) | |
| if user and user.is_active: | |
| return (user, None) | |
Xet Storage Details
- Size:
- 7.7 kB
- Xet hash:
- f0fdc81c25546f4cad615964279f35bf8eceac19ecc05de0ee52ba0f64f7b15f
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.