Spaces:
Runtime error
Runtime error
File size: 11,084 Bytes
67d959b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 | """
chip.compressor
================
CHIP 主压缩器。设计原则:
1. 协议是文本,不是模型 — 不依赖 LLM 调用,纯规则可跑
2. 双轨 — Qwen 轨用中文方括号,cl100k 轨用 XML/Markdown
3. 可逆 — 保留命名实体、数字、代码、URL 不动
4. 可审计 — 每条改动可追溯到 rules.yaml 的某条规则
当前实现层级:
L1 (lex) — 词法替换:啰嗦套话 → 紧凑动宾,纯正则,~1.3-1.5x 压缩
L2 (syn) — 句法重排:虚词替换、列表化,需 jieba 分词,~2-3x
L3 (idiom) — 成语压缩(基于实测白名单),需 target 是国产 tokenizer
L4 (proto) — 协议层归一化,统一为 ### 标签
NP-aware 角色提取(可选):
L2-022 默认用正则,在含空格的复合 NP 上偶有截断。
设环境变量 CHIP_USE_JIEBA=1 启用 jieba 增强版。
"""
from __future__ import annotations
import os
import re
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable
import yaml
# ============ 数据类 ============
@dataclass
class Rule:
"""一条 CHIP 转换规则。"""
id: str
layer: str # "L1" | "L2" | "L3" | "L4"
pattern: str # 正则
replacement: str
description: str = ""
saves: int = 0 # 在参考 tokenizer 上预估省多少 token
risk: str = "low" # low | mid | high
flags: int = 0
_compiled: re.Pattern = field(default=None, repr=False)
def compile(self):
if self._compiled is None:
self._compiled = re.compile(self.pattern, self.flags)
return self._compiled
@dataclass
class CompressionResult:
"""压缩结果,带 audit trail。"""
original: str
compressed: str
applied_rules: list[str] # 命中的 rule id 列表
target: str # tokenizer 名
layers: tuple
@property
def char_ratio(self) -> float:
return len(self.compressed) / max(len(self.original), 1)
def diff(self) -> str:
"""简单的并排展示。"""
return f"原: {self.original}\n压: {self.compressed}\n规则: {', '.join(self.applied_rules) or '(none)'}"
# ============ 规则加载 ============
DEFAULT_RULES_PATH = Path(__file__).parent / "rules" / "rules.yaml"
def load_rules(path: Path | str = DEFAULT_RULES_PATH) -> list[Rule]:
"""从 yaml 加载规则。"""
path = Path(path)
with open(path, encoding="utf-8") as f:
data = yaml.safe_load(f)
rules = []
for item in data.get("rules", []):
flags = 0
for flag_name in item.get("flags", []):
flags |= getattr(re, flag_name.upper(), 0)
rules.append(Rule(
id=item["id"],
layer=item["layer"],
pattern=item["pattern"],
replacement=item.get("replacement", ""),
description=item.get("description", ""),
saves=item.get("saves", 0),
risk=item.get("risk", "low"),
flags=flags,
))
return rules
# ============ 保护性 mask ============
# 这些 pattern 命中的子串会先被替换成占位符,跑完规则后再还原。
# 防止规则误改专有名词、URL、代码、数字。
PROTECT_PATTERNS = [
("URL", re.compile(r"https?://\S+")),
("CODE", re.compile(r"```[\s\S]*?```|`[^`\n]+`")),
("NUM", re.compile(r"\d+(?:\.\d+)?(?:%|km|kg|m|s|°C)?")),
("EMAIL", re.compile(r"[\w.+-]+@[\w-]+\.[\w.-]+")),
# 双引号包裹的引文(用户原话)
("QUOTE", re.compile(r"[\"\u201c][^\"\u201d]+[\"\u201d]")),
]
# 占位符前缀用一个不会出现在自然中文里、且不会被 PROTECT_PATTERNS 命中的 token
_PH_OPEN = "\u2983" # ⦃
_PH_CLOSE = "\u2984" # ⦄
_PH_RE = re.compile(rf"{_PH_OPEN}\d+{_PH_CLOSE}")
def _mask(text: str) -> tuple[str, list[tuple[str, str]]]:
"""把不可压缩片段替换成 ⦃i⦄ 占位符,返回 (masked, mappings)。
关键:每次 sub 时跳过已经 mask 过的占位符,避免嵌套替换。
"""
mappings = []
masked = text
def make_sub():
def _sub(m):
# 如果 match 整体落在已有占位符内,跳过
content = m.group(0)
if _PH_RE.fullmatch(content):
return content
i = len(mappings)
placeholder = f"{_PH_OPEN}{i}{_PH_CLOSE}"
mappings.append((placeholder, content))
return placeholder
return _sub
for tag, pat in PROTECT_PATTERNS:
masked = pat.sub(make_sub(), masked)
return masked, mappings
def _unmask(text: str, mappings: list[tuple[str, str]]) -> str:
# 反向替换避免 ⦃1⦄ 误替换 ⦃10⦄
for placeholder, original in reversed(mappings):
text = text.replace(placeholder, original)
return text
# ============ 主类 ============
class Compressor:
"""可重用的压缩器实例。"""
def __init__(self,
rules_path: Path | str = DEFAULT_RULES_PATH,
target: str = "qwen2.5",
layers: Iterable[str] = ("L1", "L2", "L4")):
"""
Args:
target: 目标 tokenizer,影响成语压缩等 target-aware 决策
layers: 启用的压缩层
- L1: 词法层(套话剪枝),保险,默认开
- L2: 句法层(模式重排),保险,默认开
- L3: 成语层(语义压缩),需 target 是国产 tokenizer 才有意义,默认关
- L4: 协议层归一化(### 标题统一),无害,默认开
"""
self.rules = load_rules(rules_path)
self.target = target
self.layers = tuple(layers)
# 预编译
for r in self.rules:
r.compile()
def compress(self, text: str) -> CompressionResult:
original = text
# 可选:jieba 增强角色提取 (pre-process,优先于 L2-022 的纯正则)
applied_pre = []
if os.getenv("CHIP_USE_JIEBA") == "1" and "L2" in self.layers:
text, jieba_applied = _jieba_role_extract(text)
if jieba_applied:
applied_pre.append("L2-022J(jieba)")
masked, mappings = _mask(text)
applied = list(applied_pre)
for rule in self.rules:
if rule.layer not in self.layers:
continue
new_text, n = rule._compiled.subn(rule.replacement, masked)
if n > 0:
applied.append(f"{rule.id}×{n}")
masked = new_text
# 收尾:多余空白、连续标点
masked = re.sub(r"[ \t]+", " ", masked)
masked = re.sub(r"\s*\n\s*\n\s*\n+", "\n\n", masked)
# 协议层留下的孤立标点清理(L2-022 等会留下 "\n,xxx")
masked = re.sub(r"\n[,,;;。.\s]+", "\n", masked)
masked = re.sub(r"^[,,;;]+\s*", "", masked, flags=re.MULTILINE)
masked = masked.strip()
compressed = _unmask(masked, mappings)
return CompressionResult(
original=original,
compressed=compressed,
applied_rules=applied,
target=self.target,
layers=self.layers,
)
# ============ 便捷函数 ============
_default_compressor = None
def compress(text: str,
target: str = "qwen2.5",
layers: Iterable[str] = ("L1", "L2", "L4"),
return_result: bool = False) -> str | CompressionResult:
"""简便入口。
>>> compress("请帮我总结一下这段文字")
'总结一下这段文字'
>>> compress("...", layers=["L1","L2","L3","L4"]) # 启用所有层(包括成语)
>>> r = compress("...", return_result=True)
>>> print(r.diff())
"""
global _default_compressor
key = (target, tuple(layers))
if _default_compressor is None or _default_compressor[0] != key:
_default_compressor = (key, Compressor(target=target, layers=layers))
result = _default_compressor[1].compress(text)
return result if return_result else result.compressed
# ============ jieba NP 提取(可选增强) ============
_jieba_loaded = False
def _ensure_jieba():
"""懒加载 jieba。"""
global _jieba_loaded
if _jieba_loaded:
return True
try:
import jieba.posseg as pseg # noqa: F401
_jieba_loaded = True
return True
except ImportError:
return False
# 角色扮演的触发短语 — jieba 用它定位
_ROLE_PREFIX_RE = re.compile(
r"请\s*(?:你)?\s*扮演\s*(?:一(?:个|位))?\s*"
)
def _jieba_role_extract(text: str) -> tuple[str, bool]:
"""用 jieba 词性标注提取最长名词短语作为角色描述。
替换 L2-022 的纯正则 lookahead 实现 — 后者在以下场景失败:
- 角色描述非常长且无标点结尾
- 角色描述被句中的连词意外截断("...然后..." 这种)
策略:
1. 找到 "请你扮演[一位]" 触发短语
2. 从触发短语后开始,jieba.posseg 切分
3. 贪婪收集 NP token,直到遇到 hard-stop:
- 连词 c (然后/接着/以及)
- 介词 p (对/把/为)
- 动词 v (但 vn 动名词允许)
- 句末标点 w (。;,等)
4. 助词 'uj/u/ul'(的/地/得)、空格、英文都允许进入 NP
"""
if not _ensure_jieba():
return text, False
import jieba.posseg as pseg
m = _ROLE_PREFIX_RE.search(text)
if not m:
return text, False
head = text[:m.start()]
body = text[m.end():]
if not body:
return text, False
words = list(pseg.cut(body))
# NP 定义:最长前缀,直到遇到硬终止
# HARD_STOP:动词(非 vn)、连词、介词、标点
# ALLOW_IN_NP:名词、形容词、英文、数字、量词、助词(的/地/得)、空格
np_chars = []
cumlen = 0
rest_start = 0
found_np_core = False # 是否已经收到名词或形容词(NP 核心)
for w, flag in words:
# hard stop 条件
is_hard_stop = (
flag == "w" # 标点
or w in {",", ",", "。", ".", ";", ";", ":", ":", "、", "\n"}
or flag == "c" # 连词
or flag == "p" # 介词
or (flag.startswith("v") and flag != "vn") # 真动词(非动名词)
)
if is_hard_stop and found_np_core:
rest_start = cumlen
break
# 在 NP 内
np_chars.append(w)
cumlen += len(w)
if flag.startswith("n") or flag.startswith("a") or flag == "eng":
found_np_core = True
else:
# 遍历完了,整个 body 都是 NP
rest_start = cumlen
np_str = "".join(np_chars).strip()
if not np_str or len(np_str) < 2 or not found_np_core:
return text, False
rest = body[rest_start:]
new_text = f"{head}\n### 角色\n{np_str}\n{rest}"
# 清理紧跟在角色块后的孤立标点
new_text = re.sub(r"\n[,,;;。.]+", "\n", new_text)
new_text = new_text.strip()
return new_text, True |