File size: 5,750 Bytes
1f5470c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# This file is part of h5py, a Python interface to the HDF5 library.
#
# http://www.h5py.org
#
# Copyright 2008-2013 Andrew Collette and contributors
#
# License:  Standard 3-clause BSD; see "license.txt" for full license terms
#           and contributor agreement.
import os
import signal
import tempfile
import threading
from unittest import SkipTest

import numpy as np
import time

import h5py
from h5py import _objects as o
from .common import TestCase


class TestObjects(TestCase):

    def test_invalid(self):
        # Check for segfault on close
        oid = o.ObjectID(0)
        del oid
        oid = o.ObjectID(1)
        del oid

    def test_equality(self):
        # Identifier-based equality
        oid1 = o.ObjectID(42)
        oid2 = o.ObjectID(42)
        oid3 = o.ObjectID(43)

        self.assertEqual(oid1, oid2)
        self.assertNotEqual(oid1, oid3)

    def test_hash(self):
        # Default objects are not hashable
        oid = o.ObjectID(42)
        with self.assertRaises(TypeError):
            hash(oid)

    def test_fork_with_threads(self):
        # Test that we do not deadlock after forking when the process
        # is using multiple threads to simultaneously perform h5py operations

        # On Windows forking (and the register_at_fork handler)
        # are not available, skip this test.
        if not hasattr(os, "fork"):
            raise SkipTest("os.fork not available")

        with tempfile.TemporaryDirectory() as tmpdir:
            fns = []
            for i in range(10):
                fn = os.path.join(tmpdir, f'test{i}.h5')
                with h5py.File(fn, 'w') as f:
                    f.create_dataset('values', data=np.random.rand(1000, 1000))
                fns.append(fn)

            def f(fn):
                with h5py.File(fn, 'r') as f:
                    for _ in range(100):
                        _ = f['values'][:]

            # create 10 threads, each reading from an HDF5 file
            threads = []
            for fn in fns:
                thread = threading.Thread(target=f, args=(fn,))
                thread.start()
                threads.append(thread)

            # While the threads are running (and potentially holding the phil Lock)
            # create 10 processes, each also reading from an HDF5 file
            worker2pid = {}
            for worker_id, fn in enumerate(fns):
                pid = os.fork()
                if pid == 0:
                    # child process
                    f(fn)
                    os._exit(0)
                else:
                    # parent process
                    worker2pid[worker_id] = pid

            # Wait for all child processes to finish
            start = time.time()
            timeout = 60.0
            while time.time() < start + timeout:
                for worker_id in list(worker2pid):
                    pid = worker2pid[worker_id]
                    waited_pid, status = os.waitpid(pid, os.WNOHANG)
                    if waited_pid == pid:
                        assert os.WIFEXITED(status)
                        assert os.WEXITSTATUS(status) == 0
                        del worker2pid[worker_id]
                # If all child processes exited we can stop looping, otherwise sleep and try again
                if not worker2pid:
                    break
                time.sleep(0.1)

            # Make sure all child processes finished successfully
            if len(worker2pid) > 0:
                # Some child processes did not finish because they could not acquire the phil lock,
                # make sure we clean up after us.
                for worker_id, pid in worker2pid.items():
                    # Kill the zombie child processes
                    os.kill(pid, signal.SIGKILL)
                    os.waitpid(pid, 0)

                assert False, "Some child processes did not finish and had to be killed"

            # Wait for all threads to finish
            for thread in threads:
                thread.join()

    def test_phil_fork_with_threads(self):
        # Test that handling of the phil Lock after fork is correct.
        # We simulate a deadlock in the forked process by explicitly
        # waiting for the phil Lock to be acquired in a different thread
        # before forking.

        # On Windows forking (and the register_at_fork handler)
        # are not available, skip this test.
        if not hasattr(os, "fork"):
            raise SkipTest("os.fork not available")

        thread_acquired_phil_event = threading.Event()

        def f():
            o.phil.acquire()
            try:
                thread_acquired_phil_event.set()
                time.sleep(1)
            finally:
                o.phil.release()

        thread = threading.Thread(target=f)
        thread.start()
        try:
            # wait for the thread running "f" to have acquired the phil lock
            thread_acquired_phil_event.wait()

            # now fork the current (main) thread while the other thread holds the lock
            pid = os.fork()
            if pid == 0:
                # child process
                # If we handle the phil lock correctly, this should not deadlock,
                # and we should be able to acquire the lock here.
                if o.phil.acquire(blocking=False):
                    o.phil.release()
                    os._exit(0)
                else:
                    os._exit(1)
            else:
                # parent process
                # wait for the child process to finish
                _, status = os.waitpid(pid, 0)
                assert os.WIFEXITED(status)
                assert os.WEXITSTATUS(status) == 0
        finally:
            thread.join()