"""Pipelines declarativos para encadenar pasos de procesamiento. Permite definir pipelines como secuencias de funciones decoradas con ``@pipeline_step``, con reintentos automáticos, timeouts y reportes de ejecución. Nota: los timeouts se implementan con ``ThreadPoolExecutor``. Los hilos que exceden el timeout siguen ejecutándose en segundo plano (limitación de Python) — el timeout solo controla cuánto espera el pipeline. """ from __future__ import annotations import contextvars import functools import logging import random import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field from typing import Any, Callable logger = logging.getLogger("orchestration.pipelines") _current_pipeline_name: contextvars.ContextVar[str] = contextvars.ContextVar( "_current_pipeline_name" ) class StepTimeoutError(Exception): """Se lanza cuando un paso excede su tiempo límite.""" def __init__(self, step_name: str, timeout: float) -> None: self.step_name = step_name self.timeout = timeout super().__init__(f"Step '{step_name}' timed out after {timeout}s") @dataclass class StepResult: """Resultado de la ejecución de un paso individual.""" output: Any duration_seconds: float tokens_used: int = 0 cost_usd: float = 0.0 error: str | None = None metadata: dict = field(default_factory=dict) @property def success(self) -> bool: return self.error is None def to_dict(self) -> dict: return { "output": self.output, "duration_seconds": self.duration_seconds, "tokens_used": self.tokens_used, "cost_usd": self.cost_usd, "error": self.error, "metadata": self.metadata, "success": self.success, } @dataclass class PipelineResult: """Resultado agregado de la ejecución de un pipeline completo.""" steps: list[StepResult] = field(default_factory=list) final_output: Any = None total_duration: float = 0.0 total_tokens: int = 0 total_cost: float = 0.0 @property def success(self) -> bool: return all(s.success for s in self.steps) def summary(self) -> str: status = "SUCCESS" if self.success else "FAILED" lines = [ f"Pipeline Status: {status}", f"Steps: {len(self.steps)}", f"Total Duration: {self.total_duration:.2f}s", f"Total Tokens: {self.total_tokens}", f"Total Cost: ${self.total_cost:.4f}", "", "Step Breakdown:", ] for i, step in enumerate(self.steps): mark = "OK" if step.success else "FAIL" lines.append( f" [{i + 1}] {mark} — {step.duration_seconds:.2f}s" f" | tokens={step.tokens_used} | cost=${step.cost_usd:.4f}" ) if step.error: lines.append(f" Error: {step.error}") return "\n".join(lines) def pipeline_step( name: str, max_retries: int = 2, timeout_seconds: float = 30.0 ) -> Callable: """Decorador que convierte una función en un paso de pipeline. Args: name: Nombre del paso para logs y reportes. max_retries: Número máximo de reintentos tras fallo. timeout_seconds: Tiempo máximo de ejecución por intento. Returns: Decorador que envuelve la función y retorna un ``StepResult``. """ def decorator(func: Callable) -> Callable: @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> StepResult: try: pipeline_name = _current_pipeline_name.get() prefix = f"[{pipeline_name}.{name}]" except LookupError: prefix = f"[{name}]" last_error: str | None = None last_duration: float = 0.0 for attempt in range(max_retries + 1): start = time.time() try: with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(func, *args, **kwargs) result = future.result(timeout=timeout_seconds) elapsed = time.time() - start logger.info( "%s completed in %.2fs (attempt %d)", prefix, elapsed, attempt + 1, ) return StepResult(output=result, duration_seconds=elapsed) except TimeoutError: elapsed = time.time() - start last_duration = elapsed last_error = ( f"Step '{name}' timed out after {timeout_seconds}s" ) logger.warning( "%s timeout on attempt %d: %s", prefix, attempt + 1, last_error, ) except Exception as exc: elapsed = time.time() - start last_duration = elapsed last_error = f"{type(exc).__name__}: {exc}" logger.warning( "%s error on attempt %d: %s", prefix, attempt + 1, last_error, ) if attempt < max_retries: delay = min(2**attempt + random.uniform(0, 1), 30) logger.info("%s retrying in %.1fs...", prefix, delay) time.sleep(delay) return StepResult( output=None, error=last_error, duration_seconds=last_duration ) wrapper.step_name = name # type: ignore[attr-defined] return wrapper return decorator class Pipeline: """Pipeline declarativo que encadena pasos secuencialmente. Cada paso recibe la salida del paso anterior. Si un paso falla, el pipeline se detiene inmediatamente. Args: name: Nombre del pipeline para logs. steps: Lista de funciones decoradas con ``@pipeline_step``. """ def __init__(self, name: str, steps: list[Callable]) -> None: self.name = name self.steps = steps @property def step_names(self) -> list[str]: return [ getattr(s, "step_name", s.__name__) for s in self.steps ] def run(self, initial_input: Any) -> PipelineResult: """Ejecuta todos los pasos del pipeline secuencialmente. Args: initial_input: Entrada para el primer paso. Returns: ``PipelineResult`` con resultados de cada paso ejecutado. """ return self._run_steps(self.steps, initial_input) def run_from(self, step_index: int, input_data: Any) -> PipelineResult: """Ejecuta el pipeline desde un paso específico. Args: step_index: Índice del paso desde el cual iniciar (0-based). input_data: Entrada para el paso inicial. Returns: ``PipelineResult`` con resultados de los pasos ejecutados. Raises: ValueError: Si el índice está fuera de rango. """ if step_index < 0 or step_index >= len(self.steps): raise ValueError( f"step_index {step_index} out of range " f"[0, {len(self.steps) - 1}]" ) return self._run_steps(self.steps[step_index:], input_data) def _run_steps( self, steps: list[Callable], initial_input: Any ) -> PipelineResult: token = _current_pipeline_name.set(self.name) try: result = PipelineResult() current_input = initial_input for step_fn in steps: step_result: StepResult = step_fn(current_input) result.steps.append(step_result) if not step_result.success: result.final_output = None break current_input = step_result.output else: result.final_output = current_input result.total_duration = sum( s.duration_seconds for s in result.steps ) result.total_tokens = sum(s.tokens_used for s in result.steps) result.total_cost = sum(s.cost_usd for s in result.steps) return result finally: _current_pipeline_name.reset(token)