File size: 3,984 Bytes
72c0672
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use pyo3::prelude::*;
use std::collections::VecDeque;

/// An simple iterator that can be instantiated with a specified length.
/// We use this with iterators that don't have a size_hint but we might
/// know its size. This is useful with progress bars for example.
pub struct MaybeSizedIterator<I> {
    length: Option<usize>,
    iter: I,
}

impl<I> MaybeSizedIterator<I>
where
    I: Iterator,
{
    pub fn new(iter: I, length: Option<usize>) -> Self {
        Self { length, iter }
    }
}

impl<I> Iterator for MaybeSizedIterator<I>
where
    I: Iterator,
{
    type Item = I::Item;

    fn next(&mut self) -> Option<Self::Item> {
        self.iter.next()
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (self.length.unwrap_or(0), None)
    }
}

/// A buffered iterator that takes care of locking the GIL only when needed.
/// The `PyIterator` provided by PyO3 keeps a Python GIL token all along
/// and thus doesn't allow us to release the GIL to allow having other threads.
///
/// This iterator serves two purposes:
///   - First, as opposed to the `pyo3::PyIterator`, it is Send and can easily be parallelized
///   - Second, this let us release the GIL between two refills of the buffer, allowing other
///     Python threads to work
pub struct PyBufferedIterator<T, F> {
    iter: Option<Py<PyAny>>,
    converter: F,
    buffer: VecDeque<PyResult<T>>,
    size: usize,
}

impl<T, F, I> PyBufferedIterator<T, F>
where
    F: Fn(Bound<'_, PyAny>) -> I,
    I: IntoIterator<Item = PyResult<T>>,
{
    /// Create a new PyBufferedIterator using the provided Python object.
    /// This object must implement the Python Iterator Protocol, and an error will
    /// be return if the contract is not respected.
    ///
    /// The `converter` provides a way to convert each item in the iterator into
    /// something that doesn't embed a 'py token and thus allows the GIL to be released
    ///
    /// The `buffer_size` represents the number of items that we buffer before we
    /// need to acquire the GIL again.
    pub fn new(iter: &Bound<'_, PyAny>, converter: F, buffer_size: usize) -> PyResult<Self> {
        let py = iter.py();
        let iter: Py<PyAny> = unsafe {
            Bound::from_borrowed_ptr_or_err(py, pyo3::ffi::PyObject_GetIter(iter.as_ptr()))?
                .to_object(py)
        };

        Ok(Self {
            iter: Some(iter),
            converter,
            buffer: VecDeque::with_capacity(buffer_size),
            size: buffer_size,
        })
    }

    /// Refill the buffer, and set `self.iter` as `None` if nothing more to get
    fn refill(&mut self) -> PyResult<()> {
        if self.iter.is_none() {
            return Ok(());
        }

        Python::with_gil(|py| loop {
            if self.buffer.len() >= self.size {
                return Ok(());
            }

            match unsafe {
                Bound::from_owned_ptr_or_opt(
                    py,
                    pyo3::ffi::PyIter_Next(self.iter.as_ref().unwrap().bind(py).as_ptr()),
                )
            } {
                Some(obj) => self.buffer.extend((self.converter)(obj)),
                None => {
                    if PyErr::occurred(py) {
                        return Err(PyErr::fetch(py));
                    } else {
                        self.iter = None;
                    }
                }
            };

            if self.iter.is_none() {
                return Ok(());
            }
        })
    }
}

impl<T, F, I> Iterator for PyBufferedIterator<T, F>
where
    F: Fn(Bound<'_, PyAny>) -> I,
    I: IntoIterator<Item = PyResult<T>>,
{
    type Item = PyResult<T>;

    fn next(&mut self) -> Option<Self::Item> {
        if !self.buffer.is_empty() {
            self.buffer.pop_front()
        } else if self.iter.is_some() {
            if let Err(e) = self.refill() {
                return Some(Err(e));
            }
            self.next()
        } else {
            None
        }
    }
}