File size: 4,387 Bytes
0220cd3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import os
import threading
import time
from queue import Empty, Queue

import pytest

import deltachat


def test_db_busy_error(acfactory):
    starttime = time.time()
    log_lock = threading.RLock()

    def log(string):
        with log_lock:
            print(f"{time.time() - starttime:3.2f} {string}")

    # make a number of accounts
    accounts = acfactory.get_many_online_accounts(3)
    log(f"created {len(accounts)} accounts")

    # put a bigfile into each account
    for acc in accounts:
        acc.bigfile = os.path.join(acc.get_blobdir(), "bigfile")
        with open(acc.bigfile, "wb") as f:
            f.write(b"01234567890" * 1000_000)
    log(f"created {len(accounts)} bigfiles")

    contact_addrs = [acc.get_self_contact().addr for acc in accounts]
    chat = accounts[0].create_group_chat("stress-group")
    for addr in contact_addrs[1:]:
        chat.add_contact(chat.account.create_contact(addr))

    # setup auto-responder bots which report back failures/actions
    report_queue = Queue()

    def report_func(replier, report_type, *report_args):
        report_queue.put((replier, report_type, report_args))

    # each replier receives all events and sends report events to receive_queue
    repliers = []
    for acc in accounts:
        replier = AutoReplier(acc, log=log, num_send=500, num_bigfiles=5, report_func=report_func)
        acc.add_account_plugin(replier)
        repliers.append(replier)

    # kick off message sending
    # after which repliers will reply to each other
    chat.send_text("hello")

    alive_count = len(accounts)
    while alive_count > 0:
        try:
            replier, report_type, report_args = report_queue.get(timeout=10)
        except Empty:
            log("timeout waiting for next event")
            pytest.fail("timeout exceeded")
        if report_type == ReportType.exit:
            replier.log("EXIT")
        elif report_type == ReportType.ffi_error:
            replier.log(f"ERROR: {report_args[0]}")
        elif report_type == ReportType.message_echo:
            continue
        else:
            raise ValueError(f"{addr} unknown report type {report_type}, args={report_args}")
        alive_count -= 1
        replier.log("shutting down")
        replier.account.shutdown()
        replier.log(f"shut down complete, remaining={alive_count}")


class ReportType:
    exit = "exit"
    ffi_error = "ffi-error"
    message_echo = "message-echo"


class AutoReplier:
    def __init__(self, account, log, num_send, num_bigfiles, report_func) -> None:
        self.account = account
        self._log = log
        self.report_func = report_func
        self.num_send = num_send
        self.num_bigfiles = num_bigfiles
        self.current_sent = 0
        self.addr = self.account.get_self_contact().addr

        self._thread = threading.Thread(name=f"Stats{self.account}", target=self.thread_stats)
        self._thread.setDaemon(True)
        self._thread.start()

    def log(self, message) -> None:
        self._log(f"{self.addr} {message}")

    def thread_stats(self):
        # XXX later use, for now we just quit
        return
        while 1:
            time.sleep(1.0)
            break

    @deltachat.account_hookimpl
    def ac_incoming_message(self, message):
        if self.current_sent >= self.num_send:
            self.report_func(self, ReportType.exit)
            return
        message.create_chat()
        message.mark_seen()
        self.log(f"incoming message: {message}")

        self.current_sent += 1
        # we are still alive, let's send a reply
        if self.num_bigfiles and self.current_sent % (self.num_send / self.num_bigfiles) == 0:
            message.chat.send_text(f"send big file as reply to: {message.text}")
            msg = message.chat.send_file(self.account.bigfile)
        else:
            msg = message.chat.send_text(f"got message id {message.id}, small text reply")
            assert msg.text
        self.log(f"message-sent: {msg}")
        self.report_func(self, ReportType.message_echo)
        if self.current_sent >= self.num_send:
            self.report_func(self, ReportType.exit)
            return

    @deltachat.account_hookimpl
    def ac_process_ffi_event(self, ffi_event):
        self.log(ffi_event)
        if ffi_event.name == "DC_EVENT_ERROR":
            self.report_func(self, ReportType.ffi_error, ffi_event)