""" Module Registry — discovery, registration, and dependency ordering. The engine discovers pipeline modules from ``modules/`` (lowercase only). Each module that wants to participate in the pipeline must either: 1. Be listed in ``PIPELINE_STAGES`` (the built-in ordered list), or 2. Register itself via the ``@pipeline_stage`` decorator. The registry enforces that every registered module exposes a ``run(input: EngineInput) -> EngineOutput`` callable. """ from __future__ import annotations import importlib import logging from collections import OrderedDict from typing import Callable, Dict, List, Optional, Protocol from engine.io_contract import EngineInput, EngineOutput logger = logging.getLogger("engine.registry") # --------------------------------------------------------------------------- # Protocol that every pipeline module must satisfy # --------------------------------------------------------------------------- class PipelineModule(Protocol): """Structural type for a pipeline module.""" def run(self, engine_input: EngineInput) -> EngineOutput: ... # --------------------------------------------------------------------------- # Built-in pipeline stage ordering # --------------------------------------------------------------------------- PIPELINE_STAGES: List[Dict[str, str]] = [ {"name": "ingestion", "module_path": "modules.ingestion.ingest_data"}, {"name": "preprocessing", "module_path": "modules.preprocessing.preprocess_data"}, {"name": "analysis", "module_path": "modules.ml_analysis.ml_analysis"}, {"name": "correlation", "module_path": "modules.correlation.correlate"}, {"name": "export", "module_path": "modules.export.export_results"}, ] # --------------------------------------------------------------------------- # Registry singleton # --------------------------------------------------------------------------- _registry: OrderedDict[str, Callable[[EngineInput], EngineOutput]] = OrderedDict() def register(name: str, run_fn: Callable[[EngineInput], EngineOutput]) -> None: """Register a module's run function under *name*.""" if name in _registry: logger.warning("Overwriting existing registration for stage '%s'", name) _registry[name] = run_fn logger.info("Registered pipeline stage: %s", name) def get_stage(name: str) -> Optional[Callable[[EngineInput], EngineOutput]]: """Return the run function for *name*, or ``None``.""" return _registry.get(name) def get_ordered_stages() -> List[str]: """Return stage names in pipeline execution order.""" return list(_registry.keys()) def clear() -> None: """Clear all registrations (useful for testing).""" _registry.clear() # --------------------------------------------------------------------------- # Decorator for ad-hoc registration # --------------------------------------------------------------------------- def pipeline_stage(name: str): """ Decorator to register a function as a pipeline stage:: @pipeline_stage("my_stage") def run(engine_input: EngineInput) -> EngineOutput: ... """ def decorator(fn: Callable[[EngineInput], EngineOutput]): register(name, fn) return fn return decorator # --------------------------------------------------------------------------- # Auto-discovery from PIPELINE_STAGES # --------------------------------------------------------------------------- def discover_and_register() -> List[str]: """ Import each module listed in ``PIPELINE_STAGES`` and register its ``run`` function. Returns the list of successfully registered stage names. Only discovers from ``modules/`` (lowercase). The uppercase ``Modules/`` directory is explicitly excluded. """ registered: List[str] = [] for stage_def in PIPELINE_STAGES: name = stage_def["name"] module_path = stage_def["module_path"] try: mod = importlib.import_module(module_path) run_fn = getattr(mod, "run", None) if run_fn is None: logger.error( "Module '%s' (%s) has no run() function — skipping", name, module_path, ) continue register(name, run_fn) registered.append(name) except ImportError as exc: logger.error( "Failed to import module '%s' (%s): %s", name, module_path, exc, ) except Exception as exc: logger.error( "Unexpected error loading '%s' (%s): %s", name, module_path, exc, ) return registered