Spaces:
Sleeping
Sleeping
| """Sprint A14-S8 — timeout depuis le début d'exécution **réelle**. | |
| Le bug critique de l'ancien runner : un document pouvait être marqué | |
| ``timeout`` parce qu'il avait passé N secondes en queue, pas N | |
| secondes en train de tourner. Le nouveau ``CorpusRunner`` mesure | |
| le timeout depuis ``time.monotonic()`` au moment où le worker | |
| démarre réellement (cf. ``CorpusRunner._run_one`` qui écrit | |
| ``started_at[doc.id]`` en première instruction). | |
| """ | |
| from __future__ import annotations | |
| import time | |
| from picarones.domain import Artifact, ArtifactType, DocumentRef | |
| from picarones.pipeline import ( | |
| CorpusRunner, | |
| PipelineExecutor, | |
| PipelineSpec, | |
| PipelineStep, | |
| RunContext, | |
| ) | |
| class _SlowAdapter: | |
| """Adapter qui dort un certain temps avant de retourner.""" | |
| name = "slow" | |
| input_types = frozenset({ArtifactType.IMAGE}) | |
| output_types = frozenset({ArtifactType.RAW_TEXT}) | |
| def __init__(self, sleep_seconds: float) -> None: | |
| self._sleep = sleep_seconds | |
| def execute(self, inputs, params, context, control): | |
| time.sleep(self._sleep) | |
| return { | |
| ArtifactType.RAW_TEXT: Artifact( | |
| id=f"{context.document_id}:raw_text", | |
| document_id=context.document_id, | |
| type=ArtifactType.RAW_TEXT, | |
| ), | |
| } | |
| def _build(adapter, *, timeout: float, max_in_flight: int = 2): | |
| registry = {"slow": adapter} | |
| exe = PipelineExecutor(adapter_resolver=lambda n: registry[n]) | |
| runner = CorpusRunner( | |
| exe, | |
| max_in_flight=max_in_flight, | |
| timeout_seconds_per_doc=timeout, | |
| poll_interval_seconds=0.01, | |
| ) | |
| spec = PipelineSpec( | |
| name="t", initial_inputs=(ArtifactType.IMAGE,), | |
| steps=(PipelineStep( | |
| id="s", kind="ocr", adapter_name="slow", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| return runner, spec | |
| def _factories(): | |
| def inputs(doc): | |
| return {ArtifactType.IMAGE: Artifact( | |
| id=f"{doc.id}:image", | |
| document_id=doc.id, | |
| type=ArtifactType.IMAGE, | |
| )} | |
| def ctx(doc): | |
| return RunContext( | |
| document_id=doc.id, code_version="1.0.0", pipeline_name="t", | |
| ) | |
| return inputs, ctx | |
| def test_doc_timed_out_when_exceeds_timeout() -> None: | |
| """Step qui dort 0.5s, timeout 0.1s → status timed_out.""" | |
| adapter = _SlowAdapter(sleep_seconds=0.5) | |
| runner, spec = _build(adapter, timeout=0.1, max_in_flight=1) | |
| inputs, ctx = _factories() | |
| docs = [DocumentRef(id="slow_one", image_uri="/tmp/x.png")] | |
| t0 = time.perf_counter() | |
| result = runner.run(spec, docs, inputs, ctx) | |
| elapsed = time.perf_counter() - t0 | |
| assert result.n_timed_out == 1 | |
| assert result.outcomes[0].status == "timed_out" | |
| assert "timeout" in (result.outcomes[0].error or "") | |
| # Le run principal a rendu la main rapidement (ne s'est pas bloqué | |
| # sur le sleep complet — le thread continue mais on n'attend plus). | |
| assert elapsed < 0.3, f"runner s'est bloqué : {elapsed:.2f}s" | |
| def test_timeout_measured_from_real_start_not_submission() -> None: | |
| """Bug historique : avec un seul worker (max_in_flight=1) et 4 | |
| documents, les 3 derniers attendent en queue. L'ancien runner | |
| aurait marqué ces 3 docs timeout dès que la queue dépassait le | |
| timeout. Le nouveau runner ne marque timeout que les docs qui | |
| ont **réellement** dépassé le délai en exécution. | |
| Marges de robustesse cross-OS | |
| ------------------------------ | |
| - Adapter dort **0.05s** par doc. | |
| - Timeout par doc : **2.0s** (40× la durée d'exécution). | |
| - 4 docs en série = ~0.2s d'exécution totale + overhead du | |
| ``concurrent.futures`` worker. | |
| L'ancienne version utilisait timeout=0.5s (10× la durée), ce | |
| qui flaquait sur les runners GitHub Actions sous charge (le | |
| scheduler prenait parfois >0.45s pour démarrer un doc en | |
| queue). Avec un ratio de 40×, le test ne flaque plus. | |
| """ | |
| adapter = _SlowAdapter(sleep_seconds=0.05) | |
| runner, spec = _build(adapter, timeout=2.0, max_in_flight=1) | |
| inputs, ctx = _factories() | |
| docs = [DocumentRef(id=f"d{i}") for i in range(4)] | |
| result = runner.run(spec, docs, inputs, ctx) | |
| # Les 4 docs auraient pris ~0.2s en série, ce qui dépasse le | |
| # timeout de 2.0s **si** le runner mesurait depuis la submission | |
| # du dernier doc. Mais comme on mesure depuis le début réel | |
| # de chaque doc, aucun ne devrait timeout. | |
| assert result.n_succeeded == 4 | |
| assert result.n_timed_out == 0 | |
| def test_some_docs_succeed_others_timeout() -> None: | |
| """Mix : la moitié des docs sont rapides, l'autre lente. Avec | |
| un timeout intermédiaire, les rapides réussissent et les lents | |
| timeout. | |
| Marges de robustesse cross-OS | |
| ------------------------------ | |
| - Timeout : **0.5s**. | |
| - Docs pairs dorment **0.05s** (10× sous le timeout) — ne ratent | |
| pas même sur runners macOS lents avec scheduler imprécis. | |
| - Docs impairs dorment **2.0s** (4× au-dessus) — timeout | |
| garanti. | |
| L'ancienne version utilisait timeout=0.1s / sleep pair=0.01s | |
| qui était à 10 ms du timeout — le jitter du scheduler macOS sur | |
| runners GitHub Actions le faisait basculer aléatoirement. | |
| """ | |
| class _ConditionalSlow: | |
| name = "cond" | |
| input_types = frozenset({ArtifactType.IMAGE}) | |
| output_types = frozenset({ArtifactType.RAW_TEXT}) | |
| def execute(self, inputs, params, context, control): | |
| # Les docs avec id pair sont rapides. | |
| if int(context.document_id.removeprefix("d")) % 2 == 0: | |
| time.sleep(0.05) # 10× sous le timeout (0.5s) | |
| else: | |
| time.sleep(2.0) # 4× au-dessus du timeout | |
| return { | |
| ArtifactType.RAW_TEXT: Artifact( | |
| id=f"{context.document_id}:raw_text", | |
| document_id=context.document_id, | |
| type=ArtifactType.RAW_TEXT, | |
| ), | |
| } | |
| adapter = _ConditionalSlow() | |
| runner, spec = _build(adapter, timeout=0.5, max_in_flight=2) | |
| inputs, ctx = _factories() | |
| docs = [DocumentRef(id=f"d{i}") for i in range(6)] | |
| result = runner.run(spec, docs, inputs, ctx) | |
| assert result.n_succeeded == 3, ( | |
| f"pairs (d0/d2/d4) auraient dû réussir, " | |
| f"obtenu n_succeeded={result.n_succeeded}, " | |
| f"n_timed_out={result.n_timed_out}" | |
| ) | |
| assert result.n_timed_out == 3 | |
| # ────────────────────────────────────────────────────────────────────── | |
| # Régressions backpressure × timeout (zombies) | |
| # ────────────────────────────────────────────────────────────────────── | |
| # | |
| # Bug historique : quand un doc dépassait son timeout, l'orchestrateur | |
| # enregistrait le ``timed_out``, décrémentait ``in_flight`` et soumettait | |
| # immédiatement le doc suivant — alors que le thread worker continuait | |
| # à occuper son slot du pool (Python ne tue pas un thread). Le doc | |
| # suivant queue derrière le zombie, sans pouvoir démarrer ni timeout | |
| # (pas de ``started_at``), donc le runner pouvait hang dès que tous | |
| # les slots étaient zombifiés. | |
| # | |
| # Le fix : ``zombie_futures`` compte la capacité réellement occupée, | |
| # et la backpressure refuse de soumettre tant que le slot n'est pas | |
| # libéré (par la fin naturelle du thread zombie). | |
| def test_zombie_does_not_block_runner_when_no_more_docs() -> None: | |
| """Régression : avec 1 seul doc qui timeout, le runner doit | |
| retourner sans attendre que le thread zombie finisse. | |
| Pas de doc à soumettre derrière → on sort tout de suite après | |
| le timeout enregistré. Le thread se détache via | |
| ``pool.shutdown(wait=False)``. | |
| """ | |
| adapter = _SlowAdapter(sleep_seconds=2.0) | |
| runner, spec = _build(adapter, timeout=0.1, max_in_flight=1) | |
| inputs, ctx = _factories() | |
| docs = [DocumentRef(id="solo")] | |
| t0 = time.perf_counter() | |
| result = runner.run(spec, docs, inputs, ctx) | |
| elapsed = time.perf_counter() - t0 | |
| assert result.n_timed_out == 1 | |
| assert elapsed < 0.5, ( | |
| f"runner a attendu le zombie : {elapsed:.2f}s — devrait " | |
| f"retourner ~0.1s après le timeout" | |
| ) | |
| def test_zombie_slot_blocks_new_submissions_until_thread_completes() -> None: | |
| """Régression critique : si tous les slots sont zombifiés, les | |
| docs suivants ne doivent pas être soumis (ils queue sans pouvoir | |
| démarrer ni timeout). La concurrence observée ne doit JAMAIS | |
| dépasser ``max_in_flight``, même avec des timeouts en cours. | |
| Construction : 2 workers, 2 premiers docs lents (timeout), puis | |
| 2 docs rapides. Si le bug revient, les rapides seraient | |
| submittés alors que les zombies occupent encore le pool — la | |
| concurrence réelle atteindrait 4 (2 zombies + 2 rapides). | |
| """ | |
| import threading as _threading | |
| class _MixedAdapter: | |
| name = "mixed" | |
| input_types = frozenset({ArtifactType.IMAGE}) | |
| output_types = frozenset({ArtifactType.RAW_TEXT}) | |
| def __init__(self) -> None: | |
| self._lock = _threading.Lock() | |
| self._current = 0 | |
| self.max_observed = 0 | |
| def execute(self, inputs, params, context, control): | |
| with self._lock: | |
| self._current += 1 | |
| if self._current > self.max_observed: | |
| self.max_observed = self._current | |
| try: | |
| # Les 2 premiers docs sont lents → vont timeout | |
| # (slot zombifié). Les suivants sont rapides. | |
| if context.document_id in {"slow0", "slow1"}: | |
| time.sleep(1.5) | |
| else: | |
| time.sleep(0.02) | |
| return { | |
| ArtifactType.RAW_TEXT: Artifact( | |
| id=f"{context.document_id}:raw_text", | |
| document_id=context.document_id, | |
| type=ArtifactType.RAW_TEXT, | |
| ), | |
| } | |
| finally: | |
| with self._lock: | |
| self._current -= 1 | |
| adapter = _MixedAdapter() | |
| runner, spec = _build(adapter, timeout=0.2, max_in_flight=2) | |
| # _build n'enregistre que "slow" ; on remplace le registry. | |
| registry = {"mixed": adapter} | |
| runner._executor = type(runner._executor)( | |
| adapter_resolver=lambda n: registry[n] | |
| ) | |
| spec = type(spec)( | |
| name=spec.name, | |
| initial_inputs=spec.initial_inputs, | |
| steps=(type(spec.steps[0])( | |
| id="s", kind="ocr", adapter_name="mixed", | |
| input_types=(ArtifactType.IMAGE,), | |
| output_types=(ArtifactType.RAW_TEXT,), | |
| ),), | |
| ) | |
| inputs, ctx = _factories() | |
| docs = [ | |
| DocumentRef(id="slow0"), | |
| DocumentRef(id="slow1"), | |
| DocumentRef(id="fast0"), | |
| DocumentRef(id="fast1"), | |
| ] | |
| result = runner.run(spec, docs, inputs, ctx) | |
| # Invariant critique : la concurrence n'a JAMAIS dépassé max=2. | |
| # Si le bug revient, on observerait 3 ou 4 (zombies + rapides | |
| # soumis en parallèle). | |
| assert adapter.max_observed <= 2, ( | |
| f"backpressure violée par les zombies : " | |
| f"max_observed={adapter.max_observed}, attendu <= 2" | |
| ) | |
| # Les 2 lents timeoutent, les 2 rapides finissent par réussir | |
| # une fois que les threads zombies se libèrent. | |
| assert result.n_timed_out == 2 | |
| assert result.n_succeeded == 2 | |
| def test_zombie_capacity_freed_when_thread_completes_naturally() -> None: | |
| """Quand un thread zombie finit naturellement (après son | |
| ``time.sleep`` complet), son slot redevient disponible et le doc | |
| suivant peut alors démarrer. Vérifie que le runner ne reste pas | |
| bloqué dans un état "zombies à vie". | |
| """ | |
| adapter = _SlowAdapter(sleep_seconds=0.3) # finit avant le 2e doc | |
| runner, spec = _build(adapter, timeout=0.1, max_in_flight=1) | |
| inputs, ctx = _factories() | |
| docs = [DocumentRef(id="zombie"), DocumentRef(id="follower")] | |
| result = runner.run(spec, docs, inputs, ctx) | |
| # Le 1er doc timeout (0.1 < 0.3). Le 2e doc doit finir par | |
| # démarrer après que le zombie se termine à t≈0.3s, et tourner | |
| # 0.3s à son tour → également timeout (l'adapter est le même). | |
| assert result.n_timed_out == 2, ( | |
| f"attendu 2 timeouts (zombie + follower), obtenu " | |
| f"n_timed_out={result.n_timed_out}, " | |
| f"n_succeeded={result.n_succeeded}" | |
| ) | |
| # Outcomes ordonnés par achèvement : le zombie est enregistré | |
| # immédiatement à t=0.1, le follower à t≈0.7 (0.1 + 0.3 + 0.3). | |
| assert [o.document_id for o in result.outcomes] == [ | |
| "zombie", "follower", | |
| ] | |