blux-ca / doctrine /loader.py
~JADIS
Add runtime agent pipeline with safety routing
6364e69
from __future__ import annotations
import yaml
from pathlib import Path
from typing import Iterable
from doctrine.schema import Rule, RuleBundle
class RuleLoader:
def __init__(self, base_path: Path | str | None = None) -> None:
self.base_path = Path(base_path) if base_path else Path(__file__).parent / "rules"
def load_files(self, paths: Iterable[Path | str]) -> RuleBundle:
rules = []
bundle_version = "v1"
for path in paths:
data = self._load_yaml(Path(path))
bundle_version = data.get("bundle_version", bundle_version)
for item in data.get("rules", []):
rules.append(Rule(**item))
return RuleBundle(rules=rules, version=bundle_version)
def load_default_bundle(self) -> RuleBundle:
files = sorted(self.base_path.glob("*.yaml"))
return self.load_files(files)
@staticmethod
def _load_yaml(path: Path) -> dict:
with path.open("r", encoding="utf-8") as handle:
return yaml.safe_load(handle) or {}
def load_rules_from_files(paths: Iterable[Path | str]) -> tuple[RuleBundle, str]:
loader = RuleLoader()
bundle = loader.load_files(paths)
return bundle, bundle.version