from queue import Queue from threading import Event from typing import TYPE_CHECKING, List from .hookspec import Global, account_hookimpl if TYPE_CHECKING: from .events import FFIEvent class ImexFailed(RuntimeError): """Exception for signalling that import/export operations failed.""" class ImexTracker: _imex_events: Queue def __init__(self) -> None: self._imex_events = Queue() @account_hookimpl def ac_process_ffi_event(self, ffi_event: "FFIEvent") -> None: if ffi_event.name == "DC_EVENT_IMEX_PROGRESS": self._imex_events.put(ffi_event.data1) elif ffi_event.name == "DC_EVENT_IMEX_FILE_WRITTEN": self._imex_events.put(ffi_event.data2) def wait_progress(self, target_progress, progress_upper_limit=1000, progress_timeout=60): while True: ev = self._imex_events.get(timeout=progress_timeout) if isinstance(ev, int) and ev >= target_progress: assert ev <= progress_upper_limit, ( str(ev) + " exceeded upper progress limit " + str(progress_upper_limit) ) return ev if ev == 0: return None def wait_finish(self, progress_timeout=60): """Return list of written files, raise ValueError if ExportFailed.""" files_written = [] while True: ev = self._imex_events.get(timeout=progress_timeout) if isinstance(ev, str): files_written.append(ev) elif ev == 0: raise ImexFailed(f"export failed, exp-files: {files_written}") elif ev == 1000: return files_written class ConfigureFailed(RuntimeError): """Exception for signalling that configuration failed.""" class ConfigureTracker: ConfigureFailed = ConfigureFailed _configure_events: Queue _smtp_finished: Event _imap_finished: Event _ffi_events: List["FFIEvent"] _progress: Queue def __init__(self, account) -> None: self.account = account self._configure_events = Queue() self._smtp_finished = Event() self._imap_finished = Event() self._ffi_events = [] self._progress = Queue() self._gm = Global._get_plugin_manager() @account_hookimpl def ac_process_ffi_event(self, ffi_event: "FFIEvent") -> None: self._ffi_events.append(ffi_event) if ffi_event.name == "DC_EVENT_SMTP_CONNECTED": self._smtp_finished.set() elif ffi_event.name == "DC_EVENT_IMAP_CONNECTED": self._imap_finished.set() elif ffi_event.name == "DC_EVENT_CONFIGURE_PROGRESS": self._progress.put(ffi_event.data1) @account_hookimpl def ac_configure_completed(self, success): if success: self._gm.hook.dc_account_extra_configure(account=self.account) self._configure_events.put(success) self.account.remove_account_plugin(self) def wait_smtp_connected(self): """Wait until SMTP is configured.""" self._smtp_finished.wait() def wait_imap_connected(self): """Wait until IMAP is configured.""" self._imap_finished.wait() def wait_progress(self, data1=None): while True: evdata = self._progress.get() if data1 is None or evdata == data1: break def wait_finish(self, timeout=None): """ Wait until configure is completed. Raise Exception if Configure failed """ if not self._configure_events.get(timeout=timeout): content = "\n".join(map(str, self._ffi_events)) raise ConfigureFailed(content)