AbdulElahGwaith's picture
Upload folder using huggingface_hub
0220cd3 verified
import io
import os
import re
import sys
import threading
import time
import traceback
from contextlib import contextmanager
from queue import Empty, Queue
from . import const
from .account import Account
from .capi import ffi, lib
from .cutil import from_optional_dc_charpointer
from .hookspec import account_hookimpl
def get_dc_event_name(integer, _DC_EVENTNAME_MAP={}):
if not _DC_EVENTNAME_MAP:
for name in dir(const):
if name.startswith("DC_EVENT_"):
_DC_EVENTNAME_MAP[getattr(const, name)] = name
return _DC_EVENTNAME_MAP[integer]
class FFIEvent:
def __init__(self, name: str, data1, data2) -> None:
self.name = name
self.data1 = data1
self.data2 = data2
def __str__(self) -> str:
if self.name == "DC_EVENT_INFO":
return f"INFO {self.data2}"
if self.name == "DC_EVENT_WARNING":
return f"WARNING {self.data2}"
if self.name == "DC_EVENT_ERROR":
return f"ERROR {self.data2}"
return "{name} data1={data1} data2={data2}".format(**self.__dict__)
class FFIEventLogger:
"""If you register an instance of this logger with an Account
you'll get all ffi-events printed.
"""
# to prevent garbled logging
_loglock = threading.RLock()
def __init__(self, account, logid=None, init_time=None) -> None:
self.account = account
self.logid = logid or self.account.get_config("displayname")
if init_time is None:
init_time = time.time()
self.init_time = init_time
@account_hookimpl
def ac_process_ffi_event(self, ffi_event: FFIEvent) -> None:
self.account.log(str(ffi_event))
@account_hookimpl
def ac_log_line(self, message):
t = threading.current_thread()
tname = getattr(t, "name", t)
if tname == "MainThread":
tname = "MAIN"
elapsed = time.time() - self.init_time
locname = tname
if self.logid:
locname += "-" + self.logid
s = f"{elapsed:2.2f} [{locname}] {message}"
if os.name == "posix":
WARN = "\033[93m"
ERROR = "\033[91m"
ENDC = "\033[0m"
if message.startswith("DC_EVENT_WARNING"):
s = WARN + s + ENDC
if message.startswith("DC_EVENT_ERROR"):
s = ERROR + s + ENDC
with self._loglock:
print(s, flush=True)
class FFIEventTracker:
account: Account
_event_queue: Queue
def __init__(self, account: Account, timeout=None) -> None:
self.account = account
self._timeout = timeout
self._event_queue = Queue()
@account_hookimpl
def ac_process_ffi_event(self, ffi_event: FFIEvent):
self._event_queue.put(ffi_event)
def set_timeout(self, timeout):
self._timeout = timeout
def consume_events(self, check_error=True):
while not self._event_queue.empty():
self.get(check_error=check_error)
def get(self, timeout=None, check_error=True):
timeout = timeout if timeout is not None else self._timeout
ev = self._event_queue.get(timeout=timeout)
if check_error and ev.name == "DC_EVENT_ERROR":
raise ValueError(f"unexpected event: {ev}")
return ev
def iter_events(self, timeout=None, check_error=True):
while True:
yield self.get(timeout=timeout, check_error=check_error)
def get_matching(self, event_name_regex, check_error=True, timeout=None):
rex = re.compile(f"^(?:{event_name_regex})$")
for ev in self.iter_events(timeout=timeout, check_error=check_error):
if rex.match(ev.name):
return ev
def get_info_contains(self, regex: str) -> FFIEvent:
rex = re.compile(regex)
while True:
ev = self.get_matching("DC_EVENT_INFO")
if rex.search(ev.data2):
return ev
def get_info_regex_groups(self, regex, check_error=True):
rex = re.compile(regex)
while True:
ev = self.get_matching("DC_EVENT_INFO", check_error=check_error)
m = rex.match(ev.data2)
if m is not None:
return m.groups()
def wait_for_connectivity(self, connectivity):
"""Wait for the specified connectivity.
This only works reliably if the connectivity doesn't change
again too quickly, otherwise we might miss it.
"""
while True:
if self.account.get_connectivity() == connectivity:
return
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
def wait_for_connectivity_change(self, previous, expected_next):
"""Wait until the connectivity changes to `expected_next`.
Fails the test if it changes to something else.
"""
while True:
current = self.account.get_connectivity()
if current == expected_next:
return
if current != previous:
raise Exception("Expected connectivity " + str(expected_next) + " but got " + str(current))
self.get_matching("DC_EVENT_CONNECTIVITY_CHANGED")
def ensure_event_not_queued(self, event_name_regex):
__tracebackhide__ = True
rex = re.compile(f"(?:{event_name_regex}).*")
while True:
try:
ev = self._event_queue.get(False)
except Empty:
break
else:
assert not rex.match(ev.name), f"event found {ev}"
def wait_securejoin_inviter_progress(self, target):
while True:
event = self.get_matching("DC_EVENT_SECUREJOIN_INVITER_PROGRESS")
if event.data2 >= target:
print(f"** SECUREJOINT-INVITER PROGRESS {target}", self.account)
break
def wait_securejoin_joiner_progress(self, target):
while True:
event = self.get_matching("DC_EVENT_SECUREJOIN_JOINER_PROGRESS")
if event.data2 >= target:
break
def wait_idle_inbox_ready(self):
"""Has to be called after start_io() to wait for fetch_existing_msgs to run
so that new messages are not mistaken for old ones:
- ac1 and ac2 are created
- ac1 sends a message to ac2
- ac2 is still running FetchExsistingMsgs job and thinks it's an existing, old message
- therefore no DC_EVENT_INCOMING_MSG is sent
"""
self.get_matching("DC_EVENT_IMAP_INBOX_IDLE")
def wait_next_incoming_message(self):
"""wait for and return next incoming message."""
ev = self.get_matching("DC_EVENT_INCOMING_MSG")
return self.account.get_message_by_id(ev.data2)
def wait_next_messages_changed(self):
"""wait for and return next message-changed message or None
if the event contains no msgid
"""
ev = self.get_matching("DC_EVENT_MSGS_CHANGED")
if ev.data2 > 0:
return self.account.get_message_by_id(ev.data2)
return None
def wait_next_reactions_changed(self):
"""wait for and return next reactions-changed message."""
ev = self.get_matching("DC_EVENT_REACTIONS_CHANGED")
assert ev.data1 > 0
return self.account.get_message_by_id(ev.data2)
def wait_msg_delivered(self, msg):
ev = self.get_matching("DC_EVENT_MSG_DELIVERED")
assert ev.data1 == msg.chat.id
assert ev.data2 == msg.id
assert msg.is_out_delivered()
class EventThread(threading.Thread):
"""Event Thread for an account.
With each Account init this callback thread is started.
"""
def __init__(self, account: Account) -> None:
self.account = account
super(EventThread, self).__init__(name="events")
self.daemon = True
self._marked_for_shutdown = False
self.start()
@contextmanager
def log_execution(self, message):
self.account.log(message + " START")
yield
self.account.log(message + " FINISHED")
def mark_shutdown(self) -> None:
self._marked_for_shutdown = True
def wait(self, timeout=None) -> None:
if self == threading.current_thread():
# we are in the callback thread and thus cannot
# wait for the thread-loop to finish.
return
self.join(timeout=timeout)
def run(self) -> None:
"""get and run events until shutdown."""
with self.log_execution("EVENT THREAD"):
event_emitter = ffi.gc(
lib.dc_get_event_emitter(self.account._dc_context),
lib.dc_event_emitter_unref,
)
while not self._marked_for_shutdown:
with self.swallow_and_log_exception("Unexpected error in event thread"):
event = lib.dc_get_next_event(event_emitter)
if event == ffi.NULL or self._marked_for_shutdown:
break
self._process_event(event)
def _process_event(self, event) -> None:
evt = lib.dc_event_get_id(event)
data1 = lib.dc_event_get_data1_int(event)
# the following code relates to the deltachat/_build.py's helper
# function which provides us signature info of an event call
evt_name = get_dc_event_name(evt)
if lib.dc_event_has_string_data(evt):
data2 = from_optional_dc_charpointer(lib.dc_event_get_data2_str(event))
else:
data2 = lib.dc_event_get_data2_int(event)
lib.dc_event_unref(event)
ffi_event = FFIEvent(name=evt_name, data1=data1, data2=data2)
with self.swallow_and_log_exception(f"ac_process_ffi_event {ffi_event}"):
self.account._pm.hook.ac_process_ffi_event(account=self, ffi_event=ffi_event)
for name, kwargs in self._map_ffi_event(ffi_event):
hook = getattr(self.account._pm.hook, name)
info = f"call {name} kwargs={kwargs} failed"
with self.swallow_and_log_exception(info):
hook(**kwargs)
@contextmanager
def swallow_and_log_exception(self, info):
try:
yield
except Exception as ex:
logfile = io.StringIO()
traceback.print_exception(*sys.exc_info(), file=logfile)
self.account.log(f"{info}\nException {ex}\nTraceback:\n{logfile.getvalue()}")
def _map_ffi_event(self, ffi_event: FFIEvent):
name = ffi_event.name
account = self.account
if name == "DC_EVENT_CONFIGURE_PROGRESS":
data1 = ffi_event.data1
if data1 == 0 or data1 == 1000:
success = data1 == 1000
comment = ffi_event.data2
yield "ac_configure_completed", {"success": success, "comment": comment}
elif name == "DC_EVENT_INCOMING_MSG":
msg = account.get_message_by_id(ffi_event.data2)
if msg is not None:
yield ("ac_incoming_message", {"message": msg})
elif name == "DC_EVENT_MSGS_CHANGED":
if ffi_event.data2 != 0:
msg = account.get_message_by_id(ffi_event.data2)
if msg is not None:
if msg.is_outgoing():
yield "ac_outgoing_message", {"message": msg}
elif msg.is_in_fresh():
yield "ac_incoming_message", {"message": msg}
elif name == "DC_EVENT_REACTIONS_CHANGED":
assert ffi_event.data1 > 0
msg = account.get_message_by_id(ffi_event.data2)
if msg is not None:
yield "ac_reactions_changed", {"message": msg}
elif name == "DC_EVENT_MSG_DELIVERED":
msg = account.get_message_by_id(ffi_event.data2)
if msg is not None:
yield "ac_message_delivered", {"message": msg}
elif name == "DC_EVENT_CHAT_MODIFIED":
chat = account.get_chat_by_id(ffi_event.data1)
yield "ac_chat_modified", {"chat": chat}