"""Strip MiniCPM thinking blocks and special tokens from model output.""" from __future__ import annotations import re _THINK_OPEN = "<" + "think>" _THINK_CLOSE = "" _REDACTED_OPEN = "" _REDACTED_CLOSE = "" _THINKING_PATTERNS = ( re.compile(re.escape(_THINK_OPEN) + r".*?" + re.escape(_THINK_CLOSE), re.DOTALL | re.IGNORECASE), re.compile( re.escape(_REDACTED_OPEN) + r".*?" + re.escape(_REDACTED_CLOSE), re.DOTALL | re.IGNORECASE, ), re.compile(re.escape(_THINK_OPEN) + r"\s*" + re.escape(_THINK_CLOSE), re.IGNORECASE), re.compile(re.escape(_REDACTED_OPEN) + r"\s*" + re.escape(_REDACTED_CLOSE), re.IGNORECASE), ) _SPECIAL_TOKENS = re.compile(r"<\|[^|>]*\|>") def clean_model_response(text: str) -> str: if not text: return "" cleaned = text for pattern in _THINKING_PATTERNS: cleaned = pattern.sub("", cleaned) cleaned = _SPECIAL_TOKENS.sub("", cleaned) cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) return cleaned.strip() class StreamResponseCleaner: """Yield only user-visible text while streaming (drops in-progress think blocks).""" def __init__(self) -> None: self._raw = "" self._visible = "" def feed(self, token: str) -> str: if not token: return "" self._raw += token visible = clean_model_response(self._raw) delta = visible[len(self._visible) :] self._visible = visible return delta