| |
|
| | from __future__ import annotations
|
| |
|
| | import json
|
| | import re
|
| | from pathlib import Path
|
| | from typing import Any, Dict, List, Optional, Tuple, Set
|
| |
|
| |
|
| | def _read_json(path: str) -> Dict[str, Any]:
|
| | return json.loads(Path(path).read_text(encoding="utf-8"))
|
| |
|
| |
|
| | class TargetRegistry:
|
| | def __init__(self, data: Dict[str, Any]):
|
| | self.data = data or {}
|
| | self.canonical = self.data.get("canonical", {})
|
| | self.points = self.data.get("points", {})
|
| | self.synonyms = self.data.get("synonyms", {})
|
| | self.mim_default_by_mg = self.data.get("mim_default_by_mg", {})
|
| |
|
| |
|
| | self.allowed_by_mg: Dict[str, Set[str]] = {}
|
| | for group, mg_map in self.canonical.items():
|
| | for mg, devices in mg_map.items():
|
| | self.allowed_by_mg.setdefault(mg, set()).update(devices)
|
| |
|
| |
|
| | self.dev_syn = self.synonyms.get("device", {})
|
| | self.point_syn = self.synonyms.get("point", {})
|
| | self.mim_syn = self.synonyms.get("mim", {})
|
| |
|
| | @classmethod
|
| | def from_json(cls, path: str) -> "TargetRegistry":
|
| | return cls(_read_json(path))
|
| |
|
| | def allowed_devices(self, mg: Optional[str]) -> Set[str]:
|
| | if not mg:
|
| | return set()
|
| | return self.allowed_by_mg.get(mg, set())
|
| |
|
| | def canonicalize_mim(self, mim: Optional[str]) -> Optional[str]:
|
| | if mim is None:
|
| | return None
|
| | m = self.mim_syn.get(mim, mim)
|
| | m = m.upper()
|
| | if re.fullmatch(r"MIM[1-4]", m):
|
| | return m
|
| | return None
|
| |
|
| | def canonicalize_point(self, point: str) -> str:
|
| | return self.point_syn.get(point, point)
|
| |
|
| | def canonicalize_device(self, dev: str, mg: Optional[str]) -> str:
|
| |
|
| | dev2 = self.dev_syn.get(dev, dev)
|
| |
|
| |
|
| | if "." in dev2 and mg:
|
| | core = dev2.split(".")[-1]
|
| | candidate = f"{mg}{core}"
|
| |
|
| | if candidate in self.allowed_devices(mg):
|
| | return candidate
|
| |
|
| | dev3 = self.dev_syn.get(core, core)
|
| | candidate2 = f"{mg}{dev3}"
|
| | if candidate2 in self.allowed_devices(mg):
|
| | return candidate2
|
| |
|
| |
|
| | if mg and not dev2.startswith(mg):
|
| | cand = f"{mg}{dev2}"
|
| | if cand in self.allowed_devices(mg):
|
| | return cand
|
| |
|
| |
|
| | if mg and dev2 in self.allowed_devices(mg):
|
| | return dev2
|
| |
|
| |
|
| | return dev2
|
| |
|
| | def pick_default_mim(self, mg: Optional[str]) -> Optional[str]:
|
| | if mg is None:
|
| | return None
|
| | return self.mim_default_by_mg.get(mg)
|
| |
|
| |
|
| | def augment_prompt_with_allowlist(instruction: str, reg: TargetRegistry) -> str:
|
| |
|
| | def bucket_preview(mg: str) -> str:
|
| | items = sorted(list(reg.allowed_devices(mg)))[:8]
|
| | return f"{mg}: {', '.join(items)}" if items else ""
|
| |
|
| | hints = []
|
| | for mg in ("mg1", "mg2", "mg3", "substation", "unmapped"):
|
| | s = bucket_preview(mg)
|
| | if s:
|
| | hints.append(s)
|
| |
|
| | if hints:
|
| | instruction += (
|
| | "\nAllowed device names (examples):\n"
|
| | + "\n".join(hints)
|
| | + "\nUse exactly one dot in `name`: [optional MIM].<device>.<point>\n"
|
| | )
|
| | return instruction
|
| |
|
| |
|
| | def _parse_name(name: str) -> Tuple[Optional[str], Optional[str], Optional[str]]:
|
| |
|
| |
|
| |
|
| |
|
| | parts = name.split(".")
|
| | if len(parts) >= 3 and re.fullmatch(r"MIM[1-4]", parts[0]):
|
| | mim = parts[0]
|
| | point = parts[-1]
|
| | device = ".".join(parts[1:-1])
|
| | return mim, device, point
|
| | elif len(parts) >= 2:
|
| | point = parts[-1]
|
| | device = ".".join(parts[:-1])
|
| | return None, device, point
|
| | return None, None, None
|
| |
|
| |
|
| | def _build_name(mim: Optional[str], device: str, point: str) -> str:
|
| | if mim:
|
| | return f"{mim}.{device}.{point}"
|
| | return f"{device}.{point}"
|
| |
|
| |
|
| | def _is_switch_point(point: str) -> bool:
|
| | return point in {"status", "switchA", "switchB", "switchC"}
|
| |
|
| |
|
| | def _normalize_openclose(item: Dict[str, Any]) -> None:
|
| | op = item.get("op")
|
| | point = item.get("point")
|
| | if op in {"open", "close"} or _is_switch_point(point):
|
| |
|
| | av = item.get("attack_value")
|
| | rv = item.get("real_value")
|
| | if isinstance(av, str):
|
| | item["attack_value"] = av.upper()
|
| | if isinstance(rv, str):
|
| | item["real_value"] = rv.upper()
|
| |
|
| | if op == "open" and not item.get("attack_value"):
|
| | item["attack_value"] = "OPEN"
|
| | if op == "close" and not item.get("attack_value"):
|
| | item["attack_value"] = "CLOSED"
|
| |
|
| | if not item.get("real_value"):
|
| | item["real_value"] = "CLOSED" if item["attack_value"] == "OPEN" else "OPEN"
|
| |
|
| |
|
| | def validate_and_fix_attackplan(
|
| | ap: Dict[str, Any],
|
| | reg: TargetRegistry,
|
| | strict: bool = False,
|
| | autofix: bool = True,
|
| | cutoff: float = 0.92,
|
| | ) -> Tuple[Dict[str, Any], List[str]]:
|
| | notes: List[str] = []
|
| |
|
| | if not isinstance(ap, dict):
|
| | notes.append("attack plan is not a dict")
|
| | return ap, notes
|
| |
|
| |
|
| | plan: List[Dict[str, Any]] = ap.get("plan") or []
|
| | if plan:
|
| | scope0 = plan[0].get("scope", {})
|
| | mg0 = scope0.get("mg")
|
| | mim0 = scope0.get("mim")
|
| | mim0 = reg.canonicalize_mim(mim0) or reg.pick_default_mim(mg0)
|
| |
|
| | if mim0:
|
| | ap.setdefault("mim", {})
|
| | ap["mim"]["active"] = True
|
| | ap["mim"]["selected"] = [mim0]
|
| |
|
| | new_plan: List[Dict[str, Any]] = []
|
| |
|
| | for it in plan:
|
| | scope = it.get("scope", {}) or {}
|
| | mg = scope.get("mg")
|
| | mim = reg.canonicalize_mim(scope.get("mim")) or reg.pick_default_mim(mg)
|
| |
|
| |
|
| | name = it.get("name", "")
|
| | mim_in, dev_raw, point_raw = _parse_name(name)
|
| | point = reg.canonicalize_point(point_raw) if point_raw else point_raw
|
| |
|
| |
|
| | mim_final = mim or reg.canonicalize_mim(mim_in)
|
| |
|
| |
|
| | dev_final = reg.canonicalize_device(dev_raw or "", mg)
|
| |
|
| |
|
| | if strict and mg and dev_final not in reg.allowed_devices(mg):
|
| | notes.append(f"dropped unknown device for mg={mg}: {dev_raw}")
|
| | continue
|
| |
|
| |
|
| | if dev_final and point:
|
| | it["name"] = _build_name(mim_final, dev_final, point)
|
| |
|
| |
|
| | if mim_final:
|
| | it.setdefault("scope", {})
|
| | it["scope"]["mim"] = mim_final
|
| |
|
| |
|
| | _normalize_openclose(it)
|
| |
|
| | new_plan.append(it)
|
| |
|
| | ap["plan"] = new_plan
|
| | return ap, notes
|
| |
|