# thanks to: # https://github.com/oleglpts/PriorityThreadPoolExecutor/blob/master/PriorityThreadPoolExecutor/__init__.py # https://github.com/oleglpts/PriorityThreadPoolExecutor/issues/4 import atexit import itertools import logging import queue import random import sys import threading import weakref from concurrent.futures import _base from concurrent.futures.thread import BrokenThreadPool from concurrent.futures.thread import ThreadPoolExecutor from concurrent.futures.thread import _python_exit from concurrent.futures.thread import _threads_queues from concurrent.futures.thread import _WorkItem from heapq import heappop from heapq import heappush logger = logging.getLogger(__name__) ######################################################################################################################## # Global variables # ######################################################################################################################## NULL_ENTRY = (sys.maxsize, _WorkItem(None, None, (), {})) _shutdown = False ######################################################################################################################## # Before system exit procedure # ######################################################################################################################## def python_exit(): """ Cleanup before system exit """ global _shutdown _shutdown = True items = list(_threads_queues.items()) for _t, q in items: q.put(NULL_ENTRY) for t, _q in items: t.join() # change default cleanup atexit.unregister(_python_exit) atexit.register(python_exit) class PriorityQueue(queue.Queue): """Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). """ REMOVED = "" DEFAULT_PRIORITY = 100 def _init(self, maxsize): self.queue = [] self.entry_finder = {} self.counter = itertools.count() def _qsize(self): return len(self.queue) def _put(self, item): # heappush(self.queue, item) try: if item[1] in self.entry_finder: self.remove(item[1]) count = next(self.counter) entry = [item[0], count, item[1]] self.entry_finder[item[1]] = entry heappush(self.queue, entry) except TypeError: # handle item==None self._put((self.DEFAULT_PRIORITY, None)) def remove(self, task): """ This simply replaces the data with the REMOVED value, which will get cleared out once _get reaches it. """ entry = self.entry_finder.pop(task) entry[-1] = self.REMOVED def _get(self): while self.queue: entry = heappop(self.queue) if entry[2] is not self.REMOVED: del self.entry_finder[entry[2]] return entry return None def _worker(executor_reference, work_queue, initializer, initargs): if initializer is not None: try: initializer(*initargs) except BaseException: _base.LOGGER.critical("Exception in initializer:", exc_info=True) executor = executor_reference() if executor is not None: executor._initializer_failed() return try: while True: work_item = work_queue.get(block=True) try: if work_item[2] is not None: work_item[2].run() # Delete references to object. See issue16284 del work_item # attempt to increment idle count executor = executor_reference() if executor is not None: executor._idle_semaphore.release() del executor continue executor = executor_reference() # Exit if: # - The interpreter is shutting down OR # - The executor that owns the worker has been collected OR # - The executor that owns the worker has been shutdown. if _shutdown or executor is None or executor._shutdown: # Flag the executor as shutting down as early as possible if it # is not gc-ed yet. if executor is not None: executor._shutdown = True # Notice other workers work_queue.put(None) return del executor finally: work_queue.task_done() except BaseException: _base.LOGGER.critical("Exception in worker", exc_info=True) class PriorityThreadPoolExecutor(ThreadPoolExecutor): """ Thread pool executor with priority queue (priorities must be different, lowest first) """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # change work queue type to queue.PriorityQueue self._work_queue: PriorityQueue = PriorityQueue() self._all_future = [] def submit(self, fn, *args, **kwargs): """ Sending the function to the execution queue :param fn: function being executed :type fn: callable :param args: function's positional arguments :param kwargs: function's keywords arguments :return: future instance :rtype: _base.Future Added keyword: - priority (integer later sys.maxsize) """ with self._shutdown_lock: if self._broken: raise BrokenThreadPool(self._broken) if self._shutdown: raise RuntimeError("cannot schedule new futures after shutdown") if _shutdown: raise RuntimeError( "cannot schedule new futures after interpreter shutdown" ) priority = kwargs.get("priority", random.randint(0, sys.maxsize - 1)) # noqa: S311 if "priority" in kwargs: del kwargs["priority"] f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put((priority, w)) self._adjust_thread_count() self._all_future.append(f) return f def _adjust_thread_count(self): # if idle threads are available, don't spin new threads if self._idle_semaphore.acquire(timeout=0): return # When the executor gets lost, the weakref callback will wake up # the worker threads. def weakref_cb(_, q=self._work_queue): q.put(None) num_threads = len(self._threads) if num_threads < self._max_workers: thread_name = f"{self._thread_name_prefix or self}_{num_threads:d}" t = threading.Thread( name=thread_name, target=_worker, args=( weakref.ref(self, weakref_cb), self._work_queue, self._initializer, self._initargs, ), ) t.start() self._threads.add(t) _threads_queues[t] = self._work_queue def shutdown(self, wait=True, *, cancel_futures=False): logger.debug("Shutting down executor %s", self._thread_name_prefix or self) if wait: logger.debug( "Waiting for all tasks done %s", self._thread_name_prefix or self ) self._work_queue.join() logger.debug("All tasks done %s", self._thread_name_prefix or self) with self._shutdown_lock: self._shutdown = True if cancel_futures: # Drain all work items from the queue, and then cancel their # associated futures. while True: try: work_item = self._work_queue.get_nowait() except queue.Empty: break if work_item is not None: work_item.future.cancel() # Send a wake-up to prevent threads calling # _work_queue.get(block=True) from permanently blocking. self._work_queue.put(None) if wait: logger.debug( "Waiting for all thread done %s", self._thread_name_prefix or self ) for t in self._threads: self._work_queue.put(None) t.join() logger.debug("shutdown finish %s", self._thread_name_prefix or self) def __del__(self): for f in self._all_future: if f.done() and not f.cancelled(): try: f.result() except Exception as e: logger.warning("Exception in future %s: %s", f, e, exc_info=True)