Picarones / tests /pipeline /test_timeout.py
Claude
refactor(adapters): retrait de execution_mode (mensonge structurel)
5e13c0d unverified
"""Sprint A14-S8 — timeout depuis le début d'exécution **réelle**.
Le bug critique de l'ancien runner : un document pouvait être marqué
``timeout`` parce qu'il avait passé N secondes en queue, pas N
secondes en train de tourner. Le nouveau ``CorpusRunner`` mesure
le timeout depuis ``time.monotonic()`` au moment où le worker
démarre réellement (cf. ``CorpusRunner._run_one`` qui écrit
``started_at[doc.id]`` en première instruction).
"""
from __future__ import annotations
import time
from picarones.domain import Artifact, ArtifactType, DocumentRef
from picarones.pipeline import (
CorpusRunner,
PipelineExecutor,
PipelineSpec,
PipelineStep,
RunContext,
)
class _SlowAdapter:
"""Adapter qui dort un certain temps avant de retourner."""
name = "slow"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
def __init__(self, sleep_seconds: float) -> None:
self._sleep = sleep_seconds
def execute(self, inputs, params, context, control):
time.sleep(self._sleep)
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
),
}
def _build(adapter, *, timeout: float, max_in_flight: int = 2):
registry = {"slow": adapter}
exe = PipelineExecutor(adapter_resolver=lambda n: registry[n])
runner = CorpusRunner(
exe,
max_in_flight=max_in_flight,
timeout_seconds_per_doc=timeout,
poll_interval_seconds=0.01,
)
spec = PipelineSpec(
name="t", initial_inputs=(ArtifactType.IMAGE,),
steps=(PipelineStep(
id="s", kind="ocr", adapter_name="slow",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
return runner, spec
def _factories():
def inputs(doc):
return {ArtifactType.IMAGE: Artifact(
id=f"{doc.id}:image",
document_id=doc.id,
type=ArtifactType.IMAGE,
)}
def ctx(doc):
return RunContext(
document_id=doc.id, code_version="1.0.0", pipeline_name="t",
)
return inputs, ctx
def test_doc_timed_out_when_exceeds_timeout() -> None:
"""Step qui dort 0.5s, timeout 0.1s → status timed_out."""
adapter = _SlowAdapter(sleep_seconds=0.5)
runner, spec = _build(adapter, timeout=0.1, max_in_flight=1)
inputs, ctx = _factories()
docs = [DocumentRef(id="slow_one", image_uri="/tmp/x.png")]
t0 = time.perf_counter()
result = runner.run(spec, docs, inputs, ctx)
elapsed = time.perf_counter() - t0
assert result.n_timed_out == 1
assert result.outcomes[0].status == "timed_out"
assert "timeout" in (result.outcomes[0].error or "")
# Le run principal a rendu la main rapidement (ne s'est pas bloqué
# sur le sleep complet — le thread continue mais on n'attend plus).
assert elapsed < 0.3, f"runner s'est bloqué : {elapsed:.2f}s"
def test_timeout_measured_from_real_start_not_submission() -> None:
"""Bug historique : avec un seul worker (max_in_flight=1) et 4
documents, les 3 derniers attendent en queue. L'ancien runner
aurait marqué ces 3 docs timeout dès que la queue dépassait le
timeout. Le nouveau runner ne marque timeout que les docs qui
ont **réellement** dépassé le délai en exécution.
Marges de robustesse cross-OS
------------------------------
- Adapter dort **0.05s** par doc.
- Timeout par doc : **2.0s** (40× la durée d'exécution).
- 4 docs en série = ~0.2s d'exécution totale + overhead du
``concurrent.futures`` worker.
L'ancienne version utilisait timeout=0.5s (10× la durée), ce
qui flaquait sur les runners GitHub Actions sous charge (le
scheduler prenait parfois >0.45s pour démarrer un doc en
queue). Avec un ratio de 40×, le test ne flaque plus.
"""
adapter = _SlowAdapter(sleep_seconds=0.05)
runner, spec = _build(adapter, timeout=2.0, max_in_flight=1)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}") for i in range(4)]
result = runner.run(spec, docs, inputs, ctx)
# Les 4 docs auraient pris ~0.2s en série, ce qui dépasse le
# timeout de 2.0s **si** le runner mesurait depuis la submission
# du dernier doc. Mais comme on mesure depuis le début réel
# de chaque doc, aucun ne devrait timeout.
assert result.n_succeeded == 4
assert result.n_timed_out == 0
def test_some_docs_succeed_others_timeout() -> None:
"""Mix : la moitié des docs sont rapides, l'autre lente. Avec
un timeout intermédiaire, les rapides réussissent et les lents
timeout.
Marges de robustesse cross-OS
------------------------------
- Timeout : **0.5s**.
- Docs pairs dorment **0.05s** (10× sous le timeout) — ne ratent
pas même sur runners macOS lents avec scheduler imprécis.
- Docs impairs dorment **2.0s** (4× au-dessus) — timeout
garanti.
L'ancienne version utilisait timeout=0.1s / sleep pair=0.01s
qui était à 10 ms du timeout — le jitter du scheduler macOS sur
runners GitHub Actions le faisait basculer aléatoirement.
"""
class _ConditionalSlow:
name = "cond"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
def execute(self, inputs, params, context, control):
# Les docs avec id pair sont rapides.
if int(context.document_id.removeprefix("d")) % 2 == 0:
time.sleep(0.05) # 10× sous le timeout (0.5s)
else:
time.sleep(2.0) # 4× au-dessus du timeout
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
),
}
adapter = _ConditionalSlow()
runner, spec = _build(adapter, timeout=0.5, max_in_flight=2)
inputs, ctx = _factories()
docs = [DocumentRef(id=f"d{i}") for i in range(6)]
result = runner.run(spec, docs, inputs, ctx)
assert result.n_succeeded == 3, (
f"pairs (d0/d2/d4) auraient dû réussir, "
f"obtenu n_succeeded={result.n_succeeded}, "
f"n_timed_out={result.n_timed_out}"
)
assert result.n_timed_out == 3
# ──────────────────────────────────────────────────────────────────────
# Régressions backpressure × timeout (zombies)
# ──────────────────────────────────────────────────────────────────────
#
# Bug historique : quand un doc dépassait son timeout, l'orchestrateur
# enregistrait le ``timed_out``, décrémentait ``in_flight`` et soumettait
# immédiatement le doc suivant — alors que le thread worker continuait
# à occuper son slot du pool (Python ne tue pas un thread). Le doc
# suivant queue derrière le zombie, sans pouvoir démarrer ni timeout
# (pas de ``started_at``), donc le runner pouvait hang dès que tous
# les slots étaient zombifiés.
#
# Le fix : ``zombie_futures`` compte la capacité réellement occupée,
# et la backpressure refuse de soumettre tant que le slot n'est pas
# libéré (par la fin naturelle du thread zombie).
def test_zombie_does_not_block_runner_when_no_more_docs() -> None:
"""Régression : avec 1 seul doc qui timeout, le runner doit
retourner sans attendre que le thread zombie finisse.
Pas de doc à soumettre derrière → on sort tout de suite après
le timeout enregistré. Le thread se détache via
``pool.shutdown(wait=False)``.
"""
adapter = _SlowAdapter(sleep_seconds=2.0)
runner, spec = _build(adapter, timeout=0.1, max_in_flight=1)
inputs, ctx = _factories()
docs = [DocumentRef(id="solo")]
t0 = time.perf_counter()
result = runner.run(spec, docs, inputs, ctx)
elapsed = time.perf_counter() - t0
assert result.n_timed_out == 1
assert elapsed < 0.5, (
f"runner a attendu le zombie : {elapsed:.2f}s — devrait "
f"retourner ~0.1s après le timeout"
)
def test_zombie_slot_blocks_new_submissions_until_thread_completes() -> None:
"""Régression critique : si tous les slots sont zombifiés, les
docs suivants ne doivent pas être soumis (ils queue sans pouvoir
démarrer ni timeout). La concurrence observée ne doit JAMAIS
dépasser ``max_in_flight``, même avec des timeouts en cours.
Construction : 2 workers, 2 premiers docs lents (timeout), puis
2 docs rapides. Si le bug revient, les rapides seraient
submittés alors que les zombies occupent encore le pool — la
concurrence réelle atteindrait 4 (2 zombies + 2 rapides).
"""
import threading as _threading
class _MixedAdapter:
name = "mixed"
input_types = frozenset({ArtifactType.IMAGE})
output_types = frozenset({ArtifactType.RAW_TEXT})
def __init__(self) -> None:
self._lock = _threading.Lock()
self._current = 0
self.max_observed = 0
def execute(self, inputs, params, context, control):
with self._lock:
self._current += 1
if self._current > self.max_observed:
self.max_observed = self._current
try:
# Les 2 premiers docs sont lents → vont timeout
# (slot zombifié). Les suivants sont rapides.
if context.document_id in {"slow0", "slow1"}:
time.sleep(1.5)
else:
time.sleep(0.02)
return {
ArtifactType.RAW_TEXT: Artifact(
id=f"{context.document_id}:raw_text",
document_id=context.document_id,
type=ArtifactType.RAW_TEXT,
),
}
finally:
with self._lock:
self._current -= 1
adapter = _MixedAdapter()
runner, spec = _build(adapter, timeout=0.2, max_in_flight=2)
# _build n'enregistre que "slow" ; on remplace le registry.
registry = {"mixed": adapter}
runner._executor = type(runner._executor)(
adapter_resolver=lambda n: registry[n]
)
spec = type(spec)(
name=spec.name,
initial_inputs=spec.initial_inputs,
steps=(type(spec.steps[0])(
id="s", kind="ocr", adapter_name="mixed",
input_types=(ArtifactType.IMAGE,),
output_types=(ArtifactType.RAW_TEXT,),
),),
)
inputs, ctx = _factories()
docs = [
DocumentRef(id="slow0"),
DocumentRef(id="slow1"),
DocumentRef(id="fast0"),
DocumentRef(id="fast1"),
]
result = runner.run(spec, docs, inputs, ctx)
# Invariant critique : la concurrence n'a JAMAIS dépassé max=2.
# Si le bug revient, on observerait 3 ou 4 (zombies + rapides
# soumis en parallèle).
assert adapter.max_observed <= 2, (
f"backpressure violée par les zombies : "
f"max_observed={adapter.max_observed}, attendu <= 2"
)
# Les 2 lents timeoutent, les 2 rapides finissent par réussir
# une fois que les threads zombies se libèrent.
assert result.n_timed_out == 2
assert result.n_succeeded == 2
def test_zombie_capacity_freed_when_thread_completes_naturally() -> None:
"""Quand un thread zombie finit naturellement (après son
``time.sleep`` complet), son slot redevient disponible et le doc
suivant peut alors démarrer. Vérifie que le runner ne reste pas
bloqué dans un état "zombies à vie".
"""
adapter = _SlowAdapter(sleep_seconds=0.3) # finit avant le 2e doc
runner, spec = _build(adapter, timeout=0.1, max_in_flight=1)
inputs, ctx = _factories()
docs = [DocumentRef(id="zombie"), DocumentRef(id="follower")]
result = runner.run(spec, docs, inputs, ctx)
# Le 1er doc timeout (0.1 < 0.3). Le 2e doc doit finir par
# démarrer après que le zombie se termine à t≈0.3s, et tourner
# 0.3s à son tour → également timeout (l'adapter est le même).
assert result.n_timed_out == 2, (
f"attendu 2 timeouts (zombie + follower), obtenu "
f"n_timed_out={result.n_timed_out}, "
f"n_succeeded={result.n_succeeded}"
)
# Outcomes ordonnés par achèvement : le zombie est enregistré
# immédiatement à t=0.1, le follower à t≈0.7 (0.1 + 0.3 + 0.3).
assert [o.document_id for o in result.outcomes] == [
"zombie", "follower",
]