File size: 6,407 Bytes
563a0f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de9192c
563a0f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""Tests Sprint A5 — robustesse SQLite face aux écritures concurrentes.

Item M-13 de l'audit institutional-readiness-2026-05.

``picarones.web.jobs.JobStore`` est l'unique point d'écriture sur la
BD ``jobs.sqlite`` (mode WAL, thread-safe par ``_conn`` qui ouvre une
nouvelle connection par appel). Cette suite valide qu'il survit à :

1. N threads créant des jobs simultanément (pas de doublon, pas de
   corruption).
2. M threads mettant à jour le progress du même job (pas de
   ``SQLITE_BUSY`` qui remonte au caller).
3. Set_status concurrent depuis plusieurs threads.

Les tests utilisent un fichier SQLite temporaire isolé pour ne pas
polluer ``jobs.sqlite`` du dev local.
"""

from __future__ import annotations

import threading
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

import pytest

from picarones.interfaces.web.jobs import JobStore


@pytest.fixture
def fresh_store(tmp_path: Path) -> JobStore:
    db_path = tmp_path / "jobs_test.sqlite"
    store = JobStore(db_path=db_path)
    return store


# ---------------------------------------------------------------------------
# Création concurrente
# ---------------------------------------------------------------------------


def test_concurrent_create_no_duplicate(fresh_store: JobStore) -> None:
    """20 threads créent chacun un job → 20 jobs distincts en BD,
    aucun ID dupliqué."""
    n_threads = 20

    def _create_one(_) -> str:
        return fresh_store.create_job(payload={"thread": "x"})

    with ThreadPoolExecutor(max_workers=n_threads) as pool:
        ids = list(pool.map(_create_one, range(n_threads)))

    assert len(ids) == n_threads
    assert len(set(ids)) == n_threads, (
        f"IDs dupliqués détectés : {[x for x in ids if ids.count(x) > 1]}"
    )

    listed = fresh_store.list_jobs(limit=n_threads + 5)
    assert len(listed) == n_threads


# ---------------------------------------------------------------------------
# Update concurrent sur le même job
# ---------------------------------------------------------------------------


def test_concurrent_progress_updates_no_busy_error(fresh_store: JobStore) -> None:
    """50 updates concurrents sur le même job → pas de SQLITE_BUSY,
    le dernier état persiste de manière cohérente."""
    job_id = fresh_store.create_job(payload={})

    n_updates = 50
    errors: list[BaseException] = []

    def _update_one(i: int) -> None:
        try:
            fresh_store.update_progress(
                job_id=job_id,
                progress=float(i) / n_updates,
                processed_docs=i,
            )
        except BaseException as exc:  # noqa: BLE001 — on capture pour assert
            errors.append(exc)

    with ThreadPoolExecutor(max_workers=10) as pool:
        list(pool.map(_update_one, range(n_updates)))

    assert not errors, f"Erreurs durant updates concurrentes : {errors[:3]}"

    final = fresh_store.get_job(job_id)
    assert final is not None
    # progress doit être un float ∈ [0, 1] cohérent (pas une valeur corrompue)
    assert 0.0 <= float(final.get("progress", 0)) <= 1.0


# ---------------------------------------------------------------------------
# Set status concurrent
# ---------------------------------------------------------------------------


def test_concurrent_set_status_serializable(fresh_store: JobStore) -> None:
    """Plusieurs ``set_status`` en parallèle sur le même job ne doivent
    pas corrompre la table ; le dernier statut écrit doit être l'un
    des statuts valides."""
    job_id = fresh_store.create_job(payload={})
    statuses = ["running", "succeeded", "failed", "cancelled"]
    barrier = threading.Barrier(len(statuses))

    def _set(status: str) -> None:
        barrier.wait(timeout=5)  # synchronise le départ pour maximiser la concurrence
        try:
            fresh_store.set_status(job_id, status)
        except Exception:
            pass  # un set_status peut échouer s'il y a transition invalide

    with ThreadPoolExecutor(max_workers=len(statuses)) as pool:
        list(pool.map(_set, statuses))

    final = fresh_store.get_job(job_id)
    assert final is not None
    assert final["status"] in statuses + ["pending"]


# ---------------------------------------------------------------------------
# Reads pendant writes
# ---------------------------------------------------------------------------


def test_reads_during_writes_no_locking_error(fresh_store: JobStore) -> None:
    """Lectures concurrentes pendant écritures → mode WAL doit permettre
    sans bloquer ni lever."""
    n_jobs = 10
    for _ in range(n_jobs):
        fresh_store.create_job(payload={})

    stop = threading.Event()
    read_errors: list[BaseException] = []
    write_errors: list[BaseException] = []

    def _writer() -> None:
        try:
            while not stop.is_set():
                fresh_store.create_job(payload={"writer": "x"})
        except BaseException as exc:  # noqa: BLE001
            write_errors.append(exc)

    def _reader() -> None:
        try:
            while not stop.is_set():
                fresh_store.list_jobs(limit=100)
        except BaseException as exc:  # noqa: BLE001
            read_errors.append(exc)

    threads = [
        threading.Thread(target=_writer),
        threading.Thread(target=_writer),
        threading.Thread(target=_reader),
        threading.Thread(target=_reader),
    ]
    for t in threads:
        t.start()
    threading.Event().wait(0.5)  # 500 ms de charge mixte
    stop.set()
    for t in threads:
        t.join(timeout=2)

    assert not read_errors, f"Reads ont levé : {read_errors[:2]}"
    assert not write_errors, f"Writes ont levé : {write_errors[:2]}"


# ---------------------------------------------------------------------------
# Garde-fous
# ---------------------------------------------------------------------------


def test_get_job_unknown_returns_none(fresh_store: JobStore) -> None:
    """Un job_id inconnu doit retourner ``None``, pas lever."""
    assert fresh_store.get_job("ghost-job-id") is None


def test_update_progress_unknown_job_does_not_crash(
    fresh_store: JobStore,
) -> None:
    """Update sur un job_id inconnu : pas d'effet, pas de crash."""
    fresh_store.update_progress(job_id="ghost", progress=0.5)
    # Aucun job créé en passant
    assert len(fresh_store.list_jobs()) == 0