| | """Network Authentication Helpers |
| | |
| | Contains interface (MultiDomainBasicAuth) and associated glue code for |
| | providing credentials in the context of network requests. |
| | """ |
| |
|
| | import logging |
| | import os |
| | import shutil |
| | import subprocess |
| | import sysconfig |
| | import typing |
| | import urllib.parse |
| | from abc import ABC, abstractmethod |
| | from functools import lru_cache |
| | from os.path import commonprefix |
| | from pathlib import Path |
| | from typing import Any, Dict, List, NamedTuple, Optional, Tuple |
| |
|
| | from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth |
| | from pip._vendor.requests.models import Request, Response |
| | from pip._vendor.requests.utils import get_netrc_auth |
| |
|
| | from pip._internal.utils.logging import getLogger |
| | from pip._internal.utils.misc import ( |
| | ask, |
| | ask_input, |
| | ask_password, |
| | remove_auth_from_url, |
| | split_auth_netloc_from_url, |
| | ) |
| | from pip._internal.vcs.versioncontrol import AuthInfo |
| |
|
| | logger = getLogger(__name__) |
| |
|
| | KEYRING_DISABLED = False |
| |
|
| |
|
| | class Credentials(NamedTuple): |
| | url: str |
| | username: str |
| | password: str |
| |
|
| |
|
| | class KeyRingBaseProvider(ABC): |
| | """Keyring base provider interface""" |
| |
|
| | has_keyring: bool |
| |
|
| | @abstractmethod |
| | def get_auth_info( |
| | self, url: str, username: Optional[str] |
| | ) -> Optional[AuthInfo]: ... |
| |
|
| | @abstractmethod |
| | def save_auth_info(self, url: str, username: str, password: str) -> None: ... |
| |
|
| |
|
| | class KeyRingNullProvider(KeyRingBaseProvider): |
| | """Keyring null provider""" |
| |
|
| | has_keyring = False |
| |
|
| | def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: |
| | return None |
| |
|
| | def save_auth_info(self, url: str, username: str, password: str) -> None: |
| | return None |
| |
|
| |
|
| | class KeyRingPythonProvider(KeyRingBaseProvider): |
| | """Keyring interface which uses locally imported `keyring`""" |
| |
|
| | has_keyring = True |
| |
|
| | def __init__(self) -> None: |
| | import keyring |
| |
|
| | self.keyring = keyring |
| |
|
| | def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: |
| | |
| | |
| | |
| | if hasattr(self.keyring, "get_credential"): |
| | logger.debug("Getting credentials from keyring for %s", url) |
| | cred = self.keyring.get_credential(url, username) |
| | if cred is not None: |
| | return cred.username, cred.password |
| | return None |
| |
|
| | if username is not None: |
| | logger.debug("Getting password from keyring for %s", url) |
| | password = self.keyring.get_password(url, username) |
| | if password: |
| | return username, password |
| | return None |
| |
|
| | def save_auth_info(self, url: str, username: str, password: str) -> None: |
| | self.keyring.set_password(url, username, password) |
| |
|
| |
|
| | class KeyRingCliProvider(KeyRingBaseProvider): |
| | """Provider which uses `keyring` cli |
| | |
| | Instead of calling the keyring package installed alongside pip |
| | we call keyring on the command line which will enable pip to |
| | use which ever installation of keyring is available first in |
| | PATH. |
| | """ |
| |
|
| | has_keyring = True |
| |
|
| | def __init__(self, cmd: str) -> None: |
| | self.keyring = cmd |
| |
|
| | def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: |
| | |
| | |
| | if username is not None: |
| | password = self._get_password(url, username) |
| | if password is not None: |
| | return username, password |
| | return None |
| |
|
| | def save_auth_info(self, url: str, username: str, password: str) -> None: |
| | return self._set_password(url, username, password) |
| |
|
| | def _get_password(self, service_name: str, username: str) -> Optional[str]: |
| | """Mirror the implementation of keyring.get_password using cli""" |
| | if self.keyring is None: |
| | return None |
| |
|
| | cmd = [self.keyring, "get", service_name, username] |
| | env = os.environ.copy() |
| | env["PYTHONIOENCODING"] = "utf-8" |
| | res = subprocess.run( |
| | cmd, |
| | stdin=subprocess.DEVNULL, |
| | stdout=subprocess.PIPE, |
| | env=env, |
| | ) |
| | if res.returncode: |
| | return None |
| | return res.stdout.decode("utf-8").strip(os.linesep) |
| |
|
| | def _set_password(self, service_name: str, username: str, password: str) -> None: |
| | """Mirror the implementation of keyring.set_password using cli""" |
| | if self.keyring is None: |
| | return None |
| | env = os.environ.copy() |
| | env["PYTHONIOENCODING"] = "utf-8" |
| | subprocess.run( |
| | [self.keyring, "set", service_name, username], |
| | input=f"{password}{os.linesep}".encode(), |
| | env=env, |
| | check=True, |
| | ) |
| | return None |
| |
|
| |
|
| | @lru_cache(maxsize=None) |
| | def get_keyring_provider(provider: str) -> KeyRingBaseProvider: |
| | logger.verbose("Keyring provider requested: %s", provider) |
| |
|
| | |
| | if KEYRING_DISABLED: |
| | provider = "disabled" |
| | if provider in ["import", "auto"]: |
| | try: |
| | impl = KeyRingPythonProvider() |
| | logger.verbose("Keyring provider set: import") |
| | return impl |
| | except ImportError: |
| | pass |
| | except Exception as exc: |
| | |
| | |
| | msg = "Installed copy of keyring fails with exception %s" |
| | if provider == "auto": |
| | msg = msg + ", trying to find a keyring executable as a fallback" |
| | logger.warning(msg, exc, exc_info=logger.isEnabledFor(logging.DEBUG)) |
| | if provider in ["subprocess", "auto"]: |
| | cli = shutil.which("keyring") |
| | if cli and cli.startswith(sysconfig.get_path("scripts")): |
| | |
| | @typing.no_type_check |
| | def PATH_as_shutil_which_determines_it() -> str: |
| | path = os.environ.get("PATH", None) |
| | if path is None: |
| | try: |
| | path = os.confstr("CS_PATH") |
| | except (AttributeError, ValueError): |
| | |
| | path = os.defpath |
| | |
| | |
| |
|
| | return path |
| |
|
| | scripts = Path(sysconfig.get_path("scripts")) |
| |
|
| | paths = [] |
| | for path in PATH_as_shutil_which_determines_it().split(os.pathsep): |
| | p = Path(path) |
| | try: |
| | if not p.samefile(scripts): |
| | paths.append(path) |
| | except FileNotFoundError: |
| | pass |
| |
|
| | path = os.pathsep.join(paths) |
| |
|
| | cli = shutil.which("keyring", path=path) |
| |
|
| | if cli: |
| | logger.verbose("Keyring provider set: subprocess with executable %s", cli) |
| | return KeyRingCliProvider(cli) |
| |
|
| | logger.verbose("Keyring provider set: disabled") |
| | return KeyRingNullProvider() |
| |
|
| |
|
| | class MultiDomainBasicAuth(AuthBase): |
| | def __init__( |
| | self, |
| | prompting: bool = True, |
| | index_urls: Optional[List[str]] = None, |
| | keyring_provider: str = "auto", |
| | ) -> None: |
| | self.prompting = prompting |
| | self.index_urls = index_urls |
| | self.keyring_provider = keyring_provider |
| | self.passwords: Dict[str, AuthInfo] = {} |
| | |
| | |
| | |
| | |
| | |
| | self._credentials_to_save: Optional[Credentials] = None |
| |
|
| | @property |
| | def keyring_provider(self) -> KeyRingBaseProvider: |
| | return get_keyring_provider(self._keyring_provider) |
| |
|
| | @keyring_provider.setter |
| | def keyring_provider(self, provider: str) -> None: |
| | |
| | |
| | |
| | |
| | self._keyring_provider = provider |
| |
|
| | @property |
| | def use_keyring(self) -> bool: |
| | |
| | |
| | |
| | return self.prompting or self._keyring_provider not in ["auto", "disabled"] |
| |
|
| | def _get_keyring_auth( |
| | self, |
| | url: Optional[str], |
| | username: Optional[str], |
| | ) -> Optional[AuthInfo]: |
| | """Return the tuple auth for a given url from keyring.""" |
| | |
| | if not url: |
| | return None |
| |
|
| | try: |
| | return self.keyring_provider.get_auth_info(url, username) |
| | except Exception as exc: |
| | |
| | |
| | logger.debug("Keyring is skipped due to an exception", exc_info=True) |
| | |
| | logger.warning( |
| | "Keyring is skipped due to an exception: %s", |
| | str(exc), |
| | ) |
| | global KEYRING_DISABLED |
| | KEYRING_DISABLED = True |
| | get_keyring_provider.cache_clear() |
| | return None |
| |
|
| | def _get_index_url(self, url: str) -> Optional[str]: |
| | """Return the original index URL matching the requested URL. |
| | |
| | Cached or dynamically generated credentials may work against |
| | the original index URL rather than just the netloc. |
| | |
| | The provided url should have had its username and password |
| | removed already. If the original index url had credentials then |
| | they will be included in the return value. |
| | |
| | Returns None if no matching index was found, or if --no-index |
| | was specified by the user. |
| | """ |
| | if not url or not self.index_urls: |
| | return None |
| |
|
| | url = remove_auth_from_url(url).rstrip("/") + "/" |
| | parsed_url = urllib.parse.urlsplit(url) |
| |
|
| | candidates = [] |
| |
|
| | for index in self.index_urls: |
| | index = index.rstrip("/") + "/" |
| | parsed_index = urllib.parse.urlsplit(remove_auth_from_url(index)) |
| | if parsed_url == parsed_index: |
| | return index |
| |
|
| | if parsed_url.netloc != parsed_index.netloc: |
| | continue |
| |
|
| | candidate = urllib.parse.urlsplit(index) |
| | candidates.append(candidate) |
| |
|
| | if not candidates: |
| | return None |
| |
|
| | candidates.sort( |
| | reverse=True, |
| | key=lambda candidate: commonprefix( |
| | [ |
| | parsed_url.path, |
| | candidate.path, |
| | ] |
| | ).rfind("/"), |
| | ) |
| |
|
| | return urllib.parse.urlunsplit(candidates[0]) |
| |
|
| | def _get_new_credentials( |
| | self, |
| | original_url: str, |
| | *, |
| | allow_netrc: bool = True, |
| | allow_keyring: bool = False, |
| | ) -> AuthInfo: |
| | """Find and return credentials for the specified URL.""" |
| | |
| | url, netloc, url_user_password = split_auth_netloc_from_url( |
| | original_url, |
| | ) |
| |
|
| | |
| | username, password = url_user_password |
| | if username is not None and password is not None: |
| | logger.debug("Found credentials in url for %s", netloc) |
| | return url_user_password |
| |
|
| | |
| | index_url = self._get_index_url(url) |
| | if index_url: |
| | |
| | index_info = split_auth_netloc_from_url(index_url) |
| | if index_info: |
| | index_url, _, index_url_user_password = index_info |
| | logger.debug("Found index url %s", index_url) |
| |
|
| | |
| | if index_url and index_url_user_password[0] is not None: |
| | username, password = index_url_user_password |
| | if username is not None and password is not None: |
| | logger.debug("Found credentials in index url for %s", netloc) |
| | return index_url_user_password |
| |
|
| | |
| | if allow_netrc: |
| | netrc_auth = get_netrc_auth(original_url) |
| | if netrc_auth: |
| | logger.debug("Found credentials in netrc for %s", netloc) |
| | return netrc_auth |
| |
|
| | |
| | if allow_keyring: |
| | |
| | |
| | kr_auth = ( |
| | self._get_keyring_auth(index_url, username) or |
| | self._get_keyring_auth(netloc, username) |
| | ) |
| | |
| | if kr_auth: |
| | logger.debug("Found credentials in keyring for %s", netloc) |
| | return kr_auth |
| |
|
| | return username, password |
| |
|
| | def _get_url_and_credentials( |
| | self, original_url: str |
| | ) -> Tuple[str, Optional[str], Optional[str]]: |
| | """Return the credentials to use for the provided URL. |
| | |
| | If allowed, netrc and keyring may be used to obtain the |
| | correct credentials. |
| | |
| | Returns (url_without_credentials, username, password). Note |
| | that even if the original URL contains credentials, this |
| | function may return a different username and password. |
| | """ |
| | url, netloc, _ = split_auth_netloc_from_url(original_url) |
| |
|
| | |
| | username, password = self._get_new_credentials(original_url) |
| |
|
| | |
| | |
| | |
| | |
| | if (username is None or password is None) and netloc in self.passwords: |
| | un, pw = self.passwords[netloc] |
| | |
| | |
| | if username is None or username == un: |
| | username, password = un, pw |
| |
|
| | if username is not None or password is not None: |
| | |
| | |
| | |
| | |
| | username = username or "" |
| | password = password or "" |
| |
|
| | |
| | self.passwords[netloc] = (username, password) |
| |
|
| | assert ( |
| | |
| | (username is not None and password is not None) |
| | |
| | or (username is None and password is None) |
| | ), f"Could not load credentials from url: {original_url}" |
| |
|
| | return url, username, password |
| |
|
| | def __call__(self, req: Request) -> Request: |
| | |
| | url, username, password = self._get_url_and_credentials(req.url) |
| |
|
| | |
| | req.url = url |
| |
|
| | if username is not None and password is not None: |
| | |
| | req = HTTPBasicAuth(username, password)(req) |
| |
|
| | |
| | req.register_hook("response", self.handle_401) |
| |
|
| | return req |
| |
|
| | |
| | def _prompt_for_password( |
| | self, netloc: str |
| | ) -> Tuple[Optional[str], Optional[str], bool]: |
| | username = ask_input(f"User for {netloc}: ") if self.prompting else None |
| | if not username: |
| | return None, None, False |
| | if self.use_keyring: |
| | auth = self._get_keyring_auth(netloc, username) |
| | if auth and auth[0] is not None and auth[1] is not None: |
| | return auth[0], auth[1], False |
| | password = ask_password("Password: ") |
| | return username, password, True |
| |
|
| | |
| | def _should_save_password_to_keyring(self) -> bool: |
| | if ( |
| | not self.prompting |
| | or not self.use_keyring |
| | or not self.keyring_provider.has_keyring |
| | ): |
| | return False |
| | return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" |
| |
|
| | def handle_401(self, resp: Response, **kwargs: Any) -> Response: |
| | |
| | |
| | if resp.status_code != 401: |
| | return resp |
| |
|
| | username, password = None, None |
| |
|
| | |
| | if self.use_keyring: |
| | username, password = self._get_new_credentials( |
| | resp.url, |
| | allow_netrc=False, |
| | allow_keyring=True, |
| | ) |
| |
|
| | |
| | if not self.prompting and not username and not password: |
| | return resp |
| |
|
| | parsed = urllib.parse.urlparse(resp.url) |
| |
|
| | |
| | save = False |
| | if not username and not password: |
| | username, password, save = self._prompt_for_password(parsed.netloc) |
| |
|
| | |
| | self._credentials_to_save = None |
| | if username is not None and password is not None: |
| | self.passwords[parsed.netloc] = (username, password) |
| |
|
| | |
| | if save and self._should_save_password_to_keyring(): |
| | self._credentials_to_save = Credentials( |
| | url=parsed.netloc, |
| | username=username, |
| | password=password, |
| | ) |
| |
|
| | |
| | |
| | |
| | |
| | _ = resp.content |
| | resp.raw.release_conn() |
| |
|
| | |
| | req = HTTPBasicAuth(username or "", password or "")(resp.request) |
| | req.register_hook("response", self.warn_on_401) |
| |
|
| | |
| | |
| | |
| | if self._credentials_to_save: |
| | req.register_hook("response", self.save_credentials) |
| |
|
| | |
| | new_resp = resp.connection.send(req, **kwargs) |
| | new_resp.history.append(resp) |
| |
|
| | return new_resp |
| |
|
| | def warn_on_401(self, resp: Response, **kwargs: Any) -> None: |
| | """Response callback to warn about incorrect credentials.""" |
| | if resp.status_code == 401: |
| | logger.warning( |
| | "401 Error, Credentials not correct for %s", |
| | resp.request.url, |
| | ) |
| |
|
| | def save_credentials(self, resp: Response, **kwargs: Any) -> None: |
| | """Response callback to save credentials on success.""" |
| | assert ( |
| | self.keyring_provider.has_keyring |
| | ), "should never reach here without keyring" |
| |
|
| | creds = self._credentials_to_save |
| | self._credentials_to_save = None |
| | if creds and resp.status_code < 400: |
| | try: |
| | logger.info("Saving credentials to keyring") |
| | self.keyring_provider.save_auth_info( |
| | creds.url, creds.username, creds.password |
| | ) |
| | except Exception: |
| | logger.exception("Failed to save credentials") |
| |
|