File size: 3,194 Bytes
c0c4a30
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
import json
import re
from pathlib import Path
from typing import Any

from core.rule_engine import Rule

TABLE_HEADERS = ["rule_id", "primary_keyword", "aliases", "reply"]
ROOT_DIR = Path(__file__).resolve().parents[1]
DEFAULT_RULES_PATH = ROOT_DIR / "config" / "rules.json"
DEFAULT_EVALUATION_CONTRACT_PATH = ROOT_DIR / "config" / "evaluation_contract.json"
DEFAULT_BENCHMARK_CSV_PATH = ROOT_DIR / "benchmarks_data" / "live_commerce_eval_scripts.csv"


def _load_json(path: Path | str) -> Any:
    resolved = Path(path)
    with resolved.open("r", encoding="utf-8") as handle:
        return json.load(handle)


def load_rule_catalog(path: Path | str = DEFAULT_RULES_PATH) -> list[Rule]:
    payload = _load_json(path)
    rules: list[Rule] = []
    for item in payload:
        aliases = _deduplicate([str(alias).strip() for alias in item.get("aliases", []) if str(alias).strip()])
        rules.append(
            Rule(
                rule_id=str(item["rule_id"]).strip(),
                primary_keyword=str(item["primary_keyword"]).strip(),
                aliases=tuple(aliases),
                reply=str(item["reply"]).strip(),
            )
        )
    return rules


def load_evaluation_contract(path: Path | str = DEFAULT_EVALUATION_CONTRACT_PATH) -> dict[str, Any]:
    return _load_json(path)


def rules_to_table_rows(rules: list[Rule]) -> list[list[str]]:
    return [[rule.rule_id, rule.primary_keyword, ", ".join(rule.aliases), rule.reply] for rule in rules]


def rules_from_editor_rows(rows: Any) -> list[Rule]:
    if rows is None:
        return []
    if hasattr(rows, "to_dict"):
        rows = rows.to_dict("records")

    normalized_rows: list[list[str]] = []
    if rows and isinstance(rows[0], dict):
        for row in rows:
            normalized_rows.append(
                [
                    str(row.get("rule_id", "")),
                    str(row.get("primary_keyword", "")),
                    str(row.get("aliases", "")),
                    str(row.get("reply", "")),
                ]
            )
    else:
        for row in rows:
            cells = list(row)[:4]
            while len(cells) < 4:
                cells.append("")
            normalized_rows.append([str(cell) for cell in cells])

    rules: list[Rule] = []
    for row in normalized_rows:
        rule_id, primary_keyword, aliases_raw, reply = [cell.strip() for cell in row]
        if not rule_id or not primary_keyword or not reply:
            continue
        aliases = [alias for alias in _split_aliases(aliases_raw) if alias != primary_keyword]
        rules.append(
            Rule(
                rule_id=rule_id,
                primary_keyword=primary_keyword,
                aliases=tuple(_deduplicate(aliases)),
                reply=reply,
            )
        )
    return rules


def _split_aliases(raw_value: str) -> list[str]:
    return [part.strip() for part in re.split(r"[,,]", raw_value) if part.strip()]


def _deduplicate(values: list[str]) -> list[str]:
    seen: set[str] = set()
    ordered: list[str] = []
    for value in values:
        if value not in seen:
            seen.add(value)
            ordered.append(value)
    return ordered