Spaces:
Paused
Paused
| # Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors | |
| # | |
| # This module is part of GitDB and is released under | |
| # the New BSD License: https://opensource.org/license/bsd-3-clause/ | |
| """Utilities used in ODB testing""" | |
| from gitdb import OStream | |
| import sys | |
| import random | |
| from array import array | |
| from io import BytesIO | |
| import glob | |
| import unittest | |
| import tempfile | |
| import shutil | |
| import os | |
| import gc | |
| import logging | |
| from functools import wraps | |
| #{ Bases | |
| class TestBase(unittest.TestCase): | |
| """Base class for all tests | |
| TestCase providing access to readonly repositories using the following member variables. | |
| * gitrepopath | |
| * read-only base path of the git source repository, i.e. .../git/.git | |
| """ | |
| #{ Invvariants | |
| k_env_git_repo = "GITDB_TEST_GIT_REPO_BASE" | |
| #} END invariants | |
| def setUpClass(cls): | |
| try: | |
| super().setUpClass() | |
| except AttributeError: | |
| pass | |
| cls.gitrepopath = os.environ.get(cls.k_env_git_repo) | |
| if not cls.gitrepopath: | |
| logging.info( | |
| "You can set the %s environment variable to a .git repository of your choice - defaulting to the gitdb repository", cls.k_env_git_repo) | |
| ospd = os.path.dirname | |
| cls.gitrepopath = os.path.join(ospd(ospd(ospd(__file__))), '.git') | |
| # end assure gitrepo is set | |
| assert cls.gitrepopath.endswith('.git') | |
| #} END bases | |
| #{ Decorators | |
| def with_rw_directory(func): | |
| """Create a temporary directory which can be written to, remove it if the | |
| test succeeds, but leave it otherwise to aid additional debugging""" | |
| def wrapper(self): | |
| path = tempfile.mktemp(prefix=func.__name__) | |
| os.mkdir(path) | |
| keep = False | |
| try: | |
| try: | |
| return func(self, path) | |
| except Exception: | |
| sys.stderr.write(f"Test {type(self).__name__}.{func.__name__} failed, output is at {path!r}\n") | |
| keep = True | |
| raise | |
| finally: | |
| # Need to collect here to be sure all handles have been closed. It appears | |
| # a windows-only issue. In fact things should be deleted, as well as | |
| # memory maps closed, once objects go out of scope. For some reason | |
| # though this is not the case here unless we collect explicitly. | |
| if not keep: | |
| gc.collect() | |
| shutil.rmtree(path) | |
| # END handle exception | |
| # END wrapper | |
| wrapper.__name__ = func.__name__ | |
| return wrapper | |
| def with_packs_rw(func): | |
| """Function that provides a path into which the packs for testing should be | |
| copied. Will pass on the path to the actual function afterwards""" | |
| def wrapper(self, path): | |
| src_pack_glob = fixture_path('packs/*') | |
| copy_files_globbed(src_pack_glob, path, hard_link_ok=True) | |
| return func(self, path) | |
| # END wrapper | |
| wrapper.__name__ = func.__name__ | |
| return wrapper | |
| #} END decorators | |
| #{ Routines | |
| def fixture_path(relapath=''): | |
| """:return: absolute path into the fixture directory | |
| :param relapath: relative path into the fixtures directory, or '' | |
| to obtain the fixture directory itself""" | |
| return os.path.join(os.path.dirname(__file__), 'fixtures', relapath) | |
| def copy_files_globbed(source_glob, target_dir, hard_link_ok=False): | |
| """Copy all files found according to the given source glob into the target directory | |
| :param hard_link_ok: if True, hard links will be created if possible. Otherwise | |
| the files will be copied""" | |
| for src_file in glob.glob(source_glob): | |
| if hard_link_ok and hasattr(os, 'link'): | |
| target = os.path.join(target_dir, os.path.basename(src_file)) | |
| try: | |
| os.link(src_file, target) | |
| except OSError: | |
| shutil.copy(src_file, target_dir) | |
| # END handle cross device links ( and resulting failure ) | |
| else: | |
| shutil.copy(src_file, target_dir) | |
| # END try hard link | |
| # END for each file to copy | |
| def make_bytes(size_in_bytes, randomize=False): | |
| """:return: string with given size in bytes | |
| :param randomize: try to produce a very random stream""" | |
| actual_size = size_in_bytes // 4 | |
| producer = range(actual_size) | |
| if randomize: | |
| producer = list(producer) | |
| random.shuffle(producer) | |
| # END randomize | |
| a = array('i', producer) | |
| return a.tobytes() | |
| def make_object(type, data): | |
| """:return: bytes resembling an uncompressed object""" | |
| odata = "blob %i\0" % len(data) | |
| return odata.encode("ascii") + data | |
| def make_memory_file(size_in_bytes, randomize=False): | |
| """:return: tuple(size_of_stream, stream) | |
| :param randomize: try to produce a very random stream""" | |
| d = make_bytes(size_in_bytes, randomize) | |
| return len(d), BytesIO(d) | |
| #} END routines | |
| #{ Stream Utilities | |
| class DummyStream: | |
| def __init__(self): | |
| self.was_read = False | |
| self.bytes = 0 | |
| self.closed = False | |
| def read(self, size): | |
| self.was_read = True | |
| self.bytes = size | |
| def close(self): | |
| self.closed = True | |
| def _assert(self): | |
| assert self.was_read | |
| class DeriveTest(OStream): | |
| def __init__(self, sha, type, size, stream, *args, **kwargs): | |
| self.myarg = kwargs.pop('myarg') | |
| self.args = args | |
| def _assert(self): | |
| assert self.args | |
| assert self.myarg | |
| #} END stream utilitiess | |