Spaces:
Runtime error
Runtime error
| __all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') | |
| import collections | |
| import heapq | |
| from types import GenericAlias | |
| from . import locks | |
| from . import mixins | |
| class QueueEmpty(Exception): | |
| """Raised when Queue.get_nowait() is called on an empty Queue.""" | |
| pass | |
| class QueueFull(Exception): | |
| """Raised when the Queue.put_nowait() method is called on a full Queue.""" | |
| pass | |
| class Queue(mixins._LoopBoundMixin): | |
| """A queue, useful for coordinating producer and consumer coroutines. | |
| If maxsize is less than or equal to zero, the queue size is infinite. If it | |
| is an integer greater than 0, then "await put()" will block when the | |
| queue reaches maxsize, until an item is removed by get(). | |
| Unlike the standard library Queue, you can reliably know this Queue's size | |
| with qsize(), since your single-threaded asyncio application won't be | |
| interrupted between calling qsize() and doing an operation on the Queue. | |
| """ | |
| def __init__(self, maxsize=0): | |
| self._maxsize = maxsize | |
| # Futures. | |
| self._getters = collections.deque() | |
| # Futures. | |
| self._putters = collections.deque() | |
| self._unfinished_tasks = 0 | |
| self._finished = locks.Event() | |
| self._finished.set() | |
| self._init(maxsize) | |
| # These three are overridable in subclasses. | |
| def _init(self, maxsize): | |
| self._queue = collections.deque() | |
| def _get(self): | |
| return self._queue.popleft() | |
| def _put(self, item): | |
| self._queue.append(item) | |
| # End of the overridable methods. | |
| def _wakeup_next(self, waiters): | |
| # Wake up the next waiter (if any) that isn't cancelled. | |
| while waiters: | |
| waiter = waiters.popleft() | |
| if not waiter.done(): | |
| waiter.set_result(None) | |
| break | |
| def __repr__(self): | |
| return f'<{type(self).__name__} at {id(self):#x} {self._format()}>' | |
| def __str__(self): | |
| return f'<{type(self).__name__} {self._format()}>' | |
| __class_getitem__ = classmethod(GenericAlias) | |
| def _format(self): | |
| result = f'maxsize={self._maxsize!r}' | |
| if getattr(self, '_queue', None): | |
| result += f' _queue={list(self._queue)!r}' | |
| if self._getters: | |
| result += f' _getters[{len(self._getters)}]' | |
| if self._putters: | |
| result += f' _putters[{len(self._putters)}]' | |
| if self._unfinished_tasks: | |
| result += f' tasks={self._unfinished_tasks}' | |
| return result | |
| def qsize(self): | |
| """Number of items in the queue.""" | |
| return len(self._queue) | |
| def maxsize(self): | |
| """Number of items allowed in the queue.""" | |
| return self._maxsize | |
| def empty(self): | |
| """Return True if the queue is empty, False otherwise.""" | |
| return not self._queue | |
| def full(self): | |
| """Return True if there are maxsize items in the queue. | |
| Note: if the Queue was initialized with maxsize=0 (the default), | |
| then full() is never True. | |
| """ | |
| if self._maxsize <= 0: | |
| return False | |
| else: | |
| return self.qsize() >= self._maxsize | |
| async def put(self, item): | |
| """Put an item into the queue. | |
| Put an item into the queue. If the queue is full, wait until a free | |
| slot is available before adding item. | |
| """ | |
| while self.full(): | |
| putter = self._get_loop().create_future() | |
| self._putters.append(putter) | |
| try: | |
| await putter | |
| except: | |
| putter.cancel() # Just in case putter is not done yet. | |
| try: | |
| # Clean self._putters from canceled putters. | |
| self._putters.remove(putter) | |
| except ValueError: | |
| # The putter could be removed from self._putters by a | |
| # previous get_nowait call. | |
| pass | |
| if not self.full() and not putter.cancelled(): | |
| # We were woken up by get_nowait(), but can't take | |
| # the call. Wake up the next in line. | |
| self._wakeup_next(self._putters) | |
| raise | |
| return self.put_nowait(item) | |
| def put_nowait(self, item): | |
| """Put an item into the queue without blocking. | |
| If no free slot is immediately available, raise QueueFull. | |
| """ | |
| if self.full(): | |
| raise QueueFull | |
| self._put(item) | |
| self._unfinished_tasks += 1 | |
| self._finished.clear() | |
| self._wakeup_next(self._getters) | |
| async def get(self): | |
| """Remove and return an item from the queue. | |
| If queue is empty, wait until an item is available. | |
| """ | |
| while self.empty(): | |
| getter = self._get_loop().create_future() | |
| self._getters.append(getter) | |
| try: | |
| await getter | |
| except: | |
| getter.cancel() # Just in case getter is not done yet. | |
| try: | |
| # Clean self._getters from canceled getters. | |
| self._getters.remove(getter) | |
| except ValueError: | |
| # The getter could be removed from self._getters by a | |
| # previous put_nowait call. | |
| pass | |
| if not self.empty() and not getter.cancelled(): | |
| # We were woken up by put_nowait(), but can't take | |
| # the call. Wake up the next in line. | |
| self._wakeup_next(self._getters) | |
| raise | |
| return self.get_nowait() | |
| def get_nowait(self): | |
| """Remove and return an item from the queue. | |
| Return an item if one is immediately available, else raise QueueEmpty. | |
| """ | |
| if self.empty(): | |
| raise QueueEmpty | |
| item = self._get() | |
| self._wakeup_next(self._putters) | |
| return item | |
| def task_done(self): | |
| """Indicate that a formerly enqueued task is complete. | |
| Used by queue consumers. For each get() used to fetch a task, | |
| a subsequent call to task_done() tells the queue that the processing | |
| on the task is complete. | |
| If a join() is currently blocking, it will resume when all items have | |
| been processed (meaning that a task_done() call was received for every | |
| item that had been put() into the queue). | |
| Raises ValueError if called more times than there were items placed in | |
| the queue. | |
| """ | |
| if self._unfinished_tasks <= 0: | |
| raise ValueError('task_done() called too many times') | |
| self._unfinished_tasks -= 1 | |
| if self._unfinished_tasks == 0: | |
| self._finished.set() | |
| async def join(self): | |
| """Block until all items in the queue have been gotten and processed. | |
| The count of unfinished tasks goes up whenever an item is added to the | |
| queue. The count goes down whenever a consumer calls task_done() to | |
| indicate that the item was retrieved and all work on it is complete. | |
| When the count of unfinished tasks drops to zero, join() unblocks. | |
| """ | |
| if self._unfinished_tasks > 0: | |
| await self._finished.wait() | |
| class PriorityQueue(Queue): | |
| """A subclass of Queue; retrieves entries in priority order (lowest first). | |
| Entries are typically tuples of the form: (priority number, data). | |
| """ | |
| def _init(self, maxsize): | |
| self._queue = [] | |
| def _put(self, item, heappush=heapq.heappush): | |
| heappush(self._queue, item) | |
| def _get(self, heappop=heapq.heappop): | |
| return heappop(self._queue) | |
| class LifoQueue(Queue): | |
| """A subclass of Queue that retrieves most recently added entries first.""" | |
| def _init(self, maxsize): | |
| self._queue = [] | |
| def _put(self, item): | |
| self._queue.append(item) | |
| def _get(self): | |
| return self._queue.pop() | |