File size: 11,084 Bytes
67d959b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
"""
chip.compressor
================
CHIP 主压缩器。设计原则:
    1. 协议是文本,不是模型 — 不依赖 LLM 调用,纯规则可跑
    2. 双轨 — Qwen 轨用中文方括号,cl100k 轨用 XML/Markdown
    3. 可逆 — 保留命名实体、数字、代码、URL 不动
    4. 可审计 — 每条改动可追溯到 rules.yaml 的某条规则

当前实现层级:
    L1 (lex)   — 词法替换:啰嗦套话 → 紧凑动宾,纯正则,~1.3-1.5x 压缩
    L2 (syn)   — 句法重排:虚词替换、列表化,需 jieba 分词,~2-3x
    L3 (idiom) — 成语压缩(基于实测白名单),需 target 是国产 tokenizer
    L4 (proto) — 协议层归一化,统一为 ### 标签

NP-aware 角色提取(可选):
    L2-022 默认用正则,在含空格的复合 NP 上偶有截断。
    设环境变量 CHIP_USE_JIEBA=1 启用 jieba 增强版。
"""
from __future__ import annotations

import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable

import yaml


# ============ 数据类 ============
@dataclass
class Rule:
    """一条 CHIP 转换规则。"""
    id: str
    layer: str           # "L1" | "L2" | "L3" | "L4"
    pattern: str         # 正则
    replacement: str
    description: str = ""
    saves: int = 0       # 在参考 tokenizer 上预估省多少 token
    risk: str = "low"    # low | mid | high
    flags: int = 0
    _compiled: re.Pattern = field(default=None, repr=False)

    def compile(self):
        if self._compiled is None:
            self._compiled = re.compile(self.pattern, self.flags)
        return self._compiled


@dataclass
class CompressionResult:
    """压缩结果,带 audit trail。"""
    original: str
    compressed: str
    applied_rules: list[str]   # 命中的 rule id 列表
    target: str                # tokenizer 名
    layers: tuple

    @property
    def char_ratio(self) -> float:
        return len(self.compressed) / max(len(self.original), 1)

    def diff(self) -> str:
        """简单的并排展示。"""
        return f"原:  {self.original}\n压:  {self.compressed}\n规则: {', '.join(self.applied_rules) or '(none)'}"


# ============ 规则加载 ============
DEFAULT_RULES_PATH = Path(__file__).parent / "rules" / "rules.yaml"


def load_rules(path: Path | str = DEFAULT_RULES_PATH) -> list[Rule]:
    """从 yaml 加载规则。"""
    path = Path(path)
    with open(path, encoding="utf-8") as f:
        data = yaml.safe_load(f)

    rules = []
    for item in data.get("rules", []):
        flags = 0
        for flag_name in item.get("flags", []):
            flags |= getattr(re, flag_name.upper(), 0)
        rules.append(Rule(
            id=item["id"],
            layer=item["layer"],
            pattern=item["pattern"],
            replacement=item.get("replacement", ""),
            description=item.get("description", ""),
            saves=item.get("saves", 0),
            risk=item.get("risk", "low"),
            flags=flags,
        ))
    return rules


# ============ 保护性 mask ============
# 这些 pattern 命中的子串会先被替换成占位符,跑完规则后再还原。
# 防止规则误改专有名词、URL、代码、数字。
PROTECT_PATTERNS = [
    ("URL",   re.compile(r"https?://\S+")),
    ("CODE",  re.compile(r"```[\s\S]*?```|`[^`\n]+`")),
    ("NUM",   re.compile(r"\d+(?:\.\d+)?(?:%|km|kg|m|s|°C)?")),
    ("EMAIL", re.compile(r"[\w.+-]+@[\w-]+\.[\w.-]+")),
    # 双引号包裹的引文(用户原话)
    ("QUOTE", re.compile(r"[\"\u201c][^\"\u201d]+[\"\u201d]")),
]


# 占位符前缀用一个不会出现在自然中文里、且不会被 PROTECT_PATTERNS 命中的 token
_PH_OPEN = "\u2983"  # ⦃
_PH_CLOSE = "\u2984"  # ⦄
_PH_RE = re.compile(rf"{_PH_OPEN}\d+{_PH_CLOSE}")


def _mask(text: str) -> tuple[str, list[tuple[str, str]]]:
    """把不可压缩片段替换成 ⦃i⦄ 占位符,返回 (masked, mappings)。

    关键:每次 sub 时跳过已经 mask 过的占位符,避免嵌套替换。
    """
    mappings = []
    masked = text

    def make_sub():
        def _sub(m):
            # 如果 match 整体落在已有占位符内,跳过
            content = m.group(0)
            if _PH_RE.fullmatch(content):
                return content
            i = len(mappings)
            placeholder = f"{_PH_OPEN}{i}{_PH_CLOSE}"
            mappings.append((placeholder, content))
            return placeholder
        return _sub

    for tag, pat in PROTECT_PATTERNS:
        masked = pat.sub(make_sub(), masked)
    return masked, mappings


def _unmask(text: str, mappings: list[tuple[str, str]]) -> str:
    # 反向替换避免 ⦃1⦄ 误替换 ⦃10⦄
    for placeholder, original in reversed(mappings):
        text = text.replace(placeholder, original)
    return text


# ============ 主类 ============
class Compressor:
    """可重用的压缩器实例。"""

    def __init__(self,
                 rules_path: Path | str = DEFAULT_RULES_PATH,
                 target: str = "qwen2.5",
                 layers: Iterable[str] = ("L1", "L2", "L4")):
        """
        Args:
            target: 目标 tokenizer,影响成语压缩等 target-aware 决策
            layers: 启用的压缩层
                - L1: 词法层(套话剪枝),保险,默认开
                - L2: 句法层(模式重排),保险,默认开
                - L3: 成语层(语义压缩),需 target 是国产 tokenizer 才有意义,默认关
                - L4: 协议层归一化(### 标题统一),无害,默认开
        """
        self.rules = load_rules(rules_path)
        self.target = target
        self.layers = tuple(layers)
        # 预编译
        for r in self.rules:
            r.compile()

    def compress(self, text: str) -> CompressionResult:
        original = text

        # 可选:jieba 增强角色提取 (pre-process,优先于 L2-022 的纯正则)
        applied_pre = []
        if os.getenv("CHIP_USE_JIEBA") == "1" and "L2" in self.layers:
            text, jieba_applied = _jieba_role_extract(text)
            if jieba_applied:
                applied_pre.append("L2-022J(jieba)")

        masked, mappings = _mask(text)
        applied = list(applied_pre)

        for rule in self.rules:
            if rule.layer not in self.layers:
                continue
            new_text, n = rule._compiled.subn(rule.replacement, masked)
            if n > 0:
                applied.append(f"{rule.id}×{n}")
                masked = new_text

        # 收尾:多余空白、连续标点
        masked = re.sub(r"[ \t]+", " ", masked)
        masked = re.sub(r"\s*\n\s*\n\s*\n+", "\n\n", masked)

        # 协议层留下的孤立标点清理(L2-022 等会留下 "\n,xxx")
        masked = re.sub(r"\n[,,;;。.\s]+", "\n", masked)
        masked = re.sub(r"^[,,;;]+\s*", "", masked, flags=re.MULTILINE)

        masked = masked.strip()
        compressed = _unmask(masked, mappings)

        return CompressionResult(
            original=original,
            compressed=compressed,
            applied_rules=applied,
            target=self.target,
            layers=self.layers,
        )


# ============ 便捷函数 ============
_default_compressor = None


def compress(text: str,
             target: str = "qwen2.5",
             layers: Iterable[str] = ("L1", "L2", "L4"),
             return_result: bool = False) -> str | CompressionResult:
    """简便入口。

    >>> compress("请帮我总结一下这段文字")
    '总结一下这段文字'

    >>> compress("...", layers=["L1","L2","L3","L4"])  # 启用所有层(包括成语)

    >>> r = compress("...", return_result=True)
    >>> print(r.diff())
    """
    global _default_compressor
    key = (target, tuple(layers))
    if _default_compressor is None or _default_compressor[0] != key:
        _default_compressor = (key, Compressor(target=target, layers=layers))
    result = _default_compressor[1].compress(text)
    return result if return_result else result.compressed


# ============ jieba NP 提取(可选增强) ============
_jieba_loaded = False


def _ensure_jieba():
    """懒加载 jieba。"""
    global _jieba_loaded
    if _jieba_loaded:
        return True
    try:
        import jieba.posseg as pseg  # noqa: F401
        _jieba_loaded = True
        return True
    except ImportError:
        return False


# 角色扮演的触发短语 — jieba 用它定位
_ROLE_PREFIX_RE = re.compile(
    r"请\s*(?:你)?\s*扮演\s*(?:一(?:个|位))?\s*"
)


def _jieba_role_extract(text: str) -> tuple[str, bool]:
    """用 jieba 词性标注提取最长名词短语作为角色描述。

    替换 L2-022 的纯正则 lookahead 实现 — 后者在以下场景失败:
        - 角色描述非常长且无标点结尾
        - 角色描述被句中的连词意外截断("...然后..." 这种)

    策略:
        1. 找到 "请你扮演[一位]" 触发短语
        2. 从触发短语后开始,jieba.posseg 切分
        3. 贪婪收集 NP token,直到遇到 hard-stop:
           - 连词 c (然后/接着/以及)
           - 介词 p (对/把/为)
           - 动词 v (但 vn 动名词允许)
           - 句末标点 w (。;,等)
        4. 助词 'uj/u/ul'(的/地/得)、空格、英文都允许进入 NP
    """
    if not _ensure_jieba():
        return text, False

    import jieba.posseg as pseg

    m = _ROLE_PREFIX_RE.search(text)
    if not m:
        return text, False

    head = text[:m.start()]
    body = text[m.end():]
    if not body:
        return text, False

    words = list(pseg.cut(body))

    # NP 定义:最长前缀,直到遇到硬终止
    # HARD_STOP:动词(非 vn)、连词、介词、标点
    # ALLOW_IN_NP:名词、形容词、英文、数字、量词、助词(的/地/得)、空格
    np_chars = []
    cumlen = 0
    rest_start = 0
    found_np_core = False  # 是否已经收到名词或形容词(NP 核心)

    for w, flag in words:
        # hard stop 条件
        is_hard_stop = (
            flag == "w"                                 # 标点
            or w in {",", ",", "。", ".", ";", ";", ":", ":", "、", "\n"}
            or flag == "c"                              # 连词
            or flag == "p"                              # 介词
            or (flag.startswith("v") and flag != "vn")  # 真动词(非动名词)
        )
        if is_hard_stop and found_np_core:
            rest_start = cumlen
            break

        # 在 NP 内
        np_chars.append(w)
        cumlen += len(w)
        if flag.startswith("n") or flag.startswith("a") or flag == "eng":
            found_np_core = True
    else:
        # 遍历完了,整个 body 都是 NP
        rest_start = cumlen

    np_str = "".join(np_chars).strip()
    if not np_str or len(np_str) < 2 or not found_np_core:
        return text, False

    rest = body[rest_start:]
    new_text = f"{head}\n### 角色\n{np_str}\n{rest}"
    # 清理紧跟在角色块后的孤立标点
    new_text = re.sub(r"\n[,,;;。.]+", "\n", new_text)
    new_text = new_text.strip()
    return new_text, True