| """ |
| Reporting — HTML report generation + JSONL/CSV exports. |
| |
| Produces: |
| runs/<run_id>/report/index.html – human-browsable report |
| runs/<run_id>/exports/normalized.jsonl |
| runs/<run_id>/exports/records.csv |
| """ |
|
|
| from __future__ import annotations |
|
|
| import csv |
| import json |
| import logging |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Any, Dict, List |
|
|
| from engine.io_contract import ( |
| Artifact, |
| EngineOutput, |
| InputSpec, |
| NormalizedRecord, |
| ) |
|
|
| logger = logging.getLogger("engine.reporting") |
|
|
| |
| |
| |
|
|
| _TEMPLATE_DIR = Path(__file__).parent / "templates" |
|
|
|
|
| def _render_html(template_name: str, context: Dict[str, Any]) -> str: |
| """Render a Jinja2 template from the engine/templates/ directory.""" |
| try: |
| from jinja2 import Environment, FileSystemLoader |
| except ImportError: |
| logger.warning("jinja2 not installed — HTML report will be a plain summary") |
| return _fallback_html(context) |
|
|
| env = Environment( |
| loader=FileSystemLoader(str(_TEMPLATE_DIR)), |
| autoescape=True, |
| ) |
| template = env.get_template(template_name) |
| return template.render(**context) |
|
|
|
|
| def _fallback_html(context: Dict[str, Any]) -> str: |
| """Minimal HTML when Jinja2 is unavailable.""" |
| return ( |
| f"<html><body><h1>MOD-OSINT Report — {context.get('run_id', '?')}</h1>" |
| f"<p>Records: {context.get('total_records', 0)}</p>" |
| f"<p>Generated: {context.get('generated_at', '')}</p>" |
| f"<p><em>Install jinja2 for the full HTML report.</em></p>" |
| f"</body></html>" |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def export_jsonl(records: List[NormalizedRecord], out_path: Path) -> Path: |
| """Write records as newline-delimited JSON.""" |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| with open(out_path, "w", encoding="utf-8") as f: |
| for r in records: |
| f.write(r.model_dump_json() + "\n") |
| logger.info("Exported %d records to %s", len(records), out_path) |
| return out_path |
|
|
|
|
| def export_csv(records: List[NormalizedRecord], out_path: Path) -> Path: |
| """Write records as CSV.""" |
| out_path.parent.mkdir(parents=True, exist_ok=True) |
| if not records: |
| out_path.write_text("") |
| return out_path |
|
|
| fieldnames = list(records[0].model_dump().keys()) |
| with open(out_path, "w", newline="", encoding="utf-8") as f: |
| writer = csv.DictWriter(f, fieldnames=fieldnames) |
| writer.writeheader() |
| for r in records: |
| row = r.model_dump() |
| |
| for k, v in row.items(): |
| if isinstance(v, (dict, list)): |
| row[k] = json.dumps(v, ensure_ascii=False, default=str) |
| elif isinstance(v, Path): |
| row[k] = str(v) |
| elif v is None: |
| row[k] = "" |
| writer.writerow(row) |
| logger.info("Exported %d records to %s", len(records), out_path) |
| return out_path |
|
|
|
|
| |
| |
| |
|
|
| def generate_report( |
| run_id: str, |
| run_dir: Path, |
| input_spec: InputSpec, |
| records: List[NormalizedRecord], |
| stage_outputs: Dict[str, EngineOutput], |
| ) -> List[Artifact]: |
| """ |
| Generate the full report suite: |
| - HTML report at ``run_dir/report/index.html`` |
| - JSONL export at ``run_dir/exports/normalized.jsonl`` |
| - CSV export at ``run_dir/exports/records.csv`` |
| |
| Returns a list of ``Artifact`` objects. |
| """ |
| report_dir = run_dir / "report" |
| exports_dir = run_dir / "exports" |
| report_dir.mkdir(parents=True, exist_ok=True) |
| exports_dir.mkdir(parents=True, exist_ok=True) |
|
|
| artifacts: List[Artifact] = [] |
|
|
| |
| jsonl_path = export_jsonl(records, exports_dir / "normalized.jsonl") |
| artifacts.append(Artifact( |
| name="normalized.jsonl", |
| path=jsonl_path, |
| mime_type="application/jsonl", |
| description="All normalized records in JSONL format", |
| )) |
|
|
| |
| csv_path = export_csv(records, exports_dir / "records.csv") |
| artifacts.append(Artifact( |
| name="records.csv", |
| path=csv_path, |
| mime_type="text/csv", |
| description="All normalized records in CSV format", |
| )) |
|
|
| |
| preview_limit = 50 |
| stages_data = [] |
| for name, out in stage_outputs.items(): |
| stages_data.append({ |
| "stage": name, |
| "status": out.status.value, |
| "summary": out.summary, |
| "error": out.error, |
| }) |
|
|
| input_files_data = [] |
| for f in input_spec.files: |
| input_files_data.append({ |
| "name": f.path.name, |
| "file_type": f.file_type.value, |
| "size_bytes": f.size_bytes, |
| "sha256": f.sha256, |
| }) |
|
|
| records_preview = [] |
| for r in records[:preview_limit]: |
| d = r.model_dump() |
| |
| d["source_type"] = d.get("source_type", "") |
| if hasattr(d["source_type"], "value"): |
| d["source_type"] = d["source_type"].value |
| records_preview.append(d) |
|
|
| |
| artifacts_data = [] |
| for a in artifacts: |
| try: |
| rel = a.path.relative_to(report_dir) |
| except ValueError: |
| rel = Path("..") / "exports" / a.path.name |
| artifacts_data.append({"name": a.name, "rel_path": str(rel)}) |
|
|
| context = { |
| "run_id": run_id, |
| "generated_at": datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC"), |
| "total_records": len(records), |
| "input_file_count": len(input_spec.files), |
| "stages": stages_data, |
| "input_files": input_files_data, |
| "records_preview": records_preview, |
| "preview_limit": preview_limit, |
| "artifacts": artifacts_data, |
| } |
|
|
| html_content = _render_html("report.html", context) |
| html_path = report_dir / "index.html" |
| html_path.write_text(html_content, encoding="utf-8") |
| logger.info("HTML report written to %s", html_path) |
|
|
| artifacts.append(Artifact( |
| name="index.html", |
| path=html_path, |
| mime_type="text/html", |
| description="Human-browsable pipeline report", |
| )) |
|
|
| return artifacts |
|
|