core / python /src /deltachat /testplugin.py
AbdulElahGwaith's picture
Upload folder using huggingface_hub
0220cd3 verified
import fnmatch
import io
import os
import pathlib
import queue
import subprocess
import sys
import threading
import time
import weakref
import random
from queue import Queue
from typing import Callable, Dict, List, Optional, Set
import pytest
from _pytest._code import Source
import deltachat
from . import Account, account_hookimpl, const, get_core_info
from .events import FFIEventLogger, FFIEventTracker
E2EE_INFO_MSGS = 1
"""
The number of info messages added to new e2ee chats.
Currently this is "End-to-end encryption available".
"""
def pytest_addoption(parser):
group = parser.getgroup("deltachat testplugin options")
group.addoption(
"--chatmail",
action="store",
default=None,
help="chatmail server domain name",
)
group.addoption(
"--ignored",
action="store_true",
help="Also run tests marked with the ignored marker",
)
group.addoption(
"--strict-tls",
action="store_true",
help="Never accept invalid TLS certificates for test accounts",
)
group.addoption(
"--extra-info",
action="store_true",
help="show more info on failures (imap server state, config)",
)
group.addoption(
"--debug-setup",
action="store_true",
help="show events during configure and start io phases of online accounts",
)
def pytest_configure(config):
cfg = config.getoption("--chatmail")
if not cfg:
cfg = os.getenv("CHATMAIL_DOMAIN")
if cfg:
config.option.chatmail = cfg
# Make sure we don't get garbled output because threads keep running
# collect all ever created accounts in a weakref-set (so we don't
# keep objects unnecessarily alive) and enable/disable logging
# for each pytest test phase # (setup/call/teardown).
# Additionally make the acfactory use a logging/no-logging default.
class LoggingAspect:
def __init__(self) -> None:
self._accounts: weakref.WeakSet = weakref.WeakSet()
@deltachat.global_hookimpl
def dc_account_init(self, account):
self._accounts.add(account)
def disable_logging(self, item):
for acc in self._accounts:
acc.disable_logging()
acfactory = item.funcargs.get("acfactory")
if acfactory:
acfactory.set_logging_default(False)
def enable_logging(self, item):
for acc in self._accounts:
acc.enable_logging()
acfactory = item.funcargs.get("acfactory")
if acfactory:
acfactory.set_logging_default(True)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_setup(self, item):
if item.get_closest_marker("ignored"):
if not item.config.getvalue("ignored"):
pytest.skip("use --ignored to run this test")
self.enable_logging(item)
yield
self.disable_logging(item)
@pytest.hookimpl(hookwrapper=True)
def pytest_pyfunc_call(self, pyfuncitem):
self.enable_logging(pyfuncitem)
yield
self.disable_logging(pyfuncitem)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_teardown(self, item):
logging = item.config.getoption("--extra-info")
if logging:
self.enable_logging(item)
yield
if logging:
self.disable_logging(item)
la = LoggingAspect()
config.pluginmanager.register(la)
deltachat.register_global_plugin(la)
def pytest_report_header(config):
info = get_core_info()
summary = [
"Deltachat core={} sqlite={} journal_mode={}".format(
info["deltachat_core_version"],
info["sqlite_version"],
info["journal_mode"],
),
]
chatmail_opt = config.getoption("--chatmail")
if chatmail_opt:
summary.append(f"Chatmail account provider: {chatmail_opt}")
return summary
@pytest.fixture(scope="session")
def testprocess(request):
"""Return live account configuration manager.
The returned object is a :class:`TestProcess` object."""
return TestProcess(pytestconfig=request.config)
class TestProcess:
"""A pytest session-scoped instance to help with managing "live" account configurations."""
_addr2files: Dict[str, Dict[pathlib.Path, bytes]]
def __init__(self, pytestconfig) -> None:
self.pytestconfig = pytestconfig
self._addr2files = {}
self._configlist: List[Dict[str, str]] = []
def get_liveconfig_producer(self):
"""provide live account configs, cached on a per-test-process scope
so that test functions can reuse already known live configs.
"""
chatmail_opt = self.pytestconfig.getoption("--chatmail")
if chatmail_opt:
# Use a chatmail instance.
domain = chatmail_opt
MAX_LIVE_CREATED_ACCOUNTS = 10
for index in range(MAX_LIVE_CREATED_ACCOUNTS):
try:
yield self._configlist[index]
except IndexError:
part = "".join(random.choices("2345789acdefghjkmnpqrstuvwxyz", k=6))
username = f"ci-{part}"
password = f"{username}${username}"
addr = f"{username}@{domain}"
config = {"addr": addr, "mail_pw": password}
print("newtmpuser {}: addr={}".format(index, config["addr"]))
self._configlist.append(config)
yield config
pytest.fail(f"more than {MAX_LIVE_CREATED_ACCOUNTS} live accounts requested.")
else:
pytest.skip(
"specify CHATMAIL_DOMAIN or --chatmail to provide live accounts",
)
def cache_maybe_retrieve_configured_db_files(self, cache_addr, db_target_path):
db_target_path = pathlib.Path(db_target_path)
assert not db_target_path.exists()
try:
filescache = self._addr2files[cache_addr]
except KeyError:
print("CACHE FAIL for", cache_addr)
return False
else:
print("CACHE HIT for", cache_addr)
targetdir = db_target_path.parent
write_dict_to_dir(filescache, targetdir)
return True
def cache_maybe_store_configured_db_files(self, acc):
addr = acc.get_config("addr")
assert acc.is_configured()
# don't overwrite existing entries
if addr not in self._addr2files:
print("storing cache for", addr)
basedir = pathlib.Path(acc.get_blobdir()).parent
self._addr2files[addr] = create_dict_from_files_in_path(basedir)
return True
def create_dict_from_files_in_path(base):
cachedict = {}
for path in base.glob("**/*"):
if path.is_file():
cachedict[path.relative_to(base)] = path.read_bytes()
return cachedict
def write_dict_to_dir(dic, target_dir):
assert dic
for relpath, content in dic.items():
path = target_dir.joinpath(relpath)
if not path.parent.exists():
os.makedirs(path.parent)
path.write_bytes(content)
@pytest.fixture()
def data(request):
"""Test data."""
class Data:
def __init__(self) -> None:
# trying to find test data heuristically
# because we are run from a dev-setup with pytest direct,
# through tox, and then maybe also from deltachat-binding
# users like "deltabot".
self.paths = [
os.path.normpath(x)
for x in [
os.path.join(os.path.dirname(request.fspath.strpath), "data"),
os.path.join(os.path.dirname(request.fspath.strpath), "..", "..", "test-data"),
os.path.join(os.path.dirname(__file__), "..", "..", "..", "test-data"),
]
]
def get_path(self, bn):
"""return path of file or None if it doesn't exist."""
for path in self.paths:
fn = os.path.join(path, *bn.split("/"))
if os.path.exists(fn):
return fn
print(f"WARNING: path does not exist: {fn!r}")
return None
def read_path(self, bn, mode="r"):
fn = self.get_path(bn)
if fn is not None:
with open(fn, mode) as f:
return f.read()
return Data()
class ACSetup:
"""
Accounts setup helper to deal with multiple configure-process
and io & imap initialization phases.
From tests, use the higher level
public ACFactory methods instead of its private helper class.
"""
CONFIGURING = "CONFIGURING"
CONFIGURED = "CONFIGURED"
IDLEREADY = "IDLEREADY"
_configured_events: Queue
def __init__(self, testprocess, init_time) -> None:
self._configured_events = Queue()
self._account2state: Dict[Account, str] = {}
self._account2config: Dict[Account, Dict[str, str]] = {}
self._imap_cleaned: Set[str] = set()
self.testprocess = testprocess
self.init_time = init_time
def log(self, *args):
print("[acsetup]", f"{time.time() - self.init_time:.3f}", *args)
def add_configured(self, account):
"""add an already configured account."""
assert account.is_configured()
self._account2state[account] = self.CONFIGURED
self.log("added already configured account", account, account.get_config("addr"))
def start_configure(self, account):
"""add an account and start its configure process."""
class PendingTracker:
@account_hookimpl
def ac_configure_completed(this, success: bool, comment: Optional[str]) -> None:
self._configured_events.put((account, success, comment))
account.add_account_plugin(PendingTracker(), name="pending_tracker")
self._account2state[account] = self.CONFIGURING
account.configure()
self.log("started configure on", account)
def wait_one_configured(self, account):
"""wait until this account has successfully configured."""
if self._account2state[account] == self.CONFIGURING:
while True:
acc = self._pop_config_success()
if acc == account:
break
self.init_imap(acc)
self.init_logging(acc)
acc._evtracker.consume_events()
def bring_online(self):
"""Wait for all accounts to become ready to receive messages.
This will initialize logging, start IO and the direct_imap attribute
for each account which either is CONFIGURED already or which is CONFIGURING
and successfully completing the configuration process.
"""
print("wait_all_configured finds accounts=", self._account2state)
for acc, state in self._account2state.items():
if state == self.CONFIGURED:
self._onconfigure_start_io(acc)
self._account2state[acc] = self.IDLEREADY
while self.CONFIGURING in self._account2state.values():
acc = self._pop_config_success()
self._onconfigure_start_io(acc)
self._account2state[acc] = self.IDLEREADY
print("finished, account2state", self._account2state)
def _pop_config_success(self):
acc, success, comment = self._configured_events.get()
if not success:
pytest.fail(f"configuring online account {acc} failed: {comment}")
self._account2state[acc] = self.CONFIGURED
if acc in self._account2config:
acc.update_config(self._account2config[acc])
return acc
def _onconfigure_start_io(self, acc):
self.init_imap(acc)
self.init_logging(acc)
acc.start_io()
print(acc._logid, "waiting for inbox IDLE to become ready")
acc._evtracker.wait_idle_inbox_ready()
acc._evtracker.consume_events()
acc.log("inbox IDLE ready")
def init_logging(self, acc):
"""idempotent function for initializing logging (will replace existing logger)."""
logger = FFIEventLogger(acc, logid=acc._logid, init_time=self.init_time)
acc.add_account_plugin(logger, name="logger-" + acc._logid)
def init_imap(self, acc):
"""initialize direct_imap and cleanup server state."""
from deltachat.direct_imap import DirectImap
assert acc.is_configured()
if not hasattr(acc, "direct_imap"):
acc.direct_imap = DirectImap(acc)
addr = acc.get_config("addr")
if addr not in self._imap_cleaned:
imap = acc.direct_imap
for folder in imap.list_folders():
if folder.lower() == "inbox" or folder.lower() == "deltachat":
assert imap.select_folder(folder)
imap.delete("1:*", expunge=True)
else:
imap.conn.folder.delete(folder)
acc.log(f"imap cleaned for addr {addr}")
self._imap_cleaned.add(addr)
class ACFactory:
"""Account factory"""
init_time: float
_finalizers: List[Callable[[], None]]
_accounts: List[Account]
_acsetup: ACSetup
_preconfigured_keys: List[str]
def __init__(self, request, testprocess, tmpdir, data) -> None:
self.init_time = time.time()
self.tmpdir = tmpdir
self.pytestconfig = request.config
self.data = data
self.testprocess = testprocess
self._liveconfig_producer = testprocess.get_liveconfig_producer()
self._finalizers = []
self._accounts = []
self._acsetup = ACSetup(testprocess, self.init_time)
self._preconfigured_keys = ["alice", "bob", "charlie", "dom", "elena", "fiona"]
self.set_logging_default(False)
request.addfinalizer(self.finalize)
def log(self, *args):
print("[acfactory]", f"{time.time() - self.init_time:.3f}", *args)
def finalize(self):
while self._finalizers:
fin = self._finalizers.pop()
fin()
while self._accounts:
acc = self._accounts.pop()
if acc is not None:
imap = getattr(acc, "direct_imap", None)
if imap is not None:
imap.shutdown()
del acc.direct_imap
acc.shutdown()
acc.disable_logging()
def get_next_liveconfig(self):
"""
Base function to get functional online configurations
where we can make valid SMTP and IMAP connections with.
"""
configdict = next(self._liveconfig_producer).copy()
if self.pytestconfig.getoption("--strict-tls"):
# Enable strict certificate checks for online accounts
configdict["imap_certificate_checks"] = str(const.DC_CERTCK_STRICT)
configdict["smtp_certificate_checks"] = str(const.DC_CERTCK_STRICT)
assert "addr" in configdict and "mail_pw" in configdict
return configdict
def _get_cached_account(self, addr) -> Optional[Account]:
if addr in self.testprocess._addr2files:
return self._getaccount(addr)
return None
def get_unconfigured_account(self, closed=False) -> Account:
return self._getaccount(closed=closed)
def _getaccount(self, try_cache_addr=None, closed=False) -> Account:
logid = f"ac{len(self._accounts) + 1}"
# we need to use fixed database basename for maybe_cache_* functions to work
path = self.tmpdir.mkdir(logid).join("dc.db")
if try_cache_addr:
self.testprocess.cache_maybe_retrieve_configured_db_files(try_cache_addr, path)
ac = Account(path.strpath, logging=self._logging, closed=closed)
ac._logid = logid # later instantiated FFIEventLogger needs this
ac._evtracker = ac.add_account_plugin(FFIEventTracker(ac))
if self.pytestconfig.getoption("--debug-setup"):
self._acsetup.init_logging(ac)
self._accounts.append(ac)
return ac
def set_logging_default(self, logging) -> None:
self._logging = bool(logging)
def remove_preconfigured_keys(self) -> None:
self._preconfigured_keys = []
def _preconfigure_key(self, account):
# Only set a preconfigured key if we haven't used it yet for another account.
try:
keyname = self._preconfigured_keys.pop(0)
except IndexError:
pass
else:
fname_sec = self.data.read_path(f"key/{keyname}-secret.asc")
if fname_sec:
account._preconfigure_keypair(fname_sec)
return True
print("WARN: could not use preconfigured keys")
def get_pseudo_configured_account(self, passphrase: Optional[str] = None) -> Account:
# do a pseudo-configured account
ac = self.get_unconfigured_account(closed=bool(passphrase))
if passphrase:
ac.open(passphrase)
acname = ac._logid
addr = f"{acname}@offline.org"
ac.update_config(
{
"configured_addr": addr,
"displayname": acname,
},
)
self._preconfigure_key(ac)
self._acsetup.init_logging(ac)
return ac
def new_online_configuring_account(self, cloned_from=None, cache=False, **kwargs) -> Account:
if cloned_from is None:
configdict = self.get_next_liveconfig()
else:
# XXX we might want to transfer the key to the new account
configdict = {
"addr": cloned_from.get_config("addr"),
"mail_pw": cloned_from.get_config("mail_pw"),
"imap_certificate_checks": cloned_from.get_config("imap_certificate_checks"),
"smtp_certificate_checks": cloned_from.get_config("smtp_certificate_checks"),
}
configdict.update(kwargs)
ac = self._get_cached_account(addr=configdict["addr"]) if cache else None
if ac is not None:
# make sure we consume a preconfig key, as if we had created a fresh account
self._preconfigured_keys.pop(0)
self._acsetup.add_configured(ac)
return ac
ac = self.prepare_account_from_liveconfig(configdict)
self._acsetup.start_configure(ac)
return ac
def prepare_account_from_liveconfig(self, configdict) -> Account:
ac = self.get_unconfigured_account()
assert "addr" in configdict and "mail_pw" in configdict, configdict
configdict.setdefault("bcc_self", False)
configdict.setdefault("mvbox_move", False)
configdict.setdefault("sync_msgs", False)
configdict.setdefault("delete_server_after", 0)
ac.update_config(configdict)
self._acsetup._account2config[ac] = configdict
self._preconfigure_key(ac)
return ac
def wait_configured(self, account) -> None:
"""Wait until the specified account has successfully completed configure."""
self._acsetup.wait_one_configured(account)
def bring_accounts_online(self) -> None:
print("bringing accounts online")
self._acsetup.bring_online()
print("all accounts online")
def get_online_accounts(self, num):
accounts = [self.new_online_configuring_account(cache=True) for i in range(num)]
self.bring_accounts_online()
# we cache fully configured and started accounts
for acc in accounts:
self.testprocess.cache_maybe_store_configured_db_files(acc)
return accounts
def run_bot_process(self, module, ffi=True):
fn = module.__file__
bot_cfg = self.get_next_liveconfig()
bot_ac = self.prepare_account_from_liveconfig(bot_cfg)
self._acsetup.start_configure(bot_ac)
self.wait_configured(bot_ac)
bot_ac.start_io()
# Wait for DC_EVENT_IMAP_INBOX_IDLE so that all emails appeared in the bot's Inbox later are
# considered new and not existing ones, and thus processed by the bot.
print(bot_ac._logid, "waiting for inbox IDLE to become ready")
bot_ac._evtracker.wait_idle_inbox_ready()
bot_ac.stop_io()
self._acsetup._account2state[bot_ac] = self._acsetup.IDLEREADY
# Forget ac as it will be opened by the bot subprocess
# but keep something in the list to not confuse account generation
self._accounts[self._accounts.index(bot_ac)] = None
args = [
sys.executable,
"-u",
fn,
"--email",
bot_cfg["addr"],
"--password",
bot_cfg["mail_pw"],
bot_ac.db_path,
]
if ffi:
args.insert(-1, "--show-ffi")
print("$", " ".join(args))
popen = subprocess.Popen(
args=args,
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # combine stdout/stderr in one stream
bufsize=0, # line buffering
close_fds=True, # close all FDs other than 0/1/2
universal_newlines=True, # give back text
)
bot = BotProcess(popen, addr=bot_cfg["addr"])
self._finalizers.append(bot.kill)
return bot
def dump_imap_summary(self, logfile):
for ac in self._accounts:
ac.dump_account_info(logfile=logfile)
imap = getattr(ac, "direct_imap", None)
if imap is not None:
imap.dump_imap_structures(self.tmpdir, logfile=logfile)
def get_accepted_chat(self, ac1: Account, ac2: Account):
ac2.create_chat(ac1)
return ac1.create_chat(ac2)
def introduce_each_other(self, accounts, sending=True):
to_wait = []
for i, acc in enumerate(accounts):
for acc2 in accounts[i + 1 :]:
chat = self.get_accepted_chat(acc, acc2)
if sending:
chat.send_text("hi")
to_wait.append(acc2)
acc2.create_chat(acc).send_text("hi back")
to_wait.append(acc)
for acc in to_wait:
acc.log("waiting for incoming message")
acc._evtracker.wait_next_incoming_message()
@pytest.fixture()
def acfactory(request, tmpdir, testprocess, data):
"""Account factory."""
am = ACFactory(request=request, tmpdir=tmpdir, testprocess=testprocess, data=data)
yield am
if hasattr(request.node, "rep_call") and request.node.rep_call.failed:
if testprocess.pytestconfig.getoption("--extra-info"):
logfile = io.StringIO()
am.dump_imap_summary(logfile=logfile)
print(logfile.getvalue())
# request.node.add_report_section("call", "imap-server-state", s)
class BotProcess:
stdout_queue: queue.Queue
def __init__(self, popen, addr) -> None:
self.popen = popen
# The first thing the bot prints to stdout is an invite link.
self.qr = self.popen.stdout.readline()
self.addr = addr
# we read stdout as quickly as we can in a thread and make
# the (unicode) lines available for readers through a queue.
self.stdout_queue = queue.Queue()
self.stdout_thread = t = threading.Thread(target=self._run_stdout_thread, name="bot-stdout-thread")
t.daemon = True
t.start()
def _run_stdout_thread(self) -> None:
try:
while True:
line = self.popen.stdout.readline()
if not line:
break
line = line.strip()
self.stdout_queue.put(line)
print("bot-stdout: ", line)
finally:
self.stdout_queue.put(None)
def kill(self) -> None:
self.popen.kill()
def wait(self, timeout=None) -> None:
self.popen.wait(timeout=timeout)
def fnmatch_lines(self, pattern_lines):
patterns = [x.strip() for x in Source(pattern_lines.rstrip()).lines if x.strip()]
for next_pattern in patterns:
print("+++FNMATCH:", next_pattern)
ignored = []
while True:
line = self.stdout_queue.get()
if line is None:
if ignored:
print("BOT stdout terminated after these lines")
for line in ignored:
print(line)
raise IOError("BOT stdout-thread terminated")
if fnmatch.fnmatch(line, next_pattern):
print("+++MATCHED:", line)
break
else:
print("+++IGN:", line)
ignored.append(line)
@pytest.fixture()
def tmp_db_path(tmpdir):
"""Return a path inside the temporary directory where the database can be created."""
return tmpdir.join("test.db").strpath
@pytest.fixture()
def lp():
"""Log printer fixture."""
class Printer:
def sec(self, msg: str) -> None:
print()
print("=" * 10, msg, "=" * 10)
def step(self, msg: str) -> None:
print("-" * 5, "step " + msg, "-" * 5)
def indent(self, msg: str) -> None:
print(" " + msg)
return Printer()
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
# execute all other hooks to obtain the report object
outcome = yield
rep = outcome.get_result()
# set a report attribute for each phase of a call, which can
# be "setup", "call", "teardown"
setattr(item, "rep_" + rep.when, rep)