File size: 4,717 Bytes
b75c637 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | """
Module Registry — discovery, registration, and dependency ordering.
The engine discovers pipeline modules from ``modules/`` (lowercase only).
Each module that wants to participate in the pipeline must either:
1. Be listed in ``PIPELINE_STAGES`` (the built-in ordered list), or
2. Register itself via the ``@pipeline_stage`` decorator.
The registry enforces that every registered module exposes a
``run(input: EngineInput) -> EngineOutput`` callable.
"""
from __future__ import annotations
import importlib
import logging
from collections import OrderedDict
from typing import Callable, Dict, List, Optional, Protocol
from engine.io_contract import EngineInput, EngineOutput
logger = logging.getLogger("engine.registry")
# ---------------------------------------------------------------------------
# Protocol that every pipeline module must satisfy
# ---------------------------------------------------------------------------
class PipelineModule(Protocol):
"""Structural type for a pipeline module."""
def run(self, engine_input: EngineInput) -> EngineOutput: ...
# ---------------------------------------------------------------------------
# Built-in pipeline stage ordering
# ---------------------------------------------------------------------------
PIPELINE_STAGES: List[Dict[str, str]] = [
{"name": "ingestion", "module_path": "modules.ingestion.ingest_data"},
{"name": "preprocessing", "module_path": "modules.preprocessing.preprocess_data"},
{"name": "analysis", "module_path": "modules.ml_analysis.ml_analysis"},
{"name": "correlation", "module_path": "modules.correlation.correlate"},
{"name": "export", "module_path": "modules.export.export_results"},
]
# ---------------------------------------------------------------------------
# Registry singleton
# ---------------------------------------------------------------------------
_registry: OrderedDict[str, Callable[[EngineInput], EngineOutput]] = OrderedDict()
def register(name: str, run_fn: Callable[[EngineInput], EngineOutput]) -> None:
"""Register a module's run function under *name*."""
if name in _registry:
logger.warning("Overwriting existing registration for stage '%s'", name)
_registry[name] = run_fn
logger.info("Registered pipeline stage: %s", name)
def get_stage(name: str) -> Optional[Callable[[EngineInput], EngineOutput]]:
"""Return the run function for *name*, or ``None``."""
return _registry.get(name)
def get_ordered_stages() -> List[str]:
"""Return stage names in pipeline execution order."""
return list(_registry.keys())
def clear() -> None:
"""Clear all registrations (useful for testing)."""
_registry.clear()
# ---------------------------------------------------------------------------
# Decorator for ad-hoc registration
# ---------------------------------------------------------------------------
def pipeline_stage(name: str):
"""
Decorator to register a function as a pipeline stage::
@pipeline_stage("my_stage")
def run(engine_input: EngineInput) -> EngineOutput:
...
"""
def decorator(fn: Callable[[EngineInput], EngineOutput]):
register(name, fn)
return fn
return decorator
# ---------------------------------------------------------------------------
# Auto-discovery from PIPELINE_STAGES
# ---------------------------------------------------------------------------
def discover_and_register() -> List[str]:
"""
Import each module listed in ``PIPELINE_STAGES`` and register its
``run`` function. Returns the list of successfully registered stage names.
Only discovers from ``modules/`` (lowercase). The uppercase ``Modules/``
directory is explicitly excluded.
"""
registered: List[str] = []
for stage_def in PIPELINE_STAGES:
name = stage_def["name"]
module_path = stage_def["module_path"]
try:
mod = importlib.import_module(module_path)
run_fn = getattr(mod, "run", None)
if run_fn is None:
logger.error(
"Module '%s' (%s) has no run() function — skipping",
name, module_path,
)
continue
register(name, run_fn)
registered.append(name)
except ImportError as exc:
logger.error(
"Failed to import module '%s' (%s): %s",
name, module_path, exc,
)
except Exception as exc:
logger.error(
"Unexpected error loading '%s' (%s): %s",
name, module_path, exc,
)
return registered
|