yaml-bert / yaml_bert /linearizer.py
vimalk78's picture
probes: K8s-honest framing + probe-5 rename
c414c6c verified
Raw
History Blame Contribute Delete
4.18 kB
from __future__ import annotations
import yaml
from yaml_bert.types import NodeType, YamlNode
class YamlLinearizer:
# Universal depth cap (Lever 5). Below this depth, content is essentially
# exclusively CRD schema definition (depth 11+ is 99.997% CRD); capping
# rebalances training-token pressure toward real manifest content.
# See docs/historical/v6-plan.md "Lever 5" for the data justification.
DEFAULT_MAX_DEPTH: int = 9
def __init__(self, max_depth: int = DEFAULT_MAX_DEPTH) -> None:
self.max_depth: int = max_depth
def linearize(self, yaml_string: str) -> list[YamlNode]:
data = yaml.safe_load(yaml_string)
if data is None:
return []
nodes: list[YamlNode] = []
self._walk(data, depth=0, parent_path="", nodes=nodes, in_list=False)
return nodes
def _walk(
self,
data,
depth: int,
parent_path: str,
nodes: list[YamlNode],
in_list: bool,
) -> None:
if depth > self.max_depth:
return
if isinstance(data, dict):
for sibling_index, (key, value) in enumerate(data.items()):
key_str = str(key)
key_type = NodeType.LIST_KEY if in_list else NodeType.KEY
nodes.append(
YamlNode(
token=key_str,
node_type=key_type,
depth=depth,
sibling_index=sibling_index,
parent_path=parent_path,
)
)
if isinstance(value, dict):
child_path = f"{parent_path}.{key_str}" if parent_path else key_str
self._walk(value, depth + 1, child_path, nodes, in_list=False)
elif isinstance(value, list):
child_path = f"{parent_path}.{key_str}" if parent_path else key_str
self._walk_list(value, depth + 1, child_path, nodes)
else:
value_path = f"{parent_path}.{key_str}" if parent_path else key_str
value_type = NodeType.LIST_VALUE if in_list else NodeType.VALUE
nodes.append(
YamlNode(
token=str(value),
node_type=value_type,
depth=depth,
sibling_index=sibling_index,
parent_path=value_path,
)
)
def linearize_file(self, path: str) -> list[YamlNode]:
with open(path) as f:
content = f.read()
nodes: list[YamlNode] = []
for doc in yaml.safe_load_all(content):
if doc is None:
continue
self._walk(doc, depth=0, parent_path="", nodes=nodes, in_list=False)
return nodes
def linearize_multi_doc(self, yaml_string: str) -> list[list[YamlNode]]:
result = []
for doc in yaml.safe_load_all(yaml_string):
if doc is None:
continue
nodes: list[YamlNode] = []
self._walk(doc, depth=0, parent_path="", nodes=nodes, in_list=False)
result.append(nodes)
return result
def _walk_list(
self,
data: list,
depth: int,
parent_path: str,
nodes: list[YamlNode],
) -> None:
if depth > self.max_depth:
return
for item_index, item in enumerate(data):
item_path = f"{parent_path}.{item_index}"
if isinstance(item, dict):
self._walk(item, depth, item_path, nodes, in_list=True)
elif isinstance(item, list):
self._walk_list(item, depth, item_path, nodes)
else:
nodes.append(
YamlNode(
token=str(item),
node_type=NodeType.LIST_VALUE,
depth=depth,
sibling_index=item_index,
parent_path=item_path,
)
)