File size: 1,686 Bytes
b75c637 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 | """
Ingestion module — reads input files and produces NormalizedRecords.
Engine contract:
run(EngineInput) -> EngineOutput
The heavy lifting (file parsing, entity extraction) is delegated to
``engine.normalize``. This module acts as the pipeline-stage wrapper.
"""
from __future__ import annotations
import logging
from typing import Any, Dict
from engine.io_contract import EngineInput, EngineOutput, NormalizedRecord, StageStatus
from engine.normalize import normalize_files
logger = logging.getLogger("modules.ingestion")
def run(engine_input: EngineInput) -> EngineOutput:
"""
Ingest files described by ``engine_input.input_spec`` and return
normalized records.
"""
try:
records = normalize_files(engine_input.input_spec)
return EngineOutput(
stage="ingestion",
status=StageStatus.SUCCESS,
records=records,
summary=f"Ingested {len(records)} records from {len(engine_input.input_spec.files)} files",
)
except Exception as exc:
logger.error("Ingestion failed: %s", exc, exc_info=True)
return EngineOutput(
stage="ingestion",
status=StageStatus.FAILED,
error=str(exc),
)
# ---------------------------------------------------------------------------
# Legacy compatibility — keep old function signature working
# ---------------------------------------------------------------------------
def ingest(file_path: str) -> Dict[str, Any]:
"""Legacy wrapper (deprecated). Use ``run()`` instead."""
return {"file": file_path, "status": "ingested"}
if __name__ == "__main__":
print(ingest("input.txt"))
|