| | import io |
| | import os |
| | import re |
| | import sys |
| | import threading |
| | import time |
| | import traceback |
| | from contextlib import contextmanager |
| | from queue import Empty, Queue |
| |
|
| | from . import const |
| | from .account import Account |
| | from .capi import ffi, lib |
| | from .cutil import from_optional_dc_charpointer |
| | from .hookspec import account_hookimpl |
| |
|
| |
|
| | def get_dc_event_name(integer, _DC_EVENTNAME_MAP={}): |
| | if not _DC_EVENTNAME_MAP: |
| | for name in dir(const): |
| | if name.startswith("DC_EVENT_"): |
| | _DC_EVENTNAME_MAP[getattr(const, name)] = name |
| | return _DC_EVENTNAME_MAP[integer] |
| |
|
| |
|
| | class FFIEvent: |
| | def __init__(self, name: str, data1, data2) -> None: |
| | self.name = name |
| | self.data1 = data1 |
| | self.data2 = data2 |
| |
|
| | def __str__(self) -> str: |
| | if self.name == "DC_EVENT_INFO": |
| | return f"INFO {self.data2}" |
| | if self.name == "DC_EVENT_WARNING": |
| | return f"WARNING {self.data2}" |
| | if self.name == "DC_EVENT_ERROR": |
| | return f"ERROR {self.data2}" |
| | return "{name} data1={data1} data2={data2}".format(**self.__dict__) |
| |
|
| |
|
| | class FFIEventLogger: |
| | """If you register an instance of this logger with an Account |
| | you'll get all ffi-events printed. |
| | """ |
| |
|
| | |
| | _loglock = threading.RLock() |
| |
|
| | def __init__(self, account, logid=None, init_time=None) -> None: |
| | self.account = account |
| | self.logid = logid or self.account.get_config("displayname") |
| | if init_time is None: |
| | init_time = time.time() |
| | self.init_time = init_time |
| |
|
| | @account_hookimpl |
| | def ac_process_ffi_event(self, ffi_event: FFIEvent) -> None: |
| | self.account.log(str(ffi_event)) |
| |
|
| | @account_hookimpl |
| | def ac_log_line(self, message): |
| | t = threading.current_thread() |
| | tname = getattr(t, "name", t) |
| | if tname == "MainThread": |
| | tname = "MAIN" |
| | elapsed = time.time() - self.init_time |
| | locname = tname |
| | if self.logid: |
| | locname += "-" + self.logid |
| | s = f"{elapsed:2.2f} [{locname}] {message}" |
| |
|
| | if os.name == "posix": |
| | WARN = "\033[93m" |
| | ERROR = "\033[91m" |
| | ENDC = "\033[0m" |
| | if message.startswith("DC_EVENT_WARNING"): |
| | s = WARN + s + ENDC |
| | if message.startswith("DC_EVENT_ERROR"): |
| | s = ERROR + s + ENDC |
| | with self._loglock: |
| | print(s, flush=True) |
| |
|
| |
|
| | class FFIEventTracker: |
| | account: Account |
| | _event_queue: Queue |
| |
|
| | def __init__(self, account: Account, timeout=None) -> None: |
| | self.account = account |
| | self._timeout = timeout |
| | self._event_queue = Queue() |
| |
|
| | @account_hookimpl |
| | def ac_process_ffi_event(self, ffi_event: FFIEvent): |
| | self._event_queue.put(ffi_event) |
| |
|
| | def set_timeout(self, timeout): |
| | self._timeout = timeout |
| |
|
| | def consume_events(self, check_error=True): |
| | while not self._event_queue.empty(): |
| | self.get(check_error=check_error) |
| |
|
| | def get(self, timeout=None, check_error=True): |
| | timeout = timeout if timeout is not None else self._timeout |
| | ev = self._event_queue.get(timeout=timeout) |
| | if check_error and ev.name == "DC_EVENT_ERROR": |
| | raise ValueError(f"unexpected event: {ev}") |
| | return ev |
| |
|
| | def iter_events(self, timeout=None, check_error=True): |
| | while True: |
| | yield self.get(timeout=timeout, check_error=check_error) |
| |
|
| | def get_matching(self, event_name_regex, check_error=True, timeout=None): |
| | rex = re.compile(f"^(?:{event_name_regex})$") |
| | for ev in self.iter_events(timeout=timeout, check_error=check_error): |
| | if rex.match(ev.name): |
| | return ev |
| |
|
| | def get_info_contains(self, regex: str) -> FFIEvent: |
| | rex = re.compile(regex) |
| | while True: |
| | ev = self.get_matching("DC_EVENT_INFO") |
| | if rex.search(ev.data2): |
| | return ev |
| |
|
| | def get_info_regex_groups(self, regex, check_error=True): |
| | rex = re.compile(regex) |
| | while True: |
| | ev = self.get_matching("DC_EVENT_INFO", check_error=check_error) |
| | m = rex.match(ev.data2) |
| | if m is not None: |
| | return m.groups() |
| |
|
| | def wait_for_connectivity(self, connectivity): |
| | """Wait for the specified connectivity. |
| | This only works reliably if the connectivity doesn't change |
| | again too quickly, otherwise we might miss it. |
| | """ |
| | while True: |
| | if self.account.get_connectivity() == connectivity: |
| | return |
| | self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED") |
| |
|
| | def wait_for_connectivity_change(self, previous, expected_next): |
| | """Wait until the connectivity changes to `expected_next`. |
| | Fails the test if it changes to something else. |
| | """ |
| | while True: |
| | current = self.account.get_connectivity() |
| | if current == expected_next: |
| | return |
| | if current != previous: |
| | raise Exception("Expected connectivity " + str(expected_next) + " but got " + str(current)) |
| |
|
| | self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED") |
| |
|
| | def ensure_event_not_queued(self, event_name_regex): |
| | __tracebackhide__ = True |
| | rex = re.compile(f"(?:{event_name_regex}).*") |
| | while True: |
| | try: |
| | ev = self._event_queue.get(False) |
| | except Empty: |
| | break |
| | else: |
| | assert not rex.match(ev.name), f"event found {ev}" |
| |
|
| | def wait_securejoin_inviter_progress(self, target): |
| | while True: |
| | event = self.get_matching("DC_EVENT_SECUREJOIN_INVITER_PROGRESS") |
| | if event.data2 >= target: |
| | print(f"** SECUREJOINT-INVITER PROGRESS {target}", self.account) |
| | break |
| |
|
| | def wait_securejoin_joiner_progress(self, target): |
| | while True: |
| | event = self.get_matching("DC_EVENT_SECUREJOIN_JOINER_PROGRESS") |
| | if event.data2 >= target: |
| | break |
| |
|
| | def wait_idle_inbox_ready(self): |
| | """Has to be called after start_io() to wait for fetch_existing_msgs to run |
| | so that new messages are not mistaken for old ones: |
| | - ac1 and ac2 are created |
| | - ac1 sends a message to ac2 |
| | - ac2 is still running FetchExsistingMsgs job and thinks it's an existing, old message |
| | - therefore no DC_EVENT_INCOMING_MSG is sent |
| | """ |
| | self.get_matching("DC_EVENT_IMAP_INBOX_IDLE") |
| |
|
| | def wait_next_incoming_message(self): |
| | """wait for and return next incoming message.""" |
| | ev = self.get_matching("DC_EVENT_INCOMING_MSG") |
| | return self.account.get_message_by_id(ev.data2) |
| |
|
| | def wait_next_messages_changed(self): |
| | """wait for and return next message-changed message or None |
| | if the event contains no msgid |
| | """ |
| | ev = self.get_matching("DC_EVENT_MSGS_CHANGED") |
| | if ev.data2 > 0: |
| | return self.account.get_message_by_id(ev.data2) |
| | return None |
| |
|
| | def wait_next_reactions_changed(self): |
| | """wait for and return next reactions-changed message.""" |
| | ev = self.get_matching("DC_EVENT_REACTIONS_CHANGED") |
| | assert ev.data1 > 0 |
| | return self.account.get_message_by_id(ev.data2) |
| |
|
| | def wait_msg_delivered(self, msg): |
| | ev = self.get_matching("DC_EVENT_MSG_DELIVERED") |
| | assert ev.data1 == msg.chat.id |
| | assert ev.data2 == msg.id |
| | assert msg.is_out_delivered() |
| |
|
| |
|
| | class EventThread(threading.Thread): |
| | """Event Thread for an account. |
| | |
| | With each Account init this callback thread is started. |
| | """ |
| |
|
| | def __init__(self, account: Account) -> None: |
| | self.account = account |
| | super(EventThread, self).__init__(name="events") |
| | self.daemon = True |
| | self._marked_for_shutdown = False |
| | self.start() |
| |
|
| | @contextmanager |
| | def log_execution(self, message): |
| | self.account.log(message + " START") |
| | yield |
| | self.account.log(message + " FINISHED") |
| |
|
| | def mark_shutdown(self) -> None: |
| | self._marked_for_shutdown = True |
| |
|
| | def wait(self, timeout=None) -> None: |
| | if self == threading.current_thread(): |
| | |
| | |
| | return |
| | self.join(timeout=timeout) |
| |
|
| | def run(self) -> None: |
| | """get and run events until shutdown.""" |
| | with self.log_execution("EVENT THREAD"): |
| | event_emitter = ffi.gc( |
| | lib.dc_get_event_emitter(self.account._dc_context), |
| | lib.dc_event_emitter_unref, |
| | ) |
| | while not self._marked_for_shutdown: |
| | with self.swallow_and_log_exception("Unexpected error in event thread"): |
| | event = lib.dc_get_next_event(event_emitter) |
| | if event == ffi.NULL or self._marked_for_shutdown: |
| | break |
| | self._process_event(event) |
| |
|
| | def _process_event(self, event) -> None: |
| | evt = lib.dc_event_get_id(event) |
| | data1 = lib.dc_event_get_data1_int(event) |
| | |
| | |
| | evt_name = get_dc_event_name(evt) |
| | if lib.dc_event_has_string_data(evt): |
| | data2 = from_optional_dc_charpointer(lib.dc_event_get_data2_str(event)) |
| | else: |
| | data2 = lib.dc_event_get_data2_int(event) |
| |
|
| | lib.dc_event_unref(event) |
| | ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2) |
| | with self.swallow_and_log_exception(f"ac_process_ffi_event {ffi_event}"): |
| | self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event) |
| | for name, kwargs in self._map_ffi_event(ffi_event): |
| | hook = getattr(self.account._pm.hook, name) |
| | info = f"call {name} kwargs={kwargs} failed" |
| | with self.swallow_and_log_exception(info): |
| | hook(**kwargs) |
| |
|
| | @contextmanager |
| | def swallow_and_log_exception(self, info): |
| | try: |
| | yield |
| | except Exception as ex: |
| | logfile = io.StringIO() |
| | traceback.print_exception(*sys.exc_info(), file=logfile) |
| | self.account.log(f"{info}\nException {ex}\nTraceback:\n{logfile.getvalue()}") |
| |
|
| | def _map_ffi_event(self, ffi_event: FFIEvent): |
| | name = ffi_event.name |
| | account = self.account |
| | if name == "DC_EVENT_CONFIGURE_PROGRESS": |
| | data1 = ffi_event.data1 |
| | if data1 == 0 or data1 == 1000: |
| | success = data1 == 1000 |
| | comment = ffi_event.data2 |
| | yield "ac_configure_completed", {"success": success, "comment": comment} |
| | elif name == "DC_EVENT_INCOMING_MSG": |
| | msg = account.get_message_by_id(ffi_event.data2) |
| | if msg is not None: |
| | yield ("ac_incoming_message", {"message": msg}) |
| | elif name == "DC_EVENT_MSGS_CHANGED": |
| | if ffi_event.data2 != 0: |
| | msg = account.get_message_by_id(ffi_event.data2) |
| | if msg is not None: |
| | if msg.is_outgoing(): |
| | yield "ac_outgoing_message", {"message": msg} |
| | elif msg.is_in_fresh(): |
| | yield "ac_incoming_message", {"message": msg} |
| | elif name == "DC_EVENT_REACTIONS_CHANGED": |
| | assert ffi_event.data1 > 0 |
| | msg = account.get_message_by_id(ffi_event.data2) |
| | if msg is not None: |
| | yield "ac_reactions_changed", {"message": msg} |
| | elif name == "DC_EVENT_MSG_DELIVERED": |
| | msg = account.get_message_by_id(ffi_event.data2) |
| | if msg is not None: |
| | yield "ac_message_delivered", {"message": msg} |
| | elif name == "DC_EVENT_CHAT_MODIFIED": |
| | chat = account.get_chat_by_id(ffi_event.data1) |
| | yield "ac_chat_modified", {"chat": chat} |
| |
|