""" Export module — writes final artifacts (JSON summary, per-stage outputs). Engine contract: run(EngineInput) -> EngineOutput The main report generation (HTML, CSV, JSONL) is handled by ``engine.reporting``. This module produces a supplementary JSON summary artifact in the run directory. """ from __future__ import annotations import json import logging from pathlib import Path from typing import Any, Dict from engine.io_contract import ( Artifact, EngineInput, EngineOutput, StageStatus, ) logger = logging.getLogger("modules.export") def run(engine_input: EngineInput) -> EngineOutput: """ Export a JSON summary of the pipeline run. """ try: run_dir = Path(engine_input.run_dir) exports_dir = run_dir / "exports" exports_dir.mkdir(parents=True, exist_ok=True) # Build summary summary_data = { "run_id": engine_input.run_id, "total_records": len(engine_input.records), "input_files": [ { "name": f.path.name, "type": f.file_type.value, "size": f.size_bytes, } for f in engine_input.input_spec.files ], "record_sample": [ { "row_id": r.row_id, "source": r.source_file, "entity_name": r.entity_name, "entity_email": r.entity_email, "entity_ip": r.entity_ip, } for r in engine_input.records[:20] ], } out_path = exports_dir / "summary.json" out_path.write_text( json.dumps(summary_data, indent=2, ensure_ascii=False, default=str), encoding="utf-8", ) logger.info("Exported summary to %s", out_path) return EngineOutput( stage="export", status=StageStatus.SUCCESS, records=engine_input.records, # pass through artifacts=[ Artifact( name="summary.json", path=out_path, mime_type="application/json", description="Pipeline run summary", ) ], summary=f"Exported summary.json ({len(engine_input.records)} records)", ) except Exception as exc: logger.error("Export failed: %s", exc, exc_info=True) return EngineOutput( stage="export", status=StageStatus.FAILED, error=str(exc), ) # --------------------------------------------------------------------------- # Legacy compatibility # --------------------------------------------------------------------------- def export(data: Any, outpath: str) -> str: """Legacy wrapper (deprecated). Use ``run()`` instead.""" with open(outpath, "w") as f: f.write(str(data)) return outpath if __name__ == "__main__": print(export({"result": 123}, "output.txt"))