Spaces:
Paused
Paused
| # Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors | |
| # | |
| # This module is part of GitDB and is released under | |
| # the New BSD License: https://opensource.org/license/bsd-3-clause/ | |
| """Test for object db""" | |
| import tempfile | |
| import os | |
| from gitdb.test.lib import TestBase | |
| from gitdb.util import ( | |
| to_hex_sha, | |
| to_bin_sha, | |
| NULL_HEX_SHA, | |
| LockedFD | |
| ) | |
| class TestUtils(TestBase): | |
| def test_basics(self): | |
| assert to_hex_sha(NULL_HEX_SHA) == NULL_HEX_SHA | |
| assert len(to_bin_sha(NULL_HEX_SHA)) == 20 | |
| assert to_hex_sha(to_bin_sha(NULL_HEX_SHA)) == NULL_HEX_SHA.encode("ascii") | |
| def _cmp_contents(self, file_path, data): | |
| # raise if data from file at file_path | |
| # does not match data string | |
| with open(file_path, "rb") as fp: | |
| assert fp.read() == data.encode("ascii") | |
| def test_lockedfd(self): | |
| my_file = tempfile.mktemp() | |
| orig_data = "hello" | |
| new_data = "world" | |
| with open(my_file, "wb") as my_file_fp: | |
| my_file_fp.write(orig_data.encode("ascii")) | |
| try: | |
| lfd = LockedFD(my_file) | |
| lockfilepath = lfd._lockfilepath() | |
| # cannot end before it was started | |
| self.assertRaises(AssertionError, lfd.rollback) | |
| self.assertRaises(AssertionError, lfd.commit) | |
| # open for writing | |
| assert not os.path.isfile(lockfilepath) | |
| wfd = lfd.open(write=True) | |
| assert lfd._fd is wfd | |
| assert os.path.isfile(lockfilepath) | |
| # write data and fail | |
| os.write(wfd, new_data.encode("ascii")) | |
| lfd.rollback() | |
| assert lfd._fd is None | |
| self._cmp_contents(my_file, orig_data) | |
| assert not os.path.isfile(lockfilepath) | |
| # additional call doesn't fail | |
| lfd.commit() | |
| lfd.rollback() | |
| # test reading | |
| lfd = LockedFD(my_file) | |
| rfd = lfd.open(write=False) | |
| assert os.read(rfd, len(orig_data)) == orig_data.encode("ascii") | |
| assert os.path.isfile(lockfilepath) | |
| # deletion rolls back | |
| del(lfd) | |
| assert not os.path.isfile(lockfilepath) | |
| # write data - concurrently | |
| lfd = LockedFD(my_file) | |
| olfd = LockedFD(my_file) | |
| assert not os.path.isfile(lockfilepath) | |
| wfdstream = lfd.open(write=True, stream=True) # this time as stream | |
| assert os.path.isfile(lockfilepath) | |
| # another one fails | |
| self.assertRaises(IOError, olfd.open) | |
| wfdstream.write(new_data.encode("ascii")) | |
| lfd.commit() | |
| assert not os.path.isfile(lockfilepath) | |
| self._cmp_contents(my_file, new_data) | |
| # could test automatic _end_writing on destruction | |
| finally: | |
| os.remove(my_file) | |
| # END final cleanup | |
| # try non-existing file for reading | |
| lfd = LockedFD(tempfile.mktemp()) | |
| try: | |
| lfd.open(write=False) | |
| except OSError: | |
| assert not os.path.exists(lfd._lockfilepath()) | |
| else: | |
| self.fail("expected OSError") | |
| # END handle exceptions | |