File size: 31,554 Bytes
0220cd3 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 | """Account class implementation."""
import os
from array import array
from contextlib import contextmanager
from email.utils import parseaddr
from threading import Event
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Union
from . import const, hookspec
from .capi import ffi, lib
from .chat import Chat
from .contact import Contact
from .cutil import (
DCLot,
as_dc_charpointer,
from_dc_charpointer,
from_optional_dc_charpointer,
iter_array,
)
from .message import Message
from .tracker import ConfigureTracker, ImexTracker
if TYPE_CHECKING:
from .events import FFIEventTracker
class MissingCredentials(ValueError):
"""Account is missing `addr` and `mail_pw` config values."""
def get_core_info():
"""get some system info."""
from tempfile import NamedTemporaryFile
with NamedTemporaryFile() as path:
path.close()
return get_dc_info_as_dict(
ffi.gc(
lib.dc_context_new(as_dc_charpointer(""), as_dc_charpointer(path.name), ffi.NULL),
lib.dc_context_unref,
),
)
def get_dc_info_as_dict(dc_context):
lines = from_dc_charpointer(lib.dc_get_info(dc_context))
info_dict = {}
for line in lines.split("\n"):
if not line.strip():
continue
key, value = line.split("=", 1)
info_dict[key.lower()] = value
return info_dict
class Account:
"""Each account is tied to a sqlite database file which is fully managed
by the underlying deltachat core library. All public Account methods are
meant to be memory-safe and return memory-safe objects.
"""
MissingCredentials = MissingCredentials
_logid: str
_evtracker: "FFIEventTracker"
def __init__(self, db_path, os_name=None, logging=True, closed=False) -> None:
from .events import EventThread
"""initialize account object.
:param db_path: a path to the account database. The database
will be created if it doesn't exist.
:param os_name: [Deprecated]
:param logging: enable logging for this account
:param closed: set to True to avoid automatically opening the account
after creation.
"""
# initialize per-account plugin system
self._pm = hookspec.PerAccount._make_plugin_manager()
self._logging = logging
self.add_account_plugin(self)
self.db_path = db_path
if hasattr(db_path, "encode"):
db_path = db_path.encode("utf8")
ptr = lib.dc_context_new_closed(db_path) if closed else lib.dc_context_new(ffi.NULL, db_path, ffi.NULL)
if ptr == ffi.NULL:
raise ValueError(f"Could not dc_context_new: {os_name} {db_path}")
self._dc_context = ffi.gc(
ptr,
lib.dc_context_unref,
)
self._shutdown_event = Event()
self._event_thread = EventThread(self)
self._configkeys = self.get_config("sys.config_keys").split()
hook = hookspec.Global._get_plugin_manager().hook
hook.dc_account_init(account=self)
def open(self, passphrase: Optional[str] = None) -> bool:
"""Open the account's database with the given passphrase.
This can only be used on a closed account. If the account is new, this
operation sets the database passphrase. For existing databases the passphrase
should be the one used to encrypt the database the first time.
:returns: True if the database is opened with this passphrase, False if the
passphrase is incorrect or an error occurred.
"""
return bool(lib.dc_context_open(self._dc_context, as_dc_charpointer(passphrase)))
def disable_logging(self) -> None:
"""disable logging."""
self._logging = False
def enable_logging(self) -> None:
"""re-enable logging."""
self._logging = True
def __repr__(self) -> str:
return f"<Account path={self.db_path}>"
# def __del__(self):
# self.shutdown()
def log(self, msg):
if self._logging:
self._pm.hook.ac_log_line(message=msg)
def _check_config_key(self, name: str) -> None:
if name not in self._configkeys:
raise KeyError(f"{name!r} not a valid config key, existing keys: {self._configkeys!r}")
def get_info(self) -> Dict[str, str]:
"""return dictionary of built config parameters."""
return get_dc_info_as_dict(self._dc_context)
def dump_account_info(self, logfile):
def log(*args, **kwargs):
kwargs["file"] = logfile
print(*args, **kwargs)
log("=============== " + self.get_config("displayname") + " ===============")
cursor = 0
for name, val in self.get_info().items():
entry = f"{name.upper()}={val}"
if cursor + len(entry) > 80:
log("")
cursor = 0
log(entry, end=" ")
cursor += len(entry) + 1
log("")
def set_stock_translation(self, id: int, string: str) -> None:
"""set stock translation string.
:param id: id of stock string (const.DC_STR_*)
:param value: string to set as new translation
:returns: None
"""
bytestring = string.encode("utf8")
res = lib.dc_set_stock_translation(self._dc_context, id, bytestring)
if res == 0:
raise ValueError("could not set translation string")
def set_config(self, name: str, value: Optional[str]) -> None:
"""set configuration values.
:param name: config key name (unicode)
:param value: value to set (unicode)
:returns: None
"""
self._check_config_key(name)
namebytes = name.encode("utf8")
if isinstance(value, (int, bool)):
value = str(int(value))
valuebytes = value.encode("utf8") if value is not None else ffi.NULL
lib.dc_set_config(self._dc_context, namebytes, valuebytes)
def get_config(self, name: str) -> str:
"""return unicode string value.
:param name: configuration key to lookup (eg "addr" or "mail_pw")
:returns: unicode value
:raises: KeyError if no config value was found.
"""
if name != "sys.config_keys":
self._check_config_key(name)
namebytes = name.encode("utf8")
res = lib.dc_get_config(self._dc_context, namebytes)
assert res != ffi.NULL, f"config value not found for: {name!r}"
return from_dc_charpointer(res)
def _preconfigure_keypair(self, secret: str) -> None:
"""See dc_preconfigure_keypair() in deltachat.h.
In other words, you don't need this.
"""
res = lib.dc_preconfigure_keypair(
self._dc_context,
as_dc_charpointer(secret),
)
if res == 0:
raise Exception("Failed to set key")
def update_config(self, kwargs: Dict[str, Any]) -> None:
"""update config values.
:param kwargs: name=value config settings for this account.
values need to be unicode.
:returns: None
"""
for key, value in kwargs.items():
self.set_config(key, value)
def is_configured(self) -> bool:
"""determine if the account is configured already; an initial connection
to SMTP/IMAP has been verified.
:returns: True if account is configured.
"""
return bool(lib.dc_is_configured(self._dc_context))
def is_open(self) -> bool:
"""Determine if account is open.
:returns True if account is open.
"""
return bool(lib.dc_context_is_open(self._dc_context))
def set_avatar(self, img_path: Optional[str]) -> None:
"""Set self avatar.
:raises ValueError: if profile image could not be set
:returns: None
"""
if img_path is None:
self.set_config("selfavatar", None)
else:
assert os.path.exists(img_path), img_path
self.set_config("selfavatar", img_path)
def check_is_configured(self) -> None:
"""Raise ValueError if this account is not configured."""
if not self.is_configured():
raise ValueError("need to configure first")
def get_latest_backupfile(self, backupdir) -> Optional[str]:
"""return the latest backup file in a given directory."""
res = lib.dc_imex_has_backup(self._dc_context, as_dc_charpointer(backupdir))
return from_optional_dc_charpointer(res)
def get_blobdir(self) -> str:
"""return the directory for files.
All sent files are copied to this directory if necessary.
Place files there directly to avoid copying.
"""
return from_dc_charpointer(lib.dc_get_blobdir(self._dc_context))
def get_self_contact(self) -> Contact:
"""return this account's identity as a :class:`deltachat.contact.Contact`.
:returns: :class:`deltachat.contact.Contact`
"""
return Contact(self, const.DC_CONTACT_ID_SELF)
def create_contact(self, obj, name: Optional[str] = None) -> Contact:
"""create a (new) Contact or return an existing one.
Calling this method will always result in the same
underlying contact id. If there already is a Contact
with that e-mail address, it is unblocked and its display
`name` is updated if specified.
:param obj: email-address, Account or Contact instance.
:param name: (optional) display name for this contact
:returns: :class:`deltachat.contact.Contact` instance.
"""
if isinstance(obj, Account):
if not obj.is_configured():
raise ValueError("Can only add configured accounts as contacts")
assert name is None
vcard = obj.get_self_contact().make_vcard()
return self.import_vcard(vcard)[0]
(name, addr) = self.get_contact_addr_and_name(obj, name)
name_c = as_dc_charpointer(name)
addr_c = as_dc_charpointer(addr)
contact_id = lib.dc_create_contact(self._dc_context, name_c, addr_c)
return Contact(self, contact_id)
def get_contact(self, obj) -> Optional[Contact]:
if isinstance(obj, Account):
return self.create_contact(obj)
if isinstance(obj, Contact):
return obj
(_, addr) = self.get_contact_addr_and_name(obj)
return self.get_contact_by_addr(addr)
def get_contact_addr_and_name(self, obj, name: Optional[str] = None):
if isinstance(obj, Account):
if not obj.is_configured():
raise ValueError("can only add addresses from configured accounts")
addr, displayname = obj.get_config("addr"), obj.get_config("displayname")
elif isinstance(obj, Contact):
if obj.account != self:
raise ValueError(f"account mismatch {obj}")
addr, displayname = obj.addr, obj.name
elif isinstance(obj, str):
displayname, addr = parseaddr(obj)
else:
raise TypeError(f"don't know how to create chat for {obj!r}")
if name is None and displayname:
name = displayname
return (name, addr)
def delete_contact(self, contact: Contact) -> bool:
"""delete a Contact.
:param contact: contact object obtained
:returns: True if deletion succeeded (contact was deleted)
"""
contact_id = contact.id
assert contact.account == self
assert contact_id > const.DC_CHAT_ID_LAST_SPECIAL
return bool(lib.dc_delete_contact(self._dc_context, contact_id))
def get_contact_by_addr(self, email: str) -> Optional[Contact]:
"""Looks up a known and unblocked contact with a given e-mail address.
To get a list of all known and unblocked contacts, use contacts_get_contacts().
**POTENTIAL SECURITY ISSUE**: If there are multiple contacts with this address
(e.g. an address-contact and a key-contact),
this looks up the most recently seen contact,
i.e. which contact is returned depends on which contact last sent a message.
If the user just clicked on a mailto: link, then this is the best thing you can do.
But **DO NOT** internally represent contacts by their email address
and do not use this function to look them up;
otherwise this function will sometimes look up the wrong contact.
Instead, you should internally represent contacts by their ids.
To validate an e-mail address independently of the contact database
use check_email_validity()."""
_, addr = parseaddr(email)
addr = as_dc_charpointer(addr)
contact_id = lib.dc_lookup_contact_id_by_addr(self._dc_context, addr)
if contact_id:
return self.get_contact_by_id(contact_id)
return None
def get_contact_by_id(self, contact_id: int) -> Contact:
"""return Contact instance or raise an exception.
:param contact_id: integer id of this contact.
:returns: :class:`deltachat.contact.Contact` instance.
"""
return Contact(self, contact_id)
def get_blocked_contacts(self) -> List[Contact]:
"""return a list of all blocked contacts.
:returns: list of :class:`deltachat.contact.Contact` objects.
"""
dc_array = ffi.gc(lib.dc_get_blocked_contacts(self._dc_context), lib.dc_array_unref)
return list(iter_array(dc_array, lambda x: Contact(self, x)))
def get_contacts(
self,
query: Optional[str] = None,
with_self: bool = False,
) -> List[Contact]:
"""get a (filtered) list of contacts.
:param query: if a string is specified, only return contacts
whose name or e-mail matches query.
:param with_self: if true the self-contact is also returned.
:returns: list of :class:`deltachat.contact.Contact` objects.
"""
flags = 0
query_c = as_dc_charpointer(query)
if with_self:
flags |= const.DC_GCL_ADD_SELF
dc_array = ffi.gc(lib.dc_get_contacts(self._dc_context, flags, query_c), lib.dc_array_unref)
return list(iter_array(dc_array, lambda x: Contact(self, x)))
def import_vcard(self, vcard):
"""Import a vCard and return an array of contacts."""
dc_array = ffi.gc(lib.dc_import_vcard(self._dc_context, as_dc_charpointer(vcard)), lib.dc_array_unref)
return list(iter_array(dc_array, lambda x: Contact(self, x)))
def get_fresh_messages(self) -> Generator[Message, None, None]:
"""yield all fresh messages from all chats."""
dc_array = ffi.gc(lib.dc_get_fresh_msgs(self._dc_context), lib.dc_array_unref)
return (x for x in iter_array(dc_array, lambda x: Message.from_db(self, x)) if x is not None)
def create_chat(self, obj) -> Chat:
"""Create a 1:1 chat with Account, Contact or e-mail address."""
return self.create_contact(obj).create_chat()
def create_group_chat(
self,
name: str,
contacts: Optional[List[Contact]] = None,
) -> Chat:
"""create a new group chat object.
Chats are unpromoted until the first message is sent.
:param contacts: list of contacts to add
:returns: a :class:`deltachat.chat.Chat` object.
"""
bytes_name = name.encode("utf8")
chat_id = lib.dc_create_group_chat(self._dc_context, 0, bytes_name)
chat = Chat(self, chat_id)
if contacts is not None:
for contact in contacts:
chat.add_contact(contact)
return chat
def get_chats(self) -> List[Chat]:
"""return list of chats.
:returns: a list of :class:`deltachat.chat.Chat` objects.
"""
dc_chatlist = ffi.gc(lib.dc_get_chatlist(self._dc_context, 0, ffi.NULL, 0), lib.dc_chatlist_unref)
assert dc_chatlist != ffi.NULL
chatlist = []
for i in range(lib.dc_chatlist_get_cnt(dc_chatlist)):
chat_id = lib.dc_chatlist_get_chat_id(dc_chatlist, i)
chatlist.append(Chat(self, chat_id))
return chatlist
def get_device_chat(self) -> Chat:
return Contact(self, const.DC_CONTACT_ID_DEVICE).create_chat()
def get_message_by_id(self, msg_id: int) -> Optional[Message]:
"""return Message instance.
:param msg_id: integer id of this message.
:returns: :class:`deltachat.message.Message` instance.
"""
return Message.from_db(self, msg_id)
def get_chat_by_id(self, chat_id: int) -> Chat:
"""return Chat instance.
:param chat_id: integer id of this chat.
:returns: :class:`deltachat.chat.Chat` instance.
:raises: ValueError if chat does not exist.
"""
res = lib.dc_get_chat(self._dc_context, chat_id)
if res == ffi.NULL:
raise ValueError(f"cannot get chat with id={chat_id}")
lib.dc_chat_unref(res)
return Chat(self, chat_id)
def mark_seen_messages(self, messages: List[Union[int, Message]]) -> None:
"""mark the given set of messages as seen.
:param messages: a list of message ids or Message instances.
"""
arr = array("i")
for msg in messages:
if isinstance(msg, Message):
arr.append(msg.id)
else:
arr.append(msg)
msg_ids = ffi.cast("uint32_t*", ffi.from_buffer(arr))
lib.dc_markseen_msgs(self._dc_context, msg_ids, len(messages))
def forward_messages(self, messages: List[Message], chat: Chat) -> None:
"""Forward list of messages to a chat.
:param messages: list of :class:`deltachat.message.Message` object.
:param chat: :class:`deltachat.chat.Chat` object.
:returns: None
"""
msg_ids = [msg.id for msg in messages]
lib.dc_forward_msgs(self._dc_context, msg_ids, len(msg_ids), chat.id)
def resend_messages(self, messages: List[Message]) -> None:
"""Resend list of messages.
:param messages: list of :class:`deltachat.message.Message` object.
:returns: None
"""
msg_ids = [msg.id for msg in messages]
if lib.dc_resend_msgs(self._dc_context, msg_ids, len(msg_ids)) != 1:
raise ValueError(f"could not resend messages {msg_ids}")
def delete_messages(self, messages: List[Message]) -> None:
"""delete messages (local and remote).
:param messages: list of :class:`deltachat.message.Message` object.
:returns: None
"""
msg_ids = [msg.id for msg in messages]
lib.dc_delete_msgs(self._dc_context, msg_ids, len(msg_ids))
def export_self_keys(self, path):
"""export public and private keys to the specified directory.
Note that the account does not have to be started.
"""
return self._export(path, imex_cmd=const.DC_IMEX_EXPORT_SELF_KEYS)
def export_all(self, path: str, passphrase: Optional[str] = None) -> str:
"""Export a backup of all database state (chats, contacts, keys, media, ...).
:param path: the directory where the backup will be stored.
:param passphrase: the backup will be encrypted with the passphrase, if it is
None or empty string, the backup is not encrypted.
:returns: path to the created backup.
Note that the account has to be stopped; call stop_io() if necessary.
"""
export_files = self._export(path, const.DC_IMEX_EXPORT_BACKUP, passphrase)
if len(export_files) != 1:
raise RuntimeError("found more than one new file")
return export_files[0]
def _export(self, path: str, imex_cmd: int, passphrase: Optional[str] = None) -> list:
with self.temp_plugin(ImexTracker()) as imex_tracker:
self.imex(path, imex_cmd, passphrase)
return imex_tracker.wait_finish()
def import_self_keys(self, path):
"""Import private keys found in the `path` directory.
The last imported key is made the default keys unless its name
contains the string legacy. Public keys are not imported.
Note that the account does not have to be started.
"""
self._import(path, imex_cmd=const.DC_IMEX_IMPORT_SELF_KEYS)
def import_all(self, path: str, passphrase: Optional[str] = None) -> None:
"""Import Delta Chat state from the specified backup file.
The account must be in unconfigured state for import to attempted.
:param path: path to the backup file.
:param passphrase: if not None or empty, the backup will be opened with the passphrase.
"""
assert not self.is_configured(), "cannot import into configured account"
self._import(path, imex_cmd=const.DC_IMEX_IMPORT_BACKUP, passphrase=passphrase)
def _import(self, path: str, imex_cmd: int, passphrase: Optional[str] = None) -> None:
with self.temp_plugin(ImexTracker()) as imex_tracker:
self.imex(path, imex_cmd, passphrase)
imex_tracker.wait_finish()
def imex(self, path: str, imex_cmd: int, passphrase: Optional[str] = None) -> None:
lib.dc_imex(self._dc_context, imex_cmd, as_dc_charpointer(path), as_dc_charpointer(passphrase))
def initiate_key_transfer(self) -> str:
"""return setup code after a Autocrypt setup message
has been successfully sent to our own e-mail address ("self-sent message").
If sending out was unsuccessful, a RuntimeError is raised.
"""
self.check_is_configured()
res = lib.dc_initiate_key_transfer(self._dc_context)
if res == ffi.NULL:
raise RuntimeError("could not send out autocrypt setup message")
return from_dc_charpointer(res)
def get_setup_contact_qr(self) -> str:
"""get/create Setup-Contact QR Code as ascii-string.
this string needs to be transferred to another DC account
in a second channel (typically used by mobiles with QRcode-show + scan UX)
where qr_setup_contact(qr) is called.
"""
res = lib.dc_get_securejoin_qr(self._dc_context, 0)
return from_dc_charpointer(res)
def check_qr(self, qr):
"""check qr code and return :class:`ScannedQRCode` instance representing the result."""
res = ffi.gc(lib.dc_check_qr(self._dc_context, as_dc_charpointer(qr)), lib.dc_lot_unref)
lot = DCLot(res)
if lot.state() == const.DC_QR_ERROR:
raise ValueError(f"invalid or unknown QR code: {lot.text1()}")
return ScannedQRCode(lot)
def qr_setup_contact(self, qr):
"""setup contact and return a `Chat` instance after contact is established.
This function triggers a network protocol in the background between
the emitter of the QR code and this account.
:param qr: valid "setup contact" QR code (all other QR codes will result in an exception)
"""
assert self.check_qr(qr).is_ask_verifycontact()
chat_id = lib.dc_join_securejoin(self._dc_context, as_dc_charpointer(qr))
if chat_id == 0:
raise ValueError("could not setup secure contact")
return Chat(self, chat_id)
def qr_join_chat(self, qr):
"""return a `Chat` instance for which the securejoin network
protocol has been started.
This function triggers a network protocol in the background between
the emitter of the QR code and this account.
:param qr: valid "join-group" QR code (all other QR codes will result in an exception)
"""
assert self.check_qr(qr).is_ask_verifygroup()
chat_id = lib.dc_join_securejoin(self._dc_context, as_dc_charpointer(qr))
if chat_id == 0:
raise ValueError("could not join group")
return Chat(self, chat_id)
def set_location(self, latitude: float = 0.0, longitude: float = 0.0, accuracy: float = 0.0) -> None:
"""set a new location. It effects all chats where we currently
have enabled location streaming.
:param latitude: float (use 0.0 if not known)
:param longitude: float (use 0.0 if not known)
:param accuracy: float (use 0.0 if not known)
:raises: ValueError if no chat is currently streaming locations
:returns: None
"""
dc_res = lib.dc_set_location(self._dc_context, latitude, longitude, accuracy)
if dc_res == 0:
raise ValueError("no chat is streaming locations")
#
# meta API for start/stop and event based processing
#
def run_account(self, addr=None, password=None, account_plugins=None, show_ffi=False, displayname=None):
"""get the account running, configure it if necessary. add plugins if provided.
:param addr: the email address of the account
:param password: the password of the account
:param account_plugins: a list of plugins to add
:param show_ffi: show low level ffi events
:param displayname: the display name of the account
"""
from .events import FFIEventLogger
if show_ffi:
log = FFIEventLogger(self)
self.add_account_plugin(log)
for plugin in account_plugins or []:
print("adding plugin", plugin)
self.add_account_plugin(plugin)
if not self.is_configured():
assert addr and password, "you must specify email and password once to configure this database/account"
self.set_config("addr", addr)
self.set_config("mail_pw", password)
self.set_config("bot", "1")
configtracker = self.configure()
configtracker.wait_finish()
if displayname:
self.set_config("displayname", displayname)
# start IO threads and configure if necessary
self.start_io()
def add_account_plugin(self, plugin, name=None):
"""add an account plugin which implements one or more of
the :class:`deltachat.hookspec.PerAccount` hooks.
"""
if name and self._pm.has_plugin(name=name):
self._pm.unregister(name=name)
self._pm.register(plugin, name=name)
self._pm.check_pending()
return plugin
def remove_account_plugin(self, plugin, name=None):
"""remove an account plugin."""
self._pm.unregister(plugin, name=name)
@contextmanager
def temp_plugin(self, plugin):
"""run a with-block with the given plugin temporarily registered."""
self._pm.register(plugin)
yield plugin
self._pm.unregister(plugin)
def stop_ongoing(self):
"""Stop ongoing securejoin, configuration or other core jobs."""
lib.dc_stop_ongoing_process(self._dc_context)
def get_connectivity(self):
return lib.dc_get_connectivity(self._dc_context)
def get_connectivity_html(self) -> str:
return from_dc_charpointer(lib.dc_get_connectivity_html(self._dc_context))
def start_io(self):
"""start this account's IO scheduling (Rust-core async scheduler).
If this account is not configured an Exception is raised.
You need to call account.configure() and account.wait_configure_finish()
before.
You may call `stop_scheduler`, `wait_shutdown` or `shutdown` after the
account is started.
If you are using this from a test, you may want to call
wait_all_initial_fetches() afterwards.
:raises MissingCredentials: if `addr` and `mail_pw` values are not set.
:raises ConfigureFailed: if the account could not be configured.
:returns: None
"""
if not self.is_configured():
raise ValueError("account not configured, cannot start io")
lib.dc_start_io(self._dc_context)
def maybe_network(self):
"""This function should be called when there is a hint
that the network is available again,
e.g. as a response to system event reporting network availability.
The library will try to send pending messages out immediately.
Moreover, to have a reliable state
when the app comes to foreground with network available,
it may be reasonable to call the function also at that moment.
It is okay to call the function unconditionally when there is
network available, however, calling the function
_without_ having network may interfere with the backoff algorithm
and will led to let the jobs fail faster, with fewer retries
and may avoid messages being sent out.
Finally, if the context was created by the dc_accounts_t account manager
(currently not implemented in the Python bindings),
use dc_accounts_maybe_network() instead of this function
"""
lib.dc_maybe_network(self._dc_context)
def configure(self) -> ConfigureTracker:
"""Start configuration process and return a Configtracker instance
on which you can block with wait_finish() to get a True/False success
value for the configuration process.
"""
if not self.get_config("addr") or not self.get_config("mail_pw"):
raise MissingCredentials("addr or mail_pwd not set in config")
configtracker = ConfigureTracker(self)
self.add_account_plugin(configtracker)
lib.dc_configure(self._dc_context)
return configtracker
def wait_shutdown(self) -> None:
"""wait until shutdown of this account has completed."""
self._shutdown_event.wait()
def stop_io(self) -> None:
"""stop core IO scheduler if it is running."""
self.log("stop_ongoing")
self.stop_ongoing()
self.log("dc_stop_io (stop core IO scheduler)")
lib.dc_stop_io(self._dc_context)
def shutdown(self) -> None:
"""shutdown and destroy account (stop callback thread, close and remove
underlying dc_context).
"""
if self._dc_context is None:
return
# mark the event thread for shutdown (latest on next incoming event)
self._event_thread.mark_shutdown()
# stop_io also causes an info event which will wake up
# the EventThread's inner loop and let it notice the shutdown marker.
self.stop_io()
self.log("wait for event thread to finish")
try:
self._event_thread.wait(timeout=5)
except RuntimeError as e:
self.log(f"Waiting for event thread failed: {e}")
if self._event_thread.is_alive():
self.log("WARN: event thread did not terminate yet, ignoring.")
self.log("remove dc_context references, making the Account unusable")
self._dc_context = None
self._shutdown_event.set()
hook = hookspec.Global._get_plugin_manager().hook
hook.dc_account_after_shutdown(account=self)
self.log("shutdown finished")
class ScannedQRCode:
def __init__(self, dc_lot) -> None:
self._dc_lot = dc_lot
def is_ask_verifycontact(self):
return self._dc_lot.state() == const.DC_QR_ASK_VERIFYCONTACT
def is_ask_verifygroup(self):
return self._dc_lot.state() == const.DC_QR_ASK_VERIFYGROUP
@property
def contact_id(self):
return self._dc_lot.id()
|