| | """ |
| | Custom transports, with nicely configured defaults. |
| | |
| | The following additional keyword arguments are currently supported by httpcore... |
| | |
| | * uds: str |
| | * local_address: str |
| | * retries: int |
| | |
| | Example usages... |
| | |
| | # Disable HTTP/2 on a single specific domain. |
| | mounts = { |
| | "all://": httpx.HTTPTransport(http2=True), |
| | "all://*example.org": httpx.HTTPTransport() |
| | } |
| | |
| | # Using advanced httpcore configuration, with connection retries. |
| | transport = httpx.HTTPTransport(retries=1) |
| | client = httpx.Client(transport=transport) |
| | |
| | # Using advanced httpcore configuration, with unix domain sockets. |
| | transport = httpx.HTTPTransport(uds="socket.uds") |
| | client = httpx.Client(transport=transport) |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import contextlib |
| | import typing |
| | from types import TracebackType |
| |
|
| | if typing.TYPE_CHECKING: |
| | import ssl |
| |
|
| | import httpx |
| |
|
| | from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context |
| | from .._exceptions import ( |
| | ConnectError, |
| | ConnectTimeout, |
| | LocalProtocolError, |
| | NetworkError, |
| | PoolTimeout, |
| | ProtocolError, |
| | ProxyError, |
| | ReadError, |
| | ReadTimeout, |
| | RemoteProtocolError, |
| | TimeoutException, |
| | UnsupportedProtocol, |
| | WriteError, |
| | WriteTimeout, |
| | ) |
| | from .._models import Request, Response |
| | from .._types import AsyncByteStream, CertTypes, ProxyTypes, SyncByteStream |
| | from .._urls import URL |
| | from .base import AsyncBaseTransport, BaseTransport |
| |
|
| | T = typing.TypeVar("T", bound="HTTPTransport") |
| | A = typing.TypeVar("A", bound="AsyncHTTPTransport") |
| |
|
| | SOCKET_OPTION = typing.Union[ |
| | typing.Tuple[int, int, int], |
| | typing.Tuple[int, int, typing.Union[bytes, bytearray]], |
| | typing.Tuple[int, int, None, int], |
| | ] |
| |
|
| | __all__ = ["AsyncHTTPTransport", "HTTPTransport"] |
| |
|
| | HTTPCORE_EXC_MAP: dict[type[Exception], type[httpx.HTTPError]] = {} |
| |
|
| |
|
| | def _load_httpcore_exceptions() -> dict[type[Exception], type[httpx.HTTPError]]: |
| | import httpcore |
| |
|
| | return { |
| | httpcore.TimeoutException: TimeoutException, |
| | httpcore.ConnectTimeout: ConnectTimeout, |
| | httpcore.ReadTimeout: ReadTimeout, |
| | httpcore.WriteTimeout: WriteTimeout, |
| | httpcore.PoolTimeout: PoolTimeout, |
| | httpcore.NetworkError: NetworkError, |
| | httpcore.ConnectError: ConnectError, |
| | httpcore.ReadError: ReadError, |
| | httpcore.WriteError: WriteError, |
| | httpcore.ProxyError: ProxyError, |
| | httpcore.UnsupportedProtocol: UnsupportedProtocol, |
| | httpcore.ProtocolError: ProtocolError, |
| | httpcore.LocalProtocolError: LocalProtocolError, |
| | httpcore.RemoteProtocolError: RemoteProtocolError, |
| | } |
| |
|
| |
|
| | @contextlib.contextmanager |
| | def map_httpcore_exceptions() -> typing.Iterator[None]: |
| | global HTTPCORE_EXC_MAP |
| | if len(HTTPCORE_EXC_MAP) == 0: |
| | HTTPCORE_EXC_MAP = _load_httpcore_exceptions() |
| | try: |
| | yield |
| | except Exception as exc: |
| | mapped_exc = None |
| |
|
| | for from_exc, to_exc in HTTPCORE_EXC_MAP.items(): |
| | if not isinstance(exc, from_exc): |
| | continue |
| | |
| | |
| | |
| | if mapped_exc is None or issubclass(to_exc, mapped_exc): |
| | mapped_exc = to_exc |
| |
|
| | if mapped_exc is None: |
| | raise |
| |
|
| | message = str(exc) |
| | raise mapped_exc(message) from exc |
| |
|
| |
|
| | class ResponseStream(SyncByteStream): |
| | def __init__(self, httpcore_stream: typing.Iterable[bytes]) -> None: |
| | self._httpcore_stream = httpcore_stream |
| |
|
| | def __iter__(self) -> typing.Iterator[bytes]: |
| | with map_httpcore_exceptions(): |
| | for part in self._httpcore_stream: |
| | yield part |
| |
|
| | def close(self) -> None: |
| | if hasattr(self._httpcore_stream, "close"): |
| | self._httpcore_stream.close() |
| |
|
| |
|
| | class HTTPTransport(BaseTransport): |
| | def __init__( |
| | self, |
| | verify: ssl.SSLContext | str | bool = True, |
| | cert: CertTypes | None = None, |
| | trust_env: bool = True, |
| | http1: bool = True, |
| | http2: bool = False, |
| | limits: Limits = DEFAULT_LIMITS, |
| | proxy: ProxyTypes | None = None, |
| | uds: str | None = None, |
| | local_address: str | None = None, |
| | retries: int = 0, |
| | socket_options: typing.Iterable[SOCKET_OPTION] | None = None, |
| | ) -> None: |
| | import httpcore |
| |
|
| | proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy |
| | ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) |
| |
|
| | if proxy is None: |
| | self._pool = httpcore.ConnectionPool( |
| | ssl_context=ssl_context, |
| | max_connections=limits.max_connections, |
| | max_keepalive_connections=limits.max_keepalive_connections, |
| | keepalive_expiry=limits.keepalive_expiry, |
| | http1=http1, |
| | http2=http2, |
| | uds=uds, |
| | local_address=local_address, |
| | retries=retries, |
| | socket_options=socket_options, |
| | ) |
| | elif proxy.url.scheme in ("http", "https"): |
| | self._pool = httpcore.HTTPProxy( |
| | proxy_url=httpcore.URL( |
| | scheme=proxy.url.raw_scheme, |
| | host=proxy.url.raw_host, |
| | port=proxy.url.port, |
| | target=proxy.url.raw_path, |
| | ), |
| | proxy_auth=proxy.raw_auth, |
| | proxy_headers=proxy.headers.raw, |
| | ssl_context=ssl_context, |
| | proxy_ssl_context=proxy.ssl_context, |
| | max_connections=limits.max_connections, |
| | max_keepalive_connections=limits.max_keepalive_connections, |
| | keepalive_expiry=limits.keepalive_expiry, |
| | http1=http1, |
| | http2=http2, |
| | socket_options=socket_options, |
| | ) |
| | elif proxy.url.scheme in ("socks5", "socks5h"): |
| | try: |
| | import socksio |
| | except ImportError: |
| | raise ImportError( |
| | "Using SOCKS proxy, but the 'socksio' package is not installed. " |
| | "Make sure to install httpx using `pip install httpx[socks]`." |
| | ) from None |
| |
|
| | self._pool = httpcore.SOCKSProxy( |
| | proxy_url=httpcore.URL( |
| | scheme=proxy.url.raw_scheme, |
| | host=proxy.url.raw_host, |
| | port=proxy.url.port, |
| | target=proxy.url.raw_path, |
| | ), |
| | proxy_auth=proxy.raw_auth, |
| | ssl_context=ssl_context, |
| | max_connections=limits.max_connections, |
| | max_keepalive_connections=limits.max_keepalive_connections, |
| | keepalive_expiry=limits.keepalive_expiry, |
| | http1=http1, |
| | http2=http2, |
| | ) |
| | else: |
| | raise ValueError( |
| | "Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h'," |
| | f" but got {proxy.url.scheme!r}." |
| | ) |
| |
|
| | def __enter__(self: T) -> T: |
| | self._pool.__enter__() |
| | return self |
| |
|
| | def __exit__( |
| | self, |
| | exc_type: type[BaseException] | None = None, |
| | exc_value: BaseException | None = None, |
| | traceback: TracebackType | None = None, |
| | ) -> None: |
| | with map_httpcore_exceptions(): |
| | self._pool.__exit__(exc_type, exc_value, traceback) |
| |
|
| | def handle_request( |
| | self, |
| | request: Request, |
| | ) -> Response: |
| | assert isinstance(request.stream, SyncByteStream) |
| | import httpcore |
| |
|
| | req = httpcore.Request( |
| | method=request.method, |
| | url=httpcore.URL( |
| | scheme=request.url.raw_scheme, |
| | host=request.url.raw_host, |
| | port=request.url.port, |
| | target=request.url.raw_path, |
| | ), |
| | headers=request.headers.raw, |
| | content=request.stream, |
| | extensions=request.extensions, |
| | ) |
| | with map_httpcore_exceptions(): |
| | resp = self._pool.handle_request(req) |
| |
|
| | assert isinstance(resp.stream, typing.Iterable) |
| |
|
| | return Response( |
| | status_code=resp.status, |
| | headers=resp.headers, |
| | stream=ResponseStream(resp.stream), |
| | extensions=resp.extensions, |
| | ) |
| |
|
| | def close(self) -> None: |
| | self._pool.close() |
| |
|
| |
|
| | class AsyncResponseStream(AsyncByteStream): |
| | def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]) -> None: |
| | self._httpcore_stream = httpcore_stream |
| |
|
| | async def __aiter__(self) -> typing.AsyncIterator[bytes]: |
| | with map_httpcore_exceptions(): |
| | async for part in self._httpcore_stream: |
| | yield part |
| |
|
| | async def aclose(self) -> None: |
| | if hasattr(self._httpcore_stream, "aclose"): |
| | await self._httpcore_stream.aclose() |
| |
|
| |
|
| | class AsyncHTTPTransport(AsyncBaseTransport): |
| | def __init__( |
| | self, |
| | verify: ssl.SSLContext | str | bool = True, |
| | cert: CertTypes | None = None, |
| | trust_env: bool = True, |
| | http1: bool = True, |
| | http2: bool = False, |
| | limits: Limits = DEFAULT_LIMITS, |
| | proxy: ProxyTypes | None = None, |
| | uds: str | None = None, |
| | local_address: str | None = None, |
| | retries: int = 0, |
| | socket_options: typing.Iterable[SOCKET_OPTION] | None = None, |
| | ) -> None: |
| | import httpcore |
| |
|
| | proxy = Proxy(url=proxy) if isinstance(proxy, (str, URL)) else proxy |
| | ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) |
| |
|
| | if proxy is None: |
| | self._pool = httpcore.AsyncConnectionPool( |
| | ssl_context=ssl_context, |
| | max_connections=limits.max_connections, |
| | max_keepalive_connections=limits.max_keepalive_connections, |
| | keepalive_expiry=limits.keepalive_expiry, |
| | http1=http1, |
| | http2=http2, |
| | uds=uds, |
| | local_address=local_address, |
| | retries=retries, |
| | socket_options=socket_options, |
| | ) |
| | elif proxy.url.scheme in ("http", "https"): |
| | self._pool = httpcore.AsyncHTTPProxy( |
| | proxy_url=httpcore.URL( |
| | scheme=proxy.url.raw_scheme, |
| | host=proxy.url.raw_host, |
| | port=proxy.url.port, |
| | target=proxy.url.raw_path, |
| | ), |
| | proxy_auth=proxy.raw_auth, |
| | proxy_headers=proxy.headers.raw, |
| | proxy_ssl_context=proxy.ssl_context, |
| | ssl_context=ssl_context, |
| | max_connections=limits.max_connections, |
| | max_keepalive_connections=limits.max_keepalive_connections, |
| | keepalive_expiry=limits.keepalive_expiry, |
| | http1=http1, |
| | http2=http2, |
| | socket_options=socket_options, |
| | ) |
| | elif proxy.url.scheme in ("socks5", "socks5h"): |
| | try: |
| | import socksio |
| | except ImportError: |
| | raise ImportError( |
| | "Using SOCKS proxy, but the 'socksio' package is not installed. " |
| | "Make sure to install httpx using `pip install httpx[socks]`." |
| | ) from None |
| |
|
| | self._pool = httpcore.AsyncSOCKSProxy( |
| | proxy_url=httpcore.URL( |
| | scheme=proxy.url.raw_scheme, |
| | host=proxy.url.raw_host, |
| | port=proxy.url.port, |
| | target=proxy.url.raw_path, |
| | ), |
| | proxy_auth=proxy.raw_auth, |
| | ssl_context=ssl_context, |
| | max_connections=limits.max_connections, |
| | max_keepalive_connections=limits.max_keepalive_connections, |
| | keepalive_expiry=limits.keepalive_expiry, |
| | http1=http1, |
| | http2=http2, |
| | ) |
| | else: |
| | raise ValueError( |
| | "Proxy protocol must be either 'http', 'https', 'socks5', or 'socks5h'," |
| | " but got {proxy.url.scheme!r}." |
| | ) |
| |
|
| | async def __aenter__(self: A) -> A: |
| | await self._pool.__aenter__() |
| | return self |
| |
|
| | async def __aexit__( |
| | self, |
| | exc_type: type[BaseException] | None = None, |
| | exc_value: BaseException | None = None, |
| | traceback: TracebackType | None = None, |
| | ) -> None: |
| | with map_httpcore_exceptions(): |
| | await self._pool.__aexit__(exc_type, exc_value, traceback) |
| |
|
| | async def handle_async_request( |
| | self, |
| | request: Request, |
| | ) -> Response: |
| | assert isinstance(request.stream, AsyncByteStream) |
| | import httpcore |
| |
|
| | req = httpcore.Request( |
| | method=request.method, |
| | url=httpcore.URL( |
| | scheme=request.url.raw_scheme, |
| | host=request.url.raw_host, |
| | port=request.url.port, |
| | target=request.url.raw_path, |
| | ), |
| | headers=request.headers.raw, |
| | content=request.stream, |
| | extensions=request.extensions, |
| | ) |
| | with map_httpcore_exceptions(): |
| | resp = await self._pool.handle_async_request(req) |
| |
|
| | assert isinstance(resp.stream, typing.AsyncIterable) |
| |
|
| | return Response( |
| | status_code=resp.status, |
| | headers=resp.headers, |
| | stream=AsyncResponseStream(resp.stream), |
| | extensions=resp.extensions, |
| | ) |
| |
|
| | async def aclose(self) -> None: |
| | await self._pool.aclose() |
| |
|