Spaces:
Runtime error
Runtime error
| import logging | |
| import os.path | |
| import pathlib | |
| import re | |
| import urllib.parse | |
| import urllib.request | |
| from typing import List, Optional, Tuple | |
| from pip._internal.exceptions import BadCommand, InstallationError | |
| from pip._internal.utils.misc import HiddenText, display_path, hide_url | |
| from pip._internal.utils.subprocess import make_command | |
| from pip._internal.vcs.versioncontrol import ( | |
| AuthInfo, | |
| RemoteNotFoundError, | |
| RemoteNotValidError, | |
| RevOptions, | |
| VersionControl, | |
| find_path_to_project_root_from_repo_root, | |
| vcs, | |
| ) | |
| urlsplit = urllib.parse.urlsplit | |
| urlunsplit = urllib.parse.urlunsplit | |
| logger = logging.getLogger(__name__) | |
| GIT_VERSION_REGEX = re.compile( | |
| r"^git version " # Prefix. | |
| r"(\d+)" # Major. | |
| r"\.(\d+)" # Dot, minor. | |
| r"(?:\.(\d+))?" # Optional dot, patch. | |
| r".*$" # Suffix, including any pre- and post-release segments we don't care about. | |
| ) | |
| HASH_REGEX = re.compile("^[a-fA-F0-9]{40}$") | |
| # SCP (Secure copy protocol) shorthand. e.g. 'git@example.com:foo/bar.git' | |
| SCP_REGEX = re.compile( | |
| r"""^ | |
| # Optional user, e.g. 'git@' | |
| (\w+@)? | |
| # Server, e.g. 'github.com'. | |
| ([^/:]+): | |
| # The server-side path. e.g. 'user/project.git'. Must start with an | |
| # alphanumeric character so as not to be confusable with a Windows paths | |
| # like 'C:/foo/bar' or 'C:\foo\bar'. | |
| (\w[^:]*) | |
| $""", | |
| re.VERBOSE, | |
| ) | |
| def looks_like_hash(sha: str) -> bool: | |
| return bool(HASH_REGEX.match(sha)) | |
| class Git(VersionControl): | |
| name = "git" | |
| dirname = ".git" | |
| repo_name = "clone" | |
| schemes = ( | |
| "git+http", | |
| "git+https", | |
| "git+ssh", | |
| "git+git", | |
| "git+file", | |
| ) | |
| # Prevent the user's environment variables from interfering with pip: | |
| # https://github.com/pypa/pip/issues/1130 | |
| unset_environ = ("GIT_DIR", "GIT_WORK_TREE") | |
| default_arg_rev = "HEAD" | |
| def get_base_rev_args(rev: str) -> List[str]: | |
| return [rev] | |
| def is_immutable_rev_checkout(self, url: str, dest: str) -> bool: | |
| _, rev_options = self.get_url_rev_options(hide_url(url)) | |
| if not rev_options.rev: | |
| return False | |
| if not self.is_commit_id_equal(dest, rev_options.rev): | |
| # the current commit is different from rev, | |
| # which means rev was something else than a commit hash | |
| return False | |
| # return False in the rare case rev is both a commit hash | |
| # and a tag or a branch; we don't want to cache in that case | |
| # because that branch/tag could point to something else in the future | |
| is_tag_or_branch = bool(self.get_revision_sha(dest, rev_options.rev)[0]) | |
| return not is_tag_or_branch | |
| def get_git_version(self) -> Tuple[int, ...]: | |
| version = self.run_command( | |
| ["version"], | |
| command_desc="git version", | |
| show_stdout=False, | |
| stdout_only=True, | |
| ) | |
| match = GIT_VERSION_REGEX.match(version) | |
| if not match: | |
| logger.warning("Can't parse git version: %s", version) | |
| return () | |
| return tuple(int(c) for c in match.groups()) | |
| def get_current_branch(cls, location: str) -> Optional[str]: | |
| """ | |
| Return the current branch, or None if HEAD isn't at a branch | |
| (e.g. detached HEAD). | |
| """ | |
| # git-symbolic-ref exits with empty stdout if "HEAD" is a detached | |
| # HEAD rather than a symbolic ref. In addition, the -q causes the | |
| # command to exit with status code 1 instead of 128 in this case | |
| # and to suppress the message to stderr. | |
| args = ["symbolic-ref", "-q", "HEAD"] | |
| output = cls.run_command( | |
| args, | |
| extra_ok_returncodes=(1,), | |
| show_stdout=False, | |
| stdout_only=True, | |
| cwd=location, | |
| ) | |
| ref = output.strip() | |
| if ref.startswith("refs/heads/"): | |
| return ref[len("refs/heads/") :] | |
| return None | |
| def get_revision_sha(cls, dest: str, rev: str) -> Tuple[Optional[str], bool]: | |
| """ | |
| Return (sha_or_none, is_branch), where sha_or_none is a commit hash | |
| if the revision names a remote branch or tag, otherwise None. | |
| Args: | |
| dest: the repository directory. | |
| rev: the revision name. | |
| """ | |
| # Pass rev to pre-filter the list. | |
| output = cls.run_command( | |
| ["show-ref", rev], | |
| cwd=dest, | |
| show_stdout=False, | |
| stdout_only=True, | |
| on_returncode="ignore", | |
| ) | |
| refs = {} | |
| # NOTE: We do not use splitlines here since that would split on other | |
| # unicode separators, which can be maliciously used to install a | |
| # different revision. | |
| for line in output.strip().split("\n"): | |
| line = line.rstrip("\r") | |
| if not line: | |
| continue | |
| try: | |
| ref_sha, ref_name = line.split(" ", maxsplit=2) | |
| except ValueError: | |
| # Include the offending line to simplify troubleshooting if | |
| # this error ever occurs. | |
| raise ValueError(f"unexpected show-ref line: {line!r}") | |
| refs[ref_name] = ref_sha | |
| branch_ref = f"refs/remotes/origin/{rev}" | |
| tag_ref = f"refs/tags/{rev}" | |
| sha = refs.get(branch_ref) | |
| if sha is not None: | |
| return (sha, True) | |
| sha = refs.get(tag_ref) | |
| return (sha, False) | |
| def _should_fetch(cls, dest: str, rev: str) -> bool: | |
| """ | |
| Return true if rev is a ref or is a commit that we don't have locally. | |
| Branches and tags are not considered in this method because they are | |
| assumed to be always available locally (which is a normal outcome of | |
| ``git clone`` and ``git fetch --tags``). | |
| """ | |
| if rev.startswith("refs/"): | |
| # Always fetch remote refs. | |
| return True | |
| if not looks_like_hash(rev): | |
| # Git fetch would fail with abbreviated commits. | |
| return False | |
| if cls.has_commit(dest, rev): | |
| # Don't fetch if we have the commit locally. | |
| return False | |
| return True | |
| def resolve_revision( | |
| cls, dest: str, url: HiddenText, rev_options: RevOptions | |
| ) -> RevOptions: | |
| """ | |
| Resolve a revision to a new RevOptions object with the SHA1 of the | |
| branch, tag, or ref if found. | |
| Args: | |
| rev_options: a RevOptions object. | |
| """ | |
| rev = rev_options.arg_rev | |
| # The arg_rev property's implementation for Git ensures that the | |
| # rev return value is always non-None. | |
| assert rev is not None | |
| sha, is_branch = cls.get_revision_sha(dest, rev) | |
| if sha is not None: | |
| rev_options = rev_options.make_new(sha) | |
| rev_options.branch_name = rev if is_branch else None | |
| return rev_options | |
| # Do not show a warning for the common case of something that has | |
| # the form of a Git commit hash. | |
| if not looks_like_hash(rev): | |
| logger.warning( | |
| "Did not find branch or tag '%s', assuming revision or ref.", | |
| rev, | |
| ) | |
| if not cls._should_fetch(dest, rev): | |
| return rev_options | |
| # fetch the requested revision | |
| cls.run_command( | |
| make_command("fetch", "-q", url, rev_options.to_args()), | |
| cwd=dest, | |
| ) | |
| # Change the revision to the SHA of the ref we fetched | |
| sha = cls.get_revision(dest, rev="FETCH_HEAD") | |
| rev_options = rev_options.make_new(sha) | |
| return rev_options | |
| def is_commit_id_equal(cls, dest: str, name: Optional[str]) -> bool: | |
| """ | |
| Return whether the current commit hash equals the given name. | |
| Args: | |
| dest: the repository directory. | |
| name: a string name. | |
| """ | |
| if not name: | |
| # Then avoid an unnecessary subprocess call. | |
| return False | |
| return cls.get_revision(dest) == name | |
| def fetch_new( | |
| self, dest: str, url: HiddenText, rev_options: RevOptions, verbosity: int | |
| ) -> None: | |
| rev_display = rev_options.to_display() | |
| logger.info("Cloning %s%s to %s", url, rev_display, display_path(dest)) | |
| if verbosity <= 0: | |
| flags: Tuple[str, ...] = ("--quiet",) | |
| elif verbosity == 1: | |
| flags = () | |
| else: | |
| flags = ("--verbose", "--progress") | |
| if self.get_git_version() >= (2, 17): | |
| # Git added support for partial clone in 2.17 | |
| # https://git-scm.com/docs/partial-clone | |
| # Speeds up cloning by functioning without a complete copy of repository | |
| self.run_command( | |
| make_command( | |
| "clone", | |
| "--filter=blob:none", | |
| *flags, | |
| url, | |
| dest, | |
| ) | |
| ) | |
| else: | |
| self.run_command(make_command("clone", *flags, url, dest)) | |
| if rev_options.rev: | |
| # Then a specific revision was requested. | |
| rev_options = self.resolve_revision(dest, url, rev_options) | |
| branch_name = getattr(rev_options, "branch_name", None) | |
| logger.debug("Rev options %s, branch_name %s", rev_options, branch_name) | |
| if branch_name is None: | |
| # Only do a checkout if the current commit id doesn't match | |
| # the requested revision. | |
| if not self.is_commit_id_equal(dest, rev_options.rev): | |
| cmd_args = make_command( | |
| "checkout", | |
| "-q", | |
| rev_options.to_args(), | |
| ) | |
| self.run_command(cmd_args, cwd=dest) | |
| elif self.get_current_branch(dest) != branch_name: | |
| # Then a specific branch was requested, and that branch | |
| # is not yet checked out. | |
| track_branch = f"origin/{branch_name}" | |
| cmd_args = [ | |
| "checkout", | |
| "-b", | |
| branch_name, | |
| "--track", | |
| track_branch, | |
| ] | |
| self.run_command(cmd_args, cwd=dest) | |
| else: | |
| sha = self.get_revision(dest) | |
| rev_options = rev_options.make_new(sha) | |
| logger.info("Resolved %s to commit %s", url, rev_options.rev) | |
| #: repo may contain submodules | |
| self.update_submodules(dest) | |
| def switch(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: | |
| self.run_command( | |
| make_command("config", "remote.origin.url", url), | |
| cwd=dest, | |
| ) | |
| cmd_args = make_command("checkout", "-q", rev_options.to_args()) | |
| self.run_command(cmd_args, cwd=dest) | |
| self.update_submodules(dest) | |
| def update(self, dest: str, url: HiddenText, rev_options: RevOptions) -> None: | |
| # First fetch changes from the default remote | |
| if self.get_git_version() >= (1, 9): | |
| # fetch tags in addition to everything else | |
| self.run_command(["fetch", "-q", "--tags"], cwd=dest) | |
| else: | |
| self.run_command(["fetch", "-q"], cwd=dest) | |
| # Then reset to wanted revision (maybe even origin/master) | |
| rev_options = self.resolve_revision(dest, url, rev_options) | |
| cmd_args = make_command("reset", "--hard", "-q", rev_options.to_args()) | |
| self.run_command(cmd_args, cwd=dest) | |
| #: update submodules | |
| self.update_submodules(dest) | |
| def get_remote_url(cls, location: str) -> str: | |
| """ | |
| Return URL of the first remote encountered. | |
| Raises RemoteNotFoundError if the repository does not have a remote | |
| url configured. | |
| """ | |
| # We need to pass 1 for extra_ok_returncodes since the command | |
| # exits with return code 1 if there are no matching lines. | |
| stdout = cls.run_command( | |
| ["config", "--get-regexp", r"remote\..*\.url"], | |
| extra_ok_returncodes=(1,), | |
| show_stdout=False, | |
| stdout_only=True, | |
| cwd=location, | |
| ) | |
| remotes = stdout.splitlines() | |
| try: | |
| found_remote = remotes[0] | |
| except IndexError: | |
| raise RemoteNotFoundError | |
| for remote in remotes: | |
| if remote.startswith("remote.origin.url "): | |
| found_remote = remote | |
| break | |
| url = found_remote.split(" ")[1] | |
| return cls._git_remote_to_pip_url(url.strip()) | |
| def _git_remote_to_pip_url(url: str) -> str: | |
| """ | |
| Convert a remote url from what git uses to what pip accepts. | |
| There are 3 legal forms **url** may take: | |
| 1. A fully qualified url: ssh://git@example.com/foo/bar.git | |
| 2. A local project.git folder: /path/to/bare/repository.git | |
| 3. SCP shorthand for form 1: git@example.com:foo/bar.git | |
| Form 1 is output as-is. Form 2 must be converted to URI and form 3 must | |
| be converted to form 1. | |
| See the corresponding test test_git_remote_url_to_pip() for examples of | |
| sample inputs/outputs. | |
| """ | |
| if re.match(r"\w+://", url): | |
| # This is already valid. Pass it though as-is. | |
| return url | |
| if os.path.exists(url): | |
| # A local bare remote (git clone --mirror). | |
| # Needs a file:// prefix. | |
| return pathlib.PurePath(url).as_uri() | |
| scp_match = SCP_REGEX.match(url) | |
| if scp_match: | |
| # Add an ssh:// prefix and replace the ':' with a '/'. | |
| return scp_match.expand(r"ssh://\1\2/\3") | |
| # Otherwise, bail out. | |
| raise RemoteNotValidError(url) | |
| def has_commit(cls, location: str, rev: str) -> bool: | |
| """ | |
| Check if rev is a commit that is available in the local repository. | |
| """ | |
| try: | |
| cls.run_command( | |
| ["rev-parse", "-q", "--verify", "sha^" + rev], | |
| cwd=location, | |
| log_failed_cmd=False, | |
| ) | |
| except InstallationError: | |
| return False | |
| else: | |
| return True | |
| def get_revision(cls, location: str, rev: Optional[str] = None) -> str: | |
| if rev is None: | |
| rev = "HEAD" | |
| current_rev = cls.run_command( | |
| ["rev-parse", rev], | |
| show_stdout=False, | |
| stdout_only=True, | |
| cwd=location, | |
| ) | |
| return current_rev.strip() | |
| def get_subdirectory(cls, location: str) -> Optional[str]: | |
| """ | |
| Return the path to Python project root, relative to the repo root. | |
| Return None if the project root is in the repo root. | |
| """ | |
| # find the repo root | |
| git_dir = cls.run_command( | |
| ["rev-parse", "--git-dir"], | |
| show_stdout=False, | |
| stdout_only=True, | |
| cwd=location, | |
| ).strip() | |
| if not os.path.isabs(git_dir): | |
| git_dir = os.path.join(location, git_dir) | |
| repo_root = os.path.abspath(os.path.join(git_dir, "..")) | |
| return find_path_to_project_root_from_repo_root(location, repo_root) | |
| def get_url_rev_and_auth(cls, url: str) -> Tuple[str, Optional[str], AuthInfo]: | |
| """ | |
| Prefixes stub URLs like 'user@hostname:user/repo.git' with 'ssh://'. | |
| That's required because although they use SSH they sometimes don't | |
| work with a ssh:// scheme (e.g. GitHub). But we need a scheme for | |
| parsing. Hence we remove it again afterwards and return it as a stub. | |
| """ | |
| # Works around an apparent Git bug | |
| # (see https://article.gmane.org/gmane.comp.version-control.git/146500) | |
| scheme, netloc, path, query, fragment = urlsplit(url) | |
| if scheme.endswith("file"): | |
| initial_slashes = path[: -len(path.lstrip("/"))] | |
| newpath = initial_slashes + urllib.request.url2pathname(path).replace( | |
| "\\", "/" | |
| ).lstrip("/") | |
| after_plus = scheme.find("+") + 1 | |
| url = scheme[:after_plus] + urlunsplit( | |
| (scheme[after_plus:], netloc, newpath, query, fragment), | |
| ) | |
| if "://" not in url: | |
| assert "file:" not in url | |
| url = url.replace("git+", "git+ssh://") | |
| url, rev, user_pass = super().get_url_rev_and_auth(url) | |
| url = url.replace("ssh://", "") | |
| else: | |
| url, rev, user_pass = super().get_url_rev_and_auth(url) | |
| return url, rev, user_pass | |
| def update_submodules(cls, location: str) -> None: | |
| if not os.path.exists(os.path.join(location, ".gitmodules")): | |
| return | |
| cls.run_command( | |
| ["submodule", "update", "--init", "--recursive", "-q"], | |
| cwd=location, | |
| ) | |
| def get_repository_root(cls, location: str) -> Optional[str]: | |
| loc = super().get_repository_root(location) | |
| if loc: | |
| return loc | |
| try: | |
| r = cls.run_command( | |
| ["rev-parse", "--show-toplevel"], | |
| cwd=location, | |
| show_stdout=False, | |
| stdout_only=True, | |
| on_returncode="raise", | |
| log_failed_cmd=False, | |
| ) | |
| except BadCommand: | |
| logger.debug( | |
| "could not determine if %s is under git control " | |
| "because git is not available", | |
| location, | |
| ) | |
| return None | |
| except InstallationError: | |
| return None | |
| return os.path.normpath(r.rstrip("\r\n")) | |
| def should_add_vcs_url_prefix(repo_url: str) -> bool: | |
| """In either https or ssh form, requirements must be prefixed with git+.""" | |
| return True | |
| vcs.register(Git) | |