Spaces:
Paused
Paused
| # Copyright (c) 2012-2013 Mitch Garnaat http://garnaat.org/ | |
| # Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"). You | |
| # may not use this file except in compliance with the License. A copy of | |
| # the License is located at | |
| # | |
| # http://aws.amazon.com/apache2.0/ | |
| # | |
| # or in the "license" file accompanying this file. This file is | |
| # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF | |
| # ANY KIND, either express or implied. See the License for the specific | |
| # language governing permissions and limitations under the License. | |
| import base64 | |
| import calendar | |
| import datetime | |
| import functools | |
| import hmac | |
| import json | |
| import logging | |
| import time | |
| from collections.abc import Mapping | |
| from email.utils import formatdate | |
| from hashlib import sha1, sha256 | |
| from operator import itemgetter | |
| from botocore.compat import ( | |
| HAS_CRT, | |
| HTTPHeaders, | |
| encodebytes, | |
| ensure_unicode, | |
| parse_qs, | |
| quote, | |
| unquote, | |
| urlsplit, | |
| urlunsplit, | |
| ) | |
| from botocore.exceptions import NoAuthTokenError, NoCredentialsError | |
| from botocore.utils import ( | |
| is_valid_ipv6_endpoint_url, | |
| normalize_url_path, | |
| percent_encode_sequence, | |
| ) | |
| # Imports for backwards compatibility | |
| from botocore.compat import MD5_AVAILABLE # noqa | |
| logger = logging.getLogger(__name__) | |
| EMPTY_SHA256_HASH = ( | |
| 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' | |
| ) | |
| # This is the buffer size used when calculating sha256 checksums. | |
| # Experimenting with various buffer sizes showed that this value generally | |
| # gave the best result (in terms of performance). | |
| PAYLOAD_BUFFER = 1024 * 1024 | |
| ISO8601 = '%Y-%m-%dT%H:%M:%SZ' | |
| SIGV4_TIMESTAMP = '%Y%m%dT%H%M%SZ' | |
| SIGNED_HEADERS_BLACKLIST = [ | |
| 'expect', | |
| 'user-agent', | |
| 'x-amzn-trace-id', | |
| ] | |
| UNSIGNED_PAYLOAD = 'UNSIGNED-PAYLOAD' | |
| STREAMING_UNSIGNED_PAYLOAD_TRAILER = 'STREAMING-UNSIGNED-PAYLOAD-TRAILER' | |
| def _host_from_url(url): | |
| # Given URL, derive value for host header. Ensure that value: | |
| # 1) is lowercase | |
| # 2) excludes port, if it was the default port | |
| # 3) excludes userinfo | |
| url_parts = urlsplit(url) | |
| host = url_parts.hostname # urlsplit's hostname is always lowercase | |
| if is_valid_ipv6_endpoint_url(url): | |
| host = f'[{host}]' | |
| default_ports = { | |
| 'http': 80, | |
| 'https': 443, | |
| } | |
| if url_parts.port is not None: | |
| if url_parts.port != default_ports.get(url_parts.scheme): | |
| host = '%s:%d' % (host, url_parts.port) | |
| return host | |
| def _get_body_as_dict(request): | |
| # For query services, request.data is form-encoded and is already a | |
| # dict, but for other services such as rest-json it could be a json | |
| # string or bytes. In those cases we attempt to load the data as a | |
| # dict. | |
| data = request.data | |
| if isinstance(data, bytes): | |
| data = json.loads(data.decode('utf-8')) | |
| elif isinstance(data, str): | |
| data = json.loads(data) | |
| return data | |
| class BaseSigner: | |
| REQUIRES_REGION = False | |
| REQUIRES_TOKEN = False | |
| def add_auth(self, request): | |
| raise NotImplementedError("add_auth") | |
| class TokenSigner(BaseSigner): | |
| REQUIRES_TOKEN = True | |
| """ | |
| Signers that expect an authorization token to perform the authorization | |
| """ | |
| def __init__(self, auth_token): | |
| self.auth_token = auth_token | |
| class SigV2Auth(BaseSigner): | |
| """ | |
| Sign a request with Signature V2. | |
| """ | |
| def __init__(self, credentials): | |
| self.credentials = credentials | |
| def calc_signature(self, request, params): | |
| logger.debug("Calculating signature using v2 auth.") | |
| split = urlsplit(request.url) | |
| path = split.path | |
| if len(path) == 0: | |
| path = '/' | |
| string_to_sign = f"{request.method}\n{split.netloc}\n{path}\n" | |
| lhmac = hmac.new( | |
| self.credentials.secret_key.encode("utf-8"), digestmod=sha256 | |
| ) | |
| pairs = [] | |
| for key in sorted(params): | |
| # Any previous signature should not be a part of this | |
| # one, so we skip that particular key. This prevents | |
| # issues during retries. | |
| if key == 'Signature': | |
| continue | |
| value = str(params[key]) | |
| quoted_key = quote(key.encode('utf-8'), safe='') | |
| quoted_value = quote(value.encode('utf-8'), safe='-_~') | |
| pairs.append(f'{quoted_key}={quoted_value}') | |
| qs = '&'.join(pairs) | |
| string_to_sign += qs | |
| logger.debug('String to sign: %s', string_to_sign) | |
| lhmac.update(string_to_sign.encode('utf-8')) | |
| b64 = base64.b64encode(lhmac.digest()).strip().decode('utf-8') | |
| return (qs, b64) | |
| def add_auth(self, request): | |
| # The auth handler is the last thing called in the | |
| # preparation phase of a prepared request. | |
| # Because of this we have to parse the query params | |
| # from the request body so we can update them with | |
| # the sigv2 auth params. | |
| if self.credentials is None: | |
| raise NoCredentialsError() | |
| if request.data: | |
| # POST | |
| params = request.data | |
| else: | |
| # GET | |
| params = request.params | |
| params['AWSAccessKeyId'] = self.credentials.access_key | |
| params['SignatureVersion'] = '2' | |
| params['SignatureMethod'] = 'HmacSHA256' | |
| params['Timestamp'] = time.strftime(ISO8601, time.gmtime()) | |
| if self.credentials.token: | |
| params['SecurityToken'] = self.credentials.token | |
| qs, signature = self.calc_signature(request, params) | |
| params['Signature'] = signature | |
| return request | |
| class SigV3Auth(BaseSigner): | |
| def __init__(self, credentials): | |
| self.credentials = credentials | |
| def add_auth(self, request): | |
| if self.credentials is None: | |
| raise NoCredentialsError() | |
| if 'Date' in request.headers: | |
| del request.headers['Date'] | |
| request.headers['Date'] = formatdate(usegmt=True) | |
| if self.credentials.token: | |
| if 'X-Amz-Security-Token' in request.headers: | |
| del request.headers['X-Amz-Security-Token'] | |
| request.headers['X-Amz-Security-Token'] = self.credentials.token | |
| new_hmac = hmac.new( | |
| self.credentials.secret_key.encode('utf-8'), digestmod=sha256 | |
| ) | |
| new_hmac.update(request.headers['Date'].encode('utf-8')) | |
| encoded_signature = encodebytes(new_hmac.digest()).strip() | |
| signature = ( | |
| f"AWS3-HTTPS AWSAccessKeyId={self.credentials.access_key}," | |
| f"Algorithm=HmacSHA256,Signature={encoded_signature.decode('utf-8')}" | |
| ) | |
| if 'X-Amzn-Authorization' in request.headers: | |
| del request.headers['X-Amzn-Authorization'] | |
| request.headers['X-Amzn-Authorization'] = signature | |
| class SigV4Auth(BaseSigner): | |
| """ | |
| Sign a request with Signature V4. | |
| """ | |
| REQUIRES_REGION = True | |
| def __init__(self, credentials, service_name, region_name): | |
| self.credentials = credentials | |
| # We initialize these value here so the unit tests can have | |
| # valid values. But these will get overriden in ``add_auth`` | |
| # later for real requests. | |
| self._region_name = region_name | |
| self._service_name = service_name | |
| def _sign(self, key, msg, hex=False): | |
| if hex: | |
| sig = hmac.new(key, msg.encode('utf-8'), sha256).hexdigest() | |
| else: | |
| sig = hmac.new(key, msg.encode('utf-8'), sha256).digest() | |
| return sig | |
| def headers_to_sign(self, request): | |
| """ | |
| Select the headers from the request that need to be included | |
| in the StringToSign. | |
| """ | |
| header_map = HTTPHeaders() | |
| for name, value in request.headers.items(): | |
| lname = name.lower() | |
| if lname not in SIGNED_HEADERS_BLACKLIST: | |
| header_map[lname] = value | |
| if 'host' not in header_map: | |
| # TODO: We should set the host ourselves, instead of relying on our | |
| # HTTP client to set it for us. | |
| header_map['host'] = _host_from_url(request.url) | |
| return header_map | |
| def canonical_query_string(self, request): | |
| # The query string can come from two parts. One is the | |
| # params attribute of the request. The other is from the request | |
| # url (in which case we have to re-split the url into its components | |
| # and parse out the query string component). | |
| if request.params: | |
| return self._canonical_query_string_params(request.params) | |
| else: | |
| return self._canonical_query_string_url(urlsplit(request.url)) | |
| def _canonical_query_string_params(self, params): | |
| # [(key, value), (key2, value2)] | |
| key_val_pairs = [] | |
| if isinstance(params, Mapping): | |
| params = params.items() | |
| for key, value in params: | |
| key_val_pairs.append( | |
| (quote(key, safe='-_.~'), quote(str(value), safe='-_.~')) | |
| ) | |
| sorted_key_vals = [] | |
| # Sort by the URI-encoded key names, and in the case of | |
| # repeated keys, then sort by the value. | |
| for key, value in sorted(key_val_pairs): | |
| sorted_key_vals.append(f'{key}={value}') | |
| canonical_query_string = '&'.join(sorted_key_vals) | |
| return canonical_query_string | |
| def _canonical_query_string_url(self, parts): | |
| canonical_query_string = '' | |
| if parts.query: | |
| # [(key, value), (key2, value2)] | |
| key_val_pairs = [] | |
| for pair in parts.query.split('&'): | |
| key, _, value = pair.partition('=') | |
| key_val_pairs.append((key, value)) | |
| sorted_key_vals = [] | |
| # Sort by the URI-encoded key names, and in the case of | |
| # repeated keys, then sort by the value. | |
| for key, value in sorted(key_val_pairs): | |
| sorted_key_vals.append(f'{key}={value}') | |
| canonical_query_string = '&'.join(sorted_key_vals) | |
| return canonical_query_string | |
| def canonical_headers(self, headers_to_sign): | |
| """ | |
| Return the headers that need to be included in the StringToSign | |
| in their canonical form by converting all header keys to lower | |
| case, sorting them in alphabetical order and then joining | |
| them into a string, separated by newlines. | |
| """ | |
| headers = [] | |
| sorted_header_names = sorted(set(headers_to_sign)) | |
| for key in sorted_header_names: | |
| value = ','.join( | |
| self._header_value(v) for v in headers_to_sign.get_all(key) | |
| ) | |
| headers.append(f'{key}:{ensure_unicode(value)}') | |
| return '\n'.join(headers) | |
| def _header_value(self, value): | |
| # From the sigv4 docs: | |
| # Lowercase(HeaderName) + ':' + Trimall(HeaderValue) | |
| # | |
| # The Trimall function removes excess white space before and after | |
| # values, and converts sequential spaces to a single space. | |
| return ' '.join(value.split()) | |
| def signed_headers(self, headers_to_sign): | |
| headers = sorted(n.lower().strip() for n in set(headers_to_sign)) | |
| return ';'.join(headers) | |
| def _is_streaming_checksum_payload(self, request): | |
| checksum_context = request.context.get('checksum', {}) | |
| algorithm = checksum_context.get('request_algorithm') | |
| return isinstance(algorithm, dict) and algorithm.get('in') == 'trailer' | |
| def payload(self, request): | |
| if self._is_streaming_checksum_payload(request): | |
| return STREAMING_UNSIGNED_PAYLOAD_TRAILER | |
| elif not self._should_sha256_sign_payload(request): | |
| # When payload signing is disabled, we use this static string in | |
| # place of the payload checksum. | |
| return UNSIGNED_PAYLOAD | |
| request_body = request.body | |
| if request_body and hasattr(request_body, 'seek'): | |
| position = request_body.tell() | |
| read_chunksize = functools.partial( | |
| request_body.read, PAYLOAD_BUFFER | |
| ) | |
| checksum = sha256() | |
| for chunk in iter(read_chunksize, b''): | |
| checksum.update(chunk) | |
| hex_checksum = checksum.hexdigest() | |
| request_body.seek(position) | |
| return hex_checksum | |
| elif request_body: | |
| # The request serialization has ensured that | |
| # request.body is a bytes() type. | |
| return sha256(request_body).hexdigest() | |
| else: | |
| return EMPTY_SHA256_HASH | |
| def _should_sha256_sign_payload(self, request): | |
| # Payloads will always be signed over insecure connections. | |
| if not request.url.startswith('https'): | |
| return True | |
| # Certain operations may have payload signing disabled by default. | |
| # Since we don't have access to the operation model, we pass in this | |
| # bit of metadata through the request context. | |
| return request.context.get('payload_signing_enabled', True) | |
| def canonical_request(self, request): | |
| cr = [request.method.upper()] | |
| path = self._normalize_url_path(urlsplit(request.url).path) | |
| cr.append(path) | |
| cr.append(self.canonical_query_string(request)) | |
| headers_to_sign = self.headers_to_sign(request) | |
| cr.append(self.canonical_headers(headers_to_sign) + '\n') | |
| cr.append(self.signed_headers(headers_to_sign)) | |
| if 'X-Amz-Content-SHA256' in request.headers: | |
| body_checksum = request.headers['X-Amz-Content-SHA256'] | |
| else: | |
| body_checksum = self.payload(request) | |
| cr.append(body_checksum) | |
| return '\n'.join(cr) | |
| def _normalize_url_path(self, path): | |
| normalized_path = quote(normalize_url_path(path), safe='/~') | |
| return normalized_path | |
| def scope(self, request): | |
| scope = [self.credentials.access_key] | |
| scope.append(request.context['timestamp'][0:8]) | |
| scope.append(self._region_name) | |
| scope.append(self._service_name) | |
| scope.append('aws4_request') | |
| return '/'.join(scope) | |
| def credential_scope(self, request): | |
| scope = [] | |
| scope.append(request.context['timestamp'][0:8]) | |
| scope.append(self._region_name) | |
| scope.append(self._service_name) | |
| scope.append('aws4_request') | |
| return '/'.join(scope) | |
| def string_to_sign(self, request, canonical_request): | |
| """ | |
| Return the canonical StringToSign as well as a dict | |
| containing the original version of all headers that | |
| were included in the StringToSign. | |
| """ | |
| sts = ['AWS4-HMAC-SHA256'] | |
| sts.append(request.context['timestamp']) | |
| sts.append(self.credential_scope(request)) | |
| sts.append(sha256(canonical_request.encode('utf-8')).hexdigest()) | |
| return '\n'.join(sts) | |
| def signature(self, string_to_sign, request): | |
| key = self.credentials.secret_key | |
| k_date = self._sign( | |
| (f"AWS4{key}").encode(), request.context["timestamp"][0:8] | |
| ) | |
| k_region = self._sign(k_date, self._region_name) | |
| k_service = self._sign(k_region, self._service_name) | |
| k_signing = self._sign(k_service, 'aws4_request') | |
| return self._sign(k_signing, string_to_sign, hex=True) | |
| def add_auth(self, request): | |
| if self.credentials is None: | |
| raise NoCredentialsError() | |
| datetime_now = datetime.datetime.utcnow() | |
| request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) | |
| # This could be a retry. Make sure the previous | |
| # authorization header is removed first. | |
| self._modify_request_before_signing(request) | |
| canonical_request = self.canonical_request(request) | |
| logger.debug("Calculating signature using v4 auth.") | |
| logger.debug('CanonicalRequest:\n%s', canonical_request) | |
| string_to_sign = self.string_to_sign(request, canonical_request) | |
| logger.debug('StringToSign:\n%s', string_to_sign) | |
| signature = self.signature(string_to_sign, request) | |
| logger.debug('Signature:\n%s', signature) | |
| self._inject_signature_to_request(request, signature) | |
| def _inject_signature_to_request(self, request, signature): | |
| auth_str = ['AWS4-HMAC-SHA256 Credential=%s' % self.scope(request)] | |
| headers_to_sign = self.headers_to_sign(request) | |
| auth_str.append( | |
| f"SignedHeaders={self.signed_headers(headers_to_sign)}" | |
| ) | |
| auth_str.append('Signature=%s' % signature) | |
| request.headers['Authorization'] = ', '.join(auth_str) | |
| return request | |
| def _modify_request_before_signing(self, request): | |
| if 'Authorization' in request.headers: | |
| del request.headers['Authorization'] | |
| self._set_necessary_date_headers(request) | |
| if self.credentials.token: | |
| if 'X-Amz-Security-Token' in request.headers: | |
| del request.headers['X-Amz-Security-Token'] | |
| request.headers['X-Amz-Security-Token'] = self.credentials.token | |
| if not request.context.get('payload_signing_enabled', True): | |
| if 'X-Amz-Content-SHA256' in request.headers: | |
| del request.headers['X-Amz-Content-SHA256'] | |
| request.headers['X-Amz-Content-SHA256'] = UNSIGNED_PAYLOAD | |
| def _set_necessary_date_headers(self, request): | |
| # The spec allows for either the Date _or_ the X-Amz-Date value to be | |
| # used so we check both. If there's a Date header, we use the date | |
| # header. Otherwise we use the X-Amz-Date header. | |
| if 'Date' in request.headers: | |
| del request.headers['Date'] | |
| datetime_timestamp = datetime.datetime.strptime( | |
| request.context['timestamp'], SIGV4_TIMESTAMP | |
| ) | |
| request.headers['Date'] = formatdate( | |
| int(calendar.timegm(datetime_timestamp.timetuple())) | |
| ) | |
| if 'X-Amz-Date' in request.headers: | |
| del request.headers['X-Amz-Date'] | |
| else: | |
| if 'X-Amz-Date' in request.headers: | |
| del request.headers['X-Amz-Date'] | |
| request.headers['X-Amz-Date'] = request.context['timestamp'] | |
| class S3SigV4Auth(SigV4Auth): | |
| def _modify_request_before_signing(self, request): | |
| super()._modify_request_before_signing(request) | |
| if 'X-Amz-Content-SHA256' in request.headers: | |
| del request.headers['X-Amz-Content-SHA256'] | |
| request.headers['X-Amz-Content-SHA256'] = self.payload(request) | |
| def _should_sha256_sign_payload(self, request): | |
| # S3 allows optional body signing, so to minimize the performance | |
| # impact, we opt to not SHA256 sign the body on streaming uploads, | |
| # provided that we're on https. | |
| client_config = request.context.get('client_config') | |
| s3_config = getattr(client_config, 's3', None) | |
| # The config could be None if it isn't set, or if the customer sets it | |
| # to None. | |
| if s3_config is None: | |
| s3_config = {} | |
| # The explicit configuration takes precedence over any implicit | |
| # configuration. | |
| sign_payload = s3_config.get('payload_signing_enabled', None) | |
| if sign_payload is not None: | |
| return sign_payload | |
| # We require that both a checksum be present and https be enabled | |
| # to implicitly disable body signing. The combination of TLS and | |
| # a checksum is sufficiently secure and durable for us to be | |
| # confident in the request without body signing. | |
| checksum_header = 'Content-MD5' | |
| checksum_context = request.context.get('checksum', {}) | |
| algorithm = checksum_context.get('request_algorithm') | |
| if isinstance(algorithm, dict) and algorithm.get('in') == 'header': | |
| checksum_header = algorithm['name'] | |
| if ( | |
| not request.url.startswith("https") | |
| or checksum_header not in request.headers | |
| ): | |
| return True | |
| # If the input is streaming we disable body signing by default. | |
| if request.context.get('has_streaming_input', False): | |
| return False | |
| # If the S3-specific checks had no results, delegate to the generic | |
| # checks. | |
| return super()._should_sha256_sign_payload(request) | |
| def _normalize_url_path(self, path): | |
| # For S3, we do not normalize the path. | |
| return path | |
| class S3ExpressAuth(S3SigV4Auth): | |
| REQUIRES_IDENTITY_CACHE = True | |
| def __init__( | |
| self, credentials, service_name, region_name, *, identity_cache | |
| ): | |
| super().__init__(credentials, service_name, region_name) | |
| self._identity_cache = identity_cache | |
| def add_auth(self, request): | |
| super().add_auth(request) | |
| def _modify_request_before_signing(self, request): | |
| super()._modify_request_before_signing(request) | |
| if 'x-amz-s3session-token' not in request.headers: | |
| request.headers['x-amz-s3session-token'] = self.credentials.token | |
| # S3Express does not support STS' X-Amz-Security-Token | |
| if 'X-Amz-Security-Token' in request.headers: | |
| del request.headers['X-Amz-Security-Token'] | |
| class S3ExpressPostAuth(S3ExpressAuth): | |
| REQUIRES_IDENTITY_CACHE = True | |
| def add_auth(self, request): | |
| datetime_now = datetime.datetime.utcnow() | |
| request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) | |
| fields = {} | |
| if request.context.get('s3-presign-post-fields', None) is not None: | |
| fields = request.context['s3-presign-post-fields'] | |
| policy = {} | |
| conditions = [] | |
| if request.context.get('s3-presign-post-policy', None) is not None: | |
| policy = request.context['s3-presign-post-policy'] | |
| if policy.get('conditions', None) is not None: | |
| conditions = policy['conditions'] | |
| policy['conditions'] = conditions | |
| fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' | |
| fields['x-amz-credential'] = self.scope(request) | |
| fields['x-amz-date'] = request.context['timestamp'] | |
| conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) | |
| conditions.append({'x-amz-credential': self.scope(request)}) | |
| conditions.append({'x-amz-date': request.context['timestamp']}) | |
| if self.credentials.token is not None: | |
| fields['X-Amz-S3session-Token'] = self.credentials.token | |
| conditions.append( | |
| {'X-Amz-S3session-Token': self.credentials.token} | |
| ) | |
| # Dump the base64 encoded policy into the fields dictionary. | |
| fields['policy'] = base64.b64encode( | |
| json.dumps(policy).encode('utf-8') | |
| ).decode('utf-8') | |
| fields['x-amz-signature'] = self.signature(fields['policy'], request) | |
| request.context['s3-presign-post-fields'] = fields | |
| request.context['s3-presign-post-policy'] = policy | |
| class S3ExpressQueryAuth(S3ExpressAuth): | |
| DEFAULT_EXPIRES = 300 | |
| REQUIRES_IDENTITY_CACHE = True | |
| def __init__( | |
| self, | |
| credentials, | |
| service_name, | |
| region_name, | |
| *, | |
| identity_cache, | |
| expires=DEFAULT_EXPIRES, | |
| ): | |
| super().__init__( | |
| credentials, | |
| service_name, | |
| region_name, | |
| identity_cache=identity_cache, | |
| ) | |
| self._expires = expires | |
| def _modify_request_before_signing(self, request): | |
| # We automatically set this header, so if it's the auto-set value we | |
| # want to get rid of it since it doesn't make sense for presigned urls. | |
| content_type = request.headers.get('content-type') | |
| blocklisted_content_type = ( | |
| 'application/x-www-form-urlencoded; charset=utf-8' | |
| ) | |
| if content_type == blocklisted_content_type: | |
| del request.headers['content-type'] | |
| # Note that we're not including X-Amz-Signature. | |
| # From the docs: "The Canonical Query String must include all the query | |
| # parameters from the preceding table except for X-Amz-Signature. | |
| signed_headers = self.signed_headers(self.headers_to_sign(request)) | |
| auth_params = { | |
| 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', | |
| 'X-Amz-Credential': self.scope(request), | |
| 'X-Amz-Date': request.context['timestamp'], | |
| 'X-Amz-Expires': self._expires, | |
| 'X-Amz-SignedHeaders': signed_headers, | |
| } | |
| if self.credentials.token is not None: | |
| auth_params['X-Amz-S3session-Token'] = self.credentials.token | |
| # Now parse the original query string to a dict, inject our new query | |
| # params, and serialize back to a query string. | |
| url_parts = urlsplit(request.url) | |
| # parse_qs makes each value a list, but in our case we know we won't | |
| # have repeated keys so we know we have single element lists which we | |
| # can convert back to scalar values. | |
| query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) | |
| query_dict = {k: v[0] for k, v in query_string_parts.items()} | |
| if request.params: | |
| query_dict.update(request.params) | |
| request.params = {} | |
| # The spec is particular about this. It *has* to be: | |
| # https://<endpoint>?<operation params>&<auth params> | |
| # You can't mix the two types of params together, i.e just keep doing | |
| # new_query_params.update(op_params) | |
| # new_query_params.update(auth_params) | |
| # percent_encode_sequence(new_query_params) | |
| operation_params = '' | |
| if request.data: | |
| # We also need to move the body params into the query string. To | |
| # do this, we first have to convert it to a dict. | |
| query_dict.update(_get_body_as_dict(request)) | |
| request.data = '' | |
| if query_dict: | |
| operation_params = percent_encode_sequence(query_dict) + '&' | |
| new_query_string = ( | |
| f"{operation_params}{percent_encode_sequence(auth_params)}" | |
| ) | |
| # url_parts is a tuple (and therefore immutable) so we need to create | |
| # a new url_parts with the new query string. | |
| # <part> - <index> | |
| # scheme - 0 | |
| # netloc - 1 | |
| # path - 2 | |
| # query - 3 <-- we're replacing this. | |
| # fragment - 4 | |
| p = url_parts | |
| new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) | |
| request.url = urlunsplit(new_url_parts) | |
| def _inject_signature_to_request(self, request, signature): | |
| # Rather than calculating an "Authorization" header, for the query | |
| # param quth, we just append an 'X-Amz-Signature' param to the end | |
| # of the query string. | |
| request.url += '&X-Amz-Signature=%s' % signature | |
| def _normalize_url_path(self, path): | |
| # For S3, we do not normalize the path. | |
| return path | |
| def payload(self, request): | |
| # From the doc link above: | |
| # "You don't include a payload hash in the Canonical Request, because | |
| # when you create a presigned URL, you don't know anything about the | |
| # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". | |
| return UNSIGNED_PAYLOAD | |
| class SigV4QueryAuth(SigV4Auth): | |
| DEFAULT_EXPIRES = 3600 | |
| def __init__( | |
| self, credentials, service_name, region_name, expires=DEFAULT_EXPIRES | |
| ): | |
| super().__init__(credentials, service_name, region_name) | |
| self._expires = expires | |
| def _modify_request_before_signing(self, request): | |
| # We automatically set this header, so if it's the auto-set value we | |
| # want to get rid of it since it doesn't make sense for presigned urls. | |
| content_type = request.headers.get('content-type') | |
| blacklisted_content_type = ( | |
| 'application/x-www-form-urlencoded; charset=utf-8' | |
| ) | |
| if content_type == blacklisted_content_type: | |
| del request.headers['content-type'] | |
| # Note that we're not including X-Amz-Signature. | |
| # From the docs: "The Canonical Query String must include all the query | |
| # parameters from the preceding table except for X-Amz-Signature. | |
| signed_headers = self.signed_headers(self.headers_to_sign(request)) | |
| auth_params = { | |
| 'X-Amz-Algorithm': 'AWS4-HMAC-SHA256', | |
| 'X-Amz-Credential': self.scope(request), | |
| 'X-Amz-Date': request.context['timestamp'], | |
| 'X-Amz-Expires': self._expires, | |
| 'X-Amz-SignedHeaders': signed_headers, | |
| } | |
| if self.credentials.token is not None: | |
| auth_params['X-Amz-Security-Token'] = self.credentials.token | |
| # Now parse the original query string to a dict, inject our new query | |
| # params, and serialize back to a query string. | |
| url_parts = urlsplit(request.url) | |
| # parse_qs makes each value a list, but in our case we know we won't | |
| # have repeated keys so we know we have single element lists which we | |
| # can convert back to scalar values. | |
| query_string_parts = parse_qs(url_parts.query, keep_blank_values=True) | |
| query_dict = {k: v[0] for k, v in query_string_parts.items()} | |
| if request.params: | |
| query_dict.update(request.params) | |
| request.params = {} | |
| # The spec is particular about this. It *has* to be: | |
| # https://<endpoint>?<operation params>&<auth params> | |
| # You can't mix the two types of params together, i.e just keep doing | |
| # new_query_params.update(op_params) | |
| # new_query_params.update(auth_params) | |
| # percent_encode_sequence(new_query_params) | |
| operation_params = '' | |
| if request.data: | |
| # We also need to move the body params into the query string. To | |
| # do this, we first have to convert it to a dict. | |
| query_dict.update(_get_body_as_dict(request)) | |
| request.data = '' | |
| if query_dict: | |
| operation_params = percent_encode_sequence(query_dict) + '&' | |
| new_query_string = ( | |
| f"{operation_params}{percent_encode_sequence(auth_params)}" | |
| ) | |
| # url_parts is a tuple (and therefore immutable) so we need to create | |
| # a new url_parts with the new query string. | |
| # <part> - <index> | |
| # scheme - 0 | |
| # netloc - 1 | |
| # path - 2 | |
| # query - 3 <-- we're replacing this. | |
| # fragment - 4 | |
| p = url_parts | |
| new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) | |
| request.url = urlunsplit(new_url_parts) | |
| def _inject_signature_to_request(self, request, signature): | |
| # Rather than calculating an "Authorization" header, for the query | |
| # param quth, we just append an 'X-Amz-Signature' param to the end | |
| # of the query string. | |
| request.url += '&X-Amz-Signature=%s' % signature | |
| class S3SigV4QueryAuth(SigV4QueryAuth): | |
| """S3 SigV4 auth using query parameters. | |
| This signer will sign a request using query parameters and signature | |
| version 4, i.e a "presigned url" signer. | |
| Based off of: | |
| http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-query-string-auth.html | |
| """ | |
| def _normalize_url_path(self, path): | |
| # For S3, we do not normalize the path. | |
| return path | |
| def payload(self, request): | |
| # From the doc link above: | |
| # "You don't include a payload hash in the Canonical Request, because | |
| # when you create a presigned URL, you don't know anything about the | |
| # payload. Instead, you use a constant string "UNSIGNED-PAYLOAD". | |
| return UNSIGNED_PAYLOAD | |
| class S3SigV4PostAuth(SigV4Auth): | |
| """ | |
| Presigns a s3 post | |
| Implementation doc here: | |
| http://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-UsingHTTPPOST.html | |
| """ | |
| def add_auth(self, request): | |
| datetime_now = datetime.datetime.utcnow() | |
| request.context['timestamp'] = datetime_now.strftime(SIGV4_TIMESTAMP) | |
| fields = {} | |
| if request.context.get('s3-presign-post-fields', None) is not None: | |
| fields = request.context['s3-presign-post-fields'] | |
| policy = {} | |
| conditions = [] | |
| if request.context.get('s3-presign-post-policy', None) is not None: | |
| policy = request.context['s3-presign-post-policy'] | |
| if policy.get('conditions', None) is not None: | |
| conditions = policy['conditions'] | |
| policy['conditions'] = conditions | |
| fields['x-amz-algorithm'] = 'AWS4-HMAC-SHA256' | |
| fields['x-amz-credential'] = self.scope(request) | |
| fields['x-amz-date'] = request.context['timestamp'] | |
| conditions.append({'x-amz-algorithm': 'AWS4-HMAC-SHA256'}) | |
| conditions.append({'x-amz-credential': self.scope(request)}) | |
| conditions.append({'x-amz-date': request.context['timestamp']}) | |
| if self.credentials.token is not None: | |
| fields['x-amz-security-token'] = self.credentials.token | |
| conditions.append({'x-amz-security-token': self.credentials.token}) | |
| # Dump the base64 encoded policy into the fields dictionary. | |
| fields['policy'] = base64.b64encode( | |
| json.dumps(policy).encode('utf-8') | |
| ).decode('utf-8') | |
| fields['x-amz-signature'] = self.signature(fields['policy'], request) | |
| request.context['s3-presign-post-fields'] = fields | |
| request.context['s3-presign-post-policy'] = policy | |
| class HmacV1Auth(BaseSigner): | |
| # List of Query String Arguments of Interest | |
| QSAOfInterest = [ | |
| 'accelerate', | |
| 'acl', | |
| 'cors', | |
| 'defaultObjectAcl', | |
| 'location', | |
| 'logging', | |
| 'partNumber', | |
| 'policy', | |
| 'requestPayment', | |
| 'torrent', | |
| 'versioning', | |
| 'versionId', | |
| 'versions', | |
| 'website', | |
| 'uploads', | |
| 'uploadId', | |
| 'response-content-type', | |
| 'response-content-language', | |
| 'response-expires', | |
| 'response-cache-control', | |
| 'response-content-disposition', | |
| 'response-content-encoding', | |
| 'delete', | |
| 'lifecycle', | |
| 'tagging', | |
| 'restore', | |
| 'storageClass', | |
| 'notification', | |
| 'replication', | |
| 'requestPayment', | |
| 'analytics', | |
| 'metrics', | |
| 'inventory', | |
| 'select', | |
| 'select-type', | |
| 'object-lock', | |
| ] | |
| def __init__(self, credentials, service_name=None, region_name=None): | |
| self.credentials = credentials | |
| def sign_string(self, string_to_sign): | |
| new_hmac = hmac.new( | |
| self.credentials.secret_key.encode('utf-8'), digestmod=sha1 | |
| ) | |
| new_hmac.update(string_to_sign.encode('utf-8')) | |
| return encodebytes(new_hmac.digest()).strip().decode('utf-8') | |
| def canonical_standard_headers(self, headers): | |
| interesting_headers = ['content-md5', 'content-type', 'date'] | |
| hoi = [] | |
| if 'Date' in headers: | |
| del headers['Date'] | |
| headers['Date'] = self._get_date() | |
| for ih in interesting_headers: | |
| found = False | |
| for key in headers: | |
| lk = key.lower() | |
| if headers[key] is not None and lk == ih: | |
| hoi.append(headers[key].strip()) | |
| found = True | |
| if not found: | |
| hoi.append('') | |
| return '\n'.join(hoi) | |
| def canonical_custom_headers(self, headers): | |
| hoi = [] | |
| custom_headers = {} | |
| for key in headers: | |
| lk = key.lower() | |
| if headers[key] is not None: | |
| if lk.startswith('x-amz-'): | |
| custom_headers[lk] = ','.join( | |
| v.strip() for v in headers.get_all(key) | |
| ) | |
| sorted_header_keys = sorted(custom_headers.keys()) | |
| for key in sorted_header_keys: | |
| hoi.append(f"{key}:{custom_headers[key]}") | |
| return '\n'.join(hoi) | |
| def unquote_v(self, nv): | |
| """ | |
| TODO: Do we need this? | |
| """ | |
| if len(nv) == 1: | |
| return nv | |
| else: | |
| return (nv[0], unquote(nv[1])) | |
| def canonical_resource(self, split, auth_path=None): | |
| # don't include anything after the first ? in the resource... | |
| # unless it is one of the QSA of interest, defined above | |
| # NOTE: | |
| # The path in the canonical resource should always be the | |
| # full path including the bucket name, even for virtual-hosting | |
| # style addressing. The ``auth_path`` keeps track of the full | |
| # path for the canonical resource and would be passed in if | |
| # the client was using virtual-hosting style. | |
| if auth_path is not None: | |
| buf = auth_path | |
| else: | |
| buf = split.path | |
| if split.query: | |
| qsa = split.query.split('&') | |
| qsa = [a.split('=', 1) for a in qsa] | |
| qsa = [ | |
| self.unquote_v(a) for a in qsa if a[0] in self.QSAOfInterest | |
| ] | |
| if len(qsa) > 0: | |
| qsa.sort(key=itemgetter(0)) | |
| qsa = ['='.join(a) for a in qsa] | |
| buf += '?' | |
| buf += '&'.join(qsa) | |
| return buf | |
| def canonical_string( | |
| self, method, split, headers, expires=None, auth_path=None | |
| ): | |
| cs = method.upper() + '\n' | |
| cs += self.canonical_standard_headers(headers) + '\n' | |
| custom_headers = self.canonical_custom_headers(headers) | |
| if custom_headers: | |
| cs += custom_headers + '\n' | |
| cs += self.canonical_resource(split, auth_path=auth_path) | |
| return cs | |
| def get_signature( | |
| self, method, split, headers, expires=None, auth_path=None | |
| ): | |
| if self.credentials.token: | |
| del headers['x-amz-security-token'] | |
| headers['x-amz-security-token'] = self.credentials.token | |
| string_to_sign = self.canonical_string( | |
| method, split, headers, auth_path=auth_path | |
| ) | |
| logger.debug('StringToSign:\n%s', string_to_sign) | |
| return self.sign_string(string_to_sign) | |
| def add_auth(self, request): | |
| if self.credentials is None: | |
| raise NoCredentialsError | |
| logger.debug("Calculating signature using hmacv1 auth.") | |
| split = urlsplit(request.url) | |
| logger.debug('HTTP request method: %s', request.method) | |
| signature = self.get_signature( | |
| request.method, split, request.headers, auth_path=request.auth_path | |
| ) | |
| self._inject_signature(request, signature) | |
| def _get_date(self): | |
| return formatdate(usegmt=True) | |
| def _inject_signature(self, request, signature): | |
| if 'Authorization' in request.headers: | |
| # We have to do this because request.headers is not | |
| # normal dictionary. It has the (unintuitive) behavior | |
| # of aggregating repeated setattr calls for the same | |
| # key value. For example: | |
| # headers['foo'] = 'a'; headers['foo'] = 'b' | |
| # list(headers) will print ['foo', 'foo']. | |
| del request.headers['Authorization'] | |
| auth_header = f"AWS {self.credentials.access_key}:{signature}" | |
| request.headers['Authorization'] = auth_header | |
| class HmacV1QueryAuth(HmacV1Auth): | |
| """ | |
| Generates a presigned request for s3. | |
| Spec from this document: | |
| http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html | |
| #RESTAuthenticationQueryStringAuth | |
| """ | |
| DEFAULT_EXPIRES = 3600 | |
| def __init__(self, credentials, expires=DEFAULT_EXPIRES): | |
| self.credentials = credentials | |
| self._expires = expires | |
| def _get_date(self): | |
| return str(int(time.time() + int(self._expires))) | |
| def _inject_signature(self, request, signature): | |
| query_dict = {} | |
| query_dict['AWSAccessKeyId'] = self.credentials.access_key | |
| query_dict['Signature'] = signature | |
| for header_key in request.headers: | |
| lk = header_key.lower() | |
| # For query string requests, Expires is used instead of the | |
| # Date header. | |
| if header_key == 'Date': | |
| query_dict['Expires'] = request.headers['Date'] | |
| # We only want to include relevant headers in the query string. | |
| # These can be anything that starts with x-amz, is Content-MD5, | |
| # or is Content-Type. | |
| elif lk.startswith('x-amz-') or lk in ( | |
| 'content-md5', | |
| 'content-type', | |
| ): | |
| query_dict[lk] = request.headers[lk] | |
| # Combine all of the identified headers into an encoded | |
| # query string | |
| new_query_string = percent_encode_sequence(query_dict) | |
| # Create a new url with the presigned url. | |
| p = urlsplit(request.url) | |
| if p[3]: | |
| # If there was a pre-existing query string, we should | |
| # add that back before injecting the new query string. | |
| new_query_string = f'{p[3]}&{new_query_string}' | |
| new_url_parts = (p[0], p[1], p[2], new_query_string, p[4]) | |
| request.url = urlunsplit(new_url_parts) | |
| class HmacV1PostAuth(HmacV1Auth): | |
| """ | |
| Generates a presigned post for s3. | |
| Spec from this document: | |
| http://docs.aws.amazon.com/AmazonS3/latest/dev/UsingHTTPPOST.html | |
| """ | |
| def add_auth(self, request): | |
| fields = {} | |
| if request.context.get('s3-presign-post-fields', None) is not None: | |
| fields = request.context['s3-presign-post-fields'] | |
| policy = {} | |
| conditions = [] | |
| if request.context.get('s3-presign-post-policy', None) is not None: | |
| policy = request.context['s3-presign-post-policy'] | |
| if policy.get('conditions', None) is not None: | |
| conditions = policy['conditions'] | |
| policy['conditions'] = conditions | |
| fields['AWSAccessKeyId'] = self.credentials.access_key | |
| if self.credentials.token is not None: | |
| fields['x-amz-security-token'] = self.credentials.token | |
| conditions.append({'x-amz-security-token': self.credentials.token}) | |
| # Dump the base64 encoded policy into the fields dictionary. | |
| fields['policy'] = base64.b64encode( | |
| json.dumps(policy).encode('utf-8') | |
| ).decode('utf-8') | |
| fields['signature'] = self.sign_string(fields['policy']) | |
| request.context['s3-presign-post-fields'] = fields | |
| request.context['s3-presign-post-policy'] = policy | |
| class BearerAuth(TokenSigner): | |
| """ | |
| Performs bearer token authorization by placing the bearer token in the | |
| Authorization header as specified by Section 2.1 of RFC 6750. | |
| https://datatracker.ietf.org/doc/html/rfc6750#section-2.1 | |
| """ | |
| def add_auth(self, request): | |
| if self.auth_token is None: | |
| raise NoAuthTokenError() | |
| auth_header = f'Bearer {self.auth_token.token}' | |
| if 'Authorization' in request.headers: | |
| del request.headers['Authorization'] | |
| request.headers['Authorization'] = auth_header | |
| AUTH_TYPE_MAPS = { | |
| 'v2': SigV2Auth, | |
| 'v3': SigV3Auth, | |
| 'v3https': SigV3Auth, | |
| 's3': HmacV1Auth, | |
| 's3-query': HmacV1QueryAuth, | |
| 's3-presign-post': HmacV1PostAuth, | |
| 's3v4-presign-post': S3SigV4PostAuth, | |
| 'v4-s3express': S3ExpressAuth, | |
| 'v4-s3express-query': S3ExpressQueryAuth, | |
| 'v4-s3express-presign-post': S3ExpressPostAuth, | |
| 'bearer': BearerAuth, | |
| } | |
| # Define v4 signers depending on if CRT is present | |
| if HAS_CRT: | |
| from botocore.crt.auth import CRT_AUTH_TYPE_MAPS | |
| AUTH_TYPE_MAPS.update(CRT_AUTH_TYPE_MAPS) | |
| else: | |
| AUTH_TYPE_MAPS.update( | |
| { | |
| 'v4': SigV4Auth, | |
| 'v4-query': SigV4QueryAuth, | |
| 's3v4': S3SigV4Auth, | |
| 's3v4-query': S3SigV4QueryAuth, | |
| } | |
| ) | |