ds6b-attackplan-qlora / scripts /seed_kb_examples.py
adetuire1's picture
Upload folder using huggingface_hub
fba140f verified
# -*- coding: utf-8 -*-
"""
seed_kb_examples.py
Create prompt→AttackPlan examples for RAG from train_attackplan.jsonl
Usage (from repo root):
%run scripts/seed_kb_examples.py
# or choose a different source / count
%run scripts/seed_kb_examples.py --src scripts/train_attackplan.jsonl --k 40
"""
from __future__ import annotations
import argparse, json, re, random
from pathlib import Path
from typing import Dict, Any, List, Tuple
# ----------------------
# Helpers
# ----------------------
def load_plans(src: Path) -> List[Dict[str, Any]]:
lines = src.read_text(encoding="utf-8").splitlines()
out = []
for ln in lines:
ln = ln.strip()
if not ln:
continue
try:
obj = json.loads(ln)
# tolerate files that contain chat rows by accident
if isinstance(obj, dict) and "plan" in obj and isinstance(obj["plan"], list):
out.append(obj)
except Exception:
continue
return out
def infer_device_name(item_name: str) -> str:
# item_name may be:
# "MIM2.mg1microgrid_switch2.status" or "mg1load_41.constant_power_A"
# Take the middle chunk if MIM is present, else first chunk before '.'
parts = item_name.split(".")
if parts[0].startswith("MIM") and len(parts) >= 3:
return parts[1]
return parts[0]
def infer_device_type(dev: str) -> str:
s = dev.lower()
if "switch" in s: return "switch"
if "inverter" in s: return "inverter"
if "diesel" in s or re.search(r"\bgen|generator\b", s): return "generator"
if "capacitor" in s or s.startswith("cap_"): return "capacitor"
if "regulator" in s or s.startswith("reg_"): return "regulator"
if "load" in s: return "load"
return "other"
def collect_tags(plan: Dict[str, Any]) -> Dict[str, List[str]]:
ops, points, mims, applys, dtypes = set(), set(), set(), set(), set()
for it in plan.get("plan", []):
ops.add(it.get("op", "set"))
points.add(it.get("point", ""))
sc = it.get("scope") or {}
ap = sc.get("apply", "both")
applys.add(ap)
mim = sc.get("mim")
if mim: mims.add(mim)
dev = infer_device_name(it.get("name", ""))
dtypes.add(infer_device_type(dev))
return {
"ops": sorted(x for x in ops if x),
"points": sorted(x for x in points if x),
"apply": sorted(x for x in applys if x),
"mims": sorted(mims),
"device_types": sorted(dtypes),
}
def item_to_phrase(it: Dict[str, Any]) -> str:
# Generate a concise, human prompt fragment for RAG.
op = it.get("op", "set")
point = it.get("point", "")
val = it.get("attack_value", "")
nm = infer_device_name(it.get("name", ""))
sc = it.get("scope") or {}
mim = sc.get("mim")
# Normalize value strings a bit
sval = str(val)
if isinstance(val, float) and sval.endswith(".0"):
sval = sval[:-2]
# Choose verb template
if op in {"open","close","trip"}:
base = f"{op} {infer_device_type(nm)} {nm}"
elif op in {"increase","decrease","scale"}:
base = f"{op} {point} of {nm} by {sval}"
else: # set/default
base = f"set {point} of {nm} to {sval}"
if mim:
base += f" in {mim}"
return base
def plan_to_prompt(plan: Dict[str, Any], max_items: int = 6) -> str:
items = plan.get("plan", [])[:max_items]
if not items:
return "Generate an AttackPlan JSON v1.1 (no items)."
phrases = [item_to_phrase(it) for it in items]
if len(phrases) == 1:
return phrases[0]
return "; ".join(phrases)
def score(plan: Dict[str, Any]) -> Tuple[int,int,int,int]:
"""Sort key to promote diversity: favor both/apply, more mims, more ops, more device types."""
tags = collect_tags(plan)
return (
1 if "both" in tags["apply"] else 0,
len(tags["mims"]),
len(tags["ops"]),
len(tags["device_types"]),
)
def pick_diverse(plans: List[Dict[str, Any]], k: int, seed: int = 7) -> List[Dict[str, Any]]:
rng = random.Random(seed)
# Shuffle then sort by our diversity score (descending)
rng.shuffle(plans)
plans.sort(key=score, reverse=True)
# Simple greedy: walk and enforce bucketing caps so we cover ops/apply/points
seen_keys = set()
picked = []
buckets = {}
caps = {
"apply:glm_only": max(1, k//6),
"apply:both": max(1, k//3),
}
for p in plans:
tags = collect_tags(p)
key_apply = f"apply:{'glm_only' if 'glm_only' in tags['apply'] else 'both'}"
buckets.setdefault(key_apply, 0)
if buckets[key_apply] >= caps[key_apply]:
continue
# de-dup by items signature
sig = tuple((it.get("op"), it.get("point"), (it.get("scope") or {}).get("mim")) for it in p.get("plan", [])[:4])
if sig in seen_keys:
continue
seen_keys.add(sig)
picked.append(p)
buckets[key_apply] += 1
if len(picked) >= k:
break
# If still short, top up ignoring caps
i = 0
while len(picked) < k and i < len(plans):
if plans[i] not in picked:
picked.append(plans[i])
i += 1
return picked[:k]
def write_examples(plans: List[Dict[str, Any]], outdir: Path):
outdir.mkdir(parents=True, exist_ok=True)
for i, p in enumerate(plans, 1):
ex = {
"prompt": plan_to_prompt(p),
"attack_plan": p,
"tags": collect_tags(p)
}
Path(outdir, f"ex-{i:04d}.json").write_text(json.dumps(ex, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
def write_canonical_snippets(outdir: Path):
"""A couple of tiny single-item plans as structural references."""
outdir.mkdir(parents=True, exist_ok=True)
mini = [
{
"title": "set_inverter_Pref",
"plan": {
"version": "1.1",
"time": {"start_s": 0, "end_s": 30},
"mim": {"active": True, "selected": ["MIM2"]},
"plan": [{
"name": "MIM2.mg1inverter_XXX.Pref",
"scope": {"mg": "mg1", "mim":"MIM2", "apply":"both"},
"op": "set", "point": "Pref", "attack_value": 10000, "real_value": 0,
"phase": None, "window": {"point_start_s": 1, "point_stop_s": 20}
}]
}
},
{
"title": "open_switch_status",
"plan": {
"version": "1.1",
"time": {"start_s": 0, "end_s": 30},
"mim": {"active": True, "selected": ["MIM1"]},
"plan": [{
"name": "MIM1.mg2microgrid_switch_YYY.status",
"scope": {"mg": "mg2", "mim":"MIM1", "apply":"both"},
"op": "set", "point": "status", "attack_value": "OPEN", "real_value": "CLOSED",
"phase": None, "window": {"point_start_s": 2, "point_stop_s": 10}
}]
}
},
{
"title": "glm_only_unmapped_load",
"plan": {
"version": "1.1",
"time": {"start_s": 0, "end_s": 30},
"mim": {"active": True, "selected": ["MIM3"]},
"plan": [{
"name": "load_42.constant_power_A",
"scope": {"mg": "unmapped", "mim": None, "apply":"glm_only"},
"op": "set", "point": "constant_power_A", "attack_value": 25000, "real_value": 20000,
"phase": None, "window": {"point_start_s": 5, "point_stop_s": 25}
}]
}
}
]
for m in mini:
Path(outdir, f"{m['title']}.json").write_text(json.dumps(m["plan"], ensure_ascii=False, indent=2)+"\n", encoding="utf-8")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--src", type=str, default="scripts/train_attackplan.jsonl",
help="Path to your AttackPlan JSONL")
ap.add_argument("--out", type=str, default="kb/examples",
help="Output folder for RAG examples")
ap.add_argument("--k", type=int, default=40,
help="How many examples to write")
ap.add_argument("--seed", type=int, default=7)
ap.add_argument("--write_snippets", action="store_true",
help="Also write a few canonical mini-plans to kb/snippets/json/")
args = ap.parse_args()
src = Path(args.src)
if not src.exists():
# Try a couple of common alternate locations
candidates = [
Path("..") / "EditGlm" / "scripts" / "train_attackplan.jsonl",
Path("scripts") / "train_attackplan.jsonl"
]
for c in candidates:
if c.exists():
src = c; break
print("[seed] reading", src.resolve())
plans = load_plans(src)
if not plans:
raise SystemExit("No valid plans found in JSONL.")
picked = pick_diverse(plans, k=args.k, seed=args.seed)
write_examples(picked, Path(args.out))
if args.write_snippets:
write_canonical_snippets(Path("kb/snippets/json"))
print(f"[seed] wrote {len(picked)} examples to {Path(args.out).resolve()}")
if args.write_snippets:
print(f"[seed] wrote canonical mini snippets to {Path('kb/snippets/json').resolve()}")
if __name__ == "__main__":
main()