| |
| |
| |
| |
| |
|
|
| import logging |
| import os |
| from queue import Queue, Empty |
| import signal |
| import sys |
| import threading |
| import traceback |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class DeadlockDetect: |
| def __init__(self, use: bool = False, timeout: float = 120.): |
| self.use = use |
| self.timeout = timeout |
| self._queue: Queue = Queue() |
|
|
| def update(self, stage: str): |
| if self.use: |
| self._queue.put(stage) |
|
|
| def __enter__(self): |
| if self.use: |
| self._thread = threading.Thread(target=self._detector_thread) |
| self._thread.start() |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| if self.use: |
| self._queue.put(None) |
| self._thread.join() |
|
|
| def _detector_thread(self): |
| logger.debug("Deadlock detector started") |
| last_stage = "init" |
| while True: |
| try: |
| stage = self._queue.get(timeout=self.timeout) |
| except Empty: |
| break |
| if stage is None: |
| logger.debug("Exiting deadlock detector thread") |
| return |
| else: |
| last_stage = stage |
| logger.error("Deadlock detector timed out, last stage was %s", last_stage) |
| for th in threading.enumerate(): |
| print(th, file=sys.stderr) |
| traceback.print_stack(sys._current_frames()[th.ident]) |
| print(file=sys.stderr) |
| sys.stdout.flush() |
| sys.stderr.flush() |
| os.kill(os.getpid(), signal.SIGKILL) |
|
|