File size: 6,809 Bytes
657d287 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | """Phase 5 β PCA eigenstructure analysis.
For each module (compliance, credit):
1. Pull all dense_1024 vectors from the 3 strategy collections (aggregated)
2. Fit full-rank PCA (sklearn)
3. Detect elbow via three methods (Kneedle, second-derivative, 95%-variance)
4. Persist:
evaluation/results/{module}/pca_eigenstructure.json (eigenvalues, cumvar, elbows)
evaluation/results/{module}/pca_model.joblib (fitted PCA β for query-time projection)
Why aggregate across strategies? PCA is invariant to redundant samples β the
eigenstructure reflects the corpus-level embedding geometry. Aggregating gives
a denser sample without distorting the principal directions.
"""
from __future__ import annotations
import json
import sys
import time
from pathlib import Path
import numpy as np
from tqdm import tqdm
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from pipelines.shared.pca_analyzer import fit_pca, save
from pipelines.shared.qdrant_client import (
DENSE_DIMENSIONS,
_dense_name,
all_collection_specs,
get_client,
)
SCROLL_BATCH = 256
def fetch_dense_1024(client, collection_name: str) -> np.ndarray:
"""Scroll through a collection and return its dense_1024 vectors as (n, 1024)."""
info = client.get_collection(collection_name)
expected = info.points_count or 0
if expected == 0:
return np.zeros((0, 1024), dtype=np.float32)
vectors: list[list[float]] = []
offset = None
pbar = tqdm(total=expected, desc=f" scroll {collection_name.split('_', 1)[1]}", leave=False)
while True:
points, offset = client.scroll(
collection_name=collection_name,
limit=SCROLL_BATCH,
with_payload=False,
with_vectors=[_dense_name(1024)],
offset=offset,
)
if not points:
break
for p in points:
v = p.vector.get(_dense_name(1024)) if isinstance(p.vector, dict) else p.vector
if v is not None:
vectors.append(v)
pbar.update(len(points))
if offset is None:
break
pbar.close()
if not vectors:
return np.zeros((0, 1024), dtype=np.float32)
return np.asarray(vectors, dtype=np.float32)
def pull_module_embeddings(client, module: str) -> np.ndarray:
"""Aggregate dense_1024 vectors from all strategies belonging to `module`."""
specs = [(s, n) for m, s, n in all_collection_specs() if m == module]
print(f"\n[{module}] aggregating {len(specs)} collections")
parts: list[np.ndarray] = []
for strategy, name in specs:
v = fetch_dense_1024(client, name)
print(f" {strategy:25s} {len(v):>6,d} vectors")
parts.append(v)
if not parts:
return np.zeros((0, 1024), dtype=np.float32)
out = np.concatenate(parts, axis=0)
print(f" total {len(out):>6,d} vectors")
return out
def main() -> int:
client = get_client()
out_root = ROOT / "evaluation" / "results"
summaries: dict[str, dict] = {}
for module in ("compliance", "credit"):
embeddings = pull_module_embeddings(client, module)
if len(embeddings) < 100:
print(f" ! not enough embeddings for {module}; skipping")
continue
print(f" fitting PCA on ({embeddings.shape[0]} Γ {embeddings.shape[1]})...")
t0 = time.perf_counter()
# Use 'aggregated' as the source_strategy label since we mixed all 3
pca, result = fit_pca(embeddings, module=module, source_strategy="aggregated")
elapsed = time.perf_counter() - t0
print(f" fit done in {elapsed:.1f}s")
out_dir = out_root / module
save(
pca, result,
model_path=out_dir / "pca_model.joblib",
json_path=out_dir / "pca_eigenstructure.json",
)
summaries[module] = {
"n_embeddings": result.n_embeddings,
"elbow_kneedle": result.elbow_kneedle,
"elbow_kneedle_snapped_to_matryoshka": result.elbow_kneedle_snapped,
"elbow_second_derivative": result.elbow_second_deriv,
"elbow_95pct_variance": result.elbow_95pct,
"cumvar_at_dims": {
"128": round(result.cumulative_variance_at_128, 4),
"256": round(result.cumulative_variance_at_256, 4),
"512": round(result.cumulative_variance_at_512, 4),
"768": round(result.cumulative_variance_at_768, 4),
"1024": round(result.cumulative_variance_at_1024, 4),
},
"fit_seconds": round(elapsed, 1),
}
print(f"\n [{module}] PCA findings:")
print(f" Kneedle elbow: dim {result.elbow_kneedle} (snapped to Matryoshka: {result.elbow_kneedle_snapped})")
print(f" Second-derivative elbow: dim {result.elbow_second_deriv}")
print(f" 95%-variance elbow: dim {result.elbow_95pct}")
print(f" Cumulative variance at Matryoshka dims:")
for d in DENSE_DIMENSIONS:
cv = getattr(result, f"cumulative_variance_at_{d}")
print(f" dim={d:>4d}: {cv * 100:>5.1f}%")
# Cross-module summary
summary_path = out_root / "_pca_summary.json"
summary_path.parent.mkdir(parents=True, exist_ok=True)
summary_path.write_text(json.dumps(summaries, indent=2))
print(f"\nWrote summary β {summary_path.relative_to(ROOT)}")
if "compliance" in summaries and "credit" in summaries:
c = summaries["compliance"]
cr = summaries["credit"]
print(f"\n=== Cross-module comparison ===")
print(f" Kneedle elbow: compliance={c['elbow_kneedle']} vs credit={cr['elbow_kneedle']} "
f"(Ξ = {cr['elbow_kneedle'] - c['elbow_kneedle']:+d})")
print(f" 95%-variance dim: compliance={c['elbow_95pct_variance']} vs credit={cr['elbow_95pct_variance']} "
f"(Ξ = {cr['elbow_95pct_variance'] - c['elbow_95pct_variance']:+d})")
print(f" Cumvar @ dim 256: compliance={c['cumvar_at_dims']['256']:.3f} vs credit={cr['cumvar_at_dims']['256']:.3f}")
print(f" Cumvar @ dim 512: compliance={c['cumvar_at_dims']['512']:.3f} vs credit={cr['cumvar_at_dims']['512']:.3f}")
if c["elbow_kneedle"] < cr["elbow_kneedle"]:
print(f"\n β Hypothesis CONFIRMED: regulatory text has lower intrinsic dimensionality.")
elif c["elbow_kneedle"] > cr["elbow_kneedle"]:
print(f"\n β Hypothesis REJECTED: credit-narrative text has lower intrinsic dimensionality.")
else:
print(f"\n β Hypothesis INCONCLUSIVE: both modules have the same Kneedle elbow.")
return 0
if __name__ == "__main__":
sys.exit(main())
|