File size: 3,984 Bytes
72c0672 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 | use pyo3::prelude::*;
use std::collections::VecDeque;
/// An simple iterator that can be instantiated with a specified length.
/// We use this with iterators that don't have a size_hint but we might
/// know its size. This is useful with progress bars for example.
pub struct MaybeSizedIterator<I> {
length: Option<usize>,
iter: I,
}
impl<I> MaybeSizedIterator<I>
where
I: Iterator,
{
pub fn new(iter: I, length: Option<usize>) -> Self {
Self { length, iter }
}
}
impl<I> Iterator for MaybeSizedIterator<I>
where
I: Iterator,
{
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
self.iter.next()
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.length.unwrap_or(0), None)
}
}
/// A buffered iterator that takes care of locking the GIL only when needed.
/// The `PyIterator` provided by PyO3 keeps a Python GIL token all along
/// and thus doesn't allow us to release the GIL to allow having other threads.
///
/// This iterator serves two purposes:
/// - First, as opposed to the `pyo3::PyIterator`, it is Send and can easily be parallelized
/// - Second, this let us release the GIL between two refills of the buffer, allowing other
/// Python threads to work
pub struct PyBufferedIterator<T, F> {
iter: Option<Py<PyAny>>,
converter: F,
buffer: VecDeque<PyResult<T>>,
size: usize,
}
impl<T, F, I> PyBufferedIterator<T, F>
where
F: Fn(Bound<'_, PyAny>) -> I,
I: IntoIterator<Item = PyResult<T>>,
{
/// Create a new PyBufferedIterator using the provided Python object.
/// This object must implement the Python Iterator Protocol, and an error will
/// be return if the contract is not respected.
///
/// The `converter` provides a way to convert each item in the iterator into
/// something that doesn't embed a 'py token and thus allows the GIL to be released
///
/// The `buffer_size` represents the number of items that we buffer before we
/// need to acquire the GIL again.
pub fn new(iter: &Bound<'_, PyAny>, converter: F, buffer_size: usize) -> PyResult<Self> {
let py = iter.py();
let iter: Py<PyAny> = unsafe {
Bound::from_borrowed_ptr_or_err(py, pyo3::ffi::PyObject_GetIter(iter.as_ptr()))?
.to_object(py)
};
Ok(Self {
iter: Some(iter),
converter,
buffer: VecDeque::with_capacity(buffer_size),
size: buffer_size,
})
}
/// Refill the buffer, and set `self.iter` as `None` if nothing more to get
fn refill(&mut self) -> PyResult<()> {
if self.iter.is_none() {
return Ok(());
}
Python::with_gil(|py| loop {
if self.buffer.len() >= self.size {
return Ok(());
}
match unsafe {
Bound::from_owned_ptr_or_opt(
py,
pyo3::ffi::PyIter_Next(self.iter.as_ref().unwrap().bind(py).as_ptr()),
)
} {
Some(obj) => self.buffer.extend((self.converter)(obj)),
None => {
if PyErr::occurred(py) {
return Err(PyErr::fetch(py));
} else {
self.iter = None;
}
}
};
if self.iter.is_none() {
return Ok(());
}
})
}
}
impl<T, F, I> Iterator for PyBufferedIterator<T, F>
where
F: Fn(Bound<'_, PyAny>) -> I,
I: IntoIterator<Item = PyResult<T>>,
{
type Item = PyResult<T>;
fn next(&mut self) -> Option<Self::Item> {
if !self.buffer.is_empty() {
self.buffer.pop_front()
} else if self.iter.is_some() {
if let Err(e) = self.refill() {
return Some(Err(e));
}
self.next()
} else {
None
}
}
}
|