| | """Base Command class, and related routines""" |
| |
|
| | import functools |
| | import logging |
| | import logging.config |
| | import optparse |
| | import os |
| | import sys |
| | import traceback |
| | from optparse import Values |
| | from typing import Any, Callable, List, Optional, Tuple |
| |
|
| | from pip._internal.cli import cmdoptions |
| | from pip._internal.cli.command_context import CommandContextMixIn |
| | from pip._internal.cli.parser import ConfigOptionParser, UpdatingDefaultsHelpFormatter |
| | from pip._internal.cli.status_codes import ( |
| | ERROR, |
| | PREVIOUS_BUILD_DIR_ERROR, |
| | UNKNOWN_ERROR, |
| | VIRTUALENV_NOT_FOUND, |
| | ) |
| | from pip._internal.exceptions import ( |
| | BadCommand, |
| | CommandError, |
| | DiagnosticPipError, |
| | InstallationError, |
| | NetworkConnectionError, |
| | PreviousBuildDirError, |
| | UninstallationError, |
| | ) |
| | from pip._internal.utils.filesystem import check_path_owner |
| | from pip._internal.utils.logging import BrokenStdoutLoggingError, setup_logging |
| | from pip._internal.utils.misc import get_prog, normalize_path |
| | from pip._internal.utils.temp_dir import TempDirectoryTypeRegistry as TempDirRegistry |
| | from pip._internal.utils.temp_dir import global_tempdir_manager, tempdir_registry |
| | from pip._internal.utils.virtualenv import running_under_virtualenv |
| |
|
| | __all__ = ["Command"] |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class Command(CommandContextMixIn): |
| | usage: str = "" |
| | ignore_require_venv: bool = False |
| |
|
| | def __init__(self, name: str, summary: str, isolated: bool = False) -> None: |
| | super().__init__() |
| |
|
| | self.name = name |
| | self.summary = summary |
| | self.parser = ConfigOptionParser( |
| | usage=self.usage, |
| | prog=f"{get_prog()} {name}", |
| | formatter=UpdatingDefaultsHelpFormatter(), |
| | add_help_option=False, |
| | name=name, |
| | description=self.__doc__, |
| | isolated=isolated, |
| | ) |
| |
|
| | self.tempdir_registry: Optional[TempDirRegistry] = None |
| |
|
| | |
| | optgroup_name = f"{self.name.capitalize()} Options" |
| | self.cmd_opts = optparse.OptionGroup(self.parser, optgroup_name) |
| |
|
| | |
| | gen_opts = cmdoptions.make_option_group( |
| | cmdoptions.general_group, |
| | self.parser, |
| | ) |
| | self.parser.add_option_group(gen_opts) |
| |
|
| | self.add_options() |
| |
|
| | def add_options(self) -> None: |
| | pass |
| |
|
| | def handle_pip_version_check(self, options: Values) -> None: |
| | """ |
| | This is a no-op so that commands by default do not do the pip version |
| | check. |
| | """ |
| | |
| | |
| | assert not hasattr(options, "no_index") |
| |
|
| | def run(self, options: Values, args: List[str]) -> int: |
| | raise NotImplementedError |
| |
|
| | def parse_args(self, args: List[str]) -> Tuple[Values, List[str]]: |
| | |
| | return self.parser.parse_args(args) |
| |
|
| | def main(self, args: List[str]) -> int: |
| | try: |
| | with self.main_context(): |
| | return self._main(args) |
| | finally: |
| | logging.shutdown() |
| |
|
| | def _main(self, args: List[str]) -> int: |
| | |
| | |
| | |
| | self.tempdir_registry = self.enter_context(tempdir_registry()) |
| | |
| | |
| | self.enter_context(global_tempdir_manager()) |
| |
|
| | options, args = self.parse_args(args) |
| |
|
| | |
| | self.verbosity = options.verbose - options.quiet |
| |
|
| | level_number = setup_logging( |
| | verbosity=self.verbosity, |
| | no_color=options.no_color, |
| | user_log_file=options.log, |
| | ) |
| |
|
| | |
| | |
| | |
| |
|
| | if options.no_input: |
| | os.environ["PIP_NO_INPUT"] = "1" |
| |
|
| | if options.exists_action: |
| | os.environ["PIP_EXISTS_ACTION"] = " ".join(options.exists_action) |
| |
|
| | if options.require_venv and not self.ignore_require_venv: |
| | |
| | if not running_under_virtualenv(): |
| | logger.critical("Could not find an activated virtualenv (required).") |
| | sys.exit(VIRTUALENV_NOT_FOUND) |
| |
|
| | if options.cache_dir: |
| | options.cache_dir = normalize_path(options.cache_dir) |
| | if not check_path_owner(options.cache_dir): |
| | logger.warning( |
| | "The directory '%s' or its parent directory is not owned " |
| | "or is not writable by the current user. The cache " |
| | "has been disabled. Check the permissions and owner of " |
| | "that directory. If executing pip with sudo, you should " |
| | "use sudo's -H flag.", |
| | options.cache_dir, |
| | ) |
| | options.cache_dir = None |
| |
|
| | if "2020-resolver" in options.features_enabled: |
| | logger.warning( |
| | "--use-feature=2020-resolver no longer has any effect, " |
| | "since it is now the default dependency resolver in pip. " |
| | "This will become an error in pip 21.0." |
| | ) |
| |
|
| | def intercepts_unhandled_exc( |
| | run_func: Callable[..., int] |
| | ) -> Callable[..., int]: |
| | @functools.wraps(run_func) |
| | def exc_logging_wrapper(*args: Any) -> int: |
| | try: |
| | status = run_func(*args) |
| | assert isinstance(status, int) |
| | return status |
| | except DiagnosticPipError as exc: |
| | logger.error("[present-diagnostic] %s", exc) |
| | logger.debug("Exception information:", exc_info=True) |
| |
|
| | return ERROR |
| | except PreviousBuildDirError as exc: |
| | logger.critical(str(exc)) |
| | logger.debug("Exception information:", exc_info=True) |
| |
|
| | return PREVIOUS_BUILD_DIR_ERROR |
| | except ( |
| | InstallationError, |
| | UninstallationError, |
| | BadCommand, |
| | NetworkConnectionError, |
| | ) as exc: |
| | logger.critical(str(exc)) |
| | logger.debug("Exception information:", exc_info=True) |
| |
|
| | return ERROR |
| | except CommandError as exc: |
| | logger.critical("%s", exc) |
| | logger.debug("Exception information:", exc_info=True) |
| |
|
| | return ERROR |
| | except BrokenStdoutLoggingError: |
| | |
| | |
| | print("ERROR: Pipe to stdout was broken", file=sys.stderr) |
| | if level_number <= logging.DEBUG: |
| | traceback.print_exc(file=sys.stderr) |
| |
|
| | return ERROR |
| | except KeyboardInterrupt: |
| | logger.critical("Operation cancelled by user") |
| | logger.debug("Exception information:", exc_info=True) |
| |
|
| | return ERROR |
| | except BaseException: |
| | logger.critical("Exception:", exc_info=True) |
| |
|
| | return UNKNOWN_ERROR |
| |
|
| | return exc_logging_wrapper |
| |
|
| | try: |
| | if not options.debug_mode: |
| | run = intercepts_unhandled_exc(self.run) |
| | else: |
| | run = self.run |
| | return run(options, args) |
| | finally: |
| | self.handle_pip_version_check(options) |
| |
|