| """ |
| Export module — writes final artifacts (JSON summary, per-stage outputs). |
| |
| Engine contract: |
| run(EngineInput) -> EngineOutput |
| |
| The main report generation (HTML, CSV, JSONL) is handled by |
| ``engine.reporting``. This module produces a supplementary JSON |
| summary artifact in the run directory. |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| from pathlib import Path |
| from typing import Any, Dict |
|
|
| from engine.io_contract import ( |
| Artifact, |
| EngineInput, |
| EngineOutput, |
| StageStatus, |
| ) |
|
|
| logger = logging.getLogger("modules.export") |
|
|
|
|
| def run(engine_input: EngineInput) -> EngineOutput: |
| """ |
| Export a JSON summary of the pipeline run. |
| """ |
| try: |
| run_dir = Path(engine_input.run_dir) |
| exports_dir = run_dir / "exports" |
| exports_dir.mkdir(parents=True, exist_ok=True) |
|
|
| |
| summary_data = { |
| "run_id": engine_input.run_id, |
| "total_records": len(engine_input.records), |
| "input_files": [ |
| { |
| "name": f.path.name, |
| "type": f.file_type.value, |
| "size": f.size_bytes, |
| } |
| for f in engine_input.input_spec.files |
| ], |
| "record_sample": [ |
| { |
| "row_id": r.row_id, |
| "source": r.source_file, |
| "entity_name": r.entity_name, |
| "entity_email": r.entity_email, |
| "entity_ip": r.entity_ip, |
| } |
| for r in engine_input.records[:20] |
| ], |
| } |
|
|
| out_path = exports_dir / "summary.json" |
| out_path.write_text( |
| json.dumps(summary_data, indent=2, ensure_ascii=False, default=str), |
| encoding="utf-8", |
| ) |
| logger.info("Exported summary to %s", out_path) |
|
|
| return EngineOutput( |
| stage="export", |
| status=StageStatus.SUCCESS, |
| records=engine_input.records, |
| artifacts=[ |
| Artifact( |
| name="summary.json", |
| path=out_path, |
| mime_type="application/json", |
| description="Pipeline run summary", |
| ) |
| ], |
| summary=f"Exported summary.json ({len(engine_input.records)} records)", |
| ) |
| except Exception as exc: |
| logger.error("Export failed: %s", exc, exc_info=True) |
| return EngineOutput( |
| stage="export", |
| status=StageStatus.FAILED, |
| error=str(exc), |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def export(data: Any, outpath: str) -> str: |
| """Legacy wrapper (deprecated). Use ``run()`` instead.""" |
| with open(outpath, "w") as f: |
| f.write(str(data)) |
| return outpath |
|
|
|
|
| if __name__ == "__main__": |
| print(export({"result": 123}, "output.txt")) |
|
|