File size: 12,073 Bytes
0220cd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
import io
import os
import re
import sys
import threading
import time
import traceback
from contextlib import contextmanager
from queue import Empty, Queue

from . import const
from .account import Account
from .capi import ffi, lib
from .cutil import from_optional_dc_charpointer
from .hookspec import account_hookimpl


def get_dc_event_name(integer, _DC_EVENTNAME_MAP={}):
    if not _DC_EVENTNAME_MAP:
        for name in dir(const):
            if name.startswith("DC_EVENT_"):
                _DC_EVENTNAME_MAP[getattr(const, name)] = name
    return _DC_EVENTNAME_MAP[integer]


class FFIEvent:
    def __init__(self, name: str, data1, data2) -> None:
        self.name = name
        self.data1 = data1
        self.data2 = data2

    def __str__(self) -> str:
        if self.name == "DC_EVENT_INFO":
            return f"INFO {self.data2}"
        if self.name == "DC_EVENT_WARNING":
            return f"WARNING {self.data2}"
        if self.name == "DC_EVENT_ERROR":
            return f"ERROR {self.data2}"
        return "{name} data1={data1} data2={data2}".format(**self.__dict__)


class FFIEventLogger:
    """If you register an instance of this logger with an Account
    you'll get all ffi-events printed.
    """

    # to prevent garbled logging
    _loglock = threading.RLock()

    def __init__(self, account, logid=None, init_time=None) -> None:
        self.account = account
        self.logid = logid or self.account.get_config("displayname")
        if init_time is None:
            init_time = time.time()
        self.init_time = init_time

    @account_hookimpl
    def ac_process_ffi_event(self, ffi_event: FFIEvent) -> None:
        self.account.log(str(ffi_event))

    @account_hookimpl
    def ac_log_line(self, message):
        t = threading.current_thread()
        tname = getattr(t, "name", t)
        if tname == "MainThread":
            tname = "MAIN"
        elapsed = time.time() - self.init_time
        locname = tname
        if self.logid:
            locname += "-" + self.logid
        s = f"{elapsed:2.2f} [{locname}] {message}"

        if os.name == "posix":
            WARN = "\033[93m"
            ERROR = "\033[91m"
            ENDC = "\033[0m"
            if message.startswith("DC_EVENT_WARNING"):
                s = WARN + s + ENDC
            if message.startswith("DC_EVENT_ERROR"):
                s = ERROR + s + ENDC
        with self._loglock:
            print(s, flush=True)


class FFIEventTracker:
    account: Account
    _event_queue: Queue

    def __init__(self, account: Account, timeout=None) -> None:
        self.account = account
        self._timeout = timeout
        self._event_queue = Queue()

    @account_hookimpl
    def ac_process_ffi_event(self, ffi_event: FFIEvent):
        self._event_queue.put(ffi_event)

    def set_timeout(self, timeout):
        self._timeout = timeout

    def consume_events(self, check_error=True):
        while not self._event_queue.empty():
            self.get(check_error=check_error)

    def get(self, timeout=None, check_error=True):
        timeout = timeout if timeout is not None else self._timeout
        ev = self._event_queue.get(timeout=timeout)
        if check_error and ev.name == "DC_EVENT_ERROR":
            raise ValueError(f"unexpected event: {ev}")
        return ev

    def iter_events(self, timeout=None, check_error=True):
        while True:
            yield self.get(timeout=timeout, check_error=check_error)

    def get_matching(self, event_name_regex, check_error=True, timeout=None):
        rex = re.compile(f"^(?:{event_name_regex})$")
        for ev in self.iter_events(timeout=timeout, check_error=check_error):
            if rex.match(ev.name):
                return ev

    def get_info_contains(self, regex: str) -> FFIEvent:
        rex = re.compile(regex)
        while True:
            ev = self.get_matching("DC_EVENT_INFO")
            if rex.search(ev.data2):
                return ev

    def get_info_regex_groups(self, regex, check_error=True):
        rex = re.compile(regex)
        while True:
            ev = self.get_matching("DC_EVENT_INFO", check_error=check_error)
            m = rex.match(ev.data2)
            if m is not None:
                return m.groups()

    def wait_for_connectivity(self, connectivity):
        """Wait for the specified connectivity.
        This only works reliably if the connectivity doesn't change
        again too quickly, otherwise we might miss it.
        """
        while True:
            if self.account.get_connectivity() == connectivity:
                return
            self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")

    def wait_for_connectivity_change(self, previous, expected_next):
        """Wait until the connectivity changes to `expected_next`.
        Fails the test if it changes to something else.
        """
        while True:
            current = self.account.get_connectivity()
            if current == expected_next:
                return
            if current != previous:
                raise Exception("Expected connectivity " + str(expected_next) + " but got " + str(current))

            self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")

    def ensure_event_not_queued(self, event_name_regex):
        __tracebackhide__ = True
        rex = re.compile(f"(?:{event_name_regex}).*")
        while True:
            try:
                ev = self._event_queue.get(False)
            except Empty:
                break
            else:
                assert not rex.match(ev.name), f"event found {ev}"

    def wait_securejoin_inviter_progress(self, target):
        while True:
            event = self.get_matching("DC_EVENT_SECUREJOIN_INVITER_PROGRESS")
            if event.data2 >= target:
                print(f"** SECUREJOINT-INVITER PROGRESS {target}", self.account)
                break

    def wait_securejoin_joiner_progress(self, target):
        while True:
            event = self.get_matching("DC_EVENT_SECUREJOIN_JOINER_PROGRESS")
            if event.data2 >= target:
                break

    def wait_idle_inbox_ready(self):
        """Has to be called after start_io() to wait for fetch_existing_msgs to run
        so that new messages are not mistaken for old ones:
        - ac1 and ac2 are created
        - ac1 sends a message to ac2
        - ac2 is still running FetchExsistingMsgs job and thinks it's an existing, old message
        - therefore no DC_EVENT_INCOMING_MSG is sent
        """
        self.get_matching("DC_EVENT_IMAP_INBOX_IDLE")

    def wait_next_incoming_message(self):
        """wait for and return next incoming message."""
        ev = self.get_matching("DC_EVENT_INCOMING_MSG")
        return self.account.get_message_by_id(ev.data2)

    def wait_next_messages_changed(self):
        """wait for and return next message-changed message or None
        if the event contains no msgid
        """
        ev = self.get_matching("DC_EVENT_MSGS_CHANGED")
        if ev.data2 > 0:
            return self.account.get_message_by_id(ev.data2)
        return None

    def wait_next_reactions_changed(self):
        """wait for and return next reactions-changed message."""
        ev = self.get_matching("DC_EVENT_REACTIONS_CHANGED")
        assert ev.data1 > 0
        return self.account.get_message_by_id(ev.data2)

    def wait_msg_delivered(self, msg):
        ev = self.get_matching("DC_EVENT_MSG_DELIVERED")
        assert ev.data1 == msg.chat.id
        assert ev.data2 == msg.id
        assert msg.is_out_delivered()


class EventThread(threading.Thread):
    """Event Thread for an account.

    With each Account init this callback thread is started.
    """

    def __init__(self, account: Account) -> None:
        self.account = account
        super(EventThread, self).__init__(name="events")
        self.daemon = True
        self._marked_for_shutdown = False
        self.start()

    @contextmanager
    def log_execution(self, message):
        self.account.log(message + " START")
        yield
        self.account.log(message + " FINISHED")

    def mark_shutdown(self) -> None:
        self._marked_for_shutdown = True

    def wait(self, timeout=None) -> None:
        if self == threading.current_thread():
            # we are in the callback thread and thus cannot
            # wait for the thread-loop to finish.
            return
        self.join(timeout=timeout)

    def run(self) -> None:
        """get and run events until shutdown."""
        with self.log_execution("EVENT THREAD"):
            event_emitter = ffi.gc(
                lib.dc_get_event_emitter(self.account._dc_context),
                lib.dc_event_emitter_unref,
            )
            while not self._marked_for_shutdown:
                with self.swallow_and_log_exception("Unexpected error in event thread"):
                    event = lib.dc_get_next_event(event_emitter)
                    if event == ffi.NULL or self._marked_for_shutdown:
                        break
                    self._process_event(event)

    def _process_event(self, event) -> None:
        evt = lib.dc_event_get_id(event)
        data1 = lib.dc_event_get_data1_int(event)
        # the following code relates to the deltachat/_build.py's helper
        # function which provides us signature info of an event call
        evt_name = get_dc_event_name(evt)
        if lib.dc_event_has_string_data(evt):
            data2 = from_optional_dc_charpointer(lib.dc_event_get_data2_str(event))
        else:
            data2 = lib.dc_event_get_data2_int(event)

        lib.dc_event_unref(event)
        ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2)
        with self.swallow_and_log_exception(f"ac_process_ffi_event {ffi_event}"):
            self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event)
        for name, kwargs in self._map_ffi_event(ffi_event):
            hook = getattr(self.account._pm.hook, name)
            info = f"call {name} kwargs={kwargs} failed"
            with self.swallow_and_log_exception(info):
                hook(**kwargs)

    @contextmanager
    def swallow_and_log_exception(self, info):
        try:
            yield
        except Exception as ex:
            logfile = io.StringIO()
            traceback.print_exception(*sys.exc_info(), file=logfile)
            self.account.log(f"{info}\nException {ex}\nTraceback:\n{logfile.getvalue()}")

    def _map_ffi_event(self, ffi_event: FFIEvent):
        name = ffi_event.name
        account = self.account
        if name == "DC_EVENT_CONFIGURE_PROGRESS":
            data1 = ffi_event.data1
            if data1 == 0 or data1 == 1000:
                success = data1 == 1000
                comment = ffi_event.data2
                yield "ac_configure_completed", {"success": success, "comment": comment}
        elif name == "DC_EVENT_INCOMING_MSG":
            msg = account.get_message_by_id(ffi_event.data2)
            if msg is not None:
                yield ("ac_incoming_message", {"message": msg})
        elif name == "DC_EVENT_MSGS_CHANGED":
            if ffi_event.data2 != 0:
                msg = account.get_message_by_id(ffi_event.data2)
                if msg is not None:
                    if msg.is_outgoing():
                        yield "ac_outgoing_message", {"message": msg}
                    elif msg.is_in_fresh():
                        yield "ac_incoming_message", {"message": msg}
        elif name == "DC_EVENT_REACTIONS_CHANGED":
            assert ffi_event.data1 > 0
            msg = account.get_message_by_id(ffi_event.data2)
            if msg is not None:
                yield "ac_reactions_changed", {"message": msg}
        elif name == "DC_EVENT_MSG_DELIVERED":
            msg = account.get_message_by_id(ffi_event.data2)
            if msg is not None:
                yield "ac_message_delivered", {"message": msg}
        elif name == "DC_EVENT_CHAT_MODIFIED":
            chat = account.get_chat_by_id(ffi_event.data1)
            yield "ac_chat_modified", {"chat": chat}