Spaces:
Paused
Paused
| # Copyright (C) 2008, 2009 Michael Trier (mtrier@gmail.com) and contributors | |
| # | |
| # This module is part of GitPython and is released under the | |
| # 3-Clause BSD License: https://opensource.org/license/bsd-3-clause/ | |
| from git.util import IterableList, join_path | |
| import git.diff as git_diff | |
| from git.util import to_bin_sha | |
| from . import util | |
| from .base import IndexObject, IndexObjUnion | |
| from .blob import Blob | |
| from .submodule.base import Submodule | |
| from .fun import tree_entries_from_data, tree_to_stream | |
| # typing ------------------------------------------------- | |
| from typing import ( | |
| Any, | |
| Callable, | |
| Dict, | |
| Iterable, | |
| Iterator, | |
| List, | |
| Tuple, | |
| Type, | |
| Union, | |
| cast, | |
| TYPE_CHECKING, | |
| ) | |
| from git.types import PathLike, Literal | |
| if TYPE_CHECKING: | |
| from git.repo import Repo | |
| from io import BytesIO | |
| TreeCacheTup = Tuple[bytes, int, str] | |
| TraversedTreeTup = Union[Tuple[Union["Tree", None], IndexObjUnion, Tuple["Submodule", "Submodule"]]] | |
| # def is_tree_cache(inp: Tuple[bytes, int, str]) -> TypeGuard[TreeCacheTup]: | |
| # return isinstance(inp[0], bytes) and isinstance(inp[1], int) and isinstance([inp], str) | |
| # -------------------------------------------------------- | |
| cmp: Callable[[str, str], int] = lambda a, b: (a > b) - (a < b) | |
| __all__ = ("TreeModifier", "Tree") | |
| def git_cmp(t1: TreeCacheTup, t2: TreeCacheTup) -> int: | |
| a, b = t1[2], t2[2] | |
| # assert isinstance(a, str) and isinstance(b, str) | |
| len_a, len_b = len(a), len(b) | |
| min_len = min(len_a, len_b) | |
| min_cmp = cmp(a[:min_len], b[:min_len]) | |
| if min_cmp: | |
| return min_cmp | |
| return len_a - len_b | |
| def merge_sort(a: List[TreeCacheTup], cmp: Callable[[TreeCacheTup, TreeCacheTup], int]) -> None: | |
| if len(a) < 2: | |
| return | |
| mid = len(a) // 2 | |
| lefthalf = a[:mid] | |
| righthalf = a[mid:] | |
| merge_sort(lefthalf, cmp) | |
| merge_sort(righthalf, cmp) | |
| i = 0 | |
| j = 0 | |
| k = 0 | |
| while i < len(lefthalf) and j < len(righthalf): | |
| if cmp(lefthalf[i], righthalf[j]) <= 0: | |
| a[k] = lefthalf[i] | |
| i = i + 1 | |
| else: | |
| a[k] = righthalf[j] | |
| j = j + 1 | |
| k = k + 1 | |
| while i < len(lefthalf): | |
| a[k] = lefthalf[i] | |
| i = i + 1 | |
| k = k + 1 | |
| while j < len(righthalf): | |
| a[k] = righthalf[j] | |
| j = j + 1 | |
| k = k + 1 | |
| class TreeModifier: | |
| """A utility class providing methods to alter the underlying cache in a list-like fashion. | |
| Once all adjustments are complete, the _cache, which really is a reference to | |
| the cache of a tree, will be sorted. This ensures it will be in a serializable state. | |
| """ | |
| __slots__ = ("_cache",) | |
| def __init__(self, cache: List[TreeCacheTup]) -> None: | |
| self._cache = cache | |
| def _index_by_name(self, name: str) -> int: | |
| """:return: index of an item with name, or -1 if not found""" | |
| for i, t in enumerate(self._cache): | |
| if t[2] == name: | |
| return i | |
| # END found item | |
| # END for each item in cache | |
| return -1 | |
| # { Interface | |
| def set_done(self) -> "TreeModifier": | |
| """Call this method once you are done modifying the tree information. | |
| This may be called several times, but be aware that each call will cause | |
| a sort operation. | |
| :return self: | |
| """ | |
| merge_sort(self._cache, git_cmp) | |
| return self | |
| # } END interface | |
| # { Mutators | |
| def add(self, sha: bytes, mode: int, name: str, force: bool = False) -> "TreeModifier": | |
| """Add the given item to the tree. | |
| If an item with the given name already exists, nothing will be done, but a | |
| ValueError will be raised if the sha and mode of the existing item do not match | |
| the one you add, unless force is True | |
| :param sha: The 20 or 40 byte sha of the item to add | |
| :param mode: int representing the stat compatible mode of the item | |
| :param force: If True, an item with your name and information will overwrite any | |
| existing item with the same name, no matter which information it has | |
| :return: self | |
| """ | |
| if "/" in name: | |
| raise ValueError("Name must not contain '/' characters") | |
| if (mode >> 12) not in Tree._map_id_to_type: | |
| raise ValueError("Invalid object type according to mode %o" % mode) | |
| sha = to_bin_sha(sha) | |
| index = self._index_by_name(name) | |
| item = (sha, mode, name) | |
| # assert is_tree_cache(item) | |
| if index == -1: | |
| self._cache.append(item) | |
| else: | |
| if force: | |
| self._cache[index] = item | |
| else: | |
| ex_item = self._cache[index] | |
| if ex_item[0] != sha or ex_item[1] != mode: | |
| raise ValueError("Item %r existed with different properties" % name) | |
| # END handle mismatch | |
| # END handle force | |
| # END handle name exists | |
| return self | |
| def add_unchecked(self, binsha: bytes, mode: int, name: str) -> None: | |
| """Add the given item to the tree. Its correctness is assumed, so it is the | |
| caller's responsibility to ensure that the input is correct. | |
| For more information on the parameters, see :meth:`add`. | |
| :param binsha: 20 byte binary sha | |
| """ | |
| assert isinstance(binsha, bytes) and isinstance(mode, int) and isinstance(name, str) | |
| tree_cache = (binsha, mode, name) | |
| self._cache.append(tree_cache) | |
| def __delitem__(self, name: str) -> None: | |
| """Delete an item with the given name if it exists.""" | |
| index = self._index_by_name(name) | |
| if index > -1: | |
| del self._cache[index] | |
| # } END mutators | |
| class Tree(IndexObject, git_diff.Diffable, util.Traversable, util.Serializable): | |
| """Tree objects represent an ordered list of Blobs and other Trees. | |
| ``Tree as a list``:: | |
| Access a specific blob using the | |
| tree['filename'] notation. | |
| You may as well access by index | |
| blob = tree[0] | |
| """ | |
| type: Literal["tree"] = "tree" | |
| __slots__ = ("_cache",) | |
| # Actual integer IDs for comparison. | |
| commit_id = 0o16 # Equals stat.S_IFDIR | stat.S_IFLNK - a directory link. | |
| blob_id = 0o10 | |
| symlink_id = 0o12 | |
| tree_id = 0o04 | |
| _map_id_to_type: Dict[int, Type[IndexObjUnion]] = { | |
| commit_id: Submodule, | |
| blob_id: Blob, | |
| symlink_id: Blob | |
| # Tree ID added once Tree is defined. | |
| } | |
| def __init__( | |
| self, | |
| repo: "Repo", | |
| binsha: bytes, | |
| mode: int = tree_id << 12, | |
| path: Union[PathLike, None] = None, | |
| ): | |
| super().__init__(repo, binsha, mode, path) | |
| def _get_intermediate_items( | |
| cls, | |
| index_object: IndexObjUnion, | |
| ) -> Union[Tuple["Tree", ...], Tuple[()]]: | |
| if index_object.type == "tree": | |
| return tuple(index_object._iter_convert_to_object(index_object._cache)) | |
| return () | |
| def _set_cache_(self, attr: str) -> None: | |
| if attr == "_cache": | |
| # Set the data when we need it. | |
| ostream = self.repo.odb.stream(self.binsha) | |
| self._cache: List[TreeCacheTup] = tree_entries_from_data(ostream.read()) | |
| else: | |
| super()._set_cache_(attr) | |
| # END handle attribute | |
| def _iter_convert_to_object(self, iterable: Iterable[TreeCacheTup]) -> Iterator[IndexObjUnion]: | |
| """Iterable yields tuples of (binsha, mode, name), which will be converted | |
| to the respective object representation. | |
| """ | |
| for binsha, mode, name in iterable: | |
| path = join_path(self.path, name) | |
| try: | |
| yield self._map_id_to_type[mode >> 12](self.repo, binsha, mode, path) | |
| except KeyError as e: | |
| raise TypeError("Unknown mode %o found in tree data for path '%s'" % (mode, path)) from e | |
| # END for each item | |
| def join(self, file: str) -> IndexObjUnion: | |
| """Find the named object in this tree's contents. | |
| :return: ``git.Blob`` or ``git.Tree`` or ``git.Submodule`` | |
| :raise KeyError: if given file or tree does not exist in tree | |
| """ | |
| msg = "Blob or Tree named %r not found" | |
| if "/" in file: | |
| tree = self | |
| item = self | |
| tokens = file.split("/") | |
| for i, token in enumerate(tokens): | |
| item = tree[token] | |
| if item.type == "tree": | |
| tree = item | |
| else: | |
| # Safety assertion - blobs are at the end of the path. | |
| if i != len(tokens) - 1: | |
| raise KeyError(msg % file) | |
| return item | |
| # END handle item type | |
| # END for each token of split path | |
| if item == self: | |
| raise KeyError(msg % file) | |
| return item | |
| else: | |
| for info in self._cache: | |
| if info[2] == file: # [2] == name | |
| return self._map_id_to_type[info[1] >> 12]( | |
| self.repo, info[0], info[1], join_path(self.path, info[2]) | |
| ) | |
| # END for each obj | |
| raise KeyError(msg % file) | |
| # END handle long paths | |
| def __truediv__(self, file: str) -> IndexObjUnion: | |
| """The ``/`` operator is another syntax for joining. | |
| See :meth:`join` for details. | |
| """ | |
| return self.join(file) | |
| def trees(self) -> List["Tree"]: | |
| """:return: list(Tree, ...) list of trees directly below this tree""" | |
| return [i for i in self if i.type == "tree"] | |
| def blobs(self) -> List[Blob]: | |
| """:return: list(Blob, ...) list of blobs directly below this tree""" | |
| return [i for i in self if i.type == "blob"] | |
| def cache(self) -> TreeModifier: | |
| """ | |
| :return: An object allowing to modify the internal cache. This can be used | |
| to change the tree's contents. When done, make sure you call ``set_done`` | |
| on the tree modifier, or serialization behaviour will be incorrect. | |
| See :class:`TreeModifier` for more information on how to alter the cache. | |
| """ | |
| return TreeModifier(self._cache) | |
| def traverse( | |
| self, # type: ignore[override] | |
| predicate: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: True, | |
| prune: Callable[[Union[IndexObjUnion, TraversedTreeTup], int], bool] = lambda i, d: False, | |
| depth: int = -1, | |
| branch_first: bool = True, | |
| visit_once: bool = False, | |
| ignore_self: int = 1, | |
| as_edge: bool = False, | |
| ) -> Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]]: | |
| """For documentation, see util.Traversable._traverse(). | |
| Trees are set to ``visit_once = False`` to gain more performance in the traversal. | |
| """ | |
| # """ | |
| # # To typecheck instead of using cast. | |
| # import itertools | |
| # def is_tree_traversed(inp: Tuple) -> TypeGuard[Tuple[Iterator[Union['Tree', 'Blob', 'Submodule']]]]: | |
| # return all(isinstance(x, (Blob, Tree, Submodule)) for x in inp[1]) | |
| # ret = super().traverse(predicate, prune, depth, branch_first, visit_once, ignore_self) | |
| # ret_tup = itertools.tee(ret, 2) | |
| # assert is_tree_traversed(ret_tup), f"Type is {[type(x) for x in list(ret_tup[0])]}" | |
| # return ret_tup[0]""" | |
| return cast( | |
| Union[Iterator[IndexObjUnion], Iterator[TraversedTreeTup]], | |
| super()._traverse( | |
| predicate, | |
| prune, | |
| depth, # type: ignore | |
| branch_first, | |
| visit_once, | |
| ignore_self, | |
| ), | |
| ) | |
| def list_traverse(self, *args: Any, **kwargs: Any) -> IterableList[IndexObjUnion]: | |
| """ | |
| :return: IterableList with the results of the traversal as produced by | |
| traverse() | |
| Tree -> IterableList[Union['Submodule', 'Tree', 'Blob']] | |
| """ | |
| return super()._list_traverse(*args, **kwargs) | |
| # List protocol | |
| def __getslice__(self, i: int, j: int) -> List[IndexObjUnion]: | |
| return list(self._iter_convert_to_object(self._cache[i:j])) | |
| def __iter__(self) -> Iterator[IndexObjUnion]: | |
| return self._iter_convert_to_object(self._cache) | |
| def __len__(self) -> int: | |
| return len(self._cache) | |
| def __getitem__(self, item: Union[str, int, slice]) -> IndexObjUnion: | |
| if isinstance(item, int): | |
| info = self._cache[item] | |
| return self._map_id_to_type[info[1] >> 12](self.repo, info[0], info[1], join_path(self.path, info[2])) | |
| if isinstance(item, str): | |
| # compatibility | |
| return self.join(item) | |
| # END index is basestring | |
| raise TypeError("Invalid index type: %r" % item) | |
| def __contains__(self, item: Union[IndexObjUnion, PathLike]) -> bool: | |
| if isinstance(item, IndexObject): | |
| for info in self._cache: | |
| if item.binsha == info[0]: | |
| return True | |
| # END compare sha | |
| # END for each entry | |
| # END handle item is index object | |
| # compatibility | |
| # Treat item as repo-relative path. | |
| else: | |
| path = self.path | |
| for info in self._cache: | |
| if item == join_path(path, info[2]): | |
| return True | |
| # END for each item | |
| return False | |
| def __reversed__(self) -> Iterator[IndexObjUnion]: | |
| return reversed(self._iter_convert_to_object(self._cache)) # type: ignore | |
| def _serialize(self, stream: "BytesIO") -> "Tree": | |
| """Serialize this tree into the stream. Assumes sorted tree data. | |
| .. note:: We will assume our tree data to be in a sorted state. If this is not | |
| the case, serialization will not generate a correct tree representation as | |
| these are assumed to be sorted by algorithms. | |
| """ | |
| tree_to_stream(self._cache, stream.write) | |
| return self | |
| def _deserialize(self, stream: "BytesIO") -> "Tree": | |
| self._cache = tree_entries_from_data(stream.read()) | |
| return self | |
| # END tree | |
| # Finalize map definition. | |
| Tree._map_id_to_type[Tree.tree_id] = Tree | |