Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| import json | |
| import sys | |
| from pathlib import Path | |
| LOG_TABLES = {"email_logs", "auth_logs", "netflow", "process_events", "alerts"} | |
| def _err(msg): | |
| print(f"ERROR: {msg}") | |
| def _load_json(path): | |
| with path.open() as f: | |
| return json.load(f) | |
| def validate_seed(seed): | |
| errors = 0 | |
| users = {u["user_id"] for u in seed["entities"]["users"]} | |
| hosts = {h["host_id"] for h in seed["entities"]["hosts"]} | |
| domains = {d["domain"] for d in seed["entities"]["domains"]} | |
| targets = {t["target_id"] for t in seed["entities"]["data_targets"]} | |
| emails = {e["email_id"] for e in seed["seed_artifacts"]["emails"]} | |
| log_templates = {t["template_id"]: t for t in seed["seed_artifacts"]["log_templates"]} | |
| # Top-level references | |
| if seed["patient_zero_host"] not in hosts: | |
| _err("patient_zero_host not in entities.hosts") | |
| errors += 1 | |
| if seed["compromised_user"] not in users: | |
| _err("compromised_user not in entities.users") | |
| errors += 1 | |
| if seed["attacker_domain"] not in domains: | |
| _err("attacker_domain not in entities.domains") | |
| errors += 1 | |
| if seed["data_target"] not in targets: | |
| _err("data_target not in entities.data_targets") | |
| errors += 1 | |
| # attack_plan references | |
| ap = seed["attack_plan"] | |
| if ap["phishing_email_id"] not in emails: | |
| _err("attack_plan.phishing_email_id not in seed_artifacts.emails") | |
| errors += 1 | |
| if ap["credentials_used"]["user_id"] not in users: | |
| _err("attack_plan.credentials_used.user_id not in entities.users") | |
| errors += 1 | |
| for hop in ap["lateral_path"]: | |
| if hop["src_host"] not in hosts: | |
| _err("attack_plan.lateral_path.src_host not in entities.hosts") | |
| errors += 1 | |
| if hop["dst_host"] not in hosts: | |
| _err("attack_plan.lateral_path.dst_host not in entities.hosts") | |
| errors += 1 | |
| if ap["data_access"]["target_id"] not in targets: | |
| _err("attack_plan.data_access.target_id not in entities.data_targets") | |
| errors += 1 | |
| if ap["exfiltration"]["destination_domain"] not in domains: | |
| _err("attack_plan.exfiltration.destination_domain not in entities.domains") | |
| errors += 1 | |
| # timeline artifacts (attack_plan timeline + attack_graph initial_artifacts) | |
| artifact_events = list(ap.get("timeline", [])) | |
| attack_graph = seed.get("attack_graph") | |
| if attack_graph: | |
| for art in attack_graph.get("initial_artifacts", []): | |
| artifact_events.append({"step": art.get("step", 0), "artifacts": [art]}) | |
| for item in artifact_events: | |
| for art in item["artifacts"]: | |
| art_type = art["artifact_type"] | |
| art_id = art["artifact_id"] | |
| if art_type == "email": | |
| if art_id not in emails: | |
| _err("timeline artifact email not in seed_artifacts.emails") | |
| errors += 1 | |
| elif art_type == "log_template": | |
| if art_id not in log_templates: | |
| _err("timeline artifact log_template not in seed_artifacts.log_templates") | |
| errors += 1 | |
| elif art_type == "alert": | |
| if art_id not in log_templates: | |
| _err("timeline artifact alert not in seed_artifacts.log_templates") | |
| errors += 1 | |
| else: | |
| table = log_templates[art_id]["table"] | |
| if table != "alerts": | |
| _err("timeline artifact alert must reference log_template with table=alerts") | |
| errors += 1 | |
| variant_action = art.get("variant_action_type") | |
| if variant_action and variant_action not in { | |
| "lateral_move", | |
| "lateral_move_alt", | |
| "exfiltrate", | |
| "exfiltrate_alt", | |
| }: | |
| _err("timeline artifact variant_action_type not allowed") | |
| errors += 1 | |
| # prompt injection mapping | |
| injections = seed["prompt_injection_payloads"] | |
| injection_ids = {p["injection_id"] for p in injections} | |
| if len(injection_ids) != len(injections): | |
| _err("prompt_injection_payloads injection_id must be unique") | |
| errors += 1 | |
| template_by_injection = { | |
| t.get("injection_id"): t for t in seed["seed_artifacts"]["log_templates"] if t.get("injection_id") | |
| } | |
| timeline_steps = {} | |
| for item in artifact_events: | |
| for art in item["artifacts"]: | |
| if art["artifact_type"] in ("log_template", "alert"): | |
| timeline_steps.setdefault(art["artifact_id"], item["step"]) | |
| for p in injections: | |
| inj_id = p["injection_id"] | |
| if p["surface"] == "email": | |
| if not any(e.get("injection_id") == inj_id for e in seed["seed_artifacts"]["emails"]): | |
| _err("email injection_id not referenced by any seed_artifacts.emails") | |
| errors += 1 | |
| elif p["surface"] == "alert": | |
| if not any( | |
| t.get("injection_id") == inj_id and t.get("table") == "alerts" | |
| for t in seed["seed_artifacts"]["log_templates"] | |
| ): | |
| _err("alert injection_id not referenced by any alerts log_template") | |
| errors += 1 | |
| else: | |
| template = template_by_injection.get(inj_id) | |
| if template: | |
| step = timeline_steps.get(template.get("template_id")) | |
| if step is not None: | |
| expected = f"alert-{seed['scenario_id']}-{step}" | |
| evidence_ids = set(p.get("evidence_ids", [])) | |
| if expected not in evidence_ids: | |
| _err("alert injection evidence_ids missing expected alert id") | |
| errors += 1 | |
| elif p["surface"] == "log": | |
| template = template_by_injection.get(inj_id) | |
| if not template: | |
| _err("log injection_id not referenced by any log_template") | |
| errors += 1 | |
| else: | |
| table = template.get("table") | |
| if table not in {"auth_logs", "netflow", "process_events"}: | |
| _err("log injection_id must reference auth_logs/netflow/process_events template") | |
| errors += 1 | |
| step = timeline_steps.get(template.get("template_id")) | |
| if step is None: | |
| _err("log injection template_id not present in timeline") | |
| errors += 1 | |
| else: | |
| expected = None | |
| if table == "auth_logs": | |
| expected = f"auth-{seed['scenario_id']}-{step}" | |
| elif table == "netflow": | |
| expected = f"flow-{seed['scenario_id']}-{step}" | |
| elif table == "process_events": | |
| expected = f"proc-{seed['scenario_id']}-{step}" | |
| evidence_ids = set(p.get("evidence_ids", [])) | |
| if expected and expected not in evidence_ids: | |
| _err("log injection evidence_ids missing expected row id") | |
| errors += 1 | |
| # basic log_template table validation | |
| for t in seed["seed_artifacts"]["log_templates"]: | |
| if t["table"] not in LOG_TABLES: | |
| _err(f"log_template table not recognized: {t['table']}") | |
| errors += 1 | |
| return errors | |
| def main(argv): | |
| if len(argv) < 2: | |
| candidates = [p for p in Path("data/seeds").glob("*.json") if "ground_truth" not in p.name] | |
| else: | |
| candidates = [Path(p) for p in argv[1:]] | |
| total_errors = 0 | |
| for path in candidates: | |
| seed = _load_json(path) | |
| errs = validate_seed(seed) | |
| if errs == 0: | |
| print(f"OK: {path}") | |
| total_errors += errs | |
| if total_errors > 0: | |
| print(f"Validation failed with {total_errors} error(s)") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main(sys.argv) | |