| | import configparser |
| | import logging |
| | import os |
| | from typing import List, Optional, Tuple |
| |
|
| | from pip._internal.exceptions import BadCommand, InstallationError |
| | from pip._internal.utils.misc import HiddenText, display_path |
| | from pip._internal.utils.subprocess import make_command |
| | from pip._internal.utils.urls import path_to_url |
| | from pip._internal.vcs.versioncontrol import ( |
| | RevOptions, |
| | VersionControl, |
| | find_path_to_project_root_from_repo_root, |
| | vcs, |
| | ) |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class Mercurial(VersionControl): |
| | name = "hg" |
| | dirname = ".hg" |
| | repo_name = "clone" |
| | schemes = ( |
| | "hg+file", |
| | "hg+http", |
| | "hg+https", |
| | "hg+ssh", |
| | "hg+static-http", |
| | ) |
| |
|
| | @staticmethod |
| | def get_base_rev_args(rev: str) -> List[str]: |
| | return [rev] |
| |
|
| | def fetch_new( |
| | self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int |
| | ) -> None: |
| | rev_display = rev_options.to_display() |
| | logger.info( |
| | "Cloning hg %s%s to %s", |
| | url, |
| | rev_display, |
| | display_path(dest), |
| | ) |
| | if verbosity <= 0: |
| | flags: Tuple[str, ...] = ("--quiet",) |
| | elif verbosity == 1: |
| | flags = () |
| | elif verbosity == 2: |
| | flags = ("--verbose",) |
| | else: |
| | flags = ("--verbose", "--debug") |
| | self.run_command(make_command("clone", "--noupdate", *flags, url, dest)) |
| | self.run_command( |
| | make_command("update", *flags, rev_options.to_args()), |
| | cwd=dest, |
| | ) |
| |
|
| | def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: |
| | repo_config = os.path.join(dest, self.dirname, "hgrc") |
| | config = configparser.RawConfigParser() |
| | try: |
| | config.read(repo_config) |
| | config.set("paths", "default", url.secret) |
| | with open(repo_config, "w") as config_file: |
| | config.write(config_file) |
| | except (OSError, configparser.NoSectionError) as exc: |
| | logger.warning("Could not switch Mercurial repository to %s: %s", url, exc) |
| | else: |
| | cmd_args = make_command("update", "-q", rev_options.to_args()) |
| | self.run_command(cmd_args, cwd=dest) |
| |
|
| | def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: |
| | self.run_command(["pull", "-q"], cwd=dest) |
| | cmd_args = make_command("update", "-q", rev_options.to_args()) |
| | self.run_command(cmd_args, cwd=dest) |
| |
|
| | @classmethod |
| | def get_remote_url(cls, location: str) -> str: |
| | url = cls.run_command( |
| | ["showconfig", "paths.default"], |
| | show_stdout=False, |
| | stdout_only=True, |
| | cwd=location, |
| | ).strip() |
| | if cls._is_local_repository(url): |
| | url = path_to_url(url) |
| | return url.strip() |
| |
|
| | @classmethod |
| | def get_revision(cls, location: str) -> str: |
| | """ |
| | Return the repository-local changeset revision number, as an integer. |
| | """ |
| | current_revision = cls.run_command( |
| | ["parents", "--template={rev}"], |
| | show_stdout=False, |
| | stdout_only=True, |
| | cwd=location, |
| | ).strip() |
| | return current_revision |
| |
|
| | @classmethod |
| | def get_requirement_revision(cls, location: str) -> str: |
| | """ |
| | Return the changeset identification hash, as a 40-character |
| | hexadecimal string |
| | """ |
| | current_rev_hash = cls.run_command( |
| | ["parents", "--template={node}"], |
| | show_stdout=False, |
| | stdout_only=True, |
| | cwd=location, |
| | ).strip() |
| | return current_rev_hash |
| |
|
| | @classmethod |
| | def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: |
| | """Always assume the versions don't match""" |
| | return False |
| |
|
| | @classmethod |
| | def get_subdirectory(cls, location: str) -> Optional[str]: |
| | """ |
| | Return the path to Python project root, relative to the repo root. |
| | Return None if the project root is in the repo root. |
| | """ |
| | |
| | repo_root = cls.run_command( |
| | ["root"], show_stdout=False, stdout_only=True, cwd=location |
| | ).strip() |
| | if not os.path.isabs(repo_root): |
| | repo_root = os.path.abspath(os.path.join(location, repo_root)) |
| | return find_path_to_project_root_from_repo_root(location, repo_root) |
| |
|
| | @classmethod |
| | def get_repository_root(cls, location: str) -> Optional[str]: |
| | loc = super().get_repository_root(location) |
| | if loc: |
| | return loc |
| | try: |
| | r = cls.run_command( |
| | ["root"], |
| | cwd=location, |
| | show_stdout=False, |
| | stdout_only=True, |
| | on_returncode="raise", |
| | log_failed_cmd=False, |
| | ) |
| | except BadCommand: |
| | logger.debug( |
| | "could not determine if %s is under hg control " |
| | "because hg is not available", |
| | location, |
| | ) |
| | return None |
| | except InstallationError: |
| | return None |
| | return os.path.normpath(r.rstrip("\r\n")) |
| |
|
| |
|
| | vcs.register(Mercurial) |
| |
|