Spaces:
Running
Running
| """Concurrency stress test for the SQLite persistence layer. | |
| Reproduces the transient ``"database disk image is malformed"`` race seen | |
| under high concurrent triggers + concurrent reads (event 41 in | |
| ``/tmp/polyglot_backend_postdinner.log``) and asserts that the busy | |
| timeout + retry loop added to ``polyglot_alpha.persistence.db`` keeps | |
| the lifecycle clean. | |
| """ | |
| from __future__ import annotations | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from typing import List | |
| import pytest | |
| from polyglot_alpha.persistence import session_scope | |
| from polyglot_alpha.persistence.models import Event, EventStatus | |
| _CONCURRENT_WRITERS: int = 10 | |
| def _write_one_event(idx: int) -> int: | |
| """Insert a single Event row inside a fresh ``session_scope``. | |
| Returns the assigned primary key. | |
| """ | |
| with session_scope(event_id=idx) as session: | |
| row = Event( | |
| content_hash=f"stress-hash-{idx}-{threading.get_ident()}", | |
| sources=[{"name": "stress", "url": f"https://example.com/{idx}"}], | |
| language="en", | |
| status=EventStatus.PENDING.value, | |
| title=f"stress event {idx}", | |
| ) | |
| session.add(row) | |
| session.flush() | |
| assert row.id is not None | |
| return int(row.id) | |
| def test_concurrent_session_scope_writes_no_database_error() -> None: | |
| """Fire 10 concurrent ``session_scope()`` writers; all must succeed.""" | |
| errors: List[BaseException] = [] | |
| ids: List[int] = [] | |
| with ThreadPoolExecutor(max_workers=_CONCURRENT_WRITERS) as pool: | |
| futures = [pool.submit(_write_one_event, i) for i in range(_CONCURRENT_WRITERS)] | |
| for fut in as_completed(futures): | |
| try: | |
| ids.append(fut.result()) | |
| except BaseException as exc: # capture and assert below | |
| errors.append(exc) | |
| assert errors == [], f"Concurrent writers raised: {errors!r}" | |
| assert len(ids) == _CONCURRENT_WRITERS | |
| assert len(set(ids)) == _CONCURRENT_WRITERS, "Duplicate primary keys" | |
| def test_concurrent_mixed_read_write_no_database_error() -> None: | |
| """Interleave 10 writers + 10 readers; no DatabaseError expected.""" | |
| errors: List[BaseException] = [] | |
| def _read_all(_idx: int) -> int: | |
| from sqlmodel import select | |
| with session_scope() as session: | |
| return len(session.exec(select(Event)).all()) | |
| with ThreadPoolExecutor(max_workers=_CONCURRENT_WRITERS * 2) as pool: | |
| futures = [] | |
| for i in range(_CONCURRENT_WRITERS): | |
| futures.append(pool.submit(_write_one_event, 100 + i)) | |
| futures.append(pool.submit(_read_all, i)) | |
| for fut in as_completed(futures): | |
| try: | |
| fut.result() | |
| except BaseException as exc: | |
| errors.append(exc) | |
| assert errors == [], f"Mixed read/write workload raised: {errors!r}" | |