| """ |
| Ingestion module — reads input files and produces NormalizedRecords. |
| |
| Engine contract: |
| run(EngineInput) -> EngineOutput |
| |
| The heavy lifting (file parsing, entity extraction) is delegated to |
| ``engine.normalize``. This module acts as the pipeline-stage wrapper. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import logging |
| from typing import Any, Dict |
|
|
| from engine.io_contract import EngineInput, EngineOutput, NormalizedRecord, StageStatus |
| from engine.normalize import normalize_files |
|
|
| logger = logging.getLogger("modules.ingestion") |
|
|
|
|
| def run(engine_input: EngineInput) -> EngineOutput: |
| """ |
| Ingest files described by ``engine_input.input_spec`` and return |
| normalized records. |
| """ |
| try: |
| records = normalize_files(engine_input.input_spec) |
| return EngineOutput( |
| stage="ingestion", |
| status=StageStatus.SUCCESS, |
| records=records, |
| summary=f"Ingested {len(records)} records from {len(engine_input.input_spec.files)} files", |
| ) |
| except Exception as exc: |
| logger.error("Ingestion failed: %s", exc, exc_info=True) |
| return EngineOutput( |
| stage="ingestion", |
| status=StageStatus.FAILED, |
| error=str(exc), |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def ingest(file_path: str) -> Dict[str, Any]: |
| """Legacy wrapper (deprecated). Use ``run()`` instead.""" |
| return {"file": file_path, "status": "ingested"} |
|
|
|
|
| if __name__ == "__main__": |
| print(ingest("input.txt")) |
|
|