File size: 4,717 Bytes
b75c637
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
"""
Module Registry — discovery, registration, and dependency ordering.

The engine discovers pipeline modules from ``modules/`` (lowercase only).
Each module that wants to participate in the pipeline must either:

1. Be listed in ``PIPELINE_STAGES`` (the built-in ordered list), or
2. Register itself via the ``@pipeline_stage`` decorator.

The registry enforces that every registered module exposes a
``run(input: EngineInput) -> EngineOutput`` callable.
"""

from __future__ import annotations

import importlib
import logging
from collections import OrderedDict
from typing import Callable, Dict, List, Optional, Protocol

from engine.io_contract import EngineInput, EngineOutput

logger = logging.getLogger("engine.registry")


# ---------------------------------------------------------------------------
# Protocol that every pipeline module must satisfy
# ---------------------------------------------------------------------------

class PipelineModule(Protocol):
    """Structural type for a pipeline module."""

    def run(self, engine_input: EngineInput) -> EngineOutput: ...


# ---------------------------------------------------------------------------
# Built-in pipeline stage ordering
# ---------------------------------------------------------------------------

PIPELINE_STAGES: List[Dict[str, str]] = [
    {"name": "ingestion",      "module_path": "modules.ingestion.ingest_data"},
    {"name": "preprocessing",  "module_path": "modules.preprocessing.preprocess_data"},
    {"name": "analysis",       "module_path": "modules.ml_analysis.ml_analysis"},
    {"name": "correlation",    "module_path": "modules.correlation.correlate"},
    {"name": "export",         "module_path": "modules.export.export_results"},
]


# ---------------------------------------------------------------------------
# Registry singleton
# ---------------------------------------------------------------------------

_registry: OrderedDict[str, Callable[[EngineInput], EngineOutput]] = OrderedDict()


def register(name: str, run_fn: Callable[[EngineInput], EngineOutput]) -> None:
    """Register a module's run function under *name*."""
    if name in _registry:
        logger.warning("Overwriting existing registration for stage '%s'", name)
    _registry[name] = run_fn
    logger.info("Registered pipeline stage: %s", name)


def get_stage(name: str) -> Optional[Callable[[EngineInput], EngineOutput]]:
    """Return the run function for *name*, or ``None``."""
    return _registry.get(name)


def get_ordered_stages() -> List[str]:
    """Return stage names in pipeline execution order."""
    return list(_registry.keys())


def clear() -> None:
    """Clear all registrations (useful for testing)."""
    _registry.clear()


# ---------------------------------------------------------------------------
# Decorator for ad-hoc registration
# ---------------------------------------------------------------------------

def pipeline_stage(name: str):
    """
    Decorator to register a function as a pipeline stage::

        @pipeline_stage("my_stage")
        def run(engine_input: EngineInput) -> EngineOutput:
            ...
    """
    def decorator(fn: Callable[[EngineInput], EngineOutput]):
        register(name, fn)
        return fn
    return decorator


# ---------------------------------------------------------------------------
# Auto-discovery from PIPELINE_STAGES
# ---------------------------------------------------------------------------

def discover_and_register() -> List[str]:
    """
    Import each module listed in ``PIPELINE_STAGES`` and register its
    ``run`` function.  Returns the list of successfully registered stage names.

    Only discovers from ``modules/`` (lowercase).  The uppercase ``Modules/``
    directory is explicitly excluded.
    """
    registered: List[str] = []
    for stage_def in PIPELINE_STAGES:
        name = stage_def["name"]
        module_path = stage_def["module_path"]
        try:
            mod = importlib.import_module(module_path)
            run_fn = getattr(mod, "run", None)
            if run_fn is None:
                logger.error(
                    "Module '%s' (%s) has no run() function — skipping",
                    name, module_path,
                )
                continue
            register(name, run_fn)
            registered.append(name)
        except ImportError as exc:
            logger.error(
                "Failed to import module '%s' (%s): %s",
                name, module_path, exc,
            )
        except Exception as exc:
            logger.error(
                "Unexpected error loading '%s' (%s): %s",
                name, module_path, exc,
            )
    return registered