Claude commited on
Commit
2720506
·
unverified ·
1 Parent(s): 2e9e564

feat(pipeline): Sprint A14-S28 — PipelinePlanner + ExecutionPlan

Browse files

Le S6 livrait validate_spec (validation statique) et le S7 résolvait les
bindings au runtime via un bag versionné. S28 introduit une couche de
planification qui transforme une PipelineSpec en ExecutionPlan immuable :

1. Validation statique (délègue à validate_spec)
2. Résolution explicite de chaque binding d'entrée (fini la résolution
implicite « dernier producteur » au runtime)
3. Détection des jonctions de métriques : pour chaque output_type T d'un
step, interroge MetricRegistry.select(T, T) → liste les métriques
applicables à la comparaison GT[T] vs step.outputs[T]
4. Plan immuable consommable par PipelineExecutor.run_plan

Nouveau module picarones/pipeline/planner.py (403 lignes)
---------------------------------------------------------
- StepInputBinding(input_type, source_step_id) — frozen
- ResolvedStep(step, input_bindings) — frozen
- MetricJunction(step_id, artifact_type, candidate_metrics) — frozen,
candidates triées alphabétiquement pour déterminisme
- ExecutionPlan(spec, resolved_steps, metric_junctions) — frozen
+ step_by_id() et junctions_for_step() helpers
- PlanningError(PicaronesError) avec liste structurée d'erreurs
- PipelinePlanner(metric_registry=None, available_adapters=None)
· Ne short-circuit pas — récolte toutes les erreurs de validation
· MetricRegistry optionnel — sans, junctions=()
· available_adapters optionnel — sans, validation des noms sautée

Refactor de PipelineExecutor (S7 → S28)
----------------------------------------
- Nouveau run_plan(plan, document, initial_inputs, context) — signature
canonique, contrat explicite. Toute la logique d'exécution vit ici.
- run(spec, ...) reste exposé comme sucre — appelle plan(spec) puis
run_plan. Aucune logique nouvelle.
- plan(spec) → ExecutionPlan exposé pour callers qui veulent planifier
une fois (typiquement CorpusRunner sur N documents).
- planner injectable au constructeur (par défaut PipelinePlanner sans
registry). Type-checked.
- Bindings résolus consommés via _inputs_from_bindings — fini la
résolution implicite via latest_producer au runtime.

Optimisation CorpusRunner
-------------------------
- run() planifie une fois la spec en début de méthode (lève PipelineSpec-
Invalid si invalide, AVANT de soumettre des futures inutiles)
- _run_one accepte plan, pas spec → executor.run_plan() N fois (N-1
validations économisées)

Migration tests
---------------
- Tous les tests S7/S8/S12 existants passent sans modification
(87 pipeline tests + 624 evaluation/integration/CLI).

Tests S28 dédiés (28 nouveaux)
------------------------------
- PlannerConstructor : args, MetricRegistry, available_adapters,
rejets de mauvais types.
- PlannerErrors : empty spec, unknown adapter (set fourni / None),
multi-erreurs récoltées (duplicate_id + unknown_adapter).
- PlannerBindings : chaîne simple → INITIAL_STEP_ID, deux steps →
source = step précédent, inputs_from explicite override latest,
ordre des inputs préservé.
- PlannerJunctions : sans registry → (), avec registry 1/output,
output sans métrique → candidate_metrics=().
- ExecutionPlan API : step_by_id, junctions_for_step, frozen-ness
des 4 dataclasses.
- ExecutorWithPlanner : executor.plan(), run_plan() consume plan,
rejette non-plan, run(spec) sucre, planner injection, type-check.

Tests : 4557 passed, 11 skipped (vs 4527 avant : +28 S28 + 2 ajustements).
Lint : ruff check picarones/ tests/ → All checks passed.
File budgets : pipeline/{executor.py,planner.py} ajoutés
(actuel 413/403, budget 475/465 = +15 %).

https://claude.ai/code/session_011XQZNitg1rCgia8ZD1a2hP

README.md CHANGED
@@ -396,7 +396,7 @@ ruff check picarones/ tests/
396
  python -m mypy picarones/core/
397
  ```
398
 
399
- **Test suite**: ~4540 tests, ~3 min on a modern laptop. Coverage
400
  floor at 85% (currently ~87%). The `network` marker excludes tests
401
  requiring live HTTP. A handful of tests depend on optional engines
402
  (`pero-ocr`, `pytesseract`) and are skipped/fail gracefully when
 
396
  python -m mypy picarones/core/
397
  ```
398
 
399
+ **Test suite**: ~4570 tests, ~3 min on a modern laptop. Coverage
400
  floor at 85% (currently ~87%). The `network` marker excludes tests
401
  requiring live HTTP. A handful of tests depend on optional engines
402
  (`pero-ocr`, `pytesseract`) and are skipped/fail gracefully when
picarones/pipeline/__init__.py CHANGED
@@ -56,6 +56,14 @@ from picarones.pipeline.executor import (
56
  PipelineExecutor,
57
  PipelineSpecInvalid,
58
  )
 
 
 
 
 
 
 
 
59
  from picarones.pipeline.protocols import ExecutionMode, StepExecutor
60
  from picarones.pipeline.runner import (
61
  ContextFactory,
@@ -91,6 +99,13 @@ __all__ = [
91
  "PipelineExecutor",
92
  "PipelineSpecInvalid",
93
  "AdapterResolver",
 
 
 
 
 
 
 
94
  # Cache (S7)
95
  "ArtifactCache",
96
  # CorpusRunner (S8)
 
56
  PipelineExecutor,
57
  PipelineSpecInvalid,
58
  )
59
+ from picarones.pipeline.planner import (
60
+ ExecutionPlan,
61
+ MetricJunction,
62
+ PipelinePlanner,
63
+ PlanningError,
64
+ ResolvedStep,
65
+ StepInputBinding,
66
+ )
67
  from picarones.pipeline.protocols import ExecutionMode, StepExecutor
68
  from picarones.pipeline.runner import (
69
  ContextFactory,
 
99
  "PipelineExecutor",
100
  "PipelineSpecInvalid",
101
  "AdapterResolver",
102
+ # Planner (S28)
103
+ "PipelinePlanner",
104
+ "PlanningError",
105
+ "ExecutionPlan",
106
+ "ResolvedStep",
107
+ "StepInputBinding",
108
+ "MetricJunction",
109
  # Cache (S7)
110
  "ArtifactCache",
111
  # CorpusRunner (S8)
picarones/pipeline/executor.py CHANGED
@@ -1,57 +1,62 @@
1
- """``PipelineExecutor`` mono-document — Sprint A14-S7.
2
 
3
- Première version réelle de l'exécuteur du nouveau pipeline.
4
- Mono-document, séquentiel, capture gracieuse des erreurs par
5
- étape. L'orchestration corpus-wide (backpressure, timeout réel,
6
- annulation propre) arrive au Sprint S8.
 
 
 
 
 
 
 
 
 
 
 
 
7
 
8
  Contrat
9
  -------
10
- Le caller (typiquement un service applicatif au S19) fournit :
 
11
 
12
- - une ``PipelineSpec`` validée (le caller doit avoir appelé
13
- ``validate_spec`` en amont — l'executor re-valide quand même
14
- pour défendre en profondeur),
15
  - un ``DocumentRef`` du document à traiter,
16
  - un dict ``{ArtifactType: Artifact}`` des entrées initiales
17
  (typiquement ``{IMAGE: Artifact(...)}``),
18
- - un ``RunContext`` qui porte ``document_id``, ``code_version``,
19
- ``pipeline_name`` et un éventuel ``workspace_uri``,
20
- - un ``adapter_resolver: Callable[[str], StepExecutor]`` qui
21
- résout ``adapter_name`` → instance d'adapter. Au S19, ce
22
- resolver sera fourni par ``app/services/adapter_registry``.
23
 
24
  L'executor garantit :
25
 
26
- - Les étapes sont exécutées dans l'ordre de ``spec.steps``.
27
- - Chaque entrée d'une étape est résolue depuis le **bag versionné** :
28
- si ``inputs_from[type] = "step_x"``, on prend la version
29
- produite par ``step_x`` ; sinon, on prend la dernière version
30
- disponible (comportement Sprint 66 historique).
31
  - Toute exception levée par un adapter est capturée — le step
32
  est marqué ``succeeded=False`` avec ``error=str(exc)``, et le
33
- pipeline continue (les étapes en aval pourront échouer si
34
- elles dépendaient des outputs de ce step, ce qui est explicite).
35
  - Les ``output_types`` déclarés par l'adapter sont validés au
36
- retour : si un type promis est manquant, le step est marqué
37
- en échec avec ``error="missing_output: <type>"``.
38
 
39
  L'executor ne garantit PAS (reportés à des sprints suivants) :
40
 
41
- - Mesure du temps depuis le début d'exécution réelle (S8 pour
42
- l'instant, ``time.perf_counter()`` autour de ``execute()``).
43
- - Annulation propre par signal aux workers en cours (S8).
44
- - Cache d'artefacts inter-runs (S7 livre ``ArtifactCache`` mais
45
- l'executor ne s'y branche pas encore — ça vient quand on aura
46
- un cas d'usage concret de réutilisation).
47
- - Parallélisation inter-documents ou inter-étapes (S8).
48
-
49
- Définition de done du S7
50
- ------------------------
51
- ``PipelineExecutor.run(spec, document, initial_inputs, context)``
52
- exécute une pipeline mock en moins de 100 ms et produit un
53
- ``PipelineResult`` complet (durées par étape, artefacts produits,
54
- ``succeeded`` agrégé).
55
  """
56
 
57
  from __future__ import annotations
@@ -63,16 +68,26 @@ from typing import Callable
63
  from picarones.domain.artifacts import Artifact, ArtifactType
64
  from picarones.domain.documents import DocumentRef
65
  from picarones.domain.errors import PicaronesError
 
 
 
 
 
 
66
  from picarones.pipeline.protocols import StepExecutor
67
- from picarones.pipeline.spec import INITIAL_STEP_ID, PipelineSpec, PipelineStep
68
  from picarones.pipeline.types import PipelineResult, RunContext, StepResult
69
- from picarones.pipeline.validation import validate_spec
70
 
71
  logger = logging.getLogger(__name__)
72
 
73
 
74
  class PipelineSpecInvalid(PicaronesError):
75
- """``PipelineSpec`` mal formée — l'executor refuse de démarrer."""
 
 
 
 
 
76
 
77
 
78
  #: Type alias pour le resolver d'adapters. Une fonction qui
@@ -100,12 +115,47 @@ class PipelineExecutor:
100
  applicatif qui injecte les bonnes dépendances en prod.
101
  """
102
 
103
- def __init__(self, adapter_resolver: AdapterResolver) -> None:
 
 
 
 
104
  if not callable(adapter_resolver):
105
  raise PicaronesError(
106
  "PipelineExecutor : adapter_resolver doit être callable."
107
  )
 
 
 
 
108
  self._resolver = adapter_resolver
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
109
 
110
  def run(
111
  self,
@@ -114,7 +164,13 @@ class PipelineExecutor:
114
  initial_inputs: dict[ArtifactType, Artifact],
115
  context: RunContext,
116
  ) -> PipelineResult:
117
- """Exécute une pipeline complète sur un document.
 
 
 
 
 
 
118
 
119
  Returns
120
  -------
@@ -127,53 +183,63 @@ class PipelineExecutor:
127
  Raises
128
  ------
129
  PipelineSpecInvalid
130
- Si ``validate_spec`` détecte des erreurs de
131
- cohérence. L'executor ne masque pas ce type d'erreur :
132
- c'est un bug de programmation, pas un problème runtime.
133
  """
134
- # 1. Validation défensive.
135
- errors = validate_spec(spec)
136
- if errors:
137
- messages = "; ".join(
138
- f"{e.step_id or '<global>'}: {e.message}" for e in errors
139
- )
140
- raise PipelineSpecInvalid(
141
- f"Spec '{spec.name}' invalide : {messages}"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
142
  )
143
 
144
- # 2. Bag versionné : map (type, step_id) → Artifact.
145
- # Plus une map type → step_id "le plus récent" pour le
146
- # fallback quand inputs_from ne précise pas la source.
147
  versioned: dict[tuple[ArtifactType, str], Artifact] = {}
148
- latest_producer: dict[ArtifactType, str] = {}
149
-
150
  for art_type, art in initial_inputs.items():
151
  versioned[(art_type, INITIAL_STEP_ID)] = art
152
- latest_producer[art_type] = INITIAL_STEP_ID
153
 
154
- # 3. Exécution séquentielle.
155
  step_results: list[StepResult] = []
156
  all_artifacts: list[Artifact] = list(initial_inputs.values())
157
  run_started = time.perf_counter()
158
 
159
- for step in spec.steps:
160
  result, produced = self._run_step(
161
- step=step,
162
  versioned=versioned,
163
- latest_producer=latest_producer,
164
  context=context,
165
  )
166
  step_results.append(result)
167
  for art_type, art in produced.items():
168
- versioned[(art_type, step.id)] = art
169
- latest_producer[art_type] = step.id
170
  all_artifacts.append(art)
171
 
172
  run_duration = time.perf_counter() - run_started
173
  succeeded = all(r.succeeded for r in step_results)
174
 
175
  return PipelineResult(
176
- pipeline_name=spec.name,
177
  document_id=document.id,
178
  step_results=tuple(step_results),
179
  succeeded=succeeded,
@@ -188,25 +254,25 @@ class PipelineExecutor:
188
  def _run_step(
189
  self,
190
  *,
191
- step: PipelineStep,
192
  versioned: dict[tuple[ArtifactType, str], Artifact],
193
- latest_producer: dict[ArtifactType, str],
194
  context: RunContext,
195
  ) -> tuple[StepResult, dict[ArtifactType, Artifact]]:
196
- """Exécute une étape, retourne (result, artefacts produits).
197
 
198
  Le tuple est important : si le step échoue, on retourne quand
199
  même un dict vide pour les artefacts → le caller peut
200
  continuer la boucle proprement.
201
  """
 
202
  step_started = time.perf_counter()
203
 
204
- # 1. Résoudre les inputs depuis le bag.
 
205
  try:
206
- inputs = self._resolve_inputs(
207
- step=step,
208
  versioned=versioned,
209
- latest_producer=latest_producer,
210
  )
211
  except _InputResolutionError as exc:
212
  duration = time.perf_counter() - step_started
@@ -302,41 +368,33 @@ class PipelineExecutor:
302
  outputs,
303
  )
304
 
305
- def _resolve_inputs(
306
  self,
307
  *,
308
- step: PipelineStep,
309
  versioned: dict[tuple[ArtifactType, str], Artifact],
310
- latest_producer: dict[ArtifactType, str],
311
  ) -> dict[ArtifactType, Artifact]:
312
  """Construit le dict ``{ArtifactType: Artifact}`` à passer
313
- à l'adapter, en respectant ``step.inputs_from``.
314
 
315
- Algorithme :
 
 
 
316
 
317
- - Pour chaque type dans ``step.input_types`` :
318
- - si ``step.inputs_from[type]`` est défini : exiger la
319
- version produite par cette étape, lever sinon ;
320
- - sinon : prendre la dernière version disponible
321
- (``latest_producer[type]``), lever si aucune.
322
  """
323
  inputs: dict[ArtifactType, Artifact] = {}
324
- for input_type in step.input_types:
325
- source_step = step.inputs_from.get(input_type)
326
- if source_step is None:
327
- source_step = latest_producer.get(input_type)
328
- if source_step is None:
329
- raise _InputResolutionError(
330
- f"missing_input: {input_type.value} "
331
- "non disponible dans le bag d'artefacts"
332
- )
333
- key = (input_type, source_step)
334
  if key not in versioned:
335
  raise _InputResolutionError(
336
- f"missing_input: {input_type.value}"
337
- f"@{source_step}"
338
  )
339
- inputs[input_type] = versioned[key]
340
  return inputs
341
 
342
 
 
1
+ """``PipelineExecutor`` mono-document — Sprints A14-S7 / S28.
2
 
3
+ Exécuteur séquentiel d'une pipeline composée sur un document.
4
+
5
+ Sprint S7 livrait ``run(spec, document, initial_inputs, context)``
6
+ qui validait la spec en interne et résolvait les bindings au
7
+ runtime via un bag versionné.
8
+
9
+ Sprint S28 introduit le ``PipelinePlanner`` qui transforme une
10
+ ``PipelineSpec`` en ``ExecutionPlan`` immuable (validations +
11
+ bindings résolus + jonctions de métriques détectées). L'executor
12
+ consomme désormais soit :
13
+
14
+ - Un ``ExecutionPlan`` pré-calculé via ``run_plan(plan, ...)`` —
15
+ signature canonique, contrat explicite.
16
+ - Une ``PipelineSpec`` brute via ``run(spec, ...)`` — sucre
17
+ ergonomique qui appelle le planner en interne (planification
18
+ systématique, pas de cache implicite).
19
 
20
  Contrat
21
  -------
22
+ Le caller (typiquement ``BenchmarkService`` ou ``CorpusRunner``)
23
+ fournit :
24
 
25
+ - un ``ExecutionPlan`` (canonique) ou ``PipelineSpec`` (sucre),
 
 
26
  - un ``DocumentRef`` du document à traiter,
27
  - un dict ``{ArtifactType: Artifact}`` des entrées initiales
28
  (typiquement ``{IMAGE: Artifact(...)}``),
29
+ - un ``RunContext`` (``document_id``, ``code_version``,
30
+ ``pipeline_name``, éventuel ``workspace_uri``),
31
+ - un ``adapter_resolver: Callable[[str], StepExecutor]`` injecté
32
+ au constructeur.
 
33
 
34
  L'executor garantit :
35
 
36
+ - Les étapes sont exécutées dans l'ordre du plan
37
+ (``resolved_steps``).
38
+ - Chaque entrée d'une étape est résolue depuis les
39
+ ``StepInputBinding`` du plan fini la résolution implicite
40
+ « dernier producteur » au runtime.
41
  - Toute exception levée par un adapter est capturée — le step
42
  est marqué ``succeeded=False`` avec ``error=str(exc)``, et le
43
+ pipeline continue (les étapes en aval pourront échouer si elles
44
+ dépendaient des outputs de ce step, ce qui est explicite).
45
  - Les ``output_types`` déclarés par l'adapter sont validés au
46
+ retour : un type promis manquant marque le step en échec avec
47
+ ``error="missing_output: <type>"``.
48
 
49
  L'executor ne garantit PAS (reportés à des sprints suivants) :
50
 
51
+ - Cache d'artefacts inter-runs (S29 livre ``ArtifactStore``).
52
+ - Parallélisation inter-documents ou inter-étapes (cf. S8 pour
53
+ inter-doc via ``CorpusRunner``).
54
+
55
+ Compat S7
56
+ ---------
57
+ La signature historique ``run(spec, document, ...)`` reste
58
+ exposée — elle planifie la spec systématiquement à chaque appel
59
+ et délègue à ``run_plan``. Aucune logique nouvelle n'y vit.
 
 
 
 
 
60
  """
61
 
62
  from __future__ import annotations
 
68
  from picarones.domain.artifacts import Artifact, ArtifactType
69
  from picarones.domain.documents import DocumentRef
70
  from picarones.domain.errors import PicaronesError
71
+ from picarones.pipeline.planner import (
72
+ ExecutionPlan,
73
+ PipelinePlanner,
74
+ PlanningError,
75
+ ResolvedStep,
76
+ )
77
  from picarones.pipeline.protocols import StepExecutor
78
+ from picarones.pipeline.spec import INITIAL_STEP_ID, PipelineSpec
79
  from picarones.pipeline.types import PipelineResult, RunContext, StepResult
 
80
 
81
  logger = logging.getLogger(__name__)
82
 
83
 
84
  class PipelineSpecInvalid(PicaronesError):
85
+ """``PipelineSpec`` mal formée — l'executor refuse de démarrer.
86
+
87
+ Wrappe le ``PlanningError`` produit par ``PipelinePlanner`` pour
88
+ préserver la sémantique historique : un caller qui catchait
89
+ ``PipelineSpecInvalid`` continue de fonctionner.
90
+ """
91
 
92
 
93
  #: Type alias pour le resolver d'adapters. Une fonction qui
 
115
  applicatif qui injecte les bonnes dépendances en prod.
116
  """
117
 
118
+ def __init__(
119
+ self,
120
+ adapter_resolver: AdapterResolver,
121
+ planner: PipelinePlanner | None = None,
122
+ ) -> None:
123
  if not callable(adapter_resolver):
124
  raise PicaronesError(
125
  "PipelineExecutor : adapter_resolver doit être callable."
126
  )
127
+ if planner is not None and not isinstance(planner, PipelinePlanner):
128
+ raise PicaronesError(
129
+ "PipelineExecutor : planner doit être un PipelinePlanner ou None."
130
+ )
131
  self._resolver = adapter_resolver
132
+ # Si pas de planner injecté, on en fabrique un sans MetricRegistry —
133
+ # les jonctions seront vides mais la planification reste correcte.
134
+ self._planner = planner if planner is not None else PipelinePlanner()
135
+
136
+ def plan(self, spec: PipelineSpec) -> ExecutionPlan:
137
+ """Planifie une ``PipelineSpec`` en ``ExecutionPlan``.
138
+
139
+ Sucre exposant le planner injecté. Permet aux callers
140
+ (typiquement ``CorpusRunner`` qui exécute la même spec sur
141
+ N documents) de planifier **une fois** puis appeler
142
+ ``run_plan`` N fois — économisant N-1 validations.
143
+
144
+ Raises
145
+ ------
146
+ PipelineSpecInvalid
147
+ Si la planification échoue (validations statiques).
148
+ """
149
+ try:
150
+ return self._planner.plan(spec)
151
+ except PlanningError as exc:
152
+ messages = "; ".join(
153
+ f"{e.step_id or '<global>'}: {e.message}"
154
+ for e in exc.errors
155
+ )
156
+ raise PipelineSpecInvalid(
157
+ f"Spec {spec.name!r} invalide : {messages}"
158
+ ) from exc
159
 
160
  def run(
161
  self,
 
164
  initial_inputs: dict[ArtifactType, Artifact],
165
  context: RunContext,
166
  ) -> PipelineResult:
167
+ """Exécute une pipeline complète sur un document (sucre).
168
+
169
+ Sucre ergonomique sur ``run_plan`` : appelle
170
+ ``self._planner.plan(spec)`` puis ``run_plan(plan, ...)``.
171
+ Aucune logique nouvelle n'y vit — l'API canonique est
172
+ ``run_plan(plan, document, initial_inputs, context)`` qui
173
+ accepte un ``ExecutionPlan`` pré-calculé.
174
 
175
  Returns
176
  -------
 
183
  Raises
184
  ------
185
  PipelineSpecInvalid
186
+ Si la planification échoue (validations statiques).
187
+ L'executor ne masque pas ce type d'erreur : c'est un
188
+ bug de programmation, pas un problème runtime.
189
  """
190
+ plan = self.plan(spec)
191
+ return self.run_plan(plan, document, initial_inputs, context)
192
+
193
+ def run_plan(
194
+ self,
195
+ plan: ExecutionPlan,
196
+ document: DocumentRef,
197
+ initial_inputs: dict[ArtifactType, Artifact],
198
+ context: RunContext,
199
+ ) -> PipelineResult:
200
+ """Exécute un ``ExecutionPlan`` pré-calculé sur un document.
201
+
202
+ Signature canonique du S28. Le caller a déjà appelé
203
+ ``planner.plan(spec)`` (typiquement ``CorpusRunner`` qui
204
+ planifie une fois pour N documents). L'executor consomme
205
+ directement ``plan.resolved_steps`` sans re-valider la
206
+ spec ni re-résoudre les bindings.
207
+
208
+ Toute la logique d'exécution vit ici ; ``run`` n'est qu'un
209
+ sucre.
210
+ """
211
+ if not isinstance(plan, ExecutionPlan):
212
+ raise PicaronesError(
213
+ f"run_plan : plan doit être un ExecutionPlan, "
214
+ f"reçu {type(plan).__name__}"
215
  )
216
 
217
+ # 1. Bag versionné : map (type, step_id) → Artifact.
 
 
218
  versioned: dict[tuple[ArtifactType, str], Artifact] = {}
 
 
219
  for art_type, art in initial_inputs.items():
220
  versioned[(art_type, INITIAL_STEP_ID)] = art
 
221
 
222
+ # 2. Exécution séquentielle des steps résolus.
223
  step_results: list[StepResult] = []
224
  all_artifacts: list[Artifact] = list(initial_inputs.values())
225
  run_started = time.perf_counter()
226
 
227
+ for resolved_step in plan.resolved_steps:
228
  result, produced = self._run_step(
229
+ resolved_step=resolved_step,
230
  versioned=versioned,
 
231
  context=context,
232
  )
233
  step_results.append(result)
234
  for art_type, art in produced.items():
235
+ versioned[(art_type, resolved_step.id)] = art
 
236
  all_artifacts.append(art)
237
 
238
  run_duration = time.perf_counter() - run_started
239
  succeeded = all(r.succeeded for r in step_results)
240
 
241
  return PipelineResult(
242
+ pipeline_name=plan.spec.name,
243
  document_id=document.id,
244
  step_results=tuple(step_results),
245
  succeeded=succeeded,
 
254
  def _run_step(
255
  self,
256
  *,
257
+ resolved_step: ResolvedStep,
258
  versioned: dict[tuple[ArtifactType, str], Artifact],
 
259
  context: RunContext,
260
  ) -> tuple[StepResult, dict[ArtifactType, Artifact]]:
261
+ """Exécute une étape résolue, retourne (result, artefacts produits).
262
 
263
  Le tuple est important : si le step échoue, on retourne quand
264
  même un dict vide pour les artefacts → le caller peut
265
  continuer la boucle proprement.
266
  """
267
+ step = resolved_step.step
268
  step_started = time.perf_counter()
269
 
270
+ # 1. Résoudre les inputs depuis le bag en suivant les bindings
271
+ # explicites du plan.
272
  try:
273
+ inputs = self._inputs_from_bindings(
274
+ resolved_step=resolved_step,
275
  versioned=versioned,
 
276
  )
277
  except _InputResolutionError as exc:
278
  duration = time.perf_counter() - step_started
 
368
  outputs,
369
  )
370
 
371
+ def _inputs_from_bindings(
372
  self,
373
  *,
374
+ resolved_step: ResolvedStep,
375
  versioned: dict[tuple[ArtifactType, str], Artifact],
 
376
  ) -> dict[ArtifactType, Artifact]:
377
  """Construit le dict ``{ArtifactType: Artifact}`` à passer
378
+ à l'adapter à partir des bindings explicites du plan.
379
 
380
+ Le plan a déjà résolu chaque ``input_type`` à une
381
+ ``source_step_id`` (soit ``INITIAL_STEP_ID``, soit l'ID
382
+ d'une étape antérieure). L'executor n'a plus qu'à indexer
383
+ le bag par ``(input_type, source_step_id)``.
384
 
385
+ Lève ``_InputResolutionError`` si l'artefact attendu
386
+ n'est pas dans le bag typiquement parce qu'une étape
387
+ antérieure a échoué et n'a pas produit son output.
 
 
388
  """
389
  inputs: dict[ArtifactType, Artifact] = {}
390
+ for binding in resolved_step.input_bindings:
391
+ key = (binding.input_type, binding.source_step_id)
 
 
 
 
 
 
 
 
392
  if key not in versioned:
393
  raise _InputResolutionError(
394
+ f"missing_input: {binding.input_type.value}"
395
+ f"@{binding.source_step_id}"
396
  )
397
+ inputs[binding.input_type] = versioned[key]
398
  return inputs
399
 
400
 
picarones/pipeline/planner.py ADDED
@@ -0,0 +1,403 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """``PipelinePlanner`` — Sprint A14-S28.
2
+
3
+ Le S6 livrait ``validate_spec`` (validation statique : types
4
+ cohérents, IDs uniques, ``inputs_from`` valides, adapters connus).
5
+ Le S7 livrait ``PipelineExecutor`` qui résolvait les bindings
6
+ **au runtime** (bag versionné consulté à chaque step).
7
+
8
+ S28 introduit une couche de **planification** qui transforme une
9
+ ``PipelineSpec`` en ``ExecutionPlan`` immuable :
10
+
11
+ 1. Validation statique (délègue à ``validate_spec``).
12
+ 2. Résolution explicite de chaque binding d'entrée — fini la
13
+ résolution implicite « dernier producteur » au runtime.
14
+ 3. Détection des **jonctions de métriques** : pour chaque sortie
15
+ de step, le planner interroge le ``MetricRegistry`` pour les
16
+ métriques applicables sur la signature ``(T, T)`` — base
17
+ pour l'auto-évaluation contre la GT du même niveau.
18
+ 4. Calcul d'un ordre topologique déterministe (les steps
19
+ ``inputs_from`` peuvent référencer n'importe quelle étape
20
+ antérieure ; le planner s'assure que la séquence est cohérente).
21
+
22
+ Pourquoi cette séparation
23
+ -------------------------
24
+ - **Contrat explicite** : l'executor consomme un ``ExecutionPlan``
25
+ immuable plutôt que de dériver les bindings au runtime — moins
26
+ de surprises, debug plus simple.
27
+ - **Réutilisabilité** : le ``CorpusRunner`` planifie **une fois**
28
+ pour la spec, exécute N fois (un par document) — économie marginale
29
+ mais clarté garantie.
30
+ - **Diagnostic** : un ``PlanningError`` capture toutes les erreurs
31
+ d'un coup (pas de short-circuit à la première erreur).
32
+ - **Métriques de jonction** : le planner liste les métriques
33
+ applicables à chaque sortie ; un service applicatif (S29+) peut
34
+ pré-calculer où l'évaluation est possible.
35
+
36
+ Anti-sur-ingénierie
37
+ -------------------
38
+ - Pas de cache de plan inter-spec (le coût de planification est
39
+ O(steps) et négligeable face à l'OCR).
40
+ - Pas d'optimisation de DAG (parallélisation, fusion, etc.) — le
41
+ plan reste séquentiel et correspond exactement à l'ordre des
42
+ steps.
43
+ - Pas de validation runtime additionnelle (artefacts effectivement
44
+ produits, etc.) — c'est la responsabilité de l'executor.
45
+ """
46
+
47
+ from __future__ import annotations
48
+
49
+ from dataclasses import dataclass, field
50
+
51
+ from picarones.domain.artifacts import ArtifactType
52
+ from picarones.domain.errors import PicaronesError
53
+ from picarones.evaluation.registry import MetricRegistry
54
+ from picarones.pipeline.spec import (
55
+ INITIAL_STEP_ID,
56
+ PipelineSpec,
57
+ PipelineStep,
58
+ )
59
+ from picarones.pipeline.validation import ValidationError, validate_spec
60
+
61
+
62
+ # ──────────────────────────────────────────────────────────────────────
63
+ # Erreur dédiée
64
+ # ──────────────────────────────────────────────────────────────────────
65
+
66
+
67
+ class PlanningError(PicaronesError):
68
+ """La spec n'a pas pu être planifiée — typiquement parce qu'elle
69
+ contient des erreurs de validation détectées par
70
+ ``validate_spec``.
71
+
72
+ Attributes
73
+ ----------
74
+ errors:
75
+ Liste des ``ValidationError`` produites par ``validate_spec``.
76
+ Le caller peut les rendre dans son rapport (CLI, JSON, HTML)
77
+ sans avoir à parser le message.
78
+ """
79
+
80
+ def __init__(
81
+ self, message: str, errors: list[ValidationError] | None = None,
82
+ ) -> None:
83
+ super().__init__(message)
84
+ self.errors: tuple[ValidationError, ...] = tuple(errors or ())
85
+
86
+
87
+ # ──────────────────────────────────────────────────────────────────────
88
+ # Modèles immuables du plan
89
+ # ──────────────────────────────────────────────────────────────────────
90
+
91
+
92
+ @dataclass(frozen=True)
93
+ class StepInputBinding:
94
+ """Binding explicite d'une entrée de step à sa source.
95
+
96
+ Attributes
97
+ ----------
98
+ input_type:
99
+ Type d'artefact consommé.
100
+ source_step_id:
101
+ ID de l'étape source, ou ``INITIAL_STEP_ID`` pour les
102
+ entrées initiales fournies au runner.
103
+
104
+ Notes
105
+ -----
106
+ Frozen — le caller doit considérer le binding comme un fait
107
+ figé du plan. Toute mutation invaliderait l'``ExecutionPlan``.
108
+ """
109
+
110
+ input_type: ArtifactType
111
+ source_step_id: str
112
+
113
+
114
+ @dataclass(frozen=True)
115
+ class ResolvedStep:
116
+ """Étape avec tous ses bindings d'entrée résolus.
117
+
118
+ Attributes
119
+ ----------
120
+ step:
121
+ Le ``PipelineStep`` original (frozen pydantic).
122
+ input_bindings:
123
+ Bindings explicites — un par ``input_type``. Préserve
124
+ l'ordre de ``step.input_types``.
125
+
126
+ Notes
127
+ -----
128
+ Le runner peut directement consommer ``input_bindings`` sans
129
+ refaire la résolution : pour chaque binding, il sait quelle
130
+ version de quel artefact aller chercher dans son bag.
131
+ """
132
+
133
+ step: PipelineStep
134
+ input_bindings: tuple[StepInputBinding, ...] = field(default_factory=tuple)
135
+
136
+ @property
137
+ def id(self) -> str:
138
+ return self.step.id
139
+
140
+ @property
141
+ def adapter_name(self) -> str:
142
+ return self.step.adapter_name
143
+
144
+
145
+ @dataclass(frozen=True)
146
+ class MetricJunction:
147
+ """Jonction de métriques détectée à la sortie d'un step.
148
+
149
+ Pour chaque sortie ``T`` d'un step, le planner interroge le
150
+ ``MetricRegistry`` pour les métriques de signature ``(T, T)``
151
+ — celles qui peuvent comparer la sortie du step à une GT
152
+ du même niveau. Un service applicatif (S29+) consomme cette
153
+ liste pour décider où auto-évaluer.
154
+
155
+ Attributes
156
+ ----------
157
+ step_id:
158
+ Step qui produit l'artefact évaluable.
159
+ artifact_type:
160
+ Type de l'artefact produit.
161
+ candidate_metrics:
162
+ Noms des métriques applicables, triés alphabétiquement
163
+ pour déterminisme.
164
+
165
+ Notes
166
+ -----
167
+ « Candidate » : la jonction est *applicable*, pas *exigée*. Le
168
+ caller décide selon la GT disponible et la stratégie d'évaluation.
169
+ """
170
+
171
+ step_id: str
172
+ artifact_type: ArtifactType
173
+ candidate_metrics: tuple[str, ...] = field(default_factory=tuple)
174
+
175
+
176
+ @dataclass(frozen=True)
177
+ class ExecutionPlan:
178
+ """Plan d'exécution immuable consommable par le ``PipelineExecutor``.
179
+
180
+ Construit par ``PipelinePlanner.plan(spec)``. Garantit que :
181
+
182
+ - La spec est statiquement valide (toutes les ``ValidationError``
183
+ sont nulles).
184
+ - Chaque step a ses bindings résolus (``input_bindings`` non vide
185
+ pour chaque ``input_type`` déclaré).
186
+ - L'ordre topologique est respecté (``resolved_steps`` suit
187
+ l'ordre de ``spec.steps``, qui doit déjà être topologique).
188
+ - Les jonctions de métriques sont indexées par step.
189
+
190
+ Attributes
191
+ ----------
192
+ spec:
193
+ La ``PipelineSpec`` source (référence, pas copie).
194
+ resolved_steps:
195
+ Steps avec bindings résolus, dans l'ordre topologique
196
+ d'exécution.
197
+ metric_junctions:
198
+ Jonctions auto-détectées si un ``MetricRegistry`` était
199
+ fourni au planner ; tuple vide sinon.
200
+ """
201
+
202
+ spec: PipelineSpec
203
+ resolved_steps: tuple[ResolvedStep, ...] = field(default_factory=tuple)
204
+ metric_junctions: tuple[MetricJunction, ...] = field(default_factory=tuple)
205
+
206
+ def step_by_id(self, step_id: str) -> ResolvedStep | None:
207
+ """Retourne le step résolu par son id, ou ``None``."""
208
+ for rs in self.resolved_steps:
209
+ if rs.id == step_id:
210
+ return rs
211
+ return None
212
+
213
+ def junctions_for_step(self, step_id: str) -> tuple[MetricJunction, ...]:
214
+ """Retourne les jonctions de métriques associées à un step."""
215
+ return tuple(
216
+ j for j in self.metric_junctions if j.step_id == step_id
217
+ )
218
+
219
+
220
+ # ──────────────────────────────────────────────────────────────────────
221
+ # Planificateur
222
+ # ──────────────────────────────────────────────────────────────────────
223
+
224
+
225
+ class PipelinePlanner:
226
+ """Planificateur d'une ``PipelineSpec`` en ``ExecutionPlan``.
227
+
228
+ Parameters
229
+ ----------
230
+ metric_registry:
231
+ Optionnel — si fourni, les jonctions de métriques sont
232
+ détectées pour chaque sortie de step. Sinon, le plan a
233
+ ``metric_junctions=()``.
234
+ available_adapters:
235
+ Optionnel — set des noms d'adapters connus. Si fourni, la
236
+ validation rejette les ``adapter_name`` inconnus. Sinon,
237
+ cette validation est sautée (utile pour les YAML qui
238
+ peuvent référencer des adapters tiers absents en CI).
239
+
240
+ Notes
241
+ -----
242
+ Stateless : le planner ne mémorise aucun état entre appels.
243
+ Thread-safe en lecture/écriture.
244
+ """
245
+
246
+ def __init__(
247
+ self,
248
+ metric_registry: MetricRegistry | None = None,
249
+ available_adapters: set[str] | None = None,
250
+ ) -> None:
251
+ if metric_registry is not None and not isinstance(
252
+ metric_registry, MetricRegistry,
253
+ ):
254
+ raise TypeError(
255
+ "metric_registry doit être un MetricRegistry ou None."
256
+ )
257
+ self._metrics = metric_registry
258
+ self._adapters = (
259
+ frozenset(available_adapters)
260
+ if available_adapters is not None
261
+ else None
262
+ )
263
+
264
+ def plan(self, spec: PipelineSpec) -> ExecutionPlan:
265
+ """Construit un ``ExecutionPlan`` à partir d'une ``PipelineSpec``.
266
+
267
+ Étapes :
268
+
269
+ 1. ``validate_spec(spec, available_adapters)`` — récolte
270
+ toutes les erreurs structurelles.
271
+ 2. Si erreurs → ``PlanningError`` avec la liste complète.
272
+ 3. Sinon, résout les bindings step par step en simulant le
273
+ bag versionné.
274
+ 4. Si un registre de métriques est disponible, détecte les
275
+ jonctions pour chaque sortie de step.
276
+
277
+ Raises
278
+ ------
279
+ PlanningError
280
+ Si la validation statique échoue. Le caller peut
281
+ inspecter ``error.errors`` pour rendre un rapport.
282
+ """
283
+ # 1. Validation statique.
284
+ errors = validate_spec(
285
+ spec,
286
+ available_adapters=set(self._adapters) if self._adapters else None,
287
+ )
288
+ if errors:
289
+ n = len(errors)
290
+ preview = "; ".join(
291
+ f"{e.step_id or '<global>'}:{e.code}"
292
+ for e in errors[:3]
293
+ )
294
+ suffix = f" (+{n - 3} de plus)" if n > 3 else ""
295
+ raise PlanningError(
296
+ f"PipelineSpec {spec.name!r} a {n} erreur(s) de "
297
+ f"validation : {preview}{suffix}",
298
+ errors=errors,
299
+ )
300
+
301
+ # 2. Résolution des bindings.
302
+ resolved_steps = self._resolve_steps(spec)
303
+
304
+ # 3. Détection des jonctions de métriques.
305
+ metric_junctions = (
306
+ self._detect_junctions(spec)
307
+ if self._metrics is not None
308
+ else ()
309
+ )
310
+
311
+ return ExecutionPlan(
312
+ spec=spec,
313
+ resolved_steps=resolved_steps,
314
+ metric_junctions=metric_junctions,
315
+ )
316
+
317
+ # ──────────────────────────────────────────────────────────────────
318
+ # Helpers internes
319
+ # ──────────────────────────────────────────────────────────────────
320
+
321
+ def _resolve_steps(
322
+ self, spec: PipelineSpec,
323
+ ) -> tuple[ResolvedStep, ...]:
324
+ """Résout les bindings de chaque step en simulant le bag.
325
+
326
+ Pour chaque ``input_type`` d'un step :
327
+
328
+ - Si ``inputs_from[input_type]`` est défini → ce step est la
329
+ source explicite.
330
+ - Sinon → la source est le **dernier producteur** du type
331
+ dans l'ordre topologique (équivalent au comportement
332
+ historique de l'executor S7).
333
+
334
+ ``validate_spec`` garantit que ces résolutions sont valides
335
+ (pas de référence pendante, type produit par la source).
336
+ """
337
+ latest_producer: dict[ArtifactType, str] = {
338
+ t: INITIAL_STEP_ID for t in spec.initial_inputs
339
+ }
340
+ resolved: list[ResolvedStep] = []
341
+
342
+ for step in spec.steps:
343
+ bindings: list[StepInputBinding] = []
344
+ for input_type in step.input_types:
345
+ source = step.inputs_from.get(input_type)
346
+ if source is None:
347
+ # validate_spec a vérifié que latest_producer[t]
348
+ # existe → on peut indexer sans garde.
349
+ source = latest_producer[input_type]
350
+ bindings.append(StepInputBinding(
351
+ input_type=input_type,
352
+ source_step_id=source,
353
+ ))
354
+ resolved.append(ResolvedStep(
355
+ step=step,
356
+ input_bindings=tuple(bindings),
357
+ ))
358
+ # Mise à jour de l'état pour les steps suivants.
359
+ for output_type in step.output_types:
360
+ latest_producer[output_type] = step.id
361
+
362
+ return tuple(resolved)
363
+
364
+ def _detect_junctions(
365
+ self, spec: PipelineSpec,
366
+ ) -> tuple[MetricJunction, ...]:
367
+ """Détecte les jonctions de métriques pour chaque sortie.
368
+
369
+ Pour chaque ``output_type`` ``T`` d'un step, interroge le
370
+ ``MetricRegistry`` pour les métriques de signature ``(T, T)``
371
+ — métriques applicables à la comparaison ``GT[T]`` vs
372
+ ``step.outputs[T]``.
373
+
374
+ Si aucune métrique n'est applicable, la jonction est tout
375
+ de même listée avec ``candidate_metrics=()`` — un caller
376
+ peut ainsi détecter qu'un step produit un type non
377
+ évaluable et décider de la suite (warning, registre étendu,
378
+ omission).
379
+ """
380
+ # Garde-fou : devrait être garanti par le check dans plan().
381
+ if self._metrics is None: # pragma: no cover
382
+ return ()
383
+ junctions: list[MetricJunction] = []
384
+ for step in spec.steps:
385
+ for output_type in step.output_types:
386
+ specs = self._metrics.select(output_type, output_type)
387
+ names = tuple(sorted(s.name for s in specs))
388
+ junctions.append(MetricJunction(
389
+ step_id=step.id,
390
+ artifact_type=output_type,
391
+ candidate_metrics=names,
392
+ ))
393
+ return tuple(junctions)
394
+
395
+
396
+ __all__ = [
397
+ "ExecutionPlan",
398
+ "MetricJunction",
399
+ "PipelinePlanner",
400
+ "PlanningError",
401
+ "ResolvedStep",
402
+ "StepInputBinding",
403
+ ]
picarones/pipeline/runner.py CHANGED
@@ -206,6 +206,12 @@ class CorpusRunner:
206
  outcomes=(),
207
  )
208
 
 
 
 
 
 
 
209
  # Pool instancié explicitement avec ``shutdown(wait=False,
210
  # cancel_futures=True)`` à la sortie : les futures en queue
211
  # sont annulées, les threads en cours continuent en
@@ -240,7 +246,7 @@ class CorpusRunner:
240
  return False
241
  fut = pool.submit(
242
  self._run_one,
243
- spec=spec,
244
  document=doc,
245
  initial_inputs_factory=initial_inputs_factory,
246
  context_factory=context_factory,
@@ -358,15 +364,15 @@ class CorpusRunner:
358
  def _run_one(
359
  self,
360
  *,
361
- spec: PipelineSpec,
362
  document: DocumentRef,
363
  initial_inputs_factory: InitialInputsFactory,
364
  context_factory: ContextFactory,
365
  started_at: dict[str, float],
366
  started_at_lock: threading.Lock,
367
  ) -> PipelineResult:
368
- """Exécute la pipeline sur un document. Appelé dans un thread
369
- du pool.
370
 
371
  Enregistre ``started_at[doc.id]`` au tout début pour que
372
  l'orchestrateur puisse mesurer le timeout depuis le début
@@ -381,9 +387,11 @@ class CorpusRunner:
381
  initial_inputs = initial_inputs_factory(document)
382
  context = context_factory(document)
383
 
384
- # 3. Déléguer au PipelineExecutor mono-doc (S7).
385
- return self._executor.run(
386
- spec=spec,
 
 
387
  document=document,
388
  initial_inputs=initial_inputs,
389
  context=context,
 
206
  outcomes=(),
207
  )
208
 
209
+ # S28 : on planifie une seule fois pour la spec. Si la spec
210
+ # est invalide, on lève maintenant — pas dans chaque worker.
211
+ # Les workers consomment ensuite ``executor.run_plan(plan, ...)``
212
+ # → N-1 validations économisées.
213
+ plan = self._executor.plan(spec)
214
+
215
  # Pool instancié explicitement avec ``shutdown(wait=False,
216
  # cancel_futures=True)`` à la sortie : les futures en queue
217
  # sont annulées, les threads en cours continuent en
 
246
  return False
247
  fut = pool.submit(
248
  self._run_one,
249
+ plan=plan,
250
  document=doc,
251
  initial_inputs_factory=initial_inputs_factory,
252
  context_factory=context_factory,
 
364
  def _run_one(
365
  self,
366
  *,
367
+ plan, # ExecutionPlan ; type omis pour éviter l'import top-level
368
  document: DocumentRef,
369
  initial_inputs_factory: InitialInputsFactory,
370
  context_factory: ContextFactory,
371
  started_at: dict[str, float],
372
  started_at_lock: threading.Lock,
373
  ) -> PipelineResult:
374
+ """Exécute le plan pré-calculé sur un document. Appelé dans
375
+ un thread du pool.
376
 
377
  Enregistre ``started_at[doc.id]`` au tout début pour que
378
  l'orchestrateur puisse mesurer le timeout depuis le début
 
387
  initial_inputs = initial_inputs_factory(document)
388
  context = context_factory(document)
389
 
390
+ # 3. Déléguer au PipelineExecutor.run_plan (S28). Le plan a
391
+ # déjà été validé une fois par le runner ; pas de re-validation
392
+ # par doc.
393
+ return self._executor.run_plan(
394
+ plan=plan,
395
  document=document,
396
  initial_inputs=initial_inputs,
397
  context=context,
tests/architecture/test_file_budgets.py CHANGED
@@ -78,6 +78,12 @@ FILE_BUDGETS: dict[str, int] = {
78
  # réel / annulation propre. Budget stable, l'extension
79
  # ProcessPoolExecutor (S11) restera dans cette enveloppe.
80
  "picarones/pipeline/runner.py": 550, # actuel 462
 
 
 
 
 
 
81
  "picarones/core/corpus.py": 600, # actuel 511
82
  "picarones/fixtures.py": 600, # actuel 510
83
  "picarones/measurements/inter_engine.py": 575, # actuel 484
 
78
  # réel / annulation propre. Budget stable, l'extension
79
  # ProcessPoolExecutor (S11) restera dans cette enveloppe.
80
  "picarones/pipeline/runner.py": 550, # actuel 462
81
+ # Sprint A14-S28 — PipelineExecutor refondu pour consommer un
82
+ # ExecutionPlan (run_plan) tout en gardant run(spec) comme sucre.
83
+ # PipelinePlanner introduit pour transformer une PipelineSpec en
84
+ # plan immuable (validation + bindings + jonctions de métriques).
85
+ "picarones/pipeline/executor.py": 475, # actuel 413
86
+ "picarones/pipeline/planner.py": 465, # actuel 403
87
  "picarones/core/corpus.py": 600, # actuel 511
88
  "picarones/fixtures.py": 600, # actuel 510
89
  "picarones/measurements/inter_engine.py": 575, # actuel 484
tests/pipeline/test_sprint_a14_s28_planner.py ADDED
@@ -0,0 +1,628 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Sprint A14-S28 — ``PipelinePlanner`` + ``ExecutionPlan``.
2
+
3
+ Tests du planner introduit par S28 pour transformer une
4
+ ``PipelineSpec`` en plan d'exécution immuable consommé par
5
+ le ``PipelineExecutor.run_plan``.
6
+
7
+ Couvre :
8
+
9
+ 1. ``PipelinePlanner.plan`` :
10
+ - spec valide → ExecutionPlan avec resolved_steps + bindings ;
11
+ - spec invalide → PlanningError avec liste d'erreurs ;
12
+ - DAG branchant (inputs_from explicite) → bindings non implicites ;
13
+ - validation d'adapters (set fourni) ;
14
+ - validation d'adapters (None → skip).
15
+
16
+ 2. Détection des jonctions de métriques :
17
+ - sans MetricRegistry → metric_junctions = () ;
18
+ - avec MetricRegistry → 1 junction par sortie de step ;
19
+ - sortie sans métrique applicable → candidate_metrics = () ;
20
+ - tri alphabétique déterministe des noms.
21
+
22
+ 3. ``ExecutionPlan`` API :
23
+ - frozen dataclass ;
24
+ - step_by_id() ;
25
+ - junctions_for_step().
26
+
27
+ 4. Intégration avec ``PipelineExecutor`` :
28
+ - run_plan(plan) consume un plan pré-calculé ;
29
+ - run(spec) plan internement et exécute ;
30
+ - executor.plan(spec) sucre.
31
+ """
32
+
33
+ from __future__ import annotations
34
+
35
+ import pytest
36
+
37
+ from picarones.domain.artifacts import Artifact, ArtifactType
38
+ from picarones.domain.documents import DocumentRef
39
+ from picarones.domain.evaluation_spec import MetricSpec
40
+ from picarones.evaluation.registry import MetricRegistry
41
+ from picarones.pipeline.executor import PipelineExecutor, PipelineSpecInvalid
42
+ from picarones.pipeline.planner import (
43
+ ExecutionPlan,
44
+ MetricJunction,
45
+ PipelinePlanner,
46
+ PlanningError,
47
+ StepInputBinding,
48
+ )
49
+ from picarones.pipeline.spec import (
50
+ INITIAL_STEP_ID,
51
+ PipelineSpec,
52
+ PipelineStep,
53
+ )
54
+ from picarones.pipeline.types import RunContext
55
+
56
+
57
+ # ──────────────────────────────────────────────────────────────────────
58
+ # Stub adapter
59
+ # ──────────────────────────────────────────────────────────────────────
60
+
61
+
62
+ class _IdentityAdapter:
63
+ """Adapter qui retourne directement ses inputs comme outputs."""
64
+
65
+ name = "identity"
66
+ input_types = frozenset() # ne sert pas — l'executor lit step.input_types
67
+ output_types = frozenset()
68
+ execution_mode = "io"
69
+
70
+ def execute(self, inputs, params, context):
71
+ return {
72
+ t: Artifact(
73
+ id=f"{context.document_id}:{t.value}",
74
+ document_id=context.document_id,
75
+ type=t,
76
+ )
77
+ for t in inputs
78
+ }
79
+
80
+
81
+ class _OCRStub:
82
+ name = "ocr_stub"
83
+ input_types = frozenset({ArtifactType.IMAGE})
84
+ output_types = frozenset({ArtifactType.RAW_TEXT})
85
+ execution_mode = "io"
86
+
87
+ def execute(self, inputs, params, context):
88
+ return {
89
+ ArtifactType.RAW_TEXT: Artifact(
90
+ id=f"{context.document_id}:raw",
91
+ document_id=context.document_id,
92
+ type=ArtifactType.RAW_TEXT,
93
+ ),
94
+ }
95
+
96
+
97
+ # ──────────────────────────────────────────────────────────────────────
98
+ # PipelinePlanner — validation
99
+ # ──────────────────────────────────────────────────────────────────────
100
+
101
+
102
+ class TestPipelinePlannerConstructor:
103
+ def test_no_args(self) -> None:
104
+ planner = PipelinePlanner()
105
+ assert planner is not None
106
+
107
+ def test_with_metric_registry(self) -> None:
108
+ planner = PipelinePlanner(metric_registry=MetricRegistry())
109
+ assert planner is not None
110
+
111
+ def test_rejects_non_metric_registry(self) -> None:
112
+ with pytest.raises(TypeError, match="metric_registry"):
113
+ PipelinePlanner(metric_registry="nope") # type: ignore[arg-type]
114
+
115
+ def test_with_available_adapters(self) -> None:
116
+ planner = PipelinePlanner(available_adapters={"adapter_a", "adapter_b"})
117
+ assert planner is not None
118
+
119
+
120
+ class TestPipelinePlannerErrors:
121
+ def test_empty_spec_raises_planning_error(self) -> None:
122
+ spec = PipelineSpec(name="empty", steps=())
123
+ planner = PipelinePlanner()
124
+ with pytest.raises(PlanningError) as exc_info:
125
+ planner.plan(spec)
126
+ assert exc_info.value.errors
127
+ assert exc_info.value.errors[0].code == "empty_pipeline"
128
+
129
+ def test_unknown_adapter_raises_when_set_provided(self) -> None:
130
+ spec = PipelineSpec(
131
+ name="unknown_adapter",
132
+ initial_inputs=(ArtifactType.IMAGE,),
133
+ steps=(PipelineStep(
134
+ id="s1",
135
+ kind="ocr",
136
+ adapter_name="not_in_registry",
137
+ input_types=(ArtifactType.IMAGE,),
138
+ output_types=(ArtifactType.RAW_TEXT,),
139
+ ),),
140
+ )
141
+ planner = PipelinePlanner(available_adapters={"foo", "bar"})
142
+ with pytest.raises(PlanningError) as exc_info:
143
+ planner.plan(spec)
144
+ assert any(
145
+ e.code == "unknown_adapter" for e in exc_info.value.errors
146
+ )
147
+
148
+ def test_unknown_adapter_skipped_when_set_none(self) -> None:
149
+ """Sans set d'adapters fourni, la validation est sautée."""
150
+ spec = PipelineSpec(
151
+ name="unknown_adapter",
152
+ initial_inputs=(ArtifactType.IMAGE,),
153
+ steps=(PipelineStep(
154
+ id="s1",
155
+ kind="ocr",
156
+ adapter_name="any_name",
157
+ input_types=(ArtifactType.IMAGE,),
158
+ output_types=(ArtifactType.RAW_TEXT,),
159
+ ),),
160
+ )
161
+ planner = PipelinePlanner()
162
+ plan = planner.plan(spec) # ne lève pas
163
+ assert isinstance(plan, ExecutionPlan)
164
+
165
+ def test_planning_error_carries_all_errors(self) -> None:
166
+ """Le planner ne short-circuit pas — il récolte toutes les erreurs."""
167
+ spec = PipelineSpec(
168
+ name="multi_err",
169
+ initial_inputs=(ArtifactType.IMAGE,),
170
+ steps=(
171
+ PipelineStep(
172
+ id="s1",
173
+ kind="ocr",
174
+ adapter_name="bad_a",
175
+ input_types=(ArtifactType.IMAGE,),
176
+ output_types=(ArtifactType.RAW_TEXT,),
177
+ ),
178
+ PipelineStep(
179
+ id="s1", # duplicated id !
180
+ kind="other",
181
+ adapter_name="bad_b",
182
+ input_types=(ArtifactType.RAW_TEXT,),
183
+ output_types=(ArtifactType.CORRECTED_TEXT,),
184
+ ),
185
+ ),
186
+ )
187
+ planner = PipelinePlanner(available_adapters={"only_one"})
188
+ with pytest.raises(PlanningError) as exc_info:
189
+ planner.plan(spec)
190
+ codes = {e.code for e in exc_info.value.errors}
191
+ assert "duplicate_id" in codes
192
+ assert "unknown_adapter" in codes
193
+
194
+
195
+ # ──────────────────────────────────────────────────────────────────────
196
+ # PipelinePlanner — résolution des bindings
197
+ # ──────────────────────────────────────────────────────────────────────
198
+
199
+
200
+ class TestPipelinePlannerBindings:
201
+ def test_simple_chain_resolves_to_initial(self) -> None:
202
+ spec = PipelineSpec(
203
+ name="simple",
204
+ initial_inputs=(ArtifactType.IMAGE,),
205
+ steps=(PipelineStep(
206
+ id="ocr",
207
+ kind="ocr",
208
+ adapter_name="ocr_stub",
209
+ input_types=(ArtifactType.IMAGE,),
210
+ output_types=(ArtifactType.RAW_TEXT,),
211
+ ),),
212
+ )
213
+ plan = PipelinePlanner().plan(spec)
214
+ assert len(plan.resolved_steps) == 1
215
+ rs = plan.resolved_steps[0]
216
+ assert rs.id == "ocr"
217
+ assert len(rs.input_bindings) == 1
218
+ binding = rs.input_bindings[0]
219
+ assert binding.input_type == ArtifactType.IMAGE
220
+ assert binding.source_step_id == INITIAL_STEP_ID
221
+
222
+ def test_two_step_chain_resolves_to_previous(self) -> None:
223
+ spec = PipelineSpec(
224
+ name="two_step",
225
+ initial_inputs=(ArtifactType.IMAGE,),
226
+ steps=(
227
+ PipelineStep(
228
+ id="ocr",
229
+ kind="ocr",
230
+ adapter_name="ocr_stub",
231
+ input_types=(ArtifactType.IMAGE,),
232
+ output_types=(ArtifactType.RAW_TEXT,),
233
+ ),
234
+ PipelineStep(
235
+ id="post",
236
+ kind="post_correction",
237
+ adapter_name="llm_corrector",
238
+ input_types=(ArtifactType.RAW_TEXT,),
239
+ output_types=(ArtifactType.CORRECTED_TEXT,),
240
+ ),
241
+ ),
242
+ )
243
+ plan = PipelinePlanner().plan(spec)
244
+ assert len(plan.resolved_steps) == 2
245
+ # 1er step : IMAGE depuis __initial__
246
+ assert plan.resolved_steps[0].input_bindings[0].source_step_id == INITIAL_STEP_ID
247
+ # 2e step : RAW_TEXT depuis le step "ocr"
248
+ assert plan.resolved_steps[1].input_bindings[0].source_step_id == "ocr"
249
+
250
+ def test_inputs_from_explicit_overrides_latest(self) -> None:
251
+ """Si inputs_from désigne une étape antérieure non-récente,
252
+ le binding doit pointer vers cette étape, pas vers le
253
+ dernier producteur."""
254
+ spec = PipelineSpec(
255
+ name="explicit_dag",
256
+ initial_inputs=(ArtifactType.IMAGE,),
257
+ steps=(
258
+ PipelineStep(
259
+ id="ocr_a",
260
+ kind="ocr",
261
+ adapter_name="ocr_a",
262
+ input_types=(ArtifactType.IMAGE,),
263
+ output_types=(ArtifactType.RAW_TEXT,),
264
+ ),
265
+ PipelineStep(
266
+ id="ocr_b",
267
+ kind="ocr",
268
+ adapter_name="ocr_b",
269
+ input_types=(ArtifactType.IMAGE,),
270
+ output_types=(ArtifactType.RAW_TEXT,),
271
+ ),
272
+ PipelineStep(
273
+ id="post_from_a",
274
+ kind="post_correction",
275
+ adapter_name="llm",
276
+ input_types=(ArtifactType.RAW_TEXT,),
277
+ output_types=(ArtifactType.CORRECTED_TEXT,),
278
+ # On veut explicitement le RAW_TEXT de ocr_a, pas ocr_b
279
+ # qui serait le « dernier producteur ».
280
+ inputs_from={ArtifactType.RAW_TEXT: "ocr_a"},
281
+ ),
282
+ ),
283
+ )
284
+ plan = PipelinePlanner().plan(spec)
285
+ assert plan.resolved_steps[2].input_bindings[0].source_step_id == "ocr_a"
286
+
287
+ def test_resolved_step_preserves_input_order(self) -> None:
288
+ spec = PipelineSpec(
289
+ name="multi_input",
290
+ initial_inputs=(ArtifactType.IMAGE, ArtifactType.RAW_TEXT),
291
+ steps=(PipelineStep(
292
+ id="merge",
293
+ kind="merge",
294
+ adapter_name="m",
295
+ input_types=(ArtifactType.IMAGE, ArtifactType.RAW_TEXT),
296
+ output_types=(ArtifactType.CORRECTED_TEXT,),
297
+ ),),
298
+ )
299
+ plan = PipelinePlanner().plan(spec)
300
+ types = [b.input_type for b in plan.resolved_steps[0].input_bindings]
301
+ assert types == [ArtifactType.IMAGE, ArtifactType.RAW_TEXT]
302
+
303
+
304
+ # ──────────────────────────────────────────────────────────────────────
305
+ # PipelinePlanner — détection des jonctions de métriques
306
+ # ──────────────────────────────────────────────────────────────────────
307
+
308
+
309
+ def _registry_with_text_metric() -> MetricRegistry:
310
+ reg = MetricRegistry()
311
+ reg.register(
312
+ MetricSpec(
313
+ name="cer",
314
+ input_types=(ArtifactType.RAW_TEXT, ArtifactType.RAW_TEXT),
315
+ ),
316
+ lambda r, h: 0.0,
317
+ )
318
+ reg.register(
319
+ MetricSpec(
320
+ name="wer",
321
+ input_types=(ArtifactType.RAW_TEXT, ArtifactType.RAW_TEXT),
322
+ ),
323
+ lambda r, h: 0.0,
324
+ )
325
+ return reg
326
+
327
+
328
+ class TestPipelinePlannerJunctions:
329
+ def test_no_registry_means_empty_junctions(self) -> None:
330
+ spec = PipelineSpec(
331
+ name="x",
332
+ initial_inputs=(ArtifactType.IMAGE,),
333
+ steps=(PipelineStep(
334
+ id="ocr", kind="ocr", adapter_name="ocr_stub",
335
+ input_types=(ArtifactType.IMAGE,),
336
+ output_types=(ArtifactType.RAW_TEXT,),
337
+ ),),
338
+ )
339
+ plan = PipelinePlanner().plan(spec)
340
+ assert plan.metric_junctions == ()
341
+
342
+ def test_registry_yields_junctions_per_output(self) -> None:
343
+ spec = PipelineSpec(
344
+ name="x",
345
+ initial_inputs=(ArtifactType.IMAGE,),
346
+ steps=(PipelineStep(
347
+ id="ocr", kind="ocr", adapter_name="ocr_stub",
348
+ input_types=(ArtifactType.IMAGE,),
349
+ output_types=(ArtifactType.RAW_TEXT,),
350
+ ),),
351
+ )
352
+ plan = PipelinePlanner(
353
+ metric_registry=_registry_with_text_metric(),
354
+ ).plan(spec)
355
+ assert len(plan.metric_junctions) == 1
356
+ j = plan.metric_junctions[0]
357
+ assert j.step_id == "ocr"
358
+ assert j.artifact_type == ArtifactType.RAW_TEXT
359
+ # Tri alphabétique déterministe
360
+ assert j.candidate_metrics == ("cer", "wer")
361
+
362
+ def test_output_without_metric_yields_empty_candidates(self) -> None:
363
+ """Un type d'output sans métrique enregistrée donne tout de
364
+ même une jonction (utile pour le diagnostic) avec
365
+ candidate_metrics vide."""
366
+ spec = PipelineSpec(
367
+ name="x",
368
+ initial_inputs=(ArtifactType.IMAGE,),
369
+ steps=(PipelineStep(
370
+ id="alto",
371
+ kind="alto",
372
+ adapter_name="alto_stub",
373
+ input_types=(ArtifactType.IMAGE,),
374
+ output_types=(ArtifactType.ALTO_XML,),
375
+ ),),
376
+ )
377
+ plan = PipelinePlanner(
378
+ metric_registry=_registry_with_text_metric(),
379
+ ).plan(spec)
380
+ assert len(plan.metric_junctions) == 1
381
+ j = plan.metric_junctions[0]
382
+ assert j.step_id == "alto"
383
+ assert j.artifact_type == ArtifactType.ALTO_XML
384
+ assert j.candidate_metrics == ()
385
+
386
+
387
+ # ──────────────────────────────────────────────────────────────────────
388
+ # ExecutionPlan API
389
+ # ──────────────────────────────────────────────────────────────────────
390
+
391
+
392
+ class TestExecutionPlanAPI:
393
+ def test_step_by_id(self) -> None:
394
+ spec = PipelineSpec(
395
+ name="x",
396
+ initial_inputs=(ArtifactType.IMAGE,),
397
+ steps=(
398
+ PipelineStep(
399
+ id="a", kind="ocr", adapter_name="x",
400
+ input_types=(ArtifactType.IMAGE,),
401
+ output_types=(ArtifactType.RAW_TEXT,),
402
+ ),
403
+ PipelineStep(
404
+ id="b", kind="post", adapter_name="y",
405
+ input_types=(ArtifactType.RAW_TEXT,),
406
+ output_types=(ArtifactType.CORRECTED_TEXT,),
407
+ ),
408
+ ),
409
+ )
410
+ plan = PipelinePlanner().plan(spec)
411
+ a = plan.step_by_id("a")
412
+ assert a is not None
413
+ assert a.id == "a"
414
+ assert plan.step_by_id("missing") is None
415
+
416
+ def test_junctions_for_step(self) -> None:
417
+ spec = PipelineSpec(
418
+ name="x",
419
+ initial_inputs=(ArtifactType.IMAGE,),
420
+ steps=(
421
+ PipelineStep(
422
+ id="ocr", kind="ocr", adapter_name="o",
423
+ input_types=(ArtifactType.IMAGE,),
424
+ output_types=(ArtifactType.RAW_TEXT,),
425
+ ),
426
+ PipelineStep(
427
+ id="post", kind="post", adapter_name="p",
428
+ input_types=(ArtifactType.RAW_TEXT,),
429
+ output_types=(ArtifactType.CORRECTED_TEXT,),
430
+ ),
431
+ ),
432
+ )
433
+ plan = PipelinePlanner(
434
+ metric_registry=_registry_with_text_metric(),
435
+ ).plan(spec)
436
+ ocr_junctions = plan.junctions_for_step("ocr")
437
+ assert len(ocr_junctions) == 1
438
+ assert ocr_junctions[0].artifact_type == ArtifactType.RAW_TEXT
439
+ assert plan.junctions_for_step("missing") == ()
440
+
441
+ def test_dataclass_frozen(self) -> None:
442
+ spec = PipelineSpec(
443
+ name="x",
444
+ initial_inputs=(ArtifactType.IMAGE,),
445
+ steps=(PipelineStep(
446
+ id="ocr", kind="ocr", adapter_name="o",
447
+ input_types=(ArtifactType.IMAGE,),
448
+ output_types=(ArtifactType.RAW_TEXT,),
449
+ ),),
450
+ )
451
+ plan = PipelinePlanner().plan(spec)
452
+ with pytest.raises(Exception): # FrozenInstanceError
453
+ plan.spec = None # type: ignore[misc]
454
+
455
+ def test_step_input_binding_frozen(self) -> None:
456
+ b = StepInputBinding(
457
+ input_type=ArtifactType.IMAGE,
458
+ source_step_id="x",
459
+ )
460
+ with pytest.raises(Exception): # FrozenInstanceError
461
+ b.source_step_id = "y" # type: ignore[misc]
462
+
463
+ def test_resolved_step_frozen(self) -> None:
464
+ spec = PipelineSpec(
465
+ name="x",
466
+ initial_inputs=(ArtifactType.IMAGE,),
467
+ steps=(PipelineStep(
468
+ id="s", kind="k", adapter_name="a",
469
+ input_types=(ArtifactType.IMAGE,),
470
+ output_types=(ArtifactType.RAW_TEXT,),
471
+ ),),
472
+ )
473
+ plan = PipelinePlanner().plan(spec)
474
+ rs = plan.resolved_steps[0]
475
+ with pytest.raises(Exception): # FrozenInstanceError
476
+ rs.step = None # type: ignore[misc]
477
+
478
+ def test_metric_junction_frozen(self) -> None:
479
+ j = MetricJunction(
480
+ step_id="x",
481
+ artifact_type=ArtifactType.RAW_TEXT,
482
+ candidate_metrics=("cer",),
483
+ )
484
+ with pytest.raises(Exception): # FrozenInstanceError
485
+ j.candidate_metrics = () # type: ignore[misc]
486
+
487
+
488
+ # ──────────────────────────────────────────────────────────────────────
489
+ # Intégration Planner + Executor
490
+ # ──────────────────────────────────────────────────────────────────────
491
+
492
+
493
+ class TestPipelineExecutorWithPlanner:
494
+ def test_executor_plan_returns_execution_plan(self) -> None:
495
+ spec = PipelineSpec(
496
+ name="x",
497
+ initial_inputs=(ArtifactType.IMAGE,),
498
+ steps=(PipelineStep(
499
+ id="ocr", kind="ocr", adapter_name="ocr_stub",
500
+ input_types=(ArtifactType.IMAGE,),
501
+ output_types=(ArtifactType.RAW_TEXT,),
502
+ ),),
503
+ )
504
+ executor = PipelineExecutor(
505
+ adapter_resolver=lambda n: _OCRStub(),
506
+ )
507
+ plan = executor.plan(spec)
508
+ assert isinstance(plan, ExecutionPlan)
509
+ assert len(plan.resolved_steps) == 1
510
+
511
+ def test_executor_plan_raises_pipeline_spec_invalid_on_bad_spec(self) -> None:
512
+ spec = PipelineSpec(name="bad", steps=())
513
+ executor = PipelineExecutor(
514
+ adapter_resolver=lambda n: _OCRStub(),
515
+ )
516
+ with pytest.raises(PipelineSpecInvalid, match="invalide"):
517
+ executor.plan(spec)
518
+
519
+ def test_run_plan_executes_pre_planned(self) -> None:
520
+ spec = PipelineSpec(
521
+ name="x",
522
+ initial_inputs=(ArtifactType.IMAGE,),
523
+ steps=(PipelineStep(
524
+ id="ocr", kind="ocr", adapter_name="ocr_stub",
525
+ input_types=(ArtifactType.IMAGE,),
526
+ output_types=(ArtifactType.RAW_TEXT,),
527
+ ),),
528
+ )
529
+ executor = PipelineExecutor(
530
+ adapter_resolver=lambda n: _OCRStub(),
531
+ )
532
+ plan = executor.plan(spec)
533
+
534
+ doc = DocumentRef(id="d1", image_uri="/tmp/img.png")
535
+ ctx = RunContext(
536
+ document_id="d1",
537
+ code_version="1.0.0",
538
+ pipeline_name="x",
539
+ )
540
+ result = executor.run_plan(
541
+ plan=plan,
542
+ document=doc,
543
+ initial_inputs={
544
+ ArtifactType.IMAGE: Artifact(
545
+ id="d1:img", document_id="d1", type=ArtifactType.IMAGE,
546
+ ),
547
+ },
548
+ context=ctx,
549
+ )
550
+ assert result.succeeded
551
+ assert len(result.step_results) == 1
552
+ assert result.step_results[0].step_id == "ocr"
553
+
554
+ def test_run_plan_rejects_non_plan(self) -> None:
555
+ executor = PipelineExecutor(
556
+ adapter_resolver=lambda n: _OCRStub(),
557
+ )
558
+ with pytest.raises(Exception, match="ExecutionPlan"):
559
+ executor.run_plan(
560
+ plan="not a plan", # type: ignore[arg-type]
561
+ document=DocumentRef(id="d1"),
562
+ initial_inputs={},
563
+ context=RunContext(
564
+ document_id="d1", code_version="1.0",
565
+ pipeline_name="x",
566
+ ),
567
+ )
568
+
569
+ def test_run_spec_still_works_via_planning(self) -> None:
570
+ """Sucre run(spec) — plan internement et exécute."""
571
+ spec = PipelineSpec(
572
+ name="x",
573
+ initial_inputs=(ArtifactType.IMAGE,),
574
+ steps=(PipelineStep(
575
+ id="ocr", kind="ocr", adapter_name="ocr_stub",
576
+ input_types=(ArtifactType.IMAGE,),
577
+ output_types=(ArtifactType.RAW_TEXT,),
578
+ ),),
579
+ )
580
+ executor = PipelineExecutor(
581
+ adapter_resolver=lambda n: _OCRStub(),
582
+ )
583
+ doc = DocumentRef(id="d1", image_uri="/tmp/img.png")
584
+ ctx = RunContext(
585
+ document_id="d1",
586
+ code_version="1.0.0",
587
+ pipeline_name="x",
588
+ )
589
+ result = executor.run(
590
+ spec=spec,
591
+ document=doc,
592
+ initial_inputs={
593
+ ArtifactType.IMAGE: Artifact(
594
+ id="d1:img", document_id="d1", type=ArtifactType.IMAGE,
595
+ ),
596
+ },
597
+ context=ctx,
598
+ )
599
+ assert result.succeeded
600
+
601
+ def test_planner_injection(self) -> None:
602
+ """Le caller peut injecter son propre planner (ex: avec
603
+ MetricRegistry pour avoir les jonctions)."""
604
+ custom_planner = PipelinePlanner(
605
+ metric_registry=_registry_with_text_metric(),
606
+ )
607
+ executor = PipelineExecutor(
608
+ adapter_resolver=lambda n: _OCRStub(),
609
+ planner=custom_planner,
610
+ )
611
+ spec = PipelineSpec(
612
+ name="x",
613
+ initial_inputs=(ArtifactType.IMAGE,),
614
+ steps=(PipelineStep(
615
+ id="ocr", kind="ocr", adapter_name="ocr_stub",
616
+ input_types=(ArtifactType.IMAGE,),
617
+ output_types=(ArtifactType.RAW_TEXT,),
618
+ ),),
619
+ )
620
+ plan = executor.plan(spec)
621
+ assert plan.metric_junctions # non vide grâce au registry injecté
622
+
623
+ def test_planner_must_be_pipeline_planner(self) -> None:
624
+ with pytest.raises(Exception, match="PipelinePlanner"):
625
+ PipelineExecutor(
626
+ adapter_resolver=lambda n: _OCRStub(),
627
+ planner="not a planner", # type: ignore[arg-type]
628
+ )