Spaces:
Sleeping
Sleeping
| """ | |
| ABPT Phase Probe — Фаза 1 Верификации | |
| ====================================== | |
| Считает 8 геометрических метрик зоны кристаллизации L4-L8 для каждого кейса, | |
| запускает base генерацию, собирает constraint_score. | |
| Затем вычисляет корреляцию Спирмена между early_slope_4_8 и base_constraint_score. | |
| Метрики: | |
| early_slope_4_8 — наклон r1 между L4 и L8 (linreg) | |
| early_auc_4_8 — площадь под кривой r1 в [L4..L8] | |
| peak_layer_4_12 — слой с max r1 в [L4..L12] | |
| peak_value_4_12 — значение max r1 в [L4..L12] | |
| profile_width_above_tau — число слоёв где r1 >= tau (default tau=0.5) | |
| sharpness — peak_value / max(1, width) | |
| tail_retention_ratio — auc(L9..L23) / auc(L4..L8) | |
| late_decay_ratio — auc(L24..L31) / auc(L4..L8) | |
| Использование (в Colab): | |
| !python scripts/run_qwen_phase_probe.py \\ | |
| --model Qwen/Qwen3.5-4B \\ | |
| --anchor-profile medium \\ | |
| --seed 7 | |
| Вывод: | |
| - JSON: archive/qwen35_4b_phase_probe_<profile>.json | |
| - MD: docs/research/qwen35_4b_phase_probe_<profile>.md | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any | |
| import numpy as np | |
| import torch | |
| ROOT = Path(__file__).resolve().parents[1] | |
| if str(ROOT) not in sys.path: | |
| sys.path.insert(0, str(ROOT)) | |
| from src.data.qwen_anchor_geometry_cases import ( | |
| list_anchor_span_profiles, | |
| make_qwen_anchor_geometry_cases, | |
| QwenAnchorGeometryCase, | |
| ) | |
| from src.model.config import TOY_CONFIG | |
| from src.model.qwen_anchor_overlay import QwenAnchorOverlay | |
| from src.utils.anchor_geometry import ( | |
| compute_geometry_metrics, | |
| extract_delta_vectors, | |
| list_model_layers, | |
| select_tail_probe_layers, | |
| ) | |
| from src.utils.anchor_geometry import ( | |
| decode_token_pieces, | |
| decode_token_surfaces, | |
| match_anchor_span, | |
| token_has_leading_whitespace, | |
| ) | |
| from src.utils.qwen_prompting import format_generation_prompt | |
| from src.utils.stdio import configure_utf8_stdio | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Constants | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| CRYSTALLIZATION_START = 4 | |
| CRYSTALLIZATION_END = 8 | |
| PROPAGATION_START = 9 | |
| PROPAGATION_END = 15 | |
| INTEGRATION_START = 16 | |
| INTEGRATION_END = 23 | |
| HANDOFF_START = 24 | |
| DEFAULT_TAU = 0.50 # r1 threshold for "above_tau" width | |
| MAX_LENGTH = 160 | |
| MAX_NEW_TOKENS = 120 | |
| SEED = 7 | |
| # keyword specs per group (positive = нужны, negative = запрещены) | |
| KEYWORD_SPECS: dict[str, dict[str, Any]] = { | |
| "strictly_vegan_meal_plan_policy": { | |
| "positive": ["vegan", "plant-based", "plant based", "dairy-free", | |
| "egg-free", "animal-free"], | |
| "negative": ["meat", "chicken", "beef", "pork", "fish", "dairy", | |
| "milk", "cheese", "butter", "egg", "eggs"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "async_fastapi_service_architecture_policy": { | |
| "positive": ["async", "await", "fastapi", "asyncio"], | |
| "negative": ["flask", "django", "synchronous", "sync def"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "json_only_response_format_policy": { | |
| "positive": ["json", "{", "}"], | |
| "negative": ["here is", "sure", "```python", "explanation"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "proof_by_contradiction_reasoning_steps": { | |
| "positive": ["assume", "contradiction", "therefore", "suppose", | |
| "hence", "negation"], | |
| "negative": [], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "binary_search_update_loop_procedure": { | |
| "positive": ["low", "high", "mid", "while", "binary search"], | |
| "negative": ["linear search", "sequential"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "dependency_injection_request_flow_sequence": { | |
| "positive": ["inject", "dependency", "container", "provider", | |
| "resolve", "service"], | |
| "negative": [], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "penicillin_allergy_treatment_protocol": { | |
| "positive": ["allergy", "alternative", "azithromycin", "fluoroquinolone", | |
| "avoid", "antibiotic"], | |
| "negative": ["penicillin", "amoxicillin", "ampicillin"], | |
| "negative_exceptions": {"penicillin": ["penicillin allergy", "allergic to penicillin", | |
| "avoid penicillin", "no penicillin"]}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "gdpr_data_retention_compliance_policy": { | |
| "positive": ["retention", "delete", "anonymiz", "erasure", "gdpr", | |
| "personal data", "right to"], | |
| "negative": ["sell data", "share with third parties without consent"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "mathematical_induction_proof_steps": { | |
| "positive": ["base case", "inductive", "induction", "P(k)", | |
| "P(k+1)", "assume", "hypothesis"], | |
| "negative": ["contradiction", "contrapositive"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "sql_foreign_key_constraint_enforcement": { | |
| "positive": ["foreign key", "referential", "constraint", "cascade", | |
| "references", "integrity"], | |
| "negative": ["nosql", "mongodb", "schemaless"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "thread_safe_singleton_initialization_pattern": { | |
| "positive": ["thread", "singleton", "lock", "volatile", "synchronized", | |
| "double-check", "concurrent"], | |
| "negative": ["global variable", "static init only"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "idempotent_rest_api_retry_policy": { | |
| "positive": ["idempoten", "retry", "idempotency key", "duplicate", | |
| "safe to retry", "409"], | |
| "negative": ["fire and forget", "at-most-once without retry"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "recursive_tree_traversal_procedure": { | |
| "positive": ["recursive", "traversal", "left", "right", "node", | |
| "base case", "subtree"], | |
| "negative": ["iterative", "explicit stack", "while loop"], | |
| "negative_exceptions": {}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| "strict_typescript_null_safety_policy": { | |
| "positive": ["null", "undefined", "strictnullcheck", "narrowing", | |
| "optional", "typescript", "type guard"], | |
| "negative": ["any", "as any"], | |
| "negative_exceptions": {"any": ["not any", "without any", "avoid any"]}, | |
| "min_unique_positive_hits": 2, | |
| }, | |
| } | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Helpers | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def _to_scalar(v: Any) -> float | None: | |
| if v is None: | |
| return None | |
| try: | |
| return float(v) | |
| except (TypeError, ValueError): | |
| return None | |
| def _auc(r1_profile: dict[str, float | None], start: int, end: int) -> float: | |
| """Трапецоидная интеграция r1 по слоям [start..end].""" | |
| layers = list(range(start, end + 1)) | |
| vals = [r1_profile.get(str(l)) for l in layers] | |
| vals = [v if v is not None else 0.0 for v in vals] | |
| if len(vals) < 2: | |
| return float(vals[0]) if vals else 0.0 | |
| return float(np.trapezoid(vals)) | |
| def compute_phase_metrics( | |
| r1_profile: dict[str, float | None], | |
| n_layers: int, | |
| tau: float = DEFAULT_TAU, | |
| ) -> dict[str, float | None]: | |
| cs = CRYSTALLIZATION_START | |
| ce = CRYSTALLIZATION_END | |
| handoff_end = n_layers - 1 | |
| # early slope: linreg через L4..L8 | |
| xs = np.array(list(range(cs, ce + 1)), dtype=np.float64) | |
| ys = np.array( | |
| [float(r1_profile.get(str(l)) or 0.0) for l in range(cs, ce + 1)], | |
| dtype=np.float64, | |
| ) | |
| if len(xs) >= 2: | |
| early_slope = float(np.polyfit(xs, ys, deg=1)[0]) | |
| else: | |
| early_slope = None | |
| # early auc L4..L8 | |
| early_auc = _auc(r1_profile, cs, ce) | |
| # peak в L4..L12 | |
| peak_search_end = min(12, n_layers - 1) | |
| peak_val: float | None = None | |
| peak_layer: int | None = None | |
| for l in range(cs, peak_search_end + 1): | |
| v = _to_scalar(r1_profile.get(str(l))) | |
| if v is not None and (peak_val is None or v > peak_val): | |
| peak_val = v | |
| peak_layer = l | |
| # width above tau (all layers) | |
| width = sum( | |
| 1 for l in range(n_layers) | |
| if (_to_scalar(r1_profile.get(str(l))) or 0.0) >= tau | |
| ) | |
| sharpness = (peak_val / max(1, width)) if peak_val is not None else None | |
| # tail retention: auc(L9..L23) / auc(L4..L8) | |
| tail_auc = _auc(r1_profile, PROPAGATION_START, INTEGRATION_END) | |
| tail_retention = (tail_auc / early_auc) if early_auc > 0 else None | |
| # late decay: auc(L24..end) / auc(L4..L8) | |
| late_auc = _auc(r1_profile, HANDOFF_START, handoff_end) | |
| late_decay = (late_auc / early_auc) if early_auc > 0 else None | |
| return { | |
| "early_slope_4_8": early_slope, | |
| "early_auc_4_8": early_auc, | |
| "peak_layer_4_12": peak_layer, | |
| "peak_value_4_12": peak_val, | |
| "profile_width_above_tau": float(width), | |
| "tau": tau, | |
| "sharpness": sharpness, | |
| "tail_retention_ratio": tail_retention, | |
| "late_decay_ratio": late_decay, | |
| } | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Geometry extraction (повторяет логику calibration script) | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def extract_rank1_profile( | |
| overlay: QwenAnchorOverlay, | |
| case: QwenAnchorGeometryCase, | |
| probe_layers: list[int], | |
| device: torch.device, | |
| ) -> dict[str, float | None] | None: | |
| tokenizer = overlay.tokenizer | |
| if tokenizer is None: | |
| raise ValueError("tokenizer is required") | |
| try: | |
| encoded = tokenizer( | |
| case.prompt, | |
| truncation=True, | |
| max_length=MAX_LENGTH, | |
| return_offsets_mapping=True, | |
| return_tensors="pt", | |
| ) | |
| offsets = [ | |
| (int(s), int(e)) | |
| for s, e in encoded.pop("offset_mapping")[0].tolist() | |
| ] | |
| except TypeError: | |
| encoded = tokenizer( | |
| case.prompt, | |
| truncation=True, | |
| max_length=MAX_LENGTH, | |
| return_tensors="pt", | |
| ) | |
| offsets = None | |
| batch = {k: v.to(device) for k, v in encoded.items() if isinstance(v, torch.Tensor)} | |
| input_ids = [int(t) for t in batch["input_ids"][0].tolist()] | |
| span_match = match_anchor_span( | |
| text=case.prompt, | |
| anchor_text=case.anchor_text, | |
| input_ids=input_ids, | |
| tokenizer=tokenizer, | |
| offsets=offsets, | |
| ) | |
| if span_match is None: | |
| print(f" [WARN] span not found for case '{case.name}'") | |
| return None | |
| with torch.no_grad(): | |
| outputs = overlay.base_model( | |
| input_ids=batch["input_ids"], | |
| attention_mask=batch.get("attention_mask"), | |
| output_hidden_states=True, | |
| return_dict=True, | |
| ) | |
| hidden_states = outputs.hidden_states | |
| r1_profile: dict[str, float | None] = {} | |
| for layer in probe_layers: | |
| delta_vecs = extract_delta_vectors( | |
| hidden_states[layer + 1][0], | |
| span_match.token_start, | |
| span_match.token_end, | |
| ) | |
| metrics = compute_geometry_metrics(delta_vecs) | |
| r1_profile[str(layer)] = _to_scalar(metrics.get("rank1_explained_variance")) | |
| return r1_profile | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Base generation + constraint scoring | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def generate_base_text( | |
| overlay: QwenAnchorOverlay, | |
| prompt: str, | |
| ) -> str: | |
| tokenizer = overlay.tokenizer | |
| if tokenizer is None: | |
| raise ValueError("tokenizer is required") | |
| device = next(overlay.parameters()).device | |
| generation_prompt = format_generation_prompt(tokenizer, prompt) | |
| encoded = tokenizer( | |
| [generation_prompt], | |
| truncation=True, | |
| max_length=MAX_LENGTH, | |
| return_tensors="pt", | |
| padding=False, | |
| ) | |
| input_ids = encoded["input_ids"].to(device) | |
| attention_mask = encoded.get("attention_mask") | |
| if attention_mask is not None: | |
| attention_mask = attention_mask.to(device) | |
| n_prompt_tokens = int(input_ids.shape[1]) | |
| with torch.no_grad(): | |
| generated = overlay.base_model.generate( | |
| input_ids, | |
| attention_mask=attention_mask, | |
| max_new_tokens=MAX_NEW_TOKENS, | |
| do_sample=False, | |
| temperature=None, | |
| top_p=None, | |
| ) | |
| continuation = generated[0][n_prompt_tokens:] | |
| return tokenizer.decode(continuation, skip_special_tokens=True) | |
| def score_constraint(text: str, group: str) -> dict[str, Any]: | |
| spec = KEYWORD_SPECS.get(group, {}) | |
| positive = [t.lower() for t in spec.get("positive", [])] | |
| negative = [t.lower() for t in spec.get("negative", [])] | |
| neg_exc = { | |
| k.lower(): [p.lower() for p in v] | |
| for k, v in spec.get("negative_exceptions", {}).items() | |
| } | |
| min_pos = int(spec.get("min_unique_positive_hits", 2)) | |
| lowered = text.lower() | |
| pos_hits = {t: lowered.count(t) for t in positive if lowered.count(t) > 0} | |
| neg_hits: dict[str, int] = {} | |
| for t in negative: | |
| exc_phrases = neg_exc.get(t, []) | |
| count = lowered.count(t) | |
| if exc_phrases: | |
| protected = sum(lowered.count(p) for p in exc_phrases) | |
| count = max(0, count - protected) | |
| if count > 0: | |
| neg_hits[t] = count | |
| unique_pos = len(pos_hits) | |
| neg_total = sum(neg_hits.values()) | |
| satisfied = (unique_pos >= min_pos) and (neg_total == 0) | |
| return { | |
| "positive_hits": pos_hits, | |
| "negative_hits": neg_hits, | |
| "unique_positive_hits": unique_pos, | |
| "negative_total": neg_total, | |
| "constraint_satisfied": satisfied, | |
| "constraint_score": 1.0 if satisfied else 0.0, | |
| } | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Correlation | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def spearman_correlation(xs: list[float], ys: list[float]) -> float | None: | |
| if len(xs) < 3: | |
| return None | |
| try: | |
| from scipy.stats import spearmanr | |
| rho, _ = spearmanr(xs, ys) | |
| return float(rho) | |
| except ImportError: | |
| # Ручная реализация | |
| n = len(xs) | |
| rx = _rank(xs) | |
| ry = _rank(ys) | |
| d2 = sum((a - b) ** 2 for a, b in zip(rx, ry)) | |
| return 1.0 - (6 * d2) / (n * (n ** 2 - 1)) | |
| def _rank(vals: list[float]) -> list[float]: | |
| sorted_vals = sorted(enumerate(vals), key=lambda x: x[1]) | |
| ranks = [0.0] * len(vals) | |
| for rank, (original_idx, _) in enumerate(sorted_vals, start=1): | |
| ranks[original_idx] = float(rank) | |
| return ranks | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| # Main | |
| # ───────────────────────────────────────────────────────────────────────────── | |
| def run( | |
| model_name: str, | |
| anchor_profile: str, | |
| tau: float, | |
| device_str: str, | |
| ) -> None: | |
| torch.manual_seed(SEED) | |
| device = torch.device(device_str) | |
| print(f"[PhaseProbe] Model: {model_name}") | |
| print(f"[PhaseProbe] Profile: {anchor_profile}") | |
| print(f"[PhaseProbe] Device: {device}") | |
| print("[PhaseProbe] Загружаю модель...") | |
| overlay = QwenAnchorOverlay.from_pretrained(model_name, cfg=TOY_CONFIG) | |
| overlay.to(device) | |
| overlay.eval() | |
| n_layers = int(overlay.model_num_hidden_layers) | |
| probe_layers = list(range(n_layers)) | |
| print(f"[PhaseProbe] Слоёв: {n_layers}") | |
| cases = make_qwen_anchor_geometry_cases(anchor_span_profile=anchor_profile) | |
| print(f"[PhaseProbe] Кейсов: {len(cases)}") | |
| results = [] | |
| slopes_for_corr: list[float] = [] | |
| constraints_for_corr: list[float] = [] | |
| for i, case in enumerate(cases): | |
| print(f"\n[{i+1}/{len(cases)}] {case.name}") | |
| # --- Геометрия --- | |
| r1_profile = extract_rank1_profile(overlay, case, probe_layers, device) | |
| if r1_profile is None: | |
| print(" SKIP (span not found)") | |
| continue | |
| phase_metrics = compute_phase_metrics(r1_profile, n_layers, tau=tau) | |
| # --- Base generation --- | |
| print(" generating base...") | |
| base_text = generate_base_text(overlay, case.prompt) | |
| constraint = score_constraint(base_text, case.anchor_group) | |
| base_constraint_score = constraint["constraint_score"] | |
| print(f" early_slope_4_8 = {phase_metrics['early_slope_4_8']:.4f}" | |
| f" base_constraint = {base_constraint_score:.0f}") | |
| print(f" peak@L{phase_metrics['peak_layer_4_12']} = {phase_metrics['peak_value_4_12']:.3f}" | |
| f" sharpness = {phase_metrics['sharpness']:.3f}" | |
| if phase_metrics['peak_value_4_12'] else " (no peak)") | |
| result = { | |
| "name": case.name, | |
| "anchor_group": case.anchor_group, | |
| "anchor_class": case.anchor_class, | |
| "anchor_text": case.anchor_text, | |
| "r1_profile": r1_profile, | |
| "phase_metrics": phase_metrics, | |
| "base_generated_text": base_text, | |
| "base_constraint": constraint, | |
| "base_constraint_score": base_constraint_score, | |
| } | |
| results.append(result) | |
| slope = phase_metrics.get("early_slope_4_8") | |
| if slope is not None: | |
| slopes_for_corr.append(slope) | |
| constraints_for_corr.append(base_constraint_score) | |
| # --- Корреляция --- | |
| rho = spearman_correlation(slopes_for_corr, constraints_for_corr) | |
| print(f"\n[PhaseProbe] Spearman ρ (early_slope_4_8 vs base_constraint_score) = " | |
| f"{rho:.4f}" if rho is not None else "N/A (< 3 points)") | |
| # Корреляции для всех метрик | |
| metric_correlations: dict[str, float | None] = {} | |
| metric_names = [ | |
| "early_slope_4_8", "early_auc_4_8", "peak_value_4_12", | |
| "sharpness", "tail_retention_ratio", "late_decay_ratio", | |
| ] | |
| for mname in metric_names: | |
| mvals = [ | |
| r["phase_metrics"].get(mname) | |
| for r in results | |
| if r["phase_metrics"].get(mname) is not None | |
| ] | |
| cvals = [ | |
| r["base_constraint_score"] | |
| for r in results | |
| if r["phase_metrics"].get(mname) is not None | |
| ] | |
| metric_correlations[mname] = spearman_correlation(mvals, cvals) | |
| # --- Сохранение JSON --- | |
| ARCHIVE = ROOT / "archive" | |
| ARCHIVE.mkdir(exist_ok=True) | |
| slug = model_name.split("/")[-1].lower().replace("-", "_").replace(".", "") | |
| out_json = ARCHIVE / f"{slug}_phase_probe_{anchor_profile}.json" | |
| payload = { | |
| "metadata": { | |
| "model_name": model_name, | |
| "anchor_profile": anchor_profile, | |
| "tau": tau, | |
| "n_cases": len(results), | |
| "n_layers": n_layers, | |
| "seed": SEED, | |
| "max_length": MAX_LENGTH, | |
| "max_new_tokens": MAX_NEW_TOKENS, | |
| "crystallization_zone": [CRYSTALLIZATION_START, CRYSTALLIZATION_END], | |
| "created_at_utc": datetime.now(timezone.utc).isoformat(), | |
| }, | |
| "correlation_summary": { | |
| "spearman_early_slope_4_8_vs_base_constraint": rho, | |
| "all_metrics": metric_correlations, | |
| }, | |
| "cases": results, | |
| } | |
| out_json.write_text(json.dumps(payload, indent=2, ensure_ascii=False)) | |
| print(f"\n[PhaseProbe] JSON → {out_json}") | |
| # --- Сохранение MD --- | |
| DOCS = ROOT / "docs" / "research" | |
| DOCS.mkdir(parents=True, exist_ok=True) | |
| out_md = DOCS / f"{slug}_phase_probe_{anchor_profile}.md" | |
| md_lines = [ | |
| f"# ABPT Phase Probe — {model_name} / profile={anchor_profile}", | |
| f"", | |
| f"**Created:** {payload['metadata']['created_at_utc']} ", | |
| f"**Cases:** {len(results)} | **Layers:** {n_layers} | **tau:** {tau}", | |
| f"", | |
| f"## Correlation Summary", | |
| f"", | |
| f"| Metric | Spearman ρ vs base_constraint_score |", | |
| f"|--------|--------------------------------------|", | |
| ] | |
| for mname, corr_val in metric_correlations.items(): | |
| val_str = f"{corr_val:.4f}" if corr_val is not None else "N/A" | |
| md_lines.append(f"| `{mname}` | {val_str} |") | |
| md_lines += [ | |
| f"", | |
| f"## Per-Case Results", | |
| f"", | |
| f"| name | group | early_slope | peak@L | peak_val | sharpness | tail_retention | base_constraint |", | |
| f"|------|-------|-------------|--------|----------|-----------|----------------|-----------------|", | |
| ] | |
| def _f(v: Any) -> str: | |
| if v is None: | |
| return "—" | |
| if isinstance(v, float): | |
| return f"{v:.3f}" | |
| return str(v) | |
| for r in results: | |
| pm = r["phase_metrics"] | |
| md_lines.append( | |
| f"| {r['name']} | {r['anchor_group'].split('_')[0]} " | |
| f"| {_f(pm.get('early_slope_4_8'))} " | |
| f"| L{pm.get('peak_layer_4_12')} " | |
| f"| {_f(pm.get('peak_value_4_12'))} " | |
| f"| {_f(pm.get('sharpness'))} " | |
| f"| {_f(pm.get('tail_retention_ratio'))} " | |
| f"| {_f(r['base_constraint_score'])} |" | |
| ) | |
| md_lines += [ | |
| f"", | |
| f"## Hypothesis", | |
| f"", | |
| f"Если `early_slope_4_8` **отрицательный или близкий к нулю** → r1 не растёт в зоне кристаллизации", | |
| f"→ base модель не сформировала устойчивую концептуальную структуру → высокий риск провала constraint.", | |
| f"", | |
| f"Если корреляция Spирмена **|ρ| > 0.4** — гипотеза подтверждена,", | |
| f"можно использовать `early_slope_4_8` как routing signal в Фазе 2.", | |
| ] | |
| out_md.write_text("\n".join(md_lines), encoding="utf-8") | |
| print(f"[PhaseProbe] MD → {out_md}") | |
| print(f"\n[PhaseProbe] Готово. ρ = {rho:.4f}" if rho is not None else "\n[PhaseProbe] Готово. ρ = N/A") | |
| def main() -> None: | |
| configure_utf8_stdio() | |
| parser = argparse.ArgumentParser(description="ABPT Phase Probe — Фаза 1 верификации геометрии") | |
| parser.add_argument("--model", default="Qwen/Qwen3.5-4B", | |
| help="HuggingFace model name") | |
| parser.add_argument("--anchor-profile", default="medium", | |
| choices=list(list_anchor_span_profiles()), | |
| help="Профиль длины anchor span") | |
| parser.add_argument("--tau", type=float, default=DEFAULT_TAU, | |
| help="Порог r1 для подсчёта profile_width_above_tau") | |
| parser.add_argument("--device", default="cuda" if torch.cuda.is_available() else "cpu") | |
| args, _ = parser.parse_known_args() # ignore unknown args from LLM Strategist | |
| run( | |
| model_name=args.model, | |
| anchor_profile=args.anchor_profile, | |
| tau=args.tau, | |
| device_str=args.device, | |
| ) | |
| if __name__ == "__main__": | |
| main() | |