| | import json |
| | from queue import Queue |
| |
|
| | import deltachat as dc |
| | from deltachat import capi, cutil, register_global_plugin |
| | from deltachat.capi import ffi, lib |
| | from deltachat.hookspec import global_hookimpl |
| | from deltachat.testplugin import ( |
| | ACSetup, |
| | create_dict_from_files_in_path, |
| | write_dict_to_dir, |
| | ) |
| | from deltachat.cutil import from_optional_dc_charpointer |
| |
|
| | |
| |
|
| |
|
| | class TestACSetup: |
| | def test_cache_writing(self, tmp_path): |
| | base = tmp_path.joinpath("hello") |
| | base.mkdir() |
| | d1 = base.joinpath("dir1") |
| | d1.mkdir() |
| | d1.joinpath("file1").write_bytes(b"content1") |
| | d2 = d1.joinpath("dir2") |
| | d2.mkdir() |
| | d2.joinpath("file2").write_bytes(b"123") |
| | d = create_dict_from_files_in_path(base) |
| | newbase = tmp_path.joinpath("other") |
| | write_dict_to_dir(d, newbase) |
| | assert newbase.joinpath("dir1", "dir2", "file2").exists() |
| | assert newbase.joinpath("dir1", "file1").exists() |
| |
|
| | def test_basic_states(self, acfactory, monkeypatch, testprocess): |
| | pc = ACSetup(init_time=0.0, testprocess=testprocess) |
| | acc = acfactory.get_unconfigured_account() |
| | monkeypatch.setattr(acc, "configure", lambda **kwargs: None) |
| | pc.start_configure(acc) |
| | assert pc._account2state[acc] == pc.CONFIGURING |
| | pc._configured_events.put((acc, True, None)) |
| | monkeypatch.setattr(pc, "init_imap", lambda *args, **kwargs: None) |
| | pc.wait_one_configured(acc) |
| | assert pc._account2state[acc] == pc.CONFIGURED |
| | monkeypatch.setattr(pc, "_onconfigure_start_io", lambda *args, **kwargs: None) |
| | pc.bring_online() |
| | assert pc._account2state[acc] == pc.IDLEREADY |
| |
|
| | def test_two_accounts_one_waited_all_started(self, monkeypatch, acfactory, testprocess): |
| | pc = ACSetup(init_time=0.0, testprocess=testprocess) |
| | monkeypatch.setattr(pc, "init_imap", lambda *args, **kwargs: None) |
| | monkeypatch.setattr(pc, "_onconfigure_start_io", lambda *args, **kwargs: None) |
| | ac1 = acfactory.get_unconfigured_account() |
| | monkeypatch.setattr(ac1, "configure", lambda **kwargs: None) |
| | pc.start_configure(ac1) |
| | ac2 = acfactory.get_unconfigured_account() |
| | monkeypatch.setattr(ac2, "configure", lambda **kwargs: None) |
| | pc.start_configure(ac2) |
| | assert pc._account2state[ac1] == pc.CONFIGURING |
| | assert pc._account2state[ac2] == pc.CONFIGURING |
| | pc._configured_events.put((ac1, True, None)) |
| | pc.wait_one_configured(ac1) |
| | assert pc._account2state[ac1] == pc.CONFIGURED |
| | assert pc._account2state[ac2] == pc.CONFIGURING |
| | pc._configured_events.put((ac2, True, None)) |
| | pc.bring_online() |
| | assert pc._account2state[ac1] == pc.IDLEREADY |
| | assert pc._account2state[ac2] == pc.IDLEREADY |
| |
|
| | def test_store_and_retrieve_configured_account_cache(self, acfactory, tmp_path): |
| | ac1 = acfactory.get_pseudo_configured_account() |
| | holder = acfactory._acsetup.testprocess |
| | assert holder.cache_maybe_store_configured_db_files(ac1) |
| | assert not holder.cache_maybe_store_configured_db_files(ac1) |
| | acdir = tmp_path / "newaccount" |
| | acdir.mkdir() |
| | addr = ac1.get_config("addr") |
| | target_db_path = acdir / "db" |
| | assert holder.cache_maybe_retrieve_configured_db_files(addr, str(target_db_path)) |
| | assert sum(1 for _ in acdir.iterdir()) >= 2 |
| |
|
| |
|
| | def test_liveconfig_caching(acfactory, monkeypatch): |
| | prod = [ |
| | {"addr": "1@example.org", "mail_pw": "123"}, |
| | ] |
| | acfactory._liveconfig_producer = iter(prod) |
| | d1 = acfactory.get_next_liveconfig() |
| | d1["hello"] = "world" |
| | acfactory._liveconfig_producer = iter(prod) |
| | d2 = acfactory.get_next_liveconfig() |
| | assert "hello" not in d2 |
| |
|
| |
|
| | def test_empty_context(): |
| | ctx = capi.lib.dc_context_new(capi.ffi.NULL, capi.ffi.NULL, capi.ffi.NULL) |
| | capi.lib.dc_context_unref(ctx) |
| |
|
| |
|
| | def test_dc_close_events(acfactory): |
| | ac1 = acfactory.get_unconfigured_account() |
| |
|
| | |
| | shutdowns = Queue() |
| |
|
| | class ShutdownPlugin: |
| | @global_hookimpl |
| | def dc_account_after_shutdown(self, account): |
| | assert account._dc_context is None |
| | shutdowns.put(account) |
| |
|
| | register_global_plugin(ShutdownPlugin()) |
| | assert hasattr(ac1, "_dc_context") |
| | ac1.shutdown() |
| | shutdowns.get(timeout=2) |
| |
|
| |
|
| | def test_wrong_db(tmp_path): |
| | p = tmp_path / "hello.db" |
| | |
| | p.write_bytes(b"x123" * 10) |
| |
|
| | context = lib.dc_context_new(ffi.NULL, str(p).encode("ascii"), ffi.NULL) |
| | assert not lib.dc_context_is_open(context) |
| |
|
| |
|
| | def test_empty_blobdir(tmp_path): |
| | db_fname = tmp_path / "hello.db" |
| | |
| | ctx = ffi.gc( |
| | lib.dc_context_new(ffi.NULL, str(db_fname).encode("ascii"), b""), |
| | lib.dc_context_unref, |
| | ) |
| | assert ctx != ffi.NULL |
| |
|
| |
|
| | def test_event_defines(): |
| | assert dc.const.DC_EVENT_INFO == 100 |
| | assert dc.const.DC_CONTACT_ID_SELF |
| |
|
| |
|
| | def test_sig(): |
| | sig = capi.lib.dc_event_has_string_data |
| | assert not sig(dc.const.DC_EVENT_MSGS_CHANGED) |
| | assert sig(dc.const.DC_EVENT_INFO) |
| | assert sig(dc.const.DC_EVENT_WARNING) |
| | assert sig(dc.const.DC_EVENT_ERROR) |
| | assert sig(dc.const.DC_EVENT_SMTP_CONNECTED) |
| | assert sig(dc.const.DC_EVENT_IMAP_CONNECTED) |
| | assert sig(dc.const.DC_EVENT_SMTP_MESSAGE_SENT) |
| | assert sig(dc.const.DC_EVENT_IMEX_FILE_WRITTEN) |
| |
|
| |
|
| | def test_markseen_invalid_message_ids(acfactory): |
| | ac1 = acfactory.get_pseudo_configured_account() |
| | contact1 = ac1.create_contact("some1@example.com", name="some1") |
| | chat = contact1.create_chat() |
| | chat.send_text("one message") |
| | ac1._evtracker.get_matching("DC_EVENT_MSGS_CHANGED") |
| | |
| | ac1._evtracker.ensure_event_not_queued("DC_EVENT_ERROR") |
| | msg_ids = [9] |
| | lib.dc_markseen_msgs(ac1._dc_context, msg_ids, len(msg_ids)) |
| | ac1._evtracker.ensure_event_not_queued("DC_EVENT_WARNING|DC_EVENT_ERROR") |
| |
|
| |
|
| | def test_get_special_message_id_returns_empty_message(acfactory): |
| | ac1 = acfactory.get_pseudo_configured_account() |
| | for i in range(1, 10): |
| | msg = ac1.get_message_by_id(i) |
| | assert msg.id == 0 |
| |
|
| |
|
| | def test_provider_info_none(): |
| | ctx = ffi.gc( |
| | lib.dc_context_new(ffi.NULL, ffi.NULL, ffi.NULL), |
| | lib.dc_context_unref, |
| | ) |
| | assert lib.dc_provider_new_from_email(ctx, cutil.as_dc_charpointer("email@unexistent.no")) == ffi.NULL |
| |
|
| |
|
| | def test_get_info_open(tmp_path): |
| | db_fname = tmp_path / "test.db" |
| | ctx = ffi.gc( |
| | lib.dc_context_new(ffi.NULL, str(db_fname).encode("ascii"), ffi.NULL), |
| | lib.dc_context_unref, |
| | ) |
| | info = cutil.from_dc_charpointer(lib.dc_get_info(ctx)) |
| | assert "deltachat_core_version" in info |
| | assert "database_dir" in info |
| |
|
| |
|
| | def test_logged_hook_failure(acfactory): |
| | ac1 = acfactory.get_pseudo_configured_account() |
| | cap = [] |
| | ac1.log = cap.append |
| | with ac1._event_thread.swallow_and_log_exception("some"): |
| | 0 / 0 |
| | assert cap |
| | assert "some" in str(cap) |
| | assert "ZeroDivisionError" in str(cap) |
| | assert "Traceback" in str(cap) |
| |
|
| |
|
| | def test_logged_ac_process_ffi_failure(acfactory): |
| | from deltachat import account_hookimpl |
| |
|
| | ac1 = acfactory.get_pseudo_configured_account() |
| |
|
| | class FailPlugin: |
| | @account_hookimpl |
| | def ac_process_ffi_event(ffi_event): |
| | 0 / 0 |
| |
|
| | cap = Queue() |
| |
|
| | |
| | ac1.add_account_plugin(FailPlugin()) |
| |
|
| | |
| | ac1.log = cap.put |
| |
|
| | |
| | ac1.create_contact("something@example.org") |
| | res = cap.get(timeout=10) |
| | assert "ac_process_ffi_event" in res |
| | assert "ZeroDivisionError" in res |
| | assert "Traceback" in res |
| |
|
| |
|
| | def test_jsonrpc_blocking_call(tmp_path): |
| | accounts_fname = tmp_path / "accounts" |
| | writable = True |
| | accounts = ffi.gc( |
| | lib.dc_accounts_new(str(accounts_fname).encode("ascii"), writable), |
| | lib.dc_accounts_unref, |
| | ) |
| | jsonrpc = ffi.gc(lib.dc_jsonrpc_init(accounts), lib.dc_jsonrpc_unref) |
| | res = json.loads( |
| | from_optional_dc_charpointer( |
| | lib.dc_jsonrpc_blocking_call( |
| | jsonrpc, |
| | json.dumps( |
| | {"jsonrpc": "2.0", "method": "check_email_validity", "params": ["alice@example.org"], "id": "123"}, |
| | ).encode("utf-8"), |
| | ), |
| | ), |
| | ) |
| | assert res == {"jsonrpc": "2.0", "id": "123", "result": True} |
| |
|
| | res = json.loads( |
| | from_optional_dc_charpointer( |
| | lib.dc_jsonrpc_blocking_call( |
| | jsonrpc, |
| | json.dumps( |
| | {"jsonrpc": "2.0", "method": "check_email_validity", "params": ["alice"], "id": "456"}, |
| | ).encode("utf-8"), |
| | ), |
| | ), |
| | ) |
| | assert res == {"jsonrpc": "2.0", "id": "456", "result": False} |
| |
|