Spaces:
Running
Running
File size: 6,407 Bytes
563a0f0 de9192c 563a0f0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | """Tests Sprint A5 — robustesse SQLite face aux écritures concurrentes.
Item M-13 de l'audit institutional-readiness-2026-05.
``picarones.web.jobs.JobStore`` est l'unique point d'écriture sur la
BD ``jobs.sqlite`` (mode WAL, thread-safe par ``_conn`` qui ouvre une
nouvelle connection par appel). Cette suite valide qu'il survit à :
1. N threads créant des jobs simultanément (pas de doublon, pas de
corruption).
2. M threads mettant à jour le progress du même job (pas de
``SQLITE_BUSY`` qui remonte au caller).
3. Set_status concurrent depuis plusieurs threads.
Les tests utilisent un fichier SQLite temporaire isolé pour ne pas
polluer ``jobs.sqlite`` du dev local.
"""
from __future__ import annotations
import threading
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
import pytest
from picarones.interfaces.web.jobs import JobStore
@pytest.fixture
def fresh_store(tmp_path: Path) -> JobStore:
db_path = tmp_path / "jobs_test.sqlite"
store = JobStore(db_path=db_path)
return store
# ---------------------------------------------------------------------------
# Création concurrente
# ---------------------------------------------------------------------------
def test_concurrent_create_no_duplicate(fresh_store: JobStore) -> None:
"""20 threads créent chacun un job → 20 jobs distincts en BD,
aucun ID dupliqué."""
n_threads = 20
def _create_one(_) -> str:
return fresh_store.create_job(payload={"thread": "x"})
with ThreadPoolExecutor(max_workers=n_threads) as pool:
ids = list(pool.map(_create_one, range(n_threads)))
assert len(ids) == n_threads
assert len(set(ids)) == n_threads, (
f"IDs dupliqués détectés : {[x for x in ids if ids.count(x) > 1]}"
)
listed = fresh_store.list_jobs(limit=n_threads + 5)
assert len(listed) == n_threads
# ---------------------------------------------------------------------------
# Update concurrent sur le même job
# ---------------------------------------------------------------------------
def test_concurrent_progress_updates_no_busy_error(fresh_store: JobStore) -> None:
"""50 updates concurrents sur le même job → pas de SQLITE_BUSY,
le dernier état persiste de manière cohérente."""
job_id = fresh_store.create_job(payload={})
n_updates = 50
errors: list[BaseException] = []
def _update_one(i: int) -> None:
try:
fresh_store.update_progress(
job_id=job_id,
progress=float(i) / n_updates,
processed_docs=i,
)
except BaseException as exc: # noqa: BLE001 — on capture pour assert
errors.append(exc)
with ThreadPoolExecutor(max_workers=10) as pool:
list(pool.map(_update_one, range(n_updates)))
assert not errors, f"Erreurs durant updates concurrentes : {errors[:3]}"
final = fresh_store.get_job(job_id)
assert final is not None
# progress doit être un float ∈ [0, 1] cohérent (pas une valeur corrompue)
assert 0.0 <= float(final.get("progress", 0)) <= 1.0
# ---------------------------------------------------------------------------
# Set status concurrent
# ---------------------------------------------------------------------------
def test_concurrent_set_status_serializable(fresh_store: JobStore) -> None:
"""Plusieurs ``set_status`` en parallèle sur le même job ne doivent
pas corrompre la table ; le dernier statut écrit doit être l'un
des statuts valides."""
job_id = fresh_store.create_job(payload={})
statuses = ["running", "succeeded", "failed", "cancelled"]
barrier = threading.Barrier(len(statuses))
def _set(status: str) -> None:
barrier.wait(timeout=5) # synchronise le départ pour maximiser la concurrence
try:
fresh_store.set_status(job_id, status)
except Exception:
pass # un set_status peut échouer s'il y a transition invalide
with ThreadPoolExecutor(max_workers=len(statuses)) as pool:
list(pool.map(_set, statuses))
final = fresh_store.get_job(job_id)
assert final is not None
assert final["status"] in statuses + ["pending"]
# ---------------------------------------------------------------------------
# Reads pendant writes
# ---------------------------------------------------------------------------
def test_reads_during_writes_no_locking_error(fresh_store: JobStore) -> None:
"""Lectures concurrentes pendant écritures → mode WAL doit permettre
sans bloquer ni lever."""
n_jobs = 10
for _ in range(n_jobs):
fresh_store.create_job(payload={})
stop = threading.Event()
read_errors: list[BaseException] = []
write_errors: list[BaseException] = []
def _writer() -> None:
try:
while not stop.is_set():
fresh_store.create_job(payload={"writer": "x"})
except BaseException as exc: # noqa: BLE001
write_errors.append(exc)
def _reader() -> None:
try:
while not stop.is_set():
fresh_store.list_jobs(limit=100)
except BaseException as exc: # noqa: BLE001
read_errors.append(exc)
threads = [
threading.Thread(target=_writer),
threading.Thread(target=_writer),
threading.Thread(target=_reader),
threading.Thread(target=_reader),
]
for t in threads:
t.start()
threading.Event().wait(0.5) # 500 ms de charge mixte
stop.set()
for t in threads:
t.join(timeout=2)
assert not read_errors, f"Reads ont levé : {read_errors[:2]}"
assert not write_errors, f"Writes ont levé : {write_errors[:2]}"
# ---------------------------------------------------------------------------
# Garde-fous
# ---------------------------------------------------------------------------
def test_get_job_unknown_returns_none(fresh_store: JobStore) -> None:
"""Un job_id inconnu doit retourner ``None``, pas lever."""
assert fresh_store.get_job("ghost-job-id") is None
def test_update_progress_unknown_job_does_not_crash(
fresh_store: JobStore,
) -> None:
"""Update sur un job_id inconnu : pas d'effet, pas de crash."""
fresh_store.update_progress(job_id="ghost", progress=0.5)
# Aucun job créé en passant
assert len(fresh_store.list_jobs()) == 0
|