| |
|
| | """
|
| | seed_kb_examples.py
|
| | Create prompt→AttackPlan examples for RAG from train_attackplan.jsonl
|
| |
|
| | Usage (from repo root):
|
| | %run scripts/seed_kb_examples.py
|
| | # or choose a different source / count
|
| | %run scripts/seed_kb_examples.py --src scripts/train_attackplan.jsonl --k 40
|
| | """
|
| |
|
| | from __future__ import annotations
|
| | import argparse, json, re, random
|
| | from pathlib import Path
|
| | from typing import Dict, Any, List, Tuple
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | def load_plans(src: Path) -> List[Dict[str, Any]]:
|
| | lines = src.read_text(encoding="utf-8").splitlines()
|
| | out = []
|
| | for ln in lines:
|
| | ln = ln.strip()
|
| | if not ln:
|
| | continue
|
| | try:
|
| | obj = json.loads(ln)
|
| |
|
| | if isinstance(obj, dict) and "plan" in obj and isinstance(obj["plan"], list):
|
| | out.append(obj)
|
| | except Exception:
|
| | continue
|
| | return out
|
| |
|
| | def infer_device_name(item_name: str) -> str:
|
| |
|
| |
|
| |
|
| | parts = item_name.split(".")
|
| | if parts[0].startswith("MIM") and len(parts) >= 3:
|
| | return parts[1]
|
| | return parts[0]
|
| |
|
| | def infer_device_type(dev: str) -> str:
|
| | s = dev.lower()
|
| | if "switch" in s: return "switch"
|
| | if "inverter" in s: return "inverter"
|
| | if "diesel" in s or re.search(r"\bgen|generator\b", s): return "generator"
|
| | if "capacitor" in s or s.startswith("cap_"): return "capacitor"
|
| | if "regulator" in s or s.startswith("reg_"): return "regulator"
|
| | if "load" in s: return "load"
|
| | return "other"
|
| |
|
| | def collect_tags(plan: Dict[str, Any]) -> Dict[str, List[str]]:
|
| | ops, points, mims, applys, dtypes = set(), set(), set(), set(), set()
|
| | for it in plan.get("plan", []):
|
| | ops.add(it.get("op", "set"))
|
| | points.add(it.get("point", ""))
|
| | sc = it.get("scope") or {}
|
| | ap = sc.get("apply", "both")
|
| | applys.add(ap)
|
| | mim = sc.get("mim")
|
| | if mim: mims.add(mim)
|
| | dev = infer_device_name(it.get("name", ""))
|
| | dtypes.add(infer_device_type(dev))
|
| | return {
|
| | "ops": sorted(x for x in ops if x),
|
| | "points": sorted(x for x in points if x),
|
| | "apply": sorted(x for x in applys if x),
|
| | "mims": sorted(mims),
|
| | "device_types": sorted(dtypes),
|
| | }
|
| |
|
| | def item_to_phrase(it: Dict[str, Any]) -> str:
|
| |
|
| | op = it.get("op", "set")
|
| | point = it.get("point", "")
|
| | val = it.get("attack_value", "")
|
| | nm = infer_device_name(it.get("name", ""))
|
| | sc = it.get("scope") or {}
|
| | mim = sc.get("mim")
|
| |
|
| | sval = str(val)
|
| | if isinstance(val, float) and sval.endswith(".0"):
|
| | sval = sval[:-2]
|
| |
|
| | if op in {"open","close","trip"}:
|
| | base = f"{op} {infer_device_type(nm)} {nm}"
|
| | elif op in {"increase","decrease","scale"}:
|
| | base = f"{op} {point} of {nm} by {sval}"
|
| | else:
|
| | base = f"set {point} of {nm} to {sval}"
|
| | if mim:
|
| | base += f" in {mim}"
|
| | return base
|
| |
|
| | def plan_to_prompt(plan: Dict[str, Any], max_items: int = 6) -> str:
|
| | items = plan.get("plan", [])[:max_items]
|
| | if not items:
|
| | return "Generate an AttackPlan JSON v1.1 (no items)."
|
| | phrases = [item_to_phrase(it) for it in items]
|
| | if len(phrases) == 1:
|
| | return phrases[0]
|
| | return "; ".join(phrases)
|
| |
|
| | def score(plan: Dict[str, Any]) -> Tuple[int,int,int,int]:
|
| | """Sort key to promote diversity: favor both/apply, more mims, more ops, more device types."""
|
| | tags = collect_tags(plan)
|
| | return (
|
| | 1 if "both" in tags["apply"] else 0,
|
| | len(tags["mims"]),
|
| | len(tags["ops"]),
|
| | len(tags["device_types"]),
|
| | )
|
| |
|
| | def pick_diverse(plans: List[Dict[str, Any]], k: int, seed: int = 7) -> List[Dict[str, Any]]:
|
| | rng = random.Random(seed)
|
| |
|
| | rng.shuffle(plans)
|
| | plans.sort(key=score, reverse=True)
|
| |
|
| | seen_keys = set()
|
| | picked = []
|
| | buckets = {}
|
| | caps = {
|
| | "apply:glm_only": max(1, k//6),
|
| | "apply:both": max(1, k//3),
|
| | }
|
| | for p in plans:
|
| | tags = collect_tags(p)
|
| | key_apply = f"apply:{'glm_only' if 'glm_only' in tags['apply'] else 'both'}"
|
| | buckets.setdefault(key_apply, 0)
|
| | if buckets[key_apply] >= caps[key_apply]:
|
| | continue
|
| |
|
| | sig = tuple((it.get("op"), it.get("point"), (it.get("scope") or {}).get("mim")) for it in p.get("plan", [])[:4])
|
| | if sig in seen_keys:
|
| | continue
|
| | seen_keys.add(sig)
|
| | picked.append(p)
|
| | buckets[key_apply] += 1
|
| | if len(picked) >= k:
|
| | break
|
| |
|
| | i = 0
|
| | while len(picked) < k and i < len(plans):
|
| | if plans[i] not in picked:
|
| | picked.append(plans[i])
|
| | i += 1
|
| | return picked[:k]
|
| |
|
| | def write_examples(plans: List[Dict[str, Any]], outdir: Path):
|
| | outdir.mkdir(parents=True, exist_ok=True)
|
| | for i, p in enumerate(plans, 1):
|
| | ex = {
|
| | "prompt": plan_to_prompt(p),
|
| | "attack_plan": p,
|
| | "tags": collect_tags(p)
|
| | }
|
| | Path(outdir, f"ex-{i:04d}.json").write_text(json.dumps(ex, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
|
| |
|
| | def write_canonical_snippets(outdir: Path):
|
| | """A couple of tiny single-item plans as structural references."""
|
| | outdir.mkdir(parents=True, exist_ok=True)
|
| | mini = [
|
| | {
|
| | "title": "set_inverter_Pref",
|
| | "plan": {
|
| | "version": "1.1",
|
| | "time": {"start_s": 0, "end_s": 30},
|
| | "mim": {"active": True, "selected": ["MIM2"]},
|
| | "plan": [{
|
| | "name": "MIM2.mg1inverter_XXX.Pref",
|
| | "scope": {"mg": "mg1", "mim":"MIM2", "apply":"both"},
|
| | "op": "set", "point": "Pref", "attack_value": 10000, "real_value": 0,
|
| | "phase": None, "window": {"point_start_s": 1, "point_stop_s": 20}
|
| | }]
|
| | }
|
| | },
|
| | {
|
| | "title": "open_switch_status",
|
| | "plan": {
|
| | "version": "1.1",
|
| | "time": {"start_s": 0, "end_s": 30},
|
| | "mim": {"active": True, "selected": ["MIM1"]},
|
| | "plan": [{
|
| | "name": "MIM1.mg2microgrid_switch_YYY.status",
|
| | "scope": {"mg": "mg2", "mim":"MIM1", "apply":"both"},
|
| | "op": "set", "point": "status", "attack_value": "OPEN", "real_value": "CLOSED",
|
| | "phase": None, "window": {"point_start_s": 2, "point_stop_s": 10}
|
| | }]
|
| | }
|
| | },
|
| | {
|
| | "title": "glm_only_unmapped_load",
|
| | "plan": {
|
| | "version": "1.1",
|
| | "time": {"start_s": 0, "end_s": 30},
|
| | "mim": {"active": True, "selected": ["MIM3"]},
|
| | "plan": [{
|
| | "name": "load_42.constant_power_A",
|
| | "scope": {"mg": "unmapped", "mim": None, "apply":"glm_only"},
|
| | "op": "set", "point": "constant_power_A", "attack_value": 25000, "real_value": 20000,
|
| | "phase": None, "window": {"point_start_s": 5, "point_stop_s": 25}
|
| | }]
|
| | }
|
| | }
|
| | ]
|
| | for m in mini:
|
| | Path(outdir, f"{m['title']}.json").write_text(json.dumps(m["plan"], ensure_ascii=False, indent=2)+"\n", encoding="utf-8")
|
| |
|
| | def main():
|
| | ap = argparse.ArgumentParser()
|
| | ap.add_argument("--src", type=str, default="scripts/train_attackplan.jsonl",
|
| | help="Path to your AttackPlan JSONL")
|
| | ap.add_argument("--out", type=str, default="kb/examples",
|
| | help="Output folder for RAG examples")
|
| | ap.add_argument("--k", type=int, default=40,
|
| | help="How many examples to write")
|
| | ap.add_argument("--seed", type=int, default=7)
|
| | ap.add_argument("--write_snippets", action="store_true",
|
| | help="Also write a few canonical mini-plans to kb/snippets/json/")
|
| | args = ap.parse_args()
|
| |
|
| | src = Path(args.src)
|
| | if not src.exists():
|
| |
|
| | candidates = [
|
| | Path("..") / "EditGlm" / "scripts" / "train_attackplan.jsonl",
|
| | Path("scripts") / "train_attackplan.jsonl"
|
| | ]
|
| | for c in candidates:
|
| | if c.exists():
|
| | src = c; break
|
| |
|
| | print("[seed] reading", src.resolve())
|
| | plans = load_plans(src)
|
| | if not plans:
|
| | raise SystemExit("No valid plans found in JSONL.")
|
| |
|
| | picked = pick_diverse(plans, k=args.k, seed=args.seed)
|
| | write_examples(picked, Path(args.out))
|
| |
|
| | if args.write_snippets:
|
| | write_canonical_snippets(Path("kb/snippets/json"))
|
| |
|
| | print(f"[seed] wrote {len(picked)} examples to {Path(args.out).resolve()}")
|
| | if args.write_snippets:
|
| | print(f"[seed] wrote canonical mini snippets to {Path('kb/snippets/json').resolve()}")
|
| |
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | main()
|
| |
|