mod-osint / engine /registry.py
moddux's picture
deploy: HF sanitized GUI snapshot
b75c637
"""
Module Registry — discovery, registration, and dependency ordering.
The engine discovers pipeline modules from ``modules/`` (lowercase only).
Each module that wants to participate in the pipeline must either:
1. Be listed in ``PIPELINE_STAGES`` (the built-in ordered list), or
2. Register itself via the ``@pipeline_stage`` decorator.
The registry enforces that every registered module exposes a
``run(input: EngineInput) -> EngineOutput`` callable.
"""
from __future__ import annotations
import importlib
import logging
from collections import OrderedDict
from typing import Callable, Dict, List, Optional, Protocol
from engine.io_contract import EngineInput, EngineOutput
logger = logging.getLogger("engine.registry")
# ---------------------------------------------------------------------------
# Protocol that every pipeline module must satisfy
# ---------------------------------------------------------------------------
class PipelineModule(Protocol):
"""Structural type for a pipeline module."""
def run(self, engine_input: EngineInput) -> EngineOutput: ...
# ---------------------------------------------------------------------------
# Built-in pipeline stage ordering
# ---------------------------------------------------------------------------
PIPELINE_STAGES: List[Dict[str, str]] = [
{"name": "ingestion", "module_path": "modules.ingestion.ingest_data"},
{"name": "preprocessing", "module_path": "modules.preprocessing.preprocess_data"},
{"name": "analysis", "module_path": "modules.ml_analysis.ml_analysis"},
{"name": "correlation", "module_path": "modules.correlation.correlate"},
{"name": "export", "module_path": "modules.export.export_results"},
]
# ---------------------------------------------------------------------------
# Registry singleton
# ---------------------------------------------------------------------------
_registry: OrderedDict[str, Callable[[EngineInput], EngineOutput]] = OrderedDict()
def register(name: str, run_fn: Callable[[EngineInput], EngineOutput]) -> None:
"""Register a module's run function under *name*."""
if name in _registry:
logger.warning("Overwriting existing registration for stage '%s'", name)
_registry[name] = run_fn
logger.info("Registered pipeline stage: %s", name)
def get_stage(name: str) -> Optional[Callable[[EngineInput], EngineOutput]]:
"""Return the run function for *name*, or ``None``."""
return _registry.get(name)
def get_ordered_stages() -> List[str]:
"""Return stage names in pipeline execution order."""
return list(_registry.keys())
def clear() -> None:
"""Clear all registrations (useful for testing)."""
_registry.clear()
# ---------------------------------------------------------------------------
# Decorator for ad-hoc registration
# ---------------------------------------------------------------------------
def pipeline_stage(name: str):
"""
Decorator to register a function as a pipeline stage::
@pipeline_stage("my_stage")
def run(engine_input: EngineInput) -> EngineOutput:
...
"""
def decorator(fn: Callable[[EngineInput], EngineOutput]):
register(name, fn)
return fn
return decorator
# ---------------------------------------------------------------------------
# Auto-discovery from PIPELINE_STAGES
# ---------------------------------------------------------------------------
def discover_and_register() -> List[str]:
"""
Import each module listed in ``PIPELINE_STAGES`` and register its
``run`` function. Returns the list of successfully registered stage names.
Only discovers from ``modules/`` (lowercase). The uppercase ``Modules/``
directory is explicitly excluded.
"""
registered: List[str] = []
for stage_def in PIPELINE_STAGES:
name = stage_def["name"]
module_path = stage_def["module_path"]
try:
mod = importlib.import_module(module_path)
run_fn = getattr(mod, "run", None)
if run_fn is None:
logger.error(
"Module '%s' (%s) has no run() function — skipping",
name, module_path,
)
continue
register(name, run_fn)
registered.append(name)
except ImportError as exc:
logger.error(
"Failed to import module '%s' (%s): %s",
name, module_path, exc,
)
except Exception as exc:
logger.error(
"Unexpected error loading '%s' (%s): %s",
name, module_path, exc,
)
return registered