| """ |
| Module Registry — discovery, registration, and dependency ordering. |
| |
| The engine discovers pipeline modules from ``modules/`` (lowercase only). |
| Each module that wants to participate in the pipeline must either: |
| |
| 1. Be listed in ``PIPELINE_STAGES`` (the built-in ordered list), or |
| 2. Register itself via the ``@pipeline_stage`` decorator. |
| |
| The registry enforces that every registered module exposes a |
| ``run(input: EngineInput) -> EngineOutput`` callable. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import importlib |
| import logging |
| from collections import OrderedDict |
| from typing import Callable, Dict, List, Optional, Protocol |
|
|
| from engine.io_contract import EngineInput, EngineOutput |
|
|
| logger = logging.getLogger("engine.registry") |
|
|
|
|
| |
| |
| |
|
|
| class PipelineModule(Protocol): |
| """Structural type for a pipeline module.""" |
|
|
| def run(self, engine_input: EngineInput) -> EngineOutput: ... |
|
|
|
|
| |
| |
| |
|
|
| PIPELINE_STAGES: List[Dict[str, str]] = [ |
| {"name": "ingestion", "module_path": "modules.ingestion.ingest_data"}, |
| {"name": "preprocessing", "module_path": "modules.preprocessing.preprocess_data"}, |
| {"name": "analysis", "module_path": "modules.ml_analysis.ml_analysis"}, |
| {"name": "correlation", "module_path": "modules.correlation.correlate"}, |
| {"name": "export", "module_path": "modules.export.export_results"}, |
| ] |
|
|
|
|
| |
| |
| |
|
|
| _registry: OrderedDict[str, Callable[[EngineInput], EngineOutput]] = OrderedDict() |
|
|
|
|
| def register(name: str, run_fn: Callable[[EngineInput], EngineOutput]) -> None: |
| """Register a module's run function under *name*.""" |
| if name in _registry: |
| logger.warning("Overwriting existing registration for stage '%s'", name) |
| _registry[name] = run_fn |
| logger.info("Registered pipeline stage: %s", name) |
|
|
|
|
| def get_stage(name: str) -> Optional[Callable[[EngineInput], EngineOutput]]: |
| """Return the run function for *name*, or ``None``.""" |
| return _registry.get(name) |
|
|
|
|
| def get_ordered_stages() -> List[str]: |
| """Return stage names in pipeline execution order.""" |
| return list(_registry.keys()) |
|
|
|
|
| def clear() -> None: |
| """Clear all registrations (useful for testing).""" |
| _registry.clear() |
|
|
|
|
| |
| |
| |
|
|
| def pipeline_stage(name: str): |
| """ |
| Decorator to register a function as a pipeline stage:: |
| |
| @pipeline_stage("my_stage") |
| def run(engine_input: EngineInput) -> EngineOutput: |
| ... |
| """ |
| def decorator(fn: Callable[[EngineInput], EngineOutput]): |
| register(name, fn) |
| return fn |
| return decorator |
|
|
|
|
| |
| |
| |
|
|
| def discover_and_register() -> List[str]: |
| """ |
| Import each module listed in ``PIPELINE_STAGES`` and register its |
| ``run`` function. Returns the list of successfully registered stage names. |
| |
| Only discovers from ``modules/`` (lowercase). The uppercase ``Modules/`` |
| directory is explicitly excluded. |
| """ |
| registered: List[str] = [] |
| for stage_def in PIPELINE_STAGES: |
| name = stage_def["name"] |
| module_path = stage_def["module_path"] |
| try: |
| mod = importlib.import_module(module_path) |
| run_fn = getattr(mod, "run", None) |
| if run_fn is None: |
| logger.error( |
| "Module '%s' (%s) has no run() function — skipping", |
| name, module_path, |
| ) |
| continue |
| register(name, run_fn) |
| registered.append(name) |
| except ImportError as exc: |
| logger.error( |
| "Failed to import module '%s' (%s): %s", |
| name, module_path, exc, |
| ) |
| except Exception as exc: |
| logger.error( |
| "Unexpected error loading '%s' (%s): %s", |
| name, module_path, exc, |
| ) |
| return registered |
|
|