import json import re from pathlib import Path from typing import Any from core.rule_engine import Rule TABLE_HEADERS = ["rule_id", "primary_keyword", "aliases", "reply"] ROOT_DIR = Path(__file__).resolve().parents[1] DEFAULT_RULES_PATH = ROOT_DIR / "config" / "rules.json" DEFAULT_EVALUATION_CONTRACT_PATH = ROOT_DIR / "config" / "evaluation_contract.json" DEFAULT_BENCHMARK_CSV_PATH = ROOT_DIR / "benchmarks_data" / "live_commerce_eval_scripts.csv" def _load_json(path: Path | str) -> Any: resolved = Path(path) with resolved.open("r", encoding="utf-8") as handle: return json.load(handle) def load_rule_catalog(path: Path | str = DEFAULT_RULES_PATH) -> list[Rule]: payload = _load_json(path) rules: list[Rule] = [] for item in payload: aliases = _deduplicate([str(alias).strip() for alias in item.get("aliases", []) if str(alias).strip()]) rules.append( Rule( rule_id=str(item["rule_id"]).strip(), primary_keyword=str(item["primary_keyword"]).strip(), aliases=tuple(aliases), reply=str(item["reply"]).strip(), ) ) return rules def load_evaluation_contract(path: Path | str = DEFAULT_EVALUATION_CONTRACT_PATH) -> dict[str, Any]: return _load_json(path) def rules_to_table_rows(rules: list[Rule]) -> list[list[str]]: return [[rule.rule_id, rule.primary_keyword, ", ".join(rule.aliases), rule.reply] for rule in rules] def rules_from_editor_rows(rows: Any) -> list[Rule]: if rows is None: return [] if hasattr(rows, "to_dict"): rows = rows.to_dict("records") normalized_rows: list[list[str]] = [] if rows and isinstance(rows[0], dict): for row in rows: normalized_rows.append( [ str(row.get("rule_id", "")), str(row.get("primary_keyword", "")), str(row.get("aliases", "")), str(row.get("reply", "")), ] ) else: for row in rows: cells = list(row)[:4] while len(cells) < 4: cells.append("") normalized_rows.append([str(cell) for cell in cells]) rules: list[Rule] = [] for row in normalized_rows: rule_id, primary_keyword, aliases_raw, reply = [cell.strip() for cell in row] if not rule_id or not primary_keyword or not reply: continue aliases = [alias for alias in _split_aliases(aliases_raw) if alias != primary_keyword] rules.append( Rule( rule_id=rule_id, primary_keyword=primary_keyword, aliases=tuple(_deduplicate(aliases)), reply=reply, ) ) return rules def _split_aliases(raw_value: str) -> list[str]: return [part.strip() for part in re.split(r"[,,]", raw_value) if part.strip()] def _deduplicate(values: list[str]) -> list[str]: seen: set[str] = set() ordered: list[str] = [] for value in values: if value not in seen: seen.add(value) ordered.append(value) return ordered