| | """Network Authentication Helpers |
| | |
| | Contains interface (MultiDomainBasicAuth) and associated glue code for |
| | providing credentials in the context of network requests. |
| | """ |
| |
|
| | import os |
| | import shutil |
| | import subprocess |
| | import urllib.parse |
| | from abc import ABC, abstractmethod |
| | from typing import Any, Dict, List, NamedTuple, Optional, Tuple |
| |
|
| | from pip._vendor.requests.auth import AuthBase, HTTPBasicAuth |
| | from pip._vendor.requests.models import Request, Response |
| | from pip._vendor.requests.utils import get_netrc_auth |
| |
|
| | from pip._internal.utils.logging import getLogger |
| | from pip._internal.utils.misc import ( |
| | ask, |
| | ask_input, |
| | ask_password, |
| | remove_auth_from_url, |
| | split_auth_netloc_from_url, |
| | ) |
| | from pip._internal.vcs.versioncontrol import AuthInfo |
| |
|
| | logger = getLogger(__name__) |
| |
|
| | KEYRING_DISABLED = False |
| |
|
| |
|
| | class Credentials(NamedTuple): |
| | url: str |
| | username: str |
| | password: str |
| |
|
| |
|
| | class KeyRingBaseProvider(ABC): |
| | """Keyring base provider interface""" |
| |
|
| | @abstractmethod |
| | def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: |
| | ... |
| |
|
| | @abstractmethod |
| | def save_auth_info(self, url: str, username: str, password: str) -> None: |
| | ... |
| |
|
| |
|
| | class KeyRingNullProvider(KeyRingBaseProvider): |
| | """Keyring null provider""" |
| |
|
| | def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: |
| | return None |
| |
|
| | def save_auth_info(self, url: str, username: str, password: str) -> None: |
| | return None |
| |
|
| |
|
| | class KeyRingPythonProvider(KeyRingBaseProvider): |
| | """Keyring interface which uses locally imported `keyring`""" |
| |
|
| | def __init__(self) -> None: |
| | import keyring |
| |
|
| | self.keyring = keyring |
| |
|
| | def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: |
| | |
| | |
| | |
| | if hasattr(self.keyring, "get_credential"): |
| | logger.debug("Getting credentials from keyring for %s", url) |
| | cred = self.keyring.get_credential(url, username) |
| | if cred is not None: |
| | return cred.username, cred.password |
| | return None |
| |
|
| | if username is not None: |
| | logger.debug("Getting password from keyring for %s", url) |
| | password = self.keyring.get_password(url, username) |
| | if password: |
| | return username, password |
| | return None |
| |
|
| | def save_auth_info(self, url: str, username: str, password: str) -> None: |
| | self.keyring.set_password(url, username, password) |
| |
|
| |
|
| | class KeyRingCliProvider(KeyRingBaseProvider): |
| | """Provider which uses `keyring` cli |
| | |
| | Instead of calling the keyring package installed alongside pip |
| | we call keyring on the command line which will enable pip to |
| | use which ever installation of keyring is available first in |
| | PATH. |
| | """ |
| |
|
| | def __init__(self, cmd: str) -> None: |
| | self.keyring = cmd |
| |
|
| | def get_auth_info(self, url: str, username: Optional[str]) -> Optional[AuthInfo]: |
| | |
| | |
| | if username is not None: |
| | password = self._get_password(url, username) |
| | if password is not None: |
| | return username, password |
| | return None |
| |
|
| | def save_auth_info(self, url: str, username: str, password: str) -> None: |
| | return self._set_password(url, username, password) |
| |
|
| | def _get_password(self, service_name: str, username: str) -> Optional[str]: |
| | """Mirror the implementation of keyring.get_password using cli""" |
| | if self.keyring is None: |
| | return None |
| |
|
| | cmd = [self.keyring, "get", service_name, username] |
| | env = os.environ.copy() |
| | env["PYTHONIOENCODING"] = "utf-8" |
| | res = subprocess.run( |
| | cmd, |
| | stdin=subprocess.DEVNULL, |
| | capture_output=True, |
| | env=env, |
| | ) |
| | if res.returncode: |
| | return None |
| | return res.stdout.decode("utf-8").strip(os.linesep) |
| |
|
| | def _set_password(self, service_name: str, username: str, password: str) -> None: |
| | """Mirror the implementation of keyring.set_password using cli""" |
| | if self.keyring is None: |
| | return None |
| |
|
| | cmd = [self.keyring, "set", service_name, username] |
| | input_ = (password + os.linesep).encode("utf-8") |
| | env = os.environ.copy() |
| | env["PYTHONIOENCODING"] = "utf-8" |
| | res = subprocess.run(cmd, input=input_, env=env) |
| | res.check_returncode() |
| | return None |
| |
|
| |
|
| | def get_keyring_provider() -> KeyRingBaseProvider: |
| | |
| | if not KEYRING_DISABLED: |
| | |
| | try: |
| | return KeyRingPythonProvider() |
| | except ImportError: |
| | pass |
| | except Exception as exc: |
| | |
| | |
| | logger.warning( |
| | "Installed copy of keyring fails with exception %s, " |
| | "trying to find a keyring executable as a fallback", |
| | str(exc), |
| | ) |
| |
|
| | |
| | cli = shutil.which("keyring") |
| | if cli: |
| | return KeyRingCliProvider(cli) |
| |
|
| | return KeyRingNullProvider() |
| |
|
| |
|
| | def get_keyring_auth(url: Optional[str], username: Optional[str]) -> Optional[AuthInfo]: |
| | """Return the tuple auth for a given url from keyring.""" |
| | |
| | if not url: |
| | return None |
| |
|
| | keyring = get_keyring_provider() |
| | try: |
| | return keyring.get_auth_info(url, username) |
| | except Exception as exc: |
| | logger.warning( |
| | "Keyring is skipped due to an exception: %s", |
| | str(exc), |
| | ) |
| | global KEYRING_DISABLED |
| | KEYRING_DISABLED = True |
| | return None |
| |
|
| |
|
| | class MultiDomainBasicAuth(AuthBase): |
| | def __init__( |
| | self, prompting: bool = True, index_urls: Optional[List[str]] = None |
| | ) -> None: |
| | self.prompting = prompting |
| | self.index_urls = index_urls |
| | self.passwords: Dict[str, AuthInfo] = {} |
| | |
| | |
| | |
| | |
| | |
| | self._credentials_to_save: Optional[Credentials] = None |
| |
|
| | def _get_index_url(self, url: str) -> Optional[str]: |
| | """Return the original index URL matching the requested URL. |
| | |
| | Cached or dynamically generated credentials may work against |
| | the original index URL rather than just the netloc. |
| | |
| | The provided url should have had its username and password |
| | removed already. If the original index url had credentials then |
| | they will be included in the return value. |
| | |
| | Returns None if no matching index was found, or if --no-index |
| | was specified by the user. |
| | """ |
| | if not url or not self.index_urls: |
| | return None |
| |
|
| | for u in self.index_urls: |
| | prefix = remove_auth_from_url(u).rstrip("/") + "/" |
| | if url.startswith(prefix): |
| | return u |
| | return None |
| |
|
| | def _get_new_credentials( |
| | self, |
| | original_url: str, |
| | allow_netrc: bool = True, |
| | allow_keyring: bool = False, |
| | ) -> AuthInfo: |
| | """Find and return credentials for the specified URL.""" |
| | |
| | url, netloc, url_user_password = split_auth_netloc_from_url( |
| | original_url, |
| | ) |
| |
|
| | |
| | username, password = url_user_password |
| | if username is not None and password is not None: |
| | logger.debug("Found credentials in url for %s", netloc) |
| | return url_user_password |
| |
|
| | |
| | index_url = self._get_index_url(url) |
| | if index_url: |
| | |
| | index_info = split_auth_netloc_from_url(index_url) |
| | if index_info: |
| | index_url, _, index_url_user_password = index_info |
| | logger.debug("Found index url %s", index_url) |
| |
|
| | |
| | if index_url and index_url_user_password[0] is not None: |
| | username, password = index_url_user_password |
| | if username is not None and password is not None: |
| | logger.debug("Found credentials in index url for %s", netloc) |
| | return index_url_user_password |
| |
|
| | |
| | if allow_netrc: |
| | netrc_auth = get_netrc_auth(original_url) |
| | if netrc_auth: |
| | logger.debug("Found credentials in netrc for %s", netloc) |
| | return netrc_auth |
| |
|
| | |
| | if allow_keyring: |
| | |
| | |
| | kr_auth = ( |
| | get_keyring_auth(index_url, username) or |
| | get_keyring_auth(netloc, username) |
| | ) |
| | |
| | if kr_auth: |
| | logger.debug("Found credentials in keyring for %s", netloc) |
| | return kr_auth |
| |
|
| | return username, password |
| |
|
| | def _get_url_and_credentials( |
| | self, original_url: str |
| | ) -> Tuple[str, Optional[str], Optional[str]]: |
| | """Return the credentials to use for the provided URL. |
| | |
| | If allowed, netrc and keyring may be used to obtain the |
| | correct credentials. |
| | |
| | Returns (url_without_credentials, username, password). Note |
| | that even if the original URL contains credentials, this |
| | function may return a different username and password. |
| | """ |
| | url, netloc, _ = split_auth_netloc_from_url(original_url) |
| |
|
| | |
| | username, password = self._get_new_credentials(original_url) |
| |
|
| | |
| | |
| | |
| | |
| | if (username is None or password is None) and netloc in self.passwords: |
| | un, pw = self.passwords[netloc] |
| | |
| | |
| | if username is None or username == un: |
| | username, password = un, pw |
| |
|
| | if username is not None or password is not None: |
| | |
| | |
| | |
| | |
| | username = username or "" |
| | password = password or "" |
| |
|
| | |
| | self.passwords[netloc] = (username, password) |
| |
|
| | assert ( |
| | |
| | (username is not None and password is not None) |
| | |
| | or (username is None and password is None) |
| | ), f"Could not load credentials from url: {original_url}" |
| |
|
| | return url, username, password |
| |
|
| | def __call__(self, req: Request) -> Request: |
| | |
| | url, username, password = self._get_url_and_credentials(req.url) |
| |
|
| | |
| | req.url = url |
| |
|
| | if username is not None and password is not None: |
| | |
| | req = HTTPBasicAuth(username, password)(req) |
| |
|
| | |
| | req.register_hook("response", self.handle_401) |
| |
|
| | return req |
| |
|
| | |
| | def _prompt_for_password( |
| | self, netloc: str |
| | ) -> Tuple[Optional[str], Optional[str], bool]: |
| | username = ask_input(f"User for {netloc}: ") |
| | if not username: |
| | return None, None, False |
| | auth = get_keyring_auth(netloc, username) |
| | if auth and auth[0] is not None and auth[1] is not None: |
| | return auth[0], auth[1], False |
| | password = ask_password("Password: ") |
| | return username, password, True |
| |
|
| | |
| | def _should_save_password_to_keyring(self) -> bool: |
| | if get_keyring_provider() is None: |
| | return False |
| | return ask("Save credentials to keyring [y/N]: ", ["y", "n"]) == "y" |
| |
|
| | def handle_401(self, resp: Response, **kwargs: Any) -> Response: |
| | |
| | |
| | if resp.status_code != 401: |
| | return resp |
| |
|
| | |
| | if not self.prompting: |
| | return resp |
| |
|
| | parsed = urllib.parse.urlparse(resp.url) |
| |
|
| | |
| | username, password = self._get_new_credentials( |
| | resp.url, |
| | allow_netrc=False, |
| | allow_keyring=True, |
| | ) |
| |
|
| | |
| | save = False |
| | if not username and not password: |
| | username, password, save = self._prompt_for_password(parsed.netloc) |
| |
|
| | |
| | self._credentials_to_save = None |
| | if username is not None and password is not None: |
| | self.passwords[parsed.netloc] = (username, password) |
| |
|
| | |
| | if save and self._should_save_password_to_keyring(): |
| | self._credentials_to_save = Credentials( |
| | url=parsed.netloc, |
| | username=username, |
| | password=password, |
| | ) |
| |
|
| | |
| | |
| | resp.content |
| | resp.raw.release_conn() |
| |
|
| | |
| | req = HTTPBasicAuth(username or "", password or "")(resp.request) |
| | req.register_hook("response", self.warn_on_401) |
| |
|
| | |
| | |
| | |
| | if self._credentials_to_save: |
| | req.register_hook("response", self.save_credentials) |
| |
|
| | |
| | new_resp = resp.connection.send(req, **kwargs) |
| | new_resp.history.append(resp) |
| |
|
| | return new_resp |
| |
|
| | def warn_on_401(self, resp: Response, **kwargs: Any) -> None: |
| | """Response callback to warn about incorrect credentials.""" |
| | if resp.status_code == 401: |
| | logger.warning( |
| | "401 Error, Credentials not correct for %s", |
| | resp.request.url, |
| | ) |
| |
|
| | def save_credentials(self, resp: Response, **kwargs: Any) -> None: |
| | """Response callback to save credentials on success.""" |
| | keyring = get_keyring_provider() |
| | assert not isinstance( |
| | keyring, KeyRingNullProvider |
| | ), "should never reach here without keyring" |
| |
|
| | creds = self._credentials_to_save |
| | self._credentials_to_save = None |
| | if creds and resp.status_code < 400: |
| | try: |
| | logger.info("Saving credentials to keyring") |
| | keyring.save_auth_info(creds.url, creds.username, creds.password) |
| | except Exception: |
| | logger.exception("Failed to save credentials") |
| |
|