| """Phase 5 — PCA eigenstructure analysis. |
| |
| For each module (compliance, credit): |
| 1. Pull all dense_1024 vectors from the 3 strategy collections (aggregated) |
| 2. Fit full-rank PCA (sklearn) |
| 3. Detect elbow via three methods (Kneedle, second-derivative, 95%-variance) |
| 4. Persist: |
| evaluation/results/{module}/pca_eigenstructure.json (eigenvalues, cumvar, elbows) |
| evaluation/results/{module}/pca_model.joblib (fitted PCA — for query-time projection) |
| |
| Why aggregate across strategies? PCA is invariant to redundant samples — the |
| eigenstructure reflects the corpus-level embedding geometry. Aggregating gives |
| a denser sample without distorting the principal directions. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import sys |
| import time |
| from pathlib import Path |
|
|
| import numpy as np |
| from tqdm import tqdm |
|
|
| ROOT = Path(__file__).resolve().parents[1] |
| sys.path.insert(0, str(ROOT)) |
|
|
| from pipelines.shared.pca_analyzer import fit_pca, save |
| from pipelines.shared.qdrant_client import ( |
| DENSE_DIMENSIONS, |
| _dense_name, |
| all_collection_specs, |
| get_client, |
| ) |
|
|
|
|
| SCROLL_BATCH = 256 |
|
|
|
|
| def fetch_dense_1024(client, collection_name: str) -> np.ndarray: |
| """Scroll through a collection and return its dense_1024 vectors as (n, 1024).""" |
| info = client.get_collection(collection_name) |
| expected = info.points_count or 0 |
| if expected == 0: |
| return np.zeros((0, 1024), dtype=np.float32) |
|
|
| vectors: list[list[float]] = [] |
| offset = None |
| pbar = tqdm(total=expected, desc=f" scroll {collection_name.split('_', 1)[1]}", leave=False) |
| while True: |
| points, offset = client.scroll( |
| collection_name=collection_name, |
| limit=SCROLL_BATCH, |
| with_payload=False, |
| with_vectors=[_dense_name(1024)], |
| offset=offset, |
| ) |
| if not points: |
| break |
| for p in points: |
| v = p.vector.get(_dense_name(1024)) if isinstance(p.vector, dict) else p.vector |
| if v is not None: |
| vectors.append(v) |
| pbar.update(len(points)) |
| if offset is None: |
| break |
| pbar.close() |
| if not vectors: |
| return np.zeros((0, 1024), dtype=np.float32) |
| return np.asarray(vectors, dtype=np.float32) |
|
|
|
|
| def pull_module_embeddings(client, module: str) -> np.ndarray: |
| """Aggregate dense_1024 vectors from all strategies belonging to `module`.""" |
| specs = [(s, n) for m, s, n in all_collection_specs() if m == module] |
| print(f"\n[{module}] aggregating {len(specs)} collections") |
| parts: list[np.ndarray] = [] |
| for strategy, name in specs: |
| v = fetch_dense_1024(client, name) |
| print(f" {strategy:25s} {len(v):>6,d} vectors") |
| parts.append(v) |
| if not parts: |
| return np.zeros((0, 1024), dtype=np.float32) |
| out = np.concatenate(parts, axis=0) |
| print(f" total {len(out):>6,d} vectors") |
| return out |
|
|
|
|
| def main() -> int: |
| client = get_client() |
| out_root = ROOT / "evaluation" / "results" |
|
|
| summaries: dict[str, dict] = {} |
|
|
| for module in ("compliance", "credit"): |
| embeddings = pull_module_embeddings(client, module) |
| if len(embeddings) < 100: |
| print(f" ! not enough embeddings for {module}; skipping") |
| continue |
|
|
| print(f" fitting PCA on ({embeddings.shape[0]} × {embeddings.shape[1]})...") |
| t0 = time.perf_counter() |
| |
| pca, result = fit_pca(embeddings, module=module, source_strategy="aggregated") |
| elapsed = time.perf_counter() - t0 |
| print(f" fit done in {elapsed:.1f}s") |
|
|
| out_dir = out_root / module |
| save( |
| pca, result, |
| model_path=out_dir / "pca_model.joblib", |
| json_path=out_dir / "pca_eigenstructure.json", |
| ) |
|
|
| summaries[module] = { |
| "n_embeddings": result.n_embeddings, |
| "elbow_kneedle": result.elbow_kneedle, |
| "elbow_kneedle_snapped_to_matryoshka": result.elbow_kneedle_snapped, |
| "elbow_second_derivative": result.elbow_second_deriv, |
| "elbow_95pct_variance": result.elbow_95pct, |
| "cumvar_at_dims": { |
| "128": round(result.cumulative_variance_at_128, 4), |
| "256": round(result.cumulative_variance_at_256, 4), |
| "512": round(result.cumulative_variance_at_512, 4), |
| "768": round(result.cumulative_variance_at_768, 4), |
| "1024": round(result.cumulative_variance_at_1024, 4), |
| }, |
| "fit_seconds": round(elapsed, 1), |
| } |
|
|
| print(f"\n [{module}] PCA findings:") |
| print(f" Kneedle elbow: dim {result.elbow_kneedle} (snapped to Matryoshka: {result.elbow_kneedle_snapped})") |
| print(f" Second-derivative elbow: dim {result.elbow_second_deriv}") |
| print(f" 95%-variance elbow: dim {result.elbow_95pct}") |
| print(f" Cumulative variance at Matryoshka dims:") |
| for d in DENSE_DIMENSIONS: |
| cv = getattr(result, f"cumulative_variance_at_{d}") |
| print(f" dim={d:>4d}: {cv * 100:>5.1f}%") |
|
|
| |
| summary_path = out_root / "_pca_summary.json" |
| summary_path.parent.mkdir(parents=True, exist_ok=True) |
| summary_path.write_text(json.dumps(summaries, indent=2)) |
| print(f"\nWrote summary → {summary_path.relative_to(ROOT)}") |
|
|
| if "compliance" in summaries and "credit" in summaries: |
| c = summaries["compliance"] |
| cr = summaries["credit"] |
| print(f"\n=== Cross-module comparison ===") |
| print(f" Kneedle elbow: compliance={c['elbow_kneedle']} vs credit={cr['elbow_kneedle']} " |
| f"(Δ = {cr['elbow_kneedle'] - c['elbow_kneedle']:+d})") |
| print(f" 95%-variance dim: compliance={c['elbow_95pct_variance']} vs credit={cr['elbow_95pct_variance']} " |
| f"(Δ = {cr['elbow_95pct_variance'] - c['elbow_95pct_variance']:+d})") |
| print(f" Cumvar @ dim 256: compliance={c['cumvar_at_dims']['256']:.3f} vs credit={cr['cumvar_at_dims']['256']:.3f}") |
| print(f" Cumvar @ dim 512: compliance={c['cumvar_at_dims']['512']:.3f} vs credit={cr['cumvar_at_dims']['512']:.3f}") |
| if c["elbow_kneedle"] < cr["elbow_kneedle"]: |
| print(f"\n → Hypothesis CONFIRMED: regulatory text has lower intrinsic dimensionality.") |
| elif c["elbow_kneedle"] > cr["elbow_kneedle"]: |
| print(f"\n → Hypothesis REJECTED: credit-narrative text has lower intrinsic dimensionality.") |
| else: |
| print(f"\n → Hypothesis INCONCLUSIVE: both modules have the same Kneedle elbow.") |
|
|
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|