| """ |
| Planning Quality Score (PQS) — multi-dimensional assessment. |
| |
| Motivation: |
| When we steer planning, a drop in trigger count could mean: |
| (a) The model truly lost planning capability, OR |
| (b) The model bypassed the trigger words while still planning internally. |
| |
| We need to distinguish these by measuring deeper quality signals. |
| |
| Four dimensions: |
| Q1. Structural Depth — sequential structure + forward-references |
| Q2. Strategy Diversity — explicit strategy naming + comparison |
| Q3. Long-Range Coherence — later text referring back to plan declaration |
| Q4. Premature Execution — how early the first computation appears (NEGATIVE signal) |
| |
| Combined: |
| PQS = 0.25 × Q1 + 0.25 × Q2 + 0.25 × Q3 + 0.25 × (1 - Q4) |
| """ |
| import re |
| from typing import Dict, List |
|
|
|
|
| |
| |
| |
|
|
| _ORDINAL_PATTERNS = [ |
| r"(?i)\bstep\s+\d+\b", |
| r"(?i)\bfirst(ly)?[,.]", |
| r"(?i)\bsecond(ly)?[,.]", |
| r"(?i)\bthird(ly)?[,.]", |
| r"(?i)\bnext[,.]", |
| r"(?i)\bthen[,.]", |
| r"(?i)\bfinally[,.]", |
| r"(?i)\blast(ly)?[,.]", |
| ] |
|
|
| _FORWARD_REF_PATTERNS = [ |
| r"(?i)\bwe\s+will\s+need\s+to\b", |
| r"(?i)\blater\s+(we'?ll|we\s+will|i'?ll|i\s+will)\b", |
| r"(?i)\bthis\s+will\s+help\s+(us|me)\s+later\b", |
| r"(?i)\bin\s+a\s+(later|subsequent)\s+step\b", |
| r"(?i)\b(we|i)'?ll\s+(come|get)\s+back\s+to\s+(this|that)\b", |
| r"(?i)\bfor\s+now[,.]", |
| r"(?i)\blater\s+on\b", |
| ] |
|
|
|
|
| def compute_q1_structural_depth(text: str) -> float: |
| """ |
| Structural depth score in [0, 1]. |
| |
| Combines: |
| - Ordinal phrase density (step 1, first, next, ...) |
| - Forward-reference density (we'll need to, later we'll, ...) |
| - Completeness of ordinal sequence (do we see first→next→finally?) |
| """ |
| n_tokens = max(len(text.split()), 1) |
|
|
| |
| n_ordinal = sum(len(re.findall(p, text)) for p in _ORDINAL_PATTERNS) |
| ordinal_density = min(n_ordinal / (n_tokens / 100), 1.0) |
|
|
| |
| n_forward = sum(len(re.findall(p, text)) for p in _FORWARD_REF_PATTERNS) |
| forward_density = min(n_forward / (n_tokens / 200), 1.0) |
|
|
| |
| distinct_ords = set() |
| for ord_label in ["first", "second", "third", "next", "then", "finally", "lastly"]: |
| if re.search(rf"(?i)\b{ord_label}\b", text): |
| distinct_ords.add(ord_label) |
| completeness = min(len(distinct_ords) / 3.0, 1.0) |
|
|
| return 0.4 * ordinal_density + 0.3 * forward_density + 0.3 * completeness |
|
|
|
|
| |
| |
| |
|
|
| _STRATEGY_TERMS = [ |
| |
| r"(?i)\binduction\b", r"(?i)\bcontradiction\b", r"(?i)\bcontrapositive\b", |
| r"(?i)\bconstructive\s+proof\b", r"(?i)\bpigeonhole\b", |
| |
| r"(?i)\bsubstitution\b", r"(?i)\belimination\b", r"(?i)\bfactoring\b", |
| r"(?i)\bcompleting\s+the\s+square\b", |
| |
| r"(?i)\bcase\s+analysis\b", r"(?i)\bby\s+cases\b", |
| |
| r"(?i)\bcoordinate\s+geometry\b", r"(?i)\bgraphing\b", |
| |
| r"(?i)\bgenerating\s+function\b", r"(?i)\bmodular\s+arithmetic\b", |
| r"(?i)\bchinese\s+remainder\s+theorem\b", |
| |
| r"(?i)\bwithout\s+loss\s+of\s+generality\b", r"(?i)\bwlog\b", |
| r"(?i)\bworking?\s+backward(s)?\b", |
| ] |
|
|
| _STRATEGY_COMPARISON_PATTERNS = [ |
| r"(?i)\banother\s+(way|approach|method)\b", |
| r"(?i)\balternatively[,.]", |
| r"(?i)\bwe\s+could\s+(also|instead)\b", |
| r"(?i)\bone\s+approach\s+(is|would\s+be)\b", |
| r"(?i)\ba\s+different\s+(way|approach|method)\b", |
| ] |
|
|
|
|
| def compute_q2_strategy_diversity(text: str) -> float: |
| """ |
| Strategy diversity score in [0, 1]. |
| |
| Combines: |
| - Unique strategy terms mentioned |
| - Strategy comparison (only in first 20% of CoT — else it's backtracking) |
| """ |
| if not text.strip(): |
| return 0.0 |
| length = len(text) |
| first_20pct = text[: int(length * 0.20)] |
|
|
| |
| strategies_found = set() |
| for pat in _STRATEGY_TERMS: |
| if re.search(pat, text): |
| strategies_found.add(pat) |
| strategy_unique = min(len(strategies_found) / 3.0, 1.0) |
|
|
| |
| n_compare_early = sum(len(re.findall(p, first_20pct)) for p in _STRATEGY_COMPARISON_PATTERNS) |
| compare_score = min(n_compare_early / 2.0, 1.0) |
|
|
| |
| n_strategy_total = sum(len(re.findall(p, text)) for p in _STRATEGY_TERMS) |
| n_tokens = max(len(text.split()), 1) |
| density = min(n_strategy_total / (n_tokens / 300), 1.0) |
|
|
| return 0.4 * strategy_unique + 0.3 * compare_score + 0.3 * density |
|
|
|
|
| |
| |
| |
|
|
| _BACK_REFERENCE_PATTERNS = [ |
| r"(?i)\bas\s+(planned|outlined|mentioned)\b", |
| r"(?i)\bfollowing\s+(the|my|our)\s+(plan|approach|strategy)\b", |
| r"(?i)\bas\s+i\s+(planned|said|outlined|mentioned)\s+(earlier|before|above)\b", |
| r"(?i)\bas\s+we\s+(planned|said|outlined)\s+(earlier|before|above)\b", |
| r"(?i)\bgoing\s+back\s+to\s+(step\s+\d+|my\s+plan)\b", |
| r"(?i)\brecall(ing)?\s+(that|my|the)\s+(plan|approach|strategy)\b", |
| r"(?i)\bper\s+(my|the)\s+(plan|strategy)\b", |
| ] |
|
|
|
|
| def compute_q3_long_range_coherence(text: str) -> float: |
| """ |
| Long-range coherence score in [0, 1]. |
| |
| Measure back-references to early planning, which is a signature of structured planning. |
| """ |
| if not text.strip(): |
| return 0.0 |
| length = len(text) |
| if length < 200: |
| return 0.0 |
|
|
| |
| second_half = text[length // 2:] |
| n_back = sum(len(re.findall(p, second_half)) for p in _BACK_REFERENCE_PATTERNS) |
| score = min(n_back / 2.0, 1.0) |
|
|
| return score |
|
|
|
|
| |
| |
| |
|
|
| def compute_q4_premature_execution(text: str) -> float: |
| """ |
| Premature execution score in [0, 1]. Higher = more premature (BAD for planning). |
| |
| Measure: position of the first equation/computation as a fraction of total length. |
| - If first computation appears in first 5% of text → strong premature (→ 1.0) |
| - If it appears in first 20% → still premature (→ ~0.6) |
| - If only after 30% → not premature (→ 0.0) |
| """ |
| if not text.strip(): |
| return 0.0 |
|
|
| |
| computation_patterns = [ |
| r"[-+]?\d+\s*[+\-*/]\s*\d+\s*=", |
| r"[a-zA-Z]\s*=\s*\d", |
| r"\d+\s*=\s*\d+", |
| r"\$.*?=.*?\$", |
| ] |
|
|
| first_pos = len(text) |
| for p in computation_patterns: |
| m = re.search(p, text) |
| if m and m.start() < first_pos: |
| first_pos = m.start() |
|
|
| if first_pos >= len(text): |
| return 0.0 |
|
|
| ratio = first_pos / len(text) |
|
|
| |
| |
| |
| if ratio < 0.05: |
| return 1.0 |
| elif ratio < 0.20: |
| return 1.0 - (ratio - 0.05) / 0.15 * 0.8 |
| elif ratio < 0.30: |
| return 0.2 - (ratio - 0.20) / 0.10 * 0.2 |
| else: |
| return 0.0 |
|
|
|
|
| |
| |
| |
|
|
| def compute_pqs(text: str) -> Dict: |
| """Compute all 4 dimensions and combined PQS.""" |
| q1 = compute_q1_structural_depth(text) |
| q2 = compute_q2_strategy_diversity(text) |
| q3 = compute_q3_long_range_coherence(text) |
| q4 = compute_q4_premature_execution(text) |
|
|
| pqs = 0.25 * q1 + 0.25 * q2 + 0.25 * q3 + 0.25 * (1.0 - q4) |
|
|
| return { |
| "q1_structural_depth": float(q1), |
| "q2_strategy_diversity": float(q2), |
| "q3_long_range_coherence": float(q3), |
| "q4_premature_execution": float(q4), |
| "pqs": float(pqs), |
| } |
|
|