| """PipSession and supporting code, containing all pip-specific |
| network request configuration and behavior. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import email.utils |
| import functools |
| import io |
| import ipaddress |
| import json |
| import logging |
| import mimetypes |
| import os |
| import platform |
| import shutil |
| import subprocess |
| import sys |
| import urllib.parse |
| import warnings |
| from collections.abc import Generator, Mapping, Sequence |
| from typing import ( |
| TYPE_CHECKING, |
| Any, |
| Optional, |
| Union, |
| ) |
|
|
| from pip._vendor import requests, urllib3 |
| from pip._vendor.cachecontrol import CacheControlAdapter as _BaseCacheControlAdapter |
| from pip._vendor.requests.adapters import DEFAULT_POOLBLOCK, BaseAdapter |
| from pip._vendor.requests.adapters import HTTPAdapter as _BaseHTTPAdapter |
| from pip._vendor.requests.models import PreparedRequest, Response |
| from pip._vendor.requests.structures import CaseInsensitiveDict |
| from pip._vendor.urllib3.connectionpool import ConnectionPool |
| from pip._vendor.urllib3.exceptions import InsecureRequestWarning |
|
|
| from pip import __version__ |
| from pip._internal.metadata import get_default_environment |
| from pip._internal.models.link import Link |
| from pip._internal.network.auth import MultiDomainBasicAuth |
| from pip._internal.network.cache import SafeFileCache |
|
|
| |
| from pip._internal.utils.compat import has_tls |
| from pip._internal.utils.glibc import libc_ver |
| from pip._internal.utils.misc import build_url_from_netloc, parse_netloc |
| from pip._internal.utils.urls import url_to_path |
|
|
| if TYPE_CHECKING: |
| from ssl import SSLContext |
|
|
| from pip._vendor.urllib3.poolmanager import PoolManager |
| from pip._vendor.urllib3.proxymanager import ProxyManager |
|
|
|
|
| logger = logging.getLogger(__name__) |
|
|
| SecureOrigin = tuple[str, str, Optional[Union[int, str]]] |
|
|
|
|
| |
| warnings.filterwarnings("ignore", category=InsecureRequestWarning) |
|
|
|
|
| SECURE_ORIGINS: list[SecureOrigin] = [ |
| |
| |
| ("https", "*", "*"), |
| ("*", "localhost", "*"), |
| ("*", "127.0.0.0/8", "*"), |
| ("*", "::1/128", "*"), |
| ("file", "*", None), |
| |
| ("ssh", "*", "*"), |
| ] |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| CI_ENVIRONMENT_VARIABLES = ( |
| |
| "BUILD_BUILDID", |
| |
| "BUILD_ID", |
| |
| "CI", |
| |
| "PIP_IS_CI", |
| ) |
|
|
|
|
| def looks_like_ci() -> bool: |
| """ |
| Return whether it looks like pip is running under CI. |
| """ |
| |
| |
| |
| return any(name in os.environ for name in CI_ENVIRONMENT_VARIABLES) |
|
|
|
|
| @functools.lru_cache(maxsize=1) |
| def user_agent() -> str: |
| """ |
| Return a string representing the user agent. |
| """ |
| data: dict[str, Any] = { |
| "installer": {"name": "pip", "version": __version__}, |
| "python": platform.python_version(), |
| "implementation": { |
| "name": platform.python_implementation(), |
| }, |
| } |
|
|
| if data["implementation"]["name"] == "CPython": |
| data["implementation"]["version"] = platform.python_version() |
| elif data["implementation"]["name"] == "PyPy": |
| pypy_version_info = sys.pypy_version_info |
| if pypy_version_info.releaselevel == "final": |
| pypy_version_info = pypy_version_info[:3] |
| data["implementation"]["version"] = ".".join( |
| [str(x) for x in pypy_version_info] |
| ) |
| elif data["implementation"]["name"] == "Jython": |
| |
| data["implementation"]["version"] = platform.python_version() |
| elif data["implementation"]["name"] == "IronPython": |
| |
| data["implementation"]["version"] = platform.python_version() |
|
|
| if sys.platform.startswith("linux"): |
| from pip._vendor import distro |
|
|
| linux_distribution = distro.name(), distro.version(), distro.codename() |
| distro_infos: dict[str, Any] = dict( |
| filter( |
| lambda x: x[1], |
| zip(["name", "version", "id"], linux_distribution), |
| ) |
| ) |
| libc = dict( |
| filter( |
| lambda x: x[1], |
| zip(["lib", "version"], libc_ver()), |
| ) |
| ) |
| if libc: |
| distro_infos["libc"] = libc |
| if distro_infos: |
| data["distro"] = distro_infos |
|
|
| if sys.platform.startswith("darwin") and platform.mac_ver()[0]: |
| data["distro"] = {"name": "macOS", "version": platform.mac_ver()[0]} |
|
|
| if platform.system(): |
| data.setdefault("system", {})["name"] = platform.system() |
|
|
| if platform.release(): |
| data.setdefault("system", {})["release"] = platform.release() |
|
|
| if platform.machine(): |
| data["cpu"] = platform.machine() |
|
|
| if has_tls(): |
| import _ssl as ssl |
|
|
| data["openssl_version"] = ssl.OPENSSL_VERSION |
|
|
| setuptools_dist = get_default_environment().get_distribution("setuptools") |
| if setuptools_dist is not None: |
| data["setuptools_version"] = str(setuptools_dist.version) |
|
|
| if shutil.which("rustc") is not None: |
| |
| try: |
| rustc_output = subprocess.check_output( |
| ["rustc", "--version"], stderr=subprocess.STDOUT, timeout=0.5 |
| ) |
| except Exception: |
| pass |
| else: |
| if rustc_output.startswith(b"rustc "): |
| |
| |
| |
| data["rustc_version"] = rustc_output.split(b" ")[1].decode() |
|
|
| |
| |
| |
| |
| data["ci"] = True if looks_like_ci() else None |
|
|
| user_data = os.environ.get("PIP_USER_AGENT_USER_DATA") |
| if user_data is not None: |
| data["user_data"] = user_data |
|
|
| return "{data[installer][name]}/{data[installer][version]} {json}".format( |
| data=data, |
| json=json.dumps(data, separators=(",", ":"), sort_keys=True), |
| ) |
|
|
|
|
| class LocalFSAdapter(BaseAdapter): |
| def send( |
| self, |
| request: PreparedRequest, |
| stream: bool = False, |
| timeout: float | tuple[float, float] | None = None, |
| verify: bool | str = True, |
| cert: str | tuple[str, str] | None = None, |
| proxies: Mapping[str, str] | None = None, |
| ) -> Response: |
| pathname = url_to_path(request.url) |
|
|
| resp = Response() |
| resp.status_code = 200 |
| resp.url = request.url |
|
|
| try: |
| stats = os.stat(pathname) |
| except OSError as exc: |
| |
| |
| resp.status_code = 404 |
| resp.reason = type(exc).__name__ |
| resp.raw = io.BytesIO(f"{resp.reason}: {exc}".encode()) |
| else: |
| modified = email.utils.formatdate(stats.st_mtime, usegmt=True) |
| content_type = mimetypes.guess_type(pathname)[0] or "text/plain" |
| resp.headers = CaseInsensitiveDict( |
| { |
| "Content-Type": content_type, |
| "Content-Length": stats.st_size, |
| "Last-Modified": modified, |
| } |
| ) |
|
|
| resp.raw = open(pathname, "rb") |
| resp.close = resp.raw.close |
|
|
| return resp |
|
|
| def close(self) -> None: |
| pass |
|
|
|
|
| class _SSLContextAdapterMixin: |
| """Mixin to add the ``ssl_context`` constructor argument to HTTP adapters. |
| |
| The additional argument is forwarded directly to the pool manager. This allows us |
| to dynamically decide what SSL store to use at runtime, which is used to implement |
| the optional ``truststore`` backend. |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| ssl_context: SSLContext | None = None, |
| **kwargs: Any, |
| ) -> None: |
| self._ssl_context = ssl_context |
| super().__init__(**kwargs) |
|
|
| def init_poolmanager( |
| self, |
| connections: int, |
| maxsize: int, |
| block: bool = DEFAULT_POOLBLOCK, |
| **pool_kwargs: Any, |
| ) -> PoolManager: |
| if self._ssl_context is not None: |
| pool_kwargs.setdefault("ssl_context", self._ssl_context) |
| return super().init_poolmanager( |
| connections=connections, |
| maxsize=maxsize, |
| block=block, |
| **pool_kwargs, |
| ) |
|
|
| def proxy_manager_for(self, proxy: str, **proxy_kwargs: Any) -> ProxyManager: |
| |
| |
| if self._ssl_context is not None: |
| proxy_kwargs.setdefault("ssl_context", self._ssl_context) |
| return super().proxy_manager_for(proxy, **proxy_kwargs) |
|
|
|
|
| class HTTPAdapter(_SSLContextAdapterMixin, _BaseHTTPAdapter): |
| pass |
|
|
|
|
| class CacheControlAdapter(_SSLContextAdapterMixin, _BaseCacheControlAdapter): |
| pass |
|
|
|
|
| class InsecureHTTPAdapter(HTTPAdapter): |
| def cert_verify( |
| self, |
| conn: ConnectionPool, |
| url: str, |
| verify: bool | str, |
| cert: str | tuple[str, str] | None, |
| ) -> None: |
| super().cert_verify(conn=conn, url=url, verify=False, cert=cert) |
|
|
|
|
| class InsecureCacheControlAdapter(CacheControlAdapter): |
| def cert_verify( |
| self, |
| conn: ConnectionPool, |
| url: str, |
| verify: bool | str, |
| cert: str | tuple[str, str] | None, |
| ) -> None: |
| super().cert_verify(conn=conn, url=url, verify=False, cert=cert) |
|
|
|
|
| class PipSession(requests.Session): |
| timeout: int | None = None |
|
|
| def __init__( |
| self, |
| *args: Any, |
| retries: int = 0, |
| cache: str | None = None, |
| trusted_hosts: Sequence[str] = (), |
| index_urls: list[str] | None = None, |
| ssl_context: SSLContext | None = None, |
| **kwargs: Any, |
| ) -> None: |
| """ |
| :param trusted_hosts: Domains not to emit warnings for when not using |
| HTTPS. |
| """ |
| super().__init__(*args, **kwargs) |
|
|
| |
| |
| self.pip_trusted_origins: list[tuple[str, int | None]] = [] |
| self.pip_proxy = None |
|
|
| |
| self.headers["User-Agent"] = user_agent() |
|
|
| |
| self.auth = MultiDomainBasicAuth(index_urls=index_urls) |
|
|
| |
| |
| retries = urllib3.Retry( |
| |
| |
| total=retries, |
| |
| |
| |
| |
| |
| |
| |
| status_forcelist=[500, 502, 503, 520, 527], |
| |
| |
| backoff_factor=0.25, |
| ) |
|
|
| |
| |
| |
| |
| |
| insecure_adapter = InsecureHTTPAdapter(max_retries=retries) |
|
|
| |
| |
| |
| |
| |
| if cache: |
| secure_adapter = CacheControlAdapter( |
| cache=SafeFileCache(cache), |
| max_retries=retries, |
| ssl_context=ssl_context, |
| ) |
| self._trusted_host_adapter = InsecureCacheControlAdapter( |
| cache=SafeFileCache(cache), |
| max_retries=retries, |
| ) |
| else: |
| secure_adapter = HTTPAdapter(max_retries=retries, ssl_context=ssl_context) |
| self._trusted_host_adapter = insecure_adapter |
|
|
| self.mount("https://", secure_adapter) |
| self.mount("http://", insecure_adapter) |
|
|
| |
| self.mount("file://", LocalFSAdapter()) |
|
|
| for host in trusted_hosts: |
| self.add_trusted_host(host, suppress_logging=True) |
|
|
| def update_index_urls(self, new_index_urls: list[str]) -> None: |
| """ |
| :param new_index_urls: New index urls to update the authentication |
| handler with. |
| """ |
| self.auth.index_urls = new_index_urls |
|
|
| def add_trusted_host( |
| self, host: str, source: str | None = None, suppress_logging: bool = False |
| ) -> None: |
| """ |
| :param host: It is okay to provide a host that has previously been |
| added. |
| :param source: An optional source string, for logging where the host |
| string came from. |
| """ |
| if not suppress_logging: |
| msg = f"adding trusted host: {host!r}" |
| if source is not None: |
| msg += f" (from {source})" |
| logger.info(msg) |
|
|
| parsed_host, parsed_port = parse_netloc(host) |
| if parsed_host is None: |
| raise ValueError(f"Trusted host URL must include a host part: {host!r}") |
| if (parsed_host, parsed_port) not in self.pip_trusted_origins: |
| self.pip_trusted_origins.append((parsed_host, parsed_port)) |
|
|
| self.mount( |
| build_url_from_netloc(host, scheme="http") + "/", self._trusted_host_adapter |
| ) |
| self.mount(build_url_from_netloc(host) + "/", self._trusted_host_adapter) |
| if not parsed_port: |
| self.mount( |
| build_url_from_netloc(host, scheme="http") + ":", |
| self._trusted_host_adapter, |
| ) |
| |
| self.mount(build_url_from_netloc(host) + ":", self._trusted_host_adapter) |
|
|
| def iter_secure_origins(self) -> Generator[SecureOrigin, None, None]: |
| yield from SECURE_ORIGINS |
| for host, port in self.pip_trusted_origins: |
| yield ("*", host, "*" if port is None else port) |
|
|
| def is_secure_origin(self, location: Link) -> bool: |
| |
| parsed = urllib.parse.urlparse(str(location)) |
| origin_protocol, origin_host, origin_port = ( |
| parsed.scheme, |
| parsed.hostname, |
| parsed.port, |
| ) |
|
|
| |
| |
| |
| |
| origin_protocol = origin_protocol.rsplit("+", 1)[-1] |
|
|
| |
| |
| |
| for secure_origin in self.iter_secure_origins(): |
| secure_protocol, secure_host, secure_port = secure_origin |
| if origin_protocol != secure_protocol and secure_protocol != "*": |
| continue |
|
|
| try: |
| addr = ipaddress.ip_address(origin_host or "") |
| network = ipaddress.ip_network(secure_host) |
| except ValueError: |
| |
| |
| if ( |
| origin_host |
| and origin_host.lower() != secure_host.lower() |
| and secure_host != "*" |
| ): |
| continue |
| else: |
| |
| |
| if addr not in network: |
| continue |
|
|
| |
| if ( |
| origin_port != secure_port |
| and secure_port != "*" |
| and secure_port is not None |
| ): |
| continue |
|
|
| |
| |
| return True |
|
|
| |
| |
| |
| logger.warning( |
| "The repository located at %s is not a trusted or secure host and " |
| "is being ignored. If this repository is available via HTTPS we " |
| "recommend you use HTTPS instead, otherwise you may silence " |
| "this warning and allow it anyway with '--trusted-host %s'.", |
| origin_host, |
| origin_host, |
| ) |
|
|
| return False |
|
|
| def request(self, method: str, url: str, *args: Any, **kwargs: Any) -> Response: |
| |
| kwargs.setdefault("timeout", self.timeout) |
| |
| kwargs.setdefault("proxies", self.proxies) |
|
|
| |
| return super().request(method, url, *args, **kwargs) |
|
|