Spaces:
Paused
Paused
| import json | |
| import re | |
| from pathlib import Path | |
| from typing import Any | |
| from core.rule_engine import Rule | |
| TABLE_HEADERS = ["rule_id", "primary_keyword", "aliases", "reply"] | |
| ROOT_DIR = Path(__file__).resolve().parents[1] | |
| DEFAULT_RULES_PATH = ROOT_DIR / "config" / "rules.json" | |
| DEFAULT_EVALUATION_CONTRACT_PATH = ROOT_DIR / "config" / "evaluation_contract.json" | |
| DEFAULT_BENCHMARK_CSV_PATH = ROOT_DIR / "benchmarks_data" / "live_commerce_eval_scripts.csv" | |
| def _load_json(path: Path | str) -> Any: | |
| resolved = Path(path) | |
| with resolved.open("r", encoding="utf-8") as handle: | |
| return json.load(handle) | |
| def load_rule_catalog(path: Path | str = DEFAULT_RULES_PATH) -> list[Rule]: | |
| payload = _load_json(path) | |
| rules: list[Rule] = [] | |
| for item in payload: | |
| aliases = _deduplicate([str(alias).strip() for alias in item.get("aliases", []) if str(alias).strip()]) | |
| rules.append( | |
| Rule( | |
| rule_id=str(item["rule_id"]).strip(), | |
| primary_keyword=str(item["primary_keyword"]).strip(), | |
| aliases=tuple(aliases), | |
| reply=str(item["reply"]).strip(), | |
| ) | |
| ) | |
| return rules | |
| def load_evaluation_contract(path: Path | str = DEFAULT_EVALUATION_CONTRACT_PATH) -> dict[str, Any]: | |
| return _load_json(path) | |
| def rules_to_table_rows(rules: list[Rule]) -> list[list[str]]: | |
| return [[rule.rule_id, rule.primary_keyword, ", ".join(rule.aliases), rule.reply] for rule in rules] | |
| def rules_from_editor_rows(rows: Any) -> list[Rule]: | |
| if rows is None: | |
| return [] | |
| if hasattr(rows, "to_dict"): | |
| rows = rows.to_dict("records") | |
| normalized_rows: list[list[str]] = [] | |
| if rows and isinstance(rows[0], dict): | |
| for row in rows: | |
| normalized_rows.append( | |
| [ | |
| str(row.get("rule_id", "")), | |
| str(row.get("primary_keyword", "")), | |
| str(row.get("aliases", "")), | |
| str(row.get("reply", "")), | |
| ] | |
| ) | |
| else: | |
| for row in rows: | |
| cells = list(row)[:4] | |
| while len(cells) < 4: | |
| cells.append("") | |
| normalized_rows.append([str(cell) for cell in cells]) | |
| rules: list[Rule] = [] | |
| for row in normalized_rows: | |
| rule_id, primary_keyword, aliases_raw, reply = [cell.strip() for cell in row] | |
| if not rule_id or not primary_keyword or not reply: | |
| continue | |
| aliases = [alias for alias in _split_aliases(aliases_raw) if alias != primary_keyword] | |
| rules.append( | |
| Rule( | |
| rule_id=rule_id, | |
| primary_keyword=primary_keyword, | |
| aliases=tuple(_deduplicate(aliases)), | |
| reply=reply, | |
| ) | |
| ) | |
| return rules | |
| def _split_aliases(raw_value: str) -> list[str]: | |
| return [part.strip() for part in re.split(r"[,,]", raw_value) if part.strip()] | |
| def _deduplicate(values: list[str]) -> list[str]: | |
| seen: set[str] = set() | |
| ordered: list[str] = [] | |
| for value in values: | |
| if value not in seen: | |
| seen.add(value) | |
| ordered.append(value) | |
| return ordered |