bankmind / scripts /run_pca_analysis.py
arjun10g's picture
Deploy BankMind
657d287 verified
"""Phase 5 — PCA eigenstructure analysis.
For each module (compliance, credit):
1. Pull all dense_1024 vectors from the 3 strategy collections (aggregated)
2. Fit full-rank PCA (sklearn)
3. Detect elbow via three methods (Kneedle, second-derivative, 95%-variance)
4. Persist:
evaluation/results/{module}/pca_eigenstructure.json (eigenvalues, cumvar, elbows)
evaluation/results/{module}/pca_model.joblib (fitted PCA — for query-time projection)
Why aggregate across strategies? PCA is invariant to redundant samples — the
eigenstructure reflects the corpus-level embedding geometry. Aggregating gives
a denser sample without distorting the principal directions.
"""
from __future__ import annotations
import json
import sys
import time
from pathlib import Path
import numpy as np
from tqdm import tqdm
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT))
from pipelines.shared.pca_analyzer import fit_pca, save
from pipelines.shared.qdrant_client import (
DENSE_DIMENSIONS,
_dense_name,
all_collection_specs,
get_client,
)
SCROLL_BATCH = 256
def fetch_dense_1024(client, collection_name: str) -> np.ndarray:
"""Scroll through a collection and return its dense_1024 vectors as (n, 1024)."""
info = client.get_collection(collection_name)
expected = info.points_count or 0
if expected == 0:
return np.zeros((0, 1024), dtype=np.float32)
vectors: list[list[float]] = []
offset = None
pbar = tqdm(total=expected, desc=f" scroll {collection_name.split('_', 1)[1]}", leave=False)
while True:
points, offset = client.scroll(
collection_name=collection_name,
limit=SCROLL_BATCH,
with_payload=False,
with_vectors=[_dense_name(1024)],
offset=offset,
)
if not points:
break
for p in points:
v = p.vector.get(_dense_name(1024)) if isinstance(p.vector, dict) else p.vector
if v is not None:
vectors.append(v)
pbar.update(len(points))
if offset is None:
break
pbar.close()
if not vectors:
return np.zeros((0, 1024), dtype=np.float32)
return np.asarray(vectors, dtype=np.float32)
def pull_module_embeddings(client, module: str) -> np.ndarray:
"""Aggregate dense_1024 vectors from all strategies belonging to `module`."""
specs = [(s, n) for m, s, n in all_collection_specs() if m == module]
print(f"\n[{module}] aggregating {len(specs)} collections")
parts: list[np.ndarray] = []
for strategy, name in specs:
v = fetch_dense_1024(client, name)
print(f" {strategy:25s} {len(v):>6,d} vectors")
parts.append(v)
if not parts:
return np.zeros((0, 1024), dtype=np.float32)
out = np.concatenate(parts, axis=0)
print(f" total {len(out):>6,d} vectors")
return out
def main() -> int:
client = get_client()
out_root = ROOT / "evaluation" / "results"
summaries: dict[str, dict] = {}
for module in ("compliance", "credit"):
embeddings = pull_module_embeddings(client, module)
if len(embeddings) < 100:
print(f" ! not enough embeddings for {module}; skipping")
continue
print(f" fitting PCA on ({embeddings.shape[0]} × {embeddings.shape[1]})...")
t0 = time.perf_counter()
# Use 'aggregated' as the source_strategy label since we mixed all 3
pca, result = fit_pca(embeddings, module=module, source_strategy="aggregated")
elapsed = time.perf_counter() - t0
print(f" fit done in {elapsed:.1f}s")
out_dir = out_root / module
save(
pca, result,
model_path=out_dir / "pca_model.joblib",
json_path=out_dir / "pca_eigenstructure.json",
)
summaries[module] = {
"n_embeddings": result.n_embeddings,
"elbow_kneedle": result.elbow_kneedle,
"elbow_kneedle_snapped_to_matryoshka": result.elbow_kneedle_snapped,
"elbow_second_derivative": result.elbow_second_deriv,
"elbow_95pct_variance": result.elbow_95pct,
"cumvar_at_dims": {
"128": round(result.cumulative_variance_at_128, 4),
"256": round(result.cumulative_variance_at_256, 4),
"512": round(result.cumulative_variance_at_512, 4),
"768": round(result.cumulative_variance_at_768, 4),
"1024": round(result.cumulative_variance_at_1024, 4),
},
"fit_seconds": round(elapsed, 1),
}
print(f"\n [{module}] PCA findings:")
print(f" Kneedle elbow: dim {result.elbow_kneedle} (snapped to Matryoshka: {result.elbow_kneedle_snapped})")
print(f" Second-derivative elbow: dim {result.elbow_second_deriv}")
print(f" 95%-variance elbow: dim {result.elbow_95pct}")
print(f" Cumulative variance at Matryoshka dims:")
for d in DENSE_DIMENSIONS:
cv = getattr(result, f"cumulative_variance_at_{d}")
print(f" dim={d:>4d}: {cv * 100:>5.1f}%")
# Cross-module summary
summary_path = out_root / "_pca_summary.json"
summary_path.parent.mkdir(parents=True, exist_ok=True)
summary_path.write_text(json.dumps(summaries, indent=2))
print(f"\nWrote summary → {summary_path.relative_to(ROOT)}")
if "compliance" in summaries and "credit" in summaries:
c = summaries["compliance"]
cr = summaries["credit"]
print(f"\n=== Cross-module comparison ===")
print(f" Kneedle elbow: compliance={c['elbow_kneedle']} vs credit={cr['elbow_kneedle']} "
f"(Δ = {cr['elbow_kneedle'] - c['elbow_kneedle']:+d})")
print(f" 95%-variance dim: compliance={c['elbow_95pct_variance']} vs credit={cr['elbow_95pct_variance']} "
f"(Δ = {cr['elbow_95pct_variance'] - c['elbow_95pct_variance']:+d})")
print(f" Cumvar @ dim 256: compliance={c['cumvar_at_dims']['256']:.3f} vs credit={cr['cumvar_at_dims']['256']:.3f}")
print(f" Cumvar @ dim 512: compliance={c['cumvar_at_dims']['512']:.3f} vs credit={cr['cumvar_at_dims']['512']:.3f}")
if c["elbow_kneedle"] < cr["elbow_kneedle"]:
print(f"\n → Hypothesis CONFIRMED: regulatory text has lower intrinsic dimensionality.")
elif c["elbow_kneedle"] > cr["elbow_kneedle"]:
print(f"\n → Hypothesis REJECTED: credit-narrative text has lower intrinsic dimensionality.")
else:
print(f"\n → Hypothesis INCONCLUSIVE: both modules have the same Kneedle elbow.")
return 0
if __name__ == "__main__":
sys.exit(main())