|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import signal |
|
|
import tempfile |
|
|
import threading |
|
|
from unittest import SkipTest |
|
|
|
|
|
import numpy as np |
|
|
import time |
|
|
|
|
|
import h5py |
|
|
from h5py import _objects as o |
|
|
from .common import TestCase |
|
|
|
|
|
|
|
|
class TestObjects(TestCase): |
|
|
|
|
|
def test_invalid(self): |
|
|
|
|
|
oid = o.ObjectID(0) |
|
|
del oid |
|
|
oid = o.ObjectID(1) |
|
|
del oid |
|
|
|
|
|
def test_equality(self): |
|
|
|
|
|
oid1 = o.ObjectID(42) |
|
|
oid2 = o.ObjectID(42) |
|
|
oid3 = o.ObjectID(43) |
|
|
|
|
|
self.assertEqual(oid1, oid2) |
|
|
self.assertNotEqual(oid1, oid3) |
|
|
|
|
|
def test_hash(self): |
|
|
|
|
|
oid = o.ObjectID(42) |
|
|
with self.assertRaises(TypeError): |
|
|
hash(oid) |
|
|
|
|
|
def test_fork_with_threads(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not hasattr(os, "fork"): |
|
|
raise SkipTest("os.fork not available") |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir: |
|
|
fns = [] |
|
|
for i in range(10): |
|
|
fn = os.path.join(tmpdir, f'test{i}.h5') |
|
|
with h5py.File(fn, 'w') as f: |
|
|
f.create_dataset('values', data=np.random.rand(1000, 1000)) |
|
|
fns.append(fn) |
|
|
|
|
|
def f(fn): |
|
|
with h5py.File(fn, 'r') as f: |
|
|
for _ in range(100): |
|
|
_ = f['values'][:] |
|
|
|
|
|
|
|
|
threads = [] |
|
|
for fn in fns: |
|
|
thread = threading.Thread(target=f, args=(fn,)) |
|
|
thread.start() |
|
|
threads.append(thread) |
|
|
|
|
|
|
|
|
|
|
|
worker2pid = {} |
|
|
for worker_id, fn in enumerate(fns): |
|
|
pid = os.fork() |
|
|
if pid == 0: |
|
|
|
|
|
f(fn) |
|
|
os._exit(0) |
|
|
else: |
|
|
|
|
|
worker2pid[worker_id] = pid |
|
|
|
|
|
|
|
|
start = time.time() |
|
|
timeout = 60.0 |
|
|
while time.time() < start + timeout: |
|
|
for worker_id in list(worker2pid): |
|
|
pid = worker2pid[worker_id] |
|
|
waited_pid, status = os.waitpid(pid, os.WNOHANG) |
|
|
if waited_pid == pid: |
|
|
assert os.WIFEXITED(status) |
|
|
assert os.WEXITSTATUS(status) == 0 |
|
|
del worker2pid[worker_id] |
|
|
|
|
|
if not worker2pid: |
|
|
break |
|
|
time.sleep(0.1) |
|
|
|
|
|
|
|
|
if len(worker2pid) > 0: |
|
|
|
|
|
|
|
|
for worker_id, pid in worker2pid.items(): |
|
|
|
|
|
os.kill(pid, signal.SIGKILL) |
|
|
os.waitpid(pid, 0) |
|
|
|
|
|
assert False, "Some child processes did not finish and had to be killed" |
|
|
|
|
|
|
|
|
for thread in threads: |
|
|
thread.join() |
|
|
|
|
|
def test_phil_fork_with_threads(self): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not hasattr(os, "fork"): |
|
|
raise SkipTest("os.fork not available") |
|
|
|
|
|
thread_acquired_phil_event = threading.Event() |
|
|
|
|
|
def f(): |
|
|
o.phil.acquire() |
|
|
try: |
|
|
thread_acquired_phil_event.set() |
|
|
time.sleep(1) |
|
|
finally: |
|
|
o.phil.release() |
|
|
|
|
|
thread = threading.Thread(target=f) |
|
|
thread.start() |
|
|
try: |
|
|
|
|
|
thread_acquired_phil_event.wait() |
|
|
|
|
|
|
|
|
pid = os.fork() |
|
|
if pid == 0: |
|
|
|
|
|
|
|
|
|
|
|
if o.phil.acquire(blocking=False): |
|
|
o.phil.release() |
|
|
os._exit(0) |
|
|
else: |
|
|
os._exit(1) |
|
|
else: |
|
|
|
|
|
|
|
|
_, status = os.waitpid(pid, 0) |
|
|
assert os.WIFEXITED(status) |
|
|
assert os.WEXITSTATUS(status) == 0 |
|
|
finally: |
|
|
thread.join() |
|
|
|