Spaces:
Running
Running
File size: 2,971 Bytes
88d2f2a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 | """Concurrency stress test for the SQLite persistence layer.
Reproduces the transient ``"database disk image is malformed"`` race seen
under high concurrent triggers + concurrent reads (event 41 in
``/tmp/polyglot_backend_postdinner.log``) and asserts that the busy
timeout + retry loop added to ``polyglot_alpha.persistence.db`` keeps
the lifecycle clean.
"""
from __future__ import annotations
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List
import pytest
from polyglot_alpha.persistence import session_scope
from polyglot_alpha.persistence.models import Event, EventStatus
_CONCURRENT_WRITERS: int = 10
def _write_one_event(idx: int) -> int:
"""Insert a single Event row inside a fresh ``session_scope``.
Returns the assigned primary key.
"""
with session_scope(event_id=idx) as session:
row = Event(
content_hash=f"stress-hash-{idx}-{threading.get_ident()}",
sources=[{"name": "stress", "url": f"https://example.com/{idx}"}],
language="en",
status=EventStatus.PENDING.value,
title=f"stress event {idx}",
)
session.add(row)
session.flush()
assert row.id is not None
return int(row.id)
@pytest.mark.usefixtures("isolated_db")
def test_concurrent_session_scope_writes_no_database_error() -> None:
"""Fire 10 concurrent ``session_scope()`` writers; all must succeed."""
errors: List[BaseException] = []
ids: List[int] = []
with ThreadPoolExecutor(max_workers=_CONCURRENT_WRITERS) as pool:
futures = [pool.submit(_write_one_event, i) for i in range(_CONCURRENT_WRITERS)]
for fut in as_completed(futures):
try:
ids.append(fut.result())
except BaseException as exc: # capture and assert below
errors.append(exc)
assert errors == [], f"Concurrent writers raised: {errors!r}"
assert len(ids) == _CONCURRENT_WRITERS
assert len(set(ids)) == _CONCURRENT_WRITERS, "Duplicate primary keys"
@pytest.mark.usefixtures("isolated_db")
def test_concurrent_mixed_read_write_no_database_error() -> None:
"""Interleave 10 writers + 10 readers; no DatabaseError expected."""
errors: List[BaseException] = []
def _read_all(_idx: int) -> int:
from sqlmodel import select
with session_scope() as session:
return len(session.exec(select(Event)).all())
with ThreadPoolExecutor(max_workers=_CONCURRENT_WRITERS * 2) as pool:
futures = []
for i in range(_CONCURRENT_WRITERS):
futures.append(pool.submit(_write_one_event, 100 + i))
futures.append(pool.submit(_read_all, i))
for fut in as_completed(futures):
try:
fut.result()
except BaseException as exc:
errors.append(exc)
assert errors == [], f"Mixed read/write workload raised: {errors!r}"
|