File size: 2,971 Bytes
88d2f2a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
"""Concurrency stress test for the SQLite persistence layer.

Reproduces the transient ``"database disk image is malformed"`` race seen
under high concurrent triggers + concurrent reads (event 41 in
``/tmp/polyglot_backend_postdinner.log``) and asserts that the busy
timeout + retry loop added to ``polyglot_alpha.persistence.db`` keeps
the lifecycle clean.
"""

from __future__ import annotations

import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List

import pytest

from polyglot_alpha.persistence import session_scope
from polyglot_alpha.persistence.models import Event, EventStatus


_CONCURRENT_WRITERS: int = 10


def _write_one_event(idx: int) -> int:
    """Insert a single Event row inside a fresh ``session_scope``.

    Returns the assigned primary key.
    """

    with session_scope(event_id=idx) as session:
        row = Event(
            content_hash=f"stress-hash-{idx}-{threading.get_ident()}",
            sources=[{"name": "stress", "url": f"https://example.com/{idx}"}],
            language="en",
            status=EventStatus.PENDING.value,
            title=f"stress event {idx}",
        )
        session.add(row)
        session.flush()
        assert row.id is not None
        return int(row.id)


@pytest.mark.usefixtures("isolated_db")
def test_concurrent_session_scope_writes_no_database_error() -> None:
    """Fire 10 concurrent ``session_scope()`` writers; all must succeed."""

    errors: List[BaseException] = []
    ids: List[int] = []

    with ThreadPoolExecutor(max_workers=_CONCURRENT_WRITERS) as pool:
        futures = [pool.submit(_write_one_event, i) for i in range(_CONCURRENT_WRITERS)]
        for fut in as_completed(futures):
            try:
                ids.append(fut.result())
            except BaseException as exc:  # capture and assert below
                errors.append(exc)

    assert errors == [], f"Concurrent writers raised: {errors!r}"
    assert len(ids) == _CONCURRENT_WRITERS
    assert len(set(ids)) == _CONCURRENT_WRITERS, "Duplicate primary keys"


@pytest.mark.usefixtures("isolated_db")
def test_concurrent_mixed_read_write_no_database_error() -> None:
    """Interleave 10 writers + 10 readers; no DatabaseError expected."""

    errors: List[BaseException] = []

    def _read_all(_idx: int) -> int:
        from sqlmodel import select

        with session_scope() as session:
            return len(session.exec(select(Event)).all())

    with ThreadPoolExecutor(max_workers=_CONCURRENT_WRITERS * 2) as pool:
        futures = []
        for i in range(_CONCURRENT_WRITERS):
            futures.append(pool.submit(_write_one_event, 100 + i))
            futures.append(pool.submit(_read_all, i))
        for fut in as_completed(futures):
            try:
                fut.result()
            except BaseException as exc:
                errors.append(exc)

    assert errors == [], f"Mixed read/write workload raised: {errors!r}"