| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| """ |
| NOTE: All functions in this module are considered private and are |
| subject to abrupt breaking changes. Please do not use them directly. |
| |
| """ |
|
|
| import io |
| import logging |
| from gzip import GzipFile |
| from gzip import compress as gzip_compress |
|
|
| from botocore.compat import urlencode |
| from botocore.useragent import register_feature_id |
| from botocore.utils import determine_content_length |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def maybe_compress_request(config, request_dict, operation_model): |
| """Attempt to compress the request body using the modeled encodings.""" |
| if _should_compress_request(config, request_dict, operation_model): |
| for encoding in operation_model.request_compression['encodings']: |
| encoder = COMPRESSION_MAPPING.get(encoding) |
| if encoder is not None: |
| logger.debug('Compressing request with %s encoding.', encoding) |
| request_dict['body'] = encoder(request_dict['body']) |
| _set_compression_header(request_dict['headers'], encoding) |
| return |
| else: |
| logger.debug('Unsupported compression encoding: %s', encoding) |
|
|
|
|
| def _should_compress_request(config, request_dict, operation_model): |
| if ( |
| config.disable_request_compression is not True |
| and config.signature_version != 'v2' |
| and operation_model.request_compression is not None |
| ): |
| if not _is_compressible_type(request_dict): |
| body_type = type(request_dict['body']) |
| log_msg = 'Body type %s does not support compression.' |
| logger.debug(log_msg, body_type) |
| return False |
|
|
| if operation_model.has_streaming_input: |
| streaming_input = operation_model.get_streaming_input() |
| streaming_metadata = streaming_input.metadata |
| return 'requiresLength' not in streaming_metadata |
|
|
| body_size = _get_body_size(request_dict['body']) |
| min_size = config.request_min_compression_size_bytes |
| return min_size <= body_size |
|
|
| return False |
|
|
|
|
| def _is_compressible_type(request_dict): |
| body = request_dict['body'] |
| |
| if isinstance(body, dict): |
| body = urlencode(body, doseq=True, encoding='utf-8').encode('utf-8') |
| request_dict['body'] = body |
| is_supported_type = isinstance(body, (str, bytes, bytearray)) |
| return is_supported_type or hasattr(body, 'read') |
|
|
|
|
| def _get_body_size(body): |
| size = determine_content_length(body) |
| if size is None: |
| logger.debug( |
| 'Unable to get length of the request body: %s. ' |
| 'Skipping compression.', |
| body, |
| ) |
| size = 0 |
| return size |
|
|
|
|
| def _gzip_compress_body(body): |
| register_feature_id('GZIP_REQUEST_COMPRESSION') |
| if isinstance(body, str): |
| return gzip_compress(body.encode('utf-8')) |
| elif isinstance(body, (bytes, bytearray)): |
| return gzip_compress(body) |
| elif hasattr(body, 'read'): |
| if hasattr(body, 'seek') and hasattr(body, 'tell'): |
| current_position = body.tell() |
| compressed_obj = _gzip_compress_fileobj(body) |
| body.seek(current_position) |
| return compressed_obj |
| return _gzip_compress_fileobj(body) |
|
|
|
|
| def _gzip_compress_fileobj(body): |
| compressed_obj = io.BytesIO() |
| with GzipFile(fileobj=compressed_obj, mode='wb') as gz: |
| while True: |
| chunk = body.read(8192) |
| if not chunk: |
| break |
| if isinstance(chunk, str): |
| chunk = chunk.encode('utf-8') |
| gz.write(chunk) |
| compressed_obj.seek(0) |
| return compressed_obj |
|
|
|
|
| def _set_compression_header(headers, encoding): |
| ce_header = headers.get('Content-Encoding') |
| if ce_header is None: |
| headers['Content-Encoding'] = encoding |
| else: |
| headers['Content-Encoding'] = f'{ce_header},{encoding}' |
|
|
|
|
| COMPRESSION_MAPPING = {'gzip': _gzip_compress_body} |
|
|