""" Dataset builder: synthetic logs + AI-generated report targets. Produces train.csv and val.csv for model fine-tuning. """ import csv import json import random from pathlib import Path from typing import Optional from .mitre_templates import ATTACK_WRITEUPS, INCIDENT_TEMPLATES from .synthetic_logs import generate_single_log def logs_to_text(logs: list[str]) -> str: """Convert list of JSON log strings to single text block.""" return "\n".join(logs) def generate_report_target(logs_text: str, ttps: list[str], cves: list[str]) -> str: """Generate a target report summary from logs and extracted entities.""" parts: list[str] = [] if ttps: for t in ttps[:3]: if t in ATTACK_WRITEUPS: parts.append(ATTACK_WRITEUPS[t]) if not parts and INCIDENT_TEMPLATES: _, summary = random.choice(INCIDENT_TEMPLATES) parts.append(summary) if not parts: parts.append( f"Incident involving {len(ttps)} TTP(s), {len(cves)} CVE(s). " "Further analysis recommended." ) return " ".join(parts) def build_dataset( num_train: int = 500, num_val: int = 100, samples_per_example: int = 5, output_dir: str = "data", ) -> tuple[Path, Path]: """ Build train and validation CSV datasets. Format: input_text, target_report """ from .preprocessing import extract_iocs output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) train_path = output_path / "train.csv" val_path = output_path / "val.csv" def _generate_split(n: int) -> list[tuple[str, str]]: rows: list[tuple[str, str]] = [] for _ in range(n): logs = [generate_single_log() for _ in range(samples_per_example)] text = logs_to_text(logs) iocs = extract_iocs(text) ttps = [x for x in iocs if x.startswith("T") and x[1:].replace(".", "").isdigit()] cves = [x for x in iocs if x.upper().startswith("CVE-")] target = generate_report_target(text, ttps, cves) rows.append((text, target)) return rows for path, n in [(train_path, num_train), (val_path, num_val)]: rows = _generate_split(n) with open(path, "w", newline="", encoding="utf-8") as f: w = csv.writer(f) w.writerow(["input_text", "target_report"]) w.writerows(rows) print(f"Wrote {len(rows)} rows -> {path}") return train_path, val_path if __name__ == "__main__": import argparse ap = argparse.ArgumentParser() ap.add_argument("--train", type=int, default=500) ap.add_argument("--val", type=int, default=100) ap.add_argument("--output", default="data") args = ap.parse_args() build_dataset(num_train=args.train, num_val=args.val, output_dir=args.output)