File size: 9,345 Bytes
a06facb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 |
import asyncio
import io
import os
import socket
from typing import Dict, Optional
import aiohttp # lgtm [py/import-and-import-from]
from aiohttp import (
ClientConnectionError,
ClientConnectorError,
ClientHttpProxyError,
ClientProxyConnectionError,
ClientSSLError,
ServerDisconnectedError,
ServerTimeoutError,
)
from aiohttp.client import URL
from botocore.httpsession import (
MAX_POOL_CONNECTIONS,
ConnectionClosedError,
ConnectTimeoutError,
EndpointConnectionError,
HTTPClientError,
InvalidProxiesConfigError,
LocationParseError,
ProxyConfiguration,
ProxyConnectionError,
ReadTimeoutError,
SSLError,
_is_ipaddress,
create_urllib3_context,
ensure_boolean,
get_cert_path,
logger,
mask_proxy_url,
parse_url,
urlparse,
)
from multidict import CIMultiDict
import aiobotocore.awsrequest
from aiobotocore._endpoint_helpers import _IOBaseWrapper, _text
class AIOHTTPSession:
def __init__(
self,
verify: bool = True,
proxies: Dict[str, str] = None, # {scheme: url}
timeout: float = None,
max_pool_connections: int = MAX_POOL_CONNECTIONS,
socket_options=None,
client_cert=None,
proxies_config=None,
connector_args=None,
):
# TODO: handle socket_options
self._session: Optional[aiohttp.ClientSession] = None
self._verify = verify
self._proxy_config = ProxyConfiguration(
proxies=proxies, proxies_settings=proxies_config
)
if isinstance(timeout, (list, tuple)):
conn_timeout, read_timeout = timeout
else:
conn_timeout = read_timeout = timeout
timeout = aiohttp.ClientTimeout(
sock_connect=conn_timeout, sock_read=read_timeout
)
self._cert_file = None
self._key_file = None
if isinstance(client_cert, str):
self._cert_file = client_cert
elif isinstance(client_cert, tuple):
self._cert_file, self._key_file = client_cert
self._timeout = timeout
self._connector_args = connector_args
if self._connector_args is None:
# AWS has a 20 second idle timeout:
# https://web.archive.org/web/20150926192339/https://forums.aws.amazon.com/message.jspa?messageID=215367
# aiohttp default timeout is 30s so set something reasonable here
self._connector_args = dict(keepalive_timeout=12)
self._max_pool_connections = max_pool_connections
self._socket_options = socket_options
if socket_options is None:
self._socket_options = []
# aiohttp handles 100 continue so we shouldn't need AWSHTTP[S]ConnectionPool
# it also pools by host so we don't need a manager, and can pass proxy via
# request so don't need proxy manager
ssl_context = None
if bool(verify):
if proxies:
proxies_settings = self._proxy_config.settings
ssl_context = self._setup_proxy_ssl_context(proxies_settings)
# TODO: add support for
# proxies_settings.get('proxy_use_forwarding_for_https')
else:
ssl_context = self._get_ssl_context()
# inline self._setup_ssl_cert
ca_certs = get_cert_path(verify)
if ca_certs:
ssl_context.load_verify_locations(ca_certs, None, None)
self._create_connector = lambda: aiohttp.TCPConnector(
limit=max_pool_connections,
verify_ssl=bool(verify),
ssl=ssl_context,
**self._connector_args
)
self._connector = None
async def __aenter__(self):
assert not self._session and not self._connector
self._connector = self._create_connector()
self._session = aiohttp.ClientSession(
connector=self._connector,
timeout=self._timeout,
skip_auto_headers={'CONTENT-TYPE'},
auto_decompress=False,
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._session:
await self._session.__aexit__(exc_type, exc_val, exc_tb)
self._session = None
self._connector = None
def _get_ssl_context(self):
ssl_context = create_urllib3_context()
if self._cert_file:
ssl_context.load_cert_chain(self._cert_file, self._key_file)
return ssl_context
def _setup_proxy_ssl_context(self, proxy_url):
proxies_settings = self._proxy_config.settings
proxy_ca_bundle = proxies_settings.get('proxy_ca_bundle')
proxy_cert = proxies_settings.get('proxy_client_cert')
if proxy_ca_bundle is None and proxy_cert is None:
return None
context = self._get_ssl_context()
try:
url = parse_url(proxy_url)
# urllib3 disables this by default but we need it for proper
# proxy tls negotiation when proxy_url is not an IP Address
if not _is_ipaddress(url.host):
context.check_hostname = True
if proxy_ca_bundle is not None:
context.load_verify_locations(cafile=proxy_ca_bundle)
if isinstance(proxy_cert, tuple):
context.load_cert_chain(proxy_cert[0], keyfile=proxy_cert[1])
elif isinstance(proxy_cert, str):
context.load_cert_chain(proxy_cert)
return context
except (OSError, LocationParseError) as e:
raise InvalidProxiesConfigError(error=e)
async def close(self):
await self.__aexit__(None, None, None)
async def send(self, request):
try:
proxy_url = self._proxy_config.proxy_url_for(request.url)
proxy_headers = self._proxy_config.proxy_headers_for(request.url)
url = request.url
headers = request.headers
data = request.body
if ensure_boolean(
os.environ.get('BOTO_EXPERIMENTAL__ADD_PROXY_HOST_HEADER', '')
):
# This is currently an "experimental" feature which provides
# no guarantees of backwards compatibility. It may be subject
# to change or removal in any patch version. Anyone opting in
# to this feature should strictly pin botocore.
host = urlparse(request.url).hostname
proxy_headers['host'] = host
headers_ = CIMultiDict(
(z[0], _text(z[1], encoding='utf-8')) for z in headers.items()
)
# https://github.com/boto/botocore/issues/1255
headers_['Accept-Encoding'] = 'identity'
chunked = None
if headers_.get('Transfer-Encoding', '').lower() == 'chunked':
# aiohttp wants chunking as a param, and not a header
headers_.pop('Transfer-Encoding', '')
chunked = True
if isinstance(data, io.IOBase):
data = _IOBaseWrapper(data)
url = URL(url, encoded=True)
response = await self._session.request(
request.method,
url=url,
chunked=chunked,
headers=headers_,
data=data,
proxy=proxy_url,
proxy_headers=proxy_headers,
)
http_response = aiobotocore.awsrequest.AioAWSResponse(
str(response.url), response.status, response.headers, response
)
if not request.stream_output:
# Cause the raw stream to be exhausted immediately. We do it
# this way instead of using preload_content because
# preload_content will never buffer chunked responses
await http_response.content
return http_response
except ClientSSLError as e:
raise SSLError(endpoint_url=request.url, error=e)
except (ClientProxyConnectionError, ClientHttpProxyError) as e:
raise ProxyConnectionError(
proxy_url=mask_proxy_url(proxy_url), error=e
)
except (
ServerDisconnectedError,
aiohttp.ClientPayloadError,
aiohttp.http_exceptions.BadStatusLine,
) as e:
raise ConnectionClosedError(
error=e, request=request, endpoint_url=request.url
)
except ServerTimeoutError as e:
if str(e).lower().startswith('connect'):
raise ConnectTimeoutError(endpoint_url=request.url, error=e)
else:
raise ReadTimeoutError(endpoint_url=request.url, error=e)
except (
ClientConnectorError,
ClientConnectionError,
socket.gaierror,
) as e:
raise EndpointConnectionError(endpoint_url=request.url, error=e)
except asyncio.TimeoutError as e:
raise ReadTimeoutError(endpoint_url=request.url, error=e)
except Exception as e:
message = 'Exception received when sending urllib3 HTTP request'
logger.debug(message, exc_info=True)
raise HTTPClientError(error=e)
|