| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | """ |
| | File object test module. |
| | |
| | Tests all aspects of File objects, including their creation. |
| | """ |
| |
|
| | import pytest |
| | import os |
| | import stat |
| | import pickle |
| | import tempfile |
| | import subprocess |
| | import sys |
| |
|
| | from .common import ut, TestCase, UNICODE_FILENAMES, closed_tempfile |
| | from h5py._hl.files import direct_vfd |
| | from h5py import File |
| | import h5py |
| | from .. import h5 |
| | import pathlib |
| |
|
| |
|
| | class TestFileOpen(TestCase): |
| |
|
| | """ |
| | Feature: Opening files with Python-style modes. |
| | """ |
| |
|
| | def test_default(self): |
| | """ Default semantics in the presence or absence of a file """ |
| | fname = self.mktemp() |
| |
|
| | |
| | with pytest.raises(FileNotFoundError): |
| | with File(fname): |
| | pass |
| |
|
| | |
| | with File(fname, 'w'): |
| | pass |
| | os.chmod(fname, stat.S_IREAD) |
| | try: |
| | with File(fname) as f: |
| | self.assertTrue(f) |
| | self.assertEqual(f.mode, 'r') |
| | finally: |
| | os.chmod(fname, stat.S_IWRITE) |
| |
|
| | |
| | with open(fname, 'wb') as f: |
| | f.write(b'\x00') |
| | with self.assertRaises(OSError): |
| | File(fname) |
| |
|
| | def test_create(self): |
| | """ Mode 'w' opens file in overwrite mode """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w') |
| | self.assertTrue(fid) |
| | fid.create_group('foo') |
| | fid.close() |
| | fid = File(fname, 'w') |
| | self.assertNotIn('foo', fid) |
| | fid.close() |
| |
|
| | def test_create_exclusive(self): |
| | """ Mode 'w-' opens file in exclusive mode """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w-') |
| | self.assertTrue(fid) |
| | fid.close() |
| | with self.assertRaises(FileExistsError): |
| | File(fname, 'w-') |
| |
|
| | def test_append(self): |
| | """ Mode 'a' opens file in append/readwrite mode, creating if necessary """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'a') |
| | try: |
| | self.assertTrue(fid) |
| | fid.create_group('foo') |
| | assert 'foo' in fid |
| | finally: |
| | fid.close() |
| | fid = File(fname, 'a') |
| | try: |
| | assert 'foo' in fid |
| | fid.create_group('bar') |
| | assert 'bar' in fid |
| | finally: |
| | fid.close() |
| |
|
| | os.chmod(fname, stat.S_IREAD) |
| | try: |
| | with pytest.raises(PermissionError): |
| | File(fname, 'a') |
| | finally: |
| | |
| | os.chmod(fname, stat.S_IREAD | stat.S_IWRITE) |
| |
|
| | def test_readonly(self): |
| | """ Mode 'r' opens file in readonly mode """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w') |
| | fid.close() |
| | self.assertFalse(fid) |
| | fid = File(fname, 'r') |
| | self.assertTrue(fid) |
| | with self.assertRaises(ValueError): |
| | fid.create_group('foo') |
| | fid.close() |
| |
|
| | def test_readwrite(self): |
| | """ Mode 'r+' opens existing file in readwrite mode """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w') |
| | fid.create_group('foo') |
| | fid.close() |
| | fid = File(fname, 'r+') |
| | assert 'foo' in fid |
| | fid.create_group('bar') |
| | assert 'bar' in fid |
| | fid.close() |
| |
|
| | def test_nonexistent_file(self): |
| | """ Modes 'r' and 'r+' do not create files """ |
| | fname = self.mktemp() |
| | with self.assertRaises(FileNotFoundError): |
| | File(fname, 'r') |
| | with self.assertRaises(FileNotFoundError): |
| | File(fname, 'r+') |
| |
|
| | def test_invalid_mode(self): |
| | """ Invalid modes raise ValueError """ |
| | with self.assertRaises(ValueError): |
| | File(self.mktemp(), 'mongoose') |
| |
|
| |
|
| | @ut.skipIf(h5py.version.hdf5_version_tuple < (1, 10, 1), |
| | 'Requires HDF5 1.10.1 or later') |
| | class TestSpaceStrategy(TestCase): |
| |
|
| | """ |
| | Feature: Create file with specified file space strategy |
| | """ |
| |
|
| | def test_create_with_space_strategy(self): |
| | """ Create file with file space strategy """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w', fs_strategy="page", |
| | fs_persist=True, fs_threshold=100) |
| | self.assertTrue(fid) |
| | |
| | with self.assertRaises(ValueError): |
| | File(fname, 'a', fs_strategy="page") |
| | |
| | with self.assertRaises(ValueError): |
| | File(self.mktemp(), 'w', fs_strategy="invalid") |
| |
|
| | dset = fid.create_dataset('foo', (100,), dtype='uint8') |
| | dset[...] = 1 |
| | dset = fid.create_dataset('bar', (100,), dtype='uint8') |
| | dset[...] = 1 |
| | del fid['foo'] |
| | fid.close() |
| |
|
| | fid = File(fname, 'a') |
| | plist = fid.id.get_create_plist() |
| | fs_strat = plist.get_file_space_strategy() |
| | assert(fs_strat[0] == 1) |
| | assert(fs_strat[1] == True) |
| | assert(fs_strat[2] == 100) |
| |
|
| | dset = fid.create_dataset('foo2', (100,), dtype='uint8') |
| | dset[...] = 1 |
| | fid.close() |
| |
|
| |
|
| | @ut.skipIf(h5py.version.hdf5_version_tuple < (1, 10, 1), |
| | 'Requires HDF5 1.10.1 or later') |
| | @pytest.mark.mpi_skip |
| | class TestPageBuffering(TestCase): |
| | """ |
| | Feature: Use page buffering |
| | """ |
| |
|
| | def test_only_with_page_strategy(self): |
| | """Allow page buffering only with fs_strategy="page". |
| | """ |
| | fname = self.mktemp() |
| | with File(fname, mode='w', fs_strategy='page', page_buf_size=16*1024): |
| | pass |
| | with self.assertRaises(OSError): |
| | File(fname, mode='w', page_buf_size=16*1024) |
| | with self.assertRaises(OSError): |
| | File(fname, mode='w', fs_strategy='fsm', page_buf_size=16*1024) |
| | with self.assertRaises(OSError): |
| | File(fname, mode='w', fs_strategy='aggregate', page_buf_size=16*1024) |
| |
|
| | def test_check_page_buf_size(self): |
| | """Verify set page buffer size, and minimum meta and raw eviction criteria.""" |
| | fname = self.mktemp() |
| | pbs = 16 * 1024 |
| | mm = 19 |
| | mr = 67 |
| | with File(fname, mode='w', fs_strategy='page', |
| | page_buf_size=pbs, min_meta_keep=mm, min_raw_keep=mr) as f: |
| | fapl = f.id.get_access_plist() |
| | self.assertEqual(fapl.get_page_buffer_size(), (pbs, mm, mr)) |
| |
|
| | def test_too_small_pbs(self): |
| | """Page buffer size must be greater than file space page size.""" |
| | fname = self.mktemp() |
| | fsp = 16 * 1024 |
| | with File(fname, mode='w', fs_strategy='page', fs_page_size=fsp): |
| | pass |
| | with self.assertRaises(OSError): |
| | File(fname, mode="r", page_buf_size=fsp-1) |
| |
|
| | def test_actual_pbs(self): |
| | """Verify actual page buffer size.""" |
| | fname = self.mktemp() |
| | fsp = 16 * 1024 |
| | pbs = 2 * fsp |
| | with File(fname, mode='w', fs_strategy='page', fs_page_size=fsp): |
| | pass |
| | with File(fname, mode='r', page_buf_size=pbs-1) as f: |
| | fapl = f.id.get_access_plist() |
| | self.assertEqual(fapl.get_page_buffer_size()[0], fsp) |
| |
|
| |
|
| | class TestModes(TestCase): |
| |
|
| | """ |
| | Feature: File mode can be retrieved via file.mode |
| | """ |
| |
|
| | def test_mode_attr(self): |
| | """ Mode equivalent can be retrieved via property """ |
| | fname = self.mktemp() |
| | with File(fname, 'w') as f: |
| | self.assertEqual(f.mode, 'r+') |
| | with File(fname, 'r') as f: |
| | self.assertEqual(f.mode, 'r') |
| |
|
| | def test_mode_external(self): |
| | """ Mode property works for files opened via external links |
| | |
| | Issue 190. |
| | """ |
| | fname1 = self.mktemp() |
| | fname2 = self.mktemp() |
| |
|
| | f1 = File(fname1, 'w') |
| | f1.close() |
| |
|
| | f2 = File(fname2, 'w') |
| | try: |
| | f2['External'] = h5py.ExternalLink(fname1, '/') |
| | f3 = f2['External'].file |
| | self.assertEqual(f3.mode, 'r+') |
| | finally: |
| | f2.close() |
| | f3.close() |
| |
|
| | f2 = File(fname2, 'r') |
| | try: |
| | f3 = f2['External'].file |
| | self.assertEqual(f3.mode, 'r') |
| | finally: |
| | f2.close() |
| | f3.close() |
| |
|
| |
|
| | class TestDrivers(TestCase): |
| |
|
| | """ |
| | Feature: Files can be opened with low-level HDF5 drivers. Does not |
| | include MPI drivers (see bottom). |
| | """ |
| |
|
| | @ut.skipUnless(os.name == 'posix', "Stdio driver is supported on posix") |
| | def test_stdio(self): |
| | """ Stdio driver is supported on posix """ |
| | fid = File(self.mktemp(), 'w', driver='stdio') |
| | self.assertTrue(fid) |
| | self.assertEqual(fid.driver, 'stdio') |
| | fid.close() |
| |
|
| | |
| | fid = File(self.mktemp(), 'a', driver='stdio') |
| | self.assertTrue(fid) |
| | self.assertEqual(fid.driver, 'stdio') |
| | fid.close() |
| |
|
| | @ut.skipUnless(direct_vfd, |
| | "DIRECT driver is supported on Linux if hdf5 is " |
| | "built with the appriorate flags.") |
| | def test_direct(self): |
| | """ DIRECT driver is supported on Linux""" |
| | fid = File(self.mktemp(), 'w', driver='direct') |
| | self.assertTrue(fid) |
| | self.assertEqual(fid.driver, 'direct') |
| | default_fapl = fid.id.get_access_plist().get_fapl_direct() |
| | fid.close() |
| |
|
| | |
| | fid = File(self.mktemp(), 'a', driver='direct') |
| | self.assertTrue(fid) |
| | self.assertEqual(fid.driver, 'direct') |
| | fid.close() |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | |
| | for alignment, block_size, cbuf_size in [ |
| | default_fapl, |
| | (default_fapl[0], default_fapl[1], 3 * default_fapl[1]), |
| | (default_fapl[0] * 2, default_fapl[1], 3 * default_fapl[1]), |
| | (default_fapl[0], 2 * default_fapl[1], 6 * default_fapl[1]), |
| | ]: |
| | with File(self.mktemp(), 'w', driver='direct', |
| | alignment=alignment, |
| | block_size=block_size, |
| | cbuf_size=cbuf_size) as fid: |
| | actual_fapl = fid.id.get_access_plist().get_fapl_direct() |
| | actual_alignment = actual_fapl[0] |
| | actual_block_size = actual_fapl[1] |
| | actual_cbuf_size = actual_fapl[2] |
| | assert actual_alignment == alignment |
| | assert actual_block_size == block_size |
| | assert actual_cbuf_size == actual_cbuf_size |
| |
|
| | @ut.skipUnless(os.name == 'posix', "Sec2 driver is supported on posix") |
| | def test_sec2(self): |
| | """ Sec2 driver is supported on posix """ |
| | fid = File(self.mktemp(), 'w', driver='sec2') |
| | self.assertTrue(fid) |
| | self.assertEqual(fid.driver, 'sec2') |
| | fid.close() |
| |
|
| | |
| | fid = File(self.mktemp(), 'a', driver='sec2') |
| | self.assertTrue(fid) |
| | self.assertEqual(fid.driver, 'sec2') |
| | fid.close() |
| |
|
| | def test_core(self): |
| | """ Core driver is supported (no backing store) """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w', driver='core', backing_store=False) |
| | self.assertTrue(fid) |
| | self.assertEqual(fid.driver, 'core') |
| | fid.close() |
| | self.assertFalse(os.path.exists(fname)) |
| |
|
| | |
| | fid = File(self.mktemp(), 'a', driver='core') |
| | self.assertTrue(fid) |
| | self.assertEqual(fid.driver, 'core') |
| | fid.close() |
| |
|
| | def test_backing(self): |
| | """ Core driver saves to file when backing store used """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w', driver='core', backing_store=True) |
| | fid.create_group('foo') |
| | fid.close() |
| | fid = File(fname, 'r') |
| | assert 'foo' in fid |
| | fid.close() |
| | |
| | with self.assertRaises(TypeError): |
| | File(fname, 'w', backing_store=True) |
| |
|
| | def test_readonly(self): |
| | """ Core driver can be used to open existing files """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w') |
| | fid.create_group('foo') |
| | fid.close() |
| | fid = File(fname, 'r', driver='core') |
| | self.assertTrue(fid) |
| | assert 'foo' in fid |
| | with self.assertRaises(ValueError): |
| | fid.create_group('bar') |
| | fid.close() |
| |
|
| | def test_blocksize(self): |
| | """ Core driver supports variable block size """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w', driver='core', block_size=1024, |
| | backing_store=False) |
| | self.assertTrue(fid) |
| | fid.close() |
| |
|
| | def test_split(self): |
| | """ Split stores metadata in a separate file """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w', driver='split') |
| | fid.close() |
| | self.assertTrue(os.path.exists(fname + '-m.h5')) |
| | fid = File(fname, 'r', driver='split') |
| | self.assertTrue(fid) |
| | fid.close() |
| |
|
| | def test_fileobj(self): |
| | """ Python file object driver is supported """ |
| | tf = tempfile.TemporaryFile() |
| | fid = File(tf, 'w', driver='fileobj') |
| | self.assertTrue(fid) |
| | self.assertEqual(fid.driver, 'fileobj') |
| | fid.close() |
| | |
| | with self.assertRaises(ValueError): |
| | File(tf, 'w', driver='core') |
| |
|
| | |
| |
|
| |
|
| | @ut.skipUnless(h5py.version.hdf5_version_tuple < (1, 10, 2), |
| | 'Requires HDF5 before 1.10.2') |
| | class TestLibver(TestCase): |
| |
|
| | """ |
| | Feature: File format compatibility bounds can be specified when |
| | opening a file. |
| | """ |
| |
|
| | def test_default(self): |
| | """ Opening with no libver arg """ |
| | f = File(self.mktemp(), 'w') |
| | self.assertEqual(f.libver, ('earliest', 'latest')) |
| | f.close() |
| |
|
| | def test_single(self): |
| | """ Opening with single libver arg """ |
| | f = File(self.mktemp(), 'w', libver='latest') |
| | self.assertEqual(f.libver, ('latest', 'latest')) |
| | f.close() |
| |
|
| | def test_multiple(self): |
| | """ Opening with two libver args """ |
| | f = File(self.mktemp(), 'w', libver=('earliest', 'latest')) |
| | self.assertEqual(f.libver, ('earliest', 'latest')) |
| | f.close() |
| |
|
| | def test_none(self): |
| | """ Omitting libver arg results in maximum compatibility """ |
| | f = File(self.mktemp(), 'w') |
| | self.assertEqual(f.libver, ('earliest', 'latest')) |
| | f.close() |
| |
|
| |
|
| | @ut.skipIf(h5py.version.hdf5_version_tuple < (1, 10, 2), |
| | 'Requires HDF5 1.10.2 or later') |
| | class TestNewLibver(TestCase): |
| |
|
| | """ |
| | Feature: File format compatibility bounds can be specified when |
| | opening a file. |
| | |
| | Requirement: HDF5 1.10.2 or later |
| | """ |
| |
|
| | @classmethod |
| | def setUpClass(cls): |
| | super().setUpClass() |
| |
|
| | |
| | if h5py.version.hdf5_version_tuple < (1, 11, 4): |
| | cls.latest = 'v110' |
| | elif h5py.version.hdf5_version_tuple < (1, 13, 0): |
| | cls.latest = 'v112' |
| | else: |
| | cls.latest = 'v114' |
| |
|
| | def test_default(self): |
| | """ Opening with no libver arg """ |
| | f = File(self.mktemp(), 'w') |
| | self.assertEqual(f.libver, ('earliest', self.latest)) |
| | f.close() |
| |
|
| | def test_single(self): |
| | """ Opening with single libver arg """ |
| | f = File(self.mktemp(), 'w', libver='latest') |
| | self.assertEqual(f.libver, (self.latest, self.latest)) |
| | f.close() |
| |
|
| | def test_single_v108(self): |
| | """ Opening with "v108" libver arg """ |
| | f = File(self.mktemp(), 'w', libver='v108') |
| | self.assertEqual(f.libver, ('v108', self.latest)) |
| | f.close() |
| |
|
| | def test_single_v110(self): |
| | """ Opening with "v110" libver arg """ |
| | f = File(self.mktemp(), 'w', libver='v110') |
| | self.assertEqual(f.libver, ('v110', self.latest)) |
| | f.close() |
| |
|
| | @ut.skipIf(h5py.version.hdf5_version_tuple < (1, 11, 4), |
| | 'Requires HDF5 1.11.4 or later') |
| | def test_single_v112(self): |
| | """ Opening with "v112" libver arg """ |
| | f = File(self.mktemp(), 'w', libver='v112') |
| | self.assertEqual(f.libver, ('v112', self.latest)) |
| | f.close() |
| |
|
| | def test_multiple(self): |
| | """ Opening with two libver args """ |
| | f = File(self.mktemp(), 'w', libver=('earliest', 'v108')) |
| | self.assertEqual(f.libver, ('earliest', 'v108')) |
| | f.close() |
| |
|
| | def test_none(self): |
| | """ Omitting libver arg results in maximum compatibility """ |
| | f = File(self.mktemp(), 'w') |
| | self.assertEqual(f.libver, ('earliest', self.latest)) |
| | f.close() |
| |
|
| |
|
| | class TestUserblock(TestCase): |
| |
|
| | """ |
| | Feature: Files can be create with user blocks |
| | """ |
| |
|
| | def test_create_blocksize(self): |
| | """ User blocks created with w, w-, x and properties work correctly """ |
| | f = File(self.mktemp(), 'w-', userblock_size=512) |
| | try: |
| | self.assertEqual(f.userblock_size, 512) |
| | finally: |
| | f.close() |
| |
|
| | f = File(self.mktemp(), 'x', userblock_size=512) |
| | try: |
| | self.assertEqual(f.userblock_size, 512) |
| | finally: |
| | f.close() |
| |
|
| | f = File(self.mktemp(), 'w', userblock_size=512) |
| | try: |
| | self.assertEqual(f.userblock_size, 512) |
| | finally: |
| | f.close() |
| | |
| | with self.assertRaises(ValueError): |
| | File(self.mktemp(), 'w', userblock_size='non') |
| |
|
| | def test_write_only(self): |
| | """ User block only allowed for write """ |
| | name = self.mktemp() |
| | f = File(name, 'w') |
| | f.close() |
| |
|
| | with self.assertRaises(ValueError): |
| | f = h5py.File(name, 'r', userblock_size=512) |
| |
|
| | with self.assertRaises(ValueError): |
| | f = h5py.File(name, 'r+', userblock_size=512) |
| |
|
| | def test_match_existing(self): |
| | """ User block size must match that of file when opening for append """ |
| | name = self.mktemp() |
| | f = File(name, 'w', userblock_size=512) |
| | f.close() |
| |
|
| | with self.assertRaises(ValueError): |
| | f = File(name, 'a', userblock_size=1024) |
| |
|
| | f = File(name, 'a', userblock_size=512) |
| | try: |
| | self.assertEqual(f.userblock_size, 512) |
| | finally: |
| | f.close() |
| |
|
| | def test_power_of_two(self): |
| | """ User block size must be a power of 2 and at least 512 """ |
| | name = self.mktemp() |
| |
|
| | with self.assertRaises(ValueError): |
| | f = File(name, 'w', userblock_size=128) |
| |
|
| | with self.assertRaises(ValueError): |
| | f = File(name, 'w', userblock_size=513) |
| |
|
| | with self.assertRaises(ValueError): |
| | f = File(name, 'w', userblock_size=1023) |
| |
|
| | def test_write_block(self): |
| | """ Test that writing to a user block does not destroy the file """ |
| | name = self.mktemp() |
| |
|
| | f = File(name, 'w', userblock_size=512) |
| | f.create_group("Foobar") |
| | f.close() |
| |
|
| | pyfile = open(name, 'r+b') |
| | try: |
| | pyfile.write(b'X' * 512) |
| | finally: |
| | pyfile.close() |
| |
|
| | f = h5py.File(name, 'r') |
| | try: |
| | assert "Foobar" in f |
| | finally: |
| | f.close() |
| |
|
| | pyfile = open(name, 'rb') |
| | try: |
| | self.assertEqual(pyfile.read(512), b'X' * 512) |
| | finally: |
| | pyfile.close() |
| |
|
| |
|
| | class TestContextManager(TestCase): |
| |
|
| | """ |
| | Feature: File objects can be used as context managers |
| | """ |
| |
|
| | def test_context_manager(self): |
| | """ File objects can be used in with statements """ |
| | with File(self.mktemp(), 'w') as fid: |
| | self.assertTrue(fid) |
| | self.assertTrue(not fid) |
| |
|
| |
|
| | @ut.skipIf(not UNICODE_FILENAMES, "Filesystem unicode support required") |
| | class TestUnicode(TestCase): |
| |
|
| | """ |
| | Feature: Unicode filenames are supported |
| | """ |
| |
|
| | def test_unicode(self): |
| | """ Unicode filenames can be used, and retrieved properly via .filename |
| | """ |
| | fname = self.mktemp(prefix=chr(0x201a)) |
| | fid = File(fname, 'w') |
| | try: |
| | self.assertEqual(fid.filename, fname) |
| | self.assertIsInstance(fid.filename, str) |
| | finally: |
| | fid.close() |
| |
|
| | def test_unicode_hdf5_python_consistent(self): |
| | """ Unicode filenames can be used, and seen correctly from python |
| | """ |
| | fname = self.mktemp(prefix=chr(0x201a)) |
| | with File(fname, 'w') as f: |
| | self.assertTrue(os.path.exists(fname)) |
| |
|
| | def test_nonexistent_file_unicode(self): |
| | """ |
| | Modes 'r' and 'r+' do not create files even when given unicode names |
| | """ |
| | fname = self.mktemp(prefix=chr(0x201a)) |
| | with self.assertRaises(IOError): |
| | File(fname, 'r') |
| | with self.assertRaises(IOError): |
| | File(fname, 'r+') |
| |
|
| |
|
| | class TestFileProperty(TestCase): |
| |
|
| | """ |
| | Feature: A File object can be retrieved from any child object, |
| | via the .file property |
| | """ |
| |
|
| | def test_property(self): |
| | """ File object can be retrieved from subgroup """ |
| | fname = self.mktemp() |
| | hfile = File(fname, 'w') |
| | try: |
| | hfile2 = hfile['/'].file |
| | self.assertEqual(hfile, hfile2) |
| | finally: |
| | hfile.close() |
| |
|
| | def test_close(self): |
| | """ All retrieved File objects are closed at the same time """ |
| | fname = self.mktemp() |
| | hfile = File(fname, 'w') |
| | grp = hfile.create_group('foo') |
| | hfile2 = grp.file |
| | hfile3 = hfile['/'].file |
| | hfile2.close() |
| | self.assertFalse(hfile) |
| | self.assertFalse(hfile2) |
| | self.assertFalse(hfile3) |
| |
|
| | def test_mode(self): |
| | """ Retrieved File objects have a meaningful mode attribute """ |
| | hfile = File(self.mktemp(), 'w') |
| | try: |
| | grp = hfile.create_group('foo') |
| | self.assertEqual(grp.file.mode, hfile.mode) |
| | finally: |
| | hfile.close() |
| |
|
| |
|
| | class TestClose(TestCase): |
| |
|
| | """ |
| | Feature: Files can be closed |
| | """ |
| |
|
| | def test_close(self): |
| | """ Close file via .close method """ |
| | fid = File(self.mktemp(), 'w') |
| | self.assertTrue(fid) |
| | fid.close() |
| | self.assertFalse(fid) |
| |
|
| | def test_closed_file(self): |
| | """ Trying to modify closed file raises ValueError """ |
| | fid = File(self.mktemp(), 'w') |
| | fid.close() |
| | with self.assertRaises(ValueError): |
| | fid.create_group('foo') |
| |
|
| | def test_close_multiple_default_driver(self): |
| | fname = self.mktemp() |
| | f = h5py.File(fname, 'w') |
| | f.create_group("test") |
| | f.close() |
| | f.close() |
| |
|
| |
|
| | class TestFlush(TestCase): |
| |
|
| | """ |
| | Feature: Files can be flushed |
| | """ |
| |
|
| | def test_flush(self): |
| | """ Flush via .flush method """ |
| | fid = File(self.mktemp(), 'w') |
| | fid.flush() |
| | fid.close() |
| |
|
| |
|
| | class TestRepr(TestCase): |
| |
|
| | """ |
| | Feature: File objects provide a helpful __repr__ string |
| | """ |
| |
|
| | def test_repr(self): |
| | """ __repr__ behaves itself when files are open and closed """ |
| | fid = File(self.mktemp(), 'w') |
| | self.assertIsInstance(repr(fid), str) |
| | fid.close() |
| | self.assertIsInstance(repr(fid), str) |
| |
|
| |
|
| | class TestFilename(TestCase): |
| |
|
| | """ |
| | Feature: The name of a File object can be retrieved via .filename |
| | """ |
| |
|
| | def test_filename(self): |
| | """ .filename behaves properly for string data """ |
| | fname = self.mktemp() |
| | fid = File(fname, 'w') |
| | try: |
| | self.assertEqual(fid.filename, fname) |
| | self.assertIsInstance(fid.filename, str) |
| | finally: |
| | fid.close() |
| |
|
| |
|
| | class TestCloseInvalidatesOpenObjectIDs(TestCase): |
| |
|
| | """ |
| | Ensure that closing a file invalidates object IDs, as appropriate |
| | """ |
| |
|
| | def test_close(self): |
| | """ Closing a file invalidates any of the file's open objects """ |
| | with File(self.mktemp(), 'w') as f1: |
| | g1 = f1.create_group('foo') |
| | self.assertTrue(bool(f1.id)) |
| | self.assertTrue(bool(g1.id)) |
| | f1.close() |
| | self.assertFalse(bool(f1.id)) |
| | self.assertFalse(bool(g1.id)) |
| | with File(self.mktemp(), 'w') as f2: |
| | g2 = f2.create_group('foo') |
| | self.assertTrue(bool(f2.id)) |
| | self.assertTrue(bool(g2.id)) |
| | self.assertFalse(bool(f1.id)) |
| | self.assertFalse(bool(g1.id)) |
| |
|
| | def test_close_one_handle(self): |
| | fname = self.mktemp() |
| | with File(fname, 'w') as f: |
| | f.create_group('foo') |
| |
|
| | f1 = File(fname) |
| | f2 = File(fname) |
| | g1 = f1['foo'] |
| | g2 = f2['foo'] |
| | assert g1.id.valid |
| | assert g2.id.valid |
| | f1.close() |
| | assert not g1.id.valid |
| | |
| | assert f2.id.valid |
| | assert g2.id.valid |
| |
|
| | f2.close() |
| | assert not f2.id.valid |
| | assert not g2.id.valid |
| |
|
| |
|
| | class TestPathlibSupport(TestCase): |
| |
|
| | """ |
| | Check that h5py doesn't break on pathlib |
| | """ |
| | def test_pathlib_accepted_file(self): |
| | """ Check that pathlib is accepted by h5py.File """ |
| | with closed_tempfile() as f: |
| | path = pathlib.Path(f) |
| | with File(path, 'w') as f2: |
| | self.assertTrue(True) |
| |
|
| | def test_pathlib_name_match(self): |
| | """ Check that using pathlib does not affect naming """ |
| | with closed_tempfile() as f: |
| | path = pathlib.Path(f) |
| | with File(path, 'w') as h5f1: |
| | pathlib_name = h5f1.filename |
| | with File(f, 'w') as h5f2: |
| | normal_name = h5f2.filename |
| | self.assertEqual(pathlib_name, normal_name) |
| |
|
| |
|
| | class TestPickle(TestCase): |
| | """Check that h5py.File can't be pickled""" |
| | def test_dump_error(self): |
| | with File(self.mktemp(), 'w') as f1: |
| | with self.assertRaises(TypeError): |
| | pickle.dumps(f1) |
| |
|
| |
|
| | |
| | |
| | @pytest.mark.mpi |
| | class TestMPI: |
| | def test_mpio(self, mpi_file_name): |
| | """ MPIO driver and options """ |
| | from mpi4py import MPI |
| |
|
| | with File(mpi_file_name, 'w', driver='mpio', comm=MPI.COMM_WORLD) as f: |
| | assert f |
| | assert f.driver == 'mpio' |
| |
|
| | def test_mpio_append(self, mpi_file_name): |
| | """ Testing creation of file with append """ |
| | from mpi4py import MPI |
| |
|
| | with File(mpi_file_name, 'a', driver='mpio', comm=MPI.COMM_WORLD) as f: |
| | assert f |
| | assert f.driver == 'mpio' |
| |
|
| | @pytest.mark.skipif(h5py.version.hdf5_version_tuple < (1, 8, 9), |
| | reason="mpio atomic file operations were added in HDF5 1.8.9+") |
| | def test_mpi_atomic(self, mpi_file_name): |
| | """ Enable atomic mode for MPIO driver """ |
| | from mpi4py import MPI |
| |
|
| | with File(mpi_file_name, 'w', driver='mpio', comm=MPI.COMM_WORLD) as f: |
| | assert not f.atomic |
| | f.atomic = True |
| | assert f.atomic |
| |
|
| | def test_close_multiple_mpio_driver(self, mpi_file_name): |
| | """ MPIO driver and options """ |
| | from mpi4py import MPI |
| |
|
| | f = File(mpi_file_name, 'w', driver='mpio', comm=MPI.COMM_WORLD) |
| | f.create_group("test") |
| | f.close() |
| | f.close() |
| |
|
| |
|
| | @ut.skipIf(h5py.version.hdf5_version_tuple < (1, 10, 1), |
| | 'Requires HDF5 1.10.1 or later') |
| | class TestSWMRMode(TestCase): |
| |
|
| | """ |
| | Feature: Create file that switches on SWMR mode |
| | """ |
| |
|
| | def test_file_mode_generalizes(self): |
| | fname = self.mktemp() |
| | fid = File(fname, 'w', libver='latest') |
| | g = fid.create_group('foo') |
| | |
| | assert fid.mode == g.file.mode == 'r+' |
| | fid.swmr_mode = True |
| | |
| | |
| | assert fid.mode == g.file.mode == 'r+' |
| | fid.close() |
| |
|
| | def test_swmr_mode_consistency(self): |
| | fname = self.mktemp() |
| | fid = File(fname, 'w', libver='latest') |
| | g = fid.create_group('foo') |
| | assert fid.swmr_mode == g.file.swmr_mode == False |
| | fid.swmr_mode = True |
| | |
| | assert fid.swmr_mode == g.file.swmr_mode == True |
| | fid.close() |
| |
|
| |
|
| | @pytest.mark.skipif( |
| | h5py.version.hdf5_version_tuple < (1, 12, 1) and ( |
| | h5py.version.hdf5_version_tuple[:2] != (1, 10) or h5py.version.hdf5_version_tuple[2] < 7), |
| | reason="Requires HDF5 >= 1.12.1 or 1.10.x >= 1.10.7") |
| | @pytest.mark.skipif("HDF5_USE_FILE_LOCKING" in os.environ, |
| | reason="HDF5_USE_FILE_LOCKING env. var. is set") |
| | class TestFileLocking: |
| | """Test h5py.File file locking option""" |
| |
|
| | def test_reopen(self, tmp_path): |
| | """Test file locking when opening twice the same file""" |
| | fname = tmp_path / "test.h5" |
| |
|
| | with h5py.File(fname, mode="w", locking=True) as f: |
| | f.flush() |
| |
|
| | |
| | with pytest.raises(OSError): |
| | with h5py.File(fname, mode="r", locking=False) as h5f_read: |
| | pass |
| |
|
| | with h5py.File(fname, mode="r", locking=True) as h5f_read: |
| | pass |
| |
|
| | with h5py.File(fname, mode="r", locking='best-effort') as h5f_read: |
| | pass |
| |
|
| | def test_unsupported_locking(self, tmp_path): |
| | """Test with erroneous file locking value""" |
| | fname = tmp_path / "test.h5" |
| | with pytest.raises(ValueError): |
| | with h5py.File(fname, mode="r", locking='unsupported-value') as h5f_read: |
| | pass |
| |
|
| | def test_multiprocess(self, tmp_path): |
| | """Test file locking option from different concurrent processes""" |
| | fname = tmp_path / "test.h5" |
| |
|
| | def open_in_subprocess(filename, mode, locking): |
| | """Open HDF5 file in a subprocess and return True on success""" |
| | h5py_import_dir = str(pathlib.Path(h5py.__file__).parent.parent) |
| |
|
| | process = subprocess.run( |
| | [ |
| | sys.executable, |
| | "-c", |
| | f""" |
| | import sys |
| | sys.path.insert(0, {h5py_import_dir!r}) |
| | import h5py |
| | f = h5py.File({str(filename)!r}, mode={mode!r}, locking={locking}) |
| | """, |
| | ], |
| | capture_output=True) |
| | return process.returncode == 0 and not process.stderr |
| |
|
| | |
| | with h5py.File(fname, mode="w", locking=True) as f: |
| | f["data"] = 1 |
| |
|
| | with h5py.File(fname, mode="r", locking=False) as f: |
| | |
| | assert open_in_subprocess(fname, mode="w", locking=True) |
| |
|
| |
|
| | def test_close_gc(writable_file): |
| | |
| | for i in range(100): |
| | writable_file[str(i)] = [] |
| |
|
| | filename = writable_file.filename |
| | writable_file.close() |
| |
|
| | |
| | |
| | |
| | for i in range(10): |
| | with h5py.File(filename, 'r') as f: |
| | refs = [d.id for d in f.values()] |
| | refs.append(refs) |
| | del refs |
| |
|