| """ |
| Behavior detectors: count trigger occurrences in generated CoT. |
| |
| Used for: |
| - RR (reduction rate) evaluation during steering sweep |
| - Sanity checks for labeling |
| |
| KEY UPDATE (Apr 2026): |
| Added "true reflection vs filler" distinction for monitoring triggers. |
| In Qwen3-Thinking CoT, "wait" is often used as a filler word |
| ("Wait, 5+3=8") rather than as a real reflection signal |
| ("Wait, that's wrong, let me check"). The new `count_real_monitoring` |
| excludes filler usage. |
| |
| Also includes a more robust collapse detector (n-gram based, not |
| word-based; relative to baseline length, not fixed thresholds). |
| """ |
| import re |
| from typing import Dict, List, Tuple |
| from configs.patterns import MONITORING_PATTERNS, PLANNING_PATTERNS |
|
|
|
|
| class BehaviorDetector: |
| """Count triggers of one dimension.""" |
| def __init__(self, dimension: str): |
| assert dimension in ("planning", "monitoring") |
| self.dimension = dimension |
| self.patterns = PLANNING_PATTERNS if dimension == "planning" else MONITORING_PATTERNS |
| self.compiled = { |
| subtype: [re.compile(p) for p in plist] |
| for subtype, plist in self.patterns.items() |
| } |
|
|
| def detect(self, text: str) -> Dict: |
| res = {"total": 0, "by_type": {}, "spans": []} |
| for subtype, regs in self.compiled.items(): |
| cnt = 0 |
| for r in regs: |
| for m in r.finditer(text): |
| cnt += 1 |
| res["spans"].append({ |
| "subtype": subtype, |
| "start": m.start(), |
| "end": m.end(), |
| "match": m.group(0)[:50], |
| }) |
| res["by_type"][subtype] = cnt |
| res["total"] += cnt |
| return res |
|
|
|
|
| def compute_rr(base_count: int, steered_count: int) -> float: |
| if base_count == 0: |
| return 0.0 |
| return (base_count - steered_count) / base_count |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| _REAL_REFLECTION_AFTER_WAIT = [ |
| re.compile( |
| r"^[,.]?\s+.{0,80}?\b(" |
| r"let\s+me\s+(check|verify|re-?check|reconsider|re-?examine|see|think)|" |
| r"i\s+(made|have)\s+(a|an)?\s*(mistake|error|typo|miscalc)|" |
| r"that'?s\s+(not\s+right|wrong|incorrect|off|not\s+correct)|" |
| r"actually|" |
| r"no[,.]?\s+(that|this|i)|" |
| r"hold\s+on|" |
| r"hmm[,.]?|" |
| r"i\s+(think|need\s+to|should|forgot|missed|skipped)|" |
| r"(but|because|since)\s+(i|we|the)|" |
| r"is\s+that\s+(right|correct)\?|" |
| r"does\s+that\s+(make\s+sense|work)" |
| r")", |
| re.IGNORECASE | re.DOTALL, |
| ), |
| ] |
|
|
| |
| |
| |
| _WAIT_AS_FILLER = [ |
| re.compile(r"^[,.]?\s*(\d|\$|\\)"), |
| re.compile(r"^[,.]?\s*\b(here|there|so|then|the\s+\w+\s+is)\b"), |
| ] |
|
|
|
|
| def count_real_monitoring(text: str) -> Dict: |
| """ |
| Count monitoring triggers, distinguishing real reflection from filler. |
| |
| Returns: |
| total_triggers: all monitoring triggers (regex match) |
| real_reflection: triggers backed by reflective content within 80 chars |
| filler_only: triggers that are pure filler ("wait, 5+3=...") |
| ambiguous: triggers neither clearly real nor clearly filler |
| |
| Use `real_reflection` (not `total_triggers`) as the primary metric |
| when scoring monitoring suppression — it ignores cases where the model |
| only kept the surface word. |
| """ |
| mon_det = BehaviorDetector("monitoring") |
| raw = mon_det.detect(text) |
| total = raw["total"] |
|
|
| real_count = 0 |
| filler_count = 0 |
| ambiguous = 0 |
|
|
| for span in raw["spans"]: |
| if span["subtype"] != "error_detection": |
| real_count += 1 |
| continue |
| |
| ctx = text[span["end"]:span["end"] + 80] |
| |
| is_filler = any(p.search(ctx) for p in _WAIT_AS_FILLER) |
| if is_filler: |
| filler_count += 1 |
| continue |
| |
| is_real = any(p.search(ctx) for p in _REAL_REFLECTION_AFTER_WAIT) |
| if is_real: |
| real_count += 1 |
| else: |
| ambiguous += 1 |
|
|
| return { |
| "total_triggers": total, |
| "real_reflection": real_count, |
| "filler_only": filler_count, |
| "ambiguous": ambiguous, |
| "by_type": raw["by_type"], |
| } |
|
|
|
|
| |
| |
| |
| def is_collapsed(text: str, base_text: str = None, |
| ngram: int = 4, ngram_threshold: float = 0.5, |
| length_ratio_low: float = 0.3, |
| length_ratio_high: float = 1.8) -> Dict: |
| """ |
| Detect generation collapse using multiple signals. |
| |
| Args: |
| text: generated CoT |
| base_text: optional baseline CoT for length comparison |
| ngram: n-gram size for repetition (default 4) |
| ngram_threshold: fraction of repeated n-grams above which collapse |
| length_ratio_low / high: relative to base, outside this range = collapsed |
| |
| Returns: |
| { |
| "collapsed": bool, |
| "ngram_repetition": float, |
| "length_ratio": float or None, |
| "reason": str |
| } |
| """ |
| if not text or len(text) < 50: |
| return { |
| "collapsed": True, |
| "ngram_repetition": 0.0, |
| "length_ratio": None, |
| "reason": "empty_or_too_short", |
| } |
|
|
| |
| toks = text.split() |
| rep = 0.0 |
| if len(toks) >= ngram * 4: |
| ngrams = [tuple(toks[i:i+ngram]) for i in range(len(toks) - ngram + 1)] |
| if ngrams: |
| rep = 1.0 - (len(set(ngrams)) / len(ngrams)) |
|
|
| |
| length_ratio = None |
| length_anomaly = False |
| if base_text: |
| base_len = max(len(base_text), 1) |
| length_ratio = len(text) / base_len |
| length_anomaly = (length_ratio < length_ratio_low or |
| length_ratio > length_ratio_high) |
|
|
| rep_anomaly = rep > ngram_threshold |
|
|
| if rep_anomaly and length_anomaly: |
| reason = "repetition+length" |
| elif rep_anomaly: |
| reason = "repetition" |
| elif length_anomaly: |
| reason = "length" |
| else: |
| reason = "none" |
|
|
| return { |
| "collapsed": bool(rep_anomaly or length_anomaly), |
| "ngram_repetition": float(rep), |
| "length_ratio": length_ratio, |
| "reason": reason, |
| } |
|
|
|
|
| |
| def repetition_score(text: str, window: int = 100) -> float: |
| """Legacy. Use is_collapsed() instead.""" |
| info = is_collapsed(text) |
| return info["ngram_repetition"] |
|
|