VoiceDirector / utils /config_loader.py
dsa2dsads's picture
demo: package VoiceDirector stage-1 integration app
c0c4a30 verified
import json
import re
from pathlib import Path
from typing import Any
from core.rule_engine import Rule
TABLE_HEADERS = ["rule_id", "primary_keyword", "aliases", "reply"]
ROOT_DIR = Path(__file__).resolve().parents[1]
DEFAULT_RULES_PATH = ROOT_DIR / "config" / "rules.json"
DEFAULT_EVALUATION_CONTRACT_PATH = ROOT_DIR / "config" / "evaluation_contract.json"
DEFAULT_BENCHMARK_CSV_PATH = ROOT_DIR / "benchmarks_data" / "live_commerce_eval_scripts.csv"
def _load_json(path: Path | str) -> Any:
resolved = Path(path)
with resolved.open("r", encoding="utf-8") as handle:
return json.load(handle)
def load_rule_catalog(path: Path | str = DEFAULT_RULES_PATH) -> list[Rule]:
payload = _load_json(path)
rules: list[Rule] = []
for item in payload:
aliases = _deduplicate([str(alias).strip() for alias in item.get("aliases", []) if str(alias).strip()])
rules.append(
Rule(
rule_id=str(item["rule_id"]).strip(),
primary_keyword=str(item["primary_keyword"]).strip(),
aliases=tuple(aliases),
reply=str(item["reply"]).strip(),
)
)
return rules
def load_evaluation_contract(path: Path | str = DEFAULT_EVALUATION_CONTRACT_PATH) -> dict[str, Any]:
return _load_json(path)
def rules_to_table_rows(rules: list[Rule]) -> list[list[str]]:
return [[rule.rule_id, rule.primary_keyword, ", ".join(rule.aliases), rule.reply] for rule in rules]
def rules_from_editor_rows(rows: Any) -> list[Rule]:
if rows is None:
return []
if hasattr(rows, "to_dict"):
rows = rows.to_dict("records")
normalized_rows: list[list[str]] = []
if rows and isinstance(rows[0], dict):
for row in rows:
normalized_rows.append(
[
str(row.get("rule_id", "")),
str(row.get("primary_keyword", "")),
str(row.get("aliases", "")),
str(row.get("reply", "")),
]
)
else:
for row in rows:
cells = list(row)[:4]
while len(cells) < 4:
cells.append("")
normalized_rows.append([str(cell) for cell in cells])
rules: list[Rule] = []
for row in normalized_rows:
rule_id, primary_keyword, aliases_raw, reply = [cell.strip() for cell in row]
if not rule_id or not primary_keyword or not reply:
continue
aliases = [alias for alias in _split_aliases(aliases_raw) if alias != primary_keyword]
rules.append(
Rule(
rule_id=rule_id,
primary_keyword=primary_keyword,
aliases=tuple(_deduplicate(aliases)),
reply=reply,
)
)
return rules
def _split_aliases(raw_value: str) -> list[str]:
return [part.strip() for part in re.split(r"[,,]", raw_value) if part.strip()]
def _deduplicate(values: list[str]) -> list[str]:
seen: set[str] = set()
ordered: list[str] = []
for value in values:
if value not in seen:
seen.add(value)
ordered.append(value)
return ordered