geolip-conduit-experiments / notebook_cell_2_bulk_test_try1.py
AbstractPhil's picture
Create notebook_cell_2_bulk_test_try1.py
b22847f verified
"""
FLEighConduit β€” Sweeping Deterministic Analysis
=================================================
No models. No training. Pure numerical scrutiny.
Tests every major theorem, every conduit field, every vulnerability
identified by the council. Results speak for themselves.
Sections:
1. Parity Verification (Theorem 1)
2. Characteristic Coefficients Validation
3. Friction Signal β€” Spectral Gap Correlation
4. Friction Signal β€” Controlled Gap Sweep
5. Settle Time Analysis
6. Extraction Order Determinism
7. Near-Degenerate Behavior (Theorem 4 stress test)
8. Sign Canonicalization (Theorem 5)
9. Refinement Residual Analysis
10. Static Reconstruction Test (Theorem 2 β€” static side)
11. Dynamic Non-Reconstructibility (Theorem 2 β€” dynamic side)
12. Dimension Agnostic Scaling (n=3,4,6,8)
13. Research Mode β€” Mstore & Trajectory Inspection
14. Freckles CIFAR-10 β€” Class Discriminability of Conduit Fields
Usage:
Run each section as a separate Colab cell.
All sections are independent β€” no state carries between them.
"""
# ═══════════════════════════════════════════════════════════════
# SETUP (run this cell first)
# ═══════════════════════════════════════════════════════════════
import torch
import torch.nn.functional as F
import numpy as np
import time
from geolip_core.linalg import FLEigh, eigh
from geolip_core.linalg.conduit import (
FLEighConduit, ConduitPacket, canonicalize_eigenvectors, verify_parity
)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
torch.manual_seed(42)
def section(title):
print(f"\n{'=' * 70}")
print(f" {title}")
print(f"{'=' * 70}\n")
def make_symmetric(B, n, device='cuda'):
"""Random symmetric matrices."""
A = torch.randn(B, n, n, device=device)
return (A + A.transpose(-1, -2)) / 2
def make_with_eigenvalues(eigenvalues, B=1, device='cuda'):
"""Construct symmetric matrices with prescribed eigenvalues.
eigenvalues: (n,) or (B, n)"""
if eigenvalues.dim() == 1:
eigenvalues = eigenvalues.unsqueeze(0).expand(B, -1)
n = eigenvalues.shape[-1]
# Random orthogonal basis
Q, _ = torch.linalg.qr(torch.randn(B, n, n, device=device))
return Q @ torch.diag_embed(eigenvalues.to(device)) @ Q.transpose(-1, -2)
print(f"Device: {device}")
print("Setup complete.\n")
# ═══════════════════════════════════════════════════════════════
# 1. PARITY VERIFICATION (Theorem 1)
# ═══════════════════════════════════════════════════════════════
section("1. PARITY VERIFICATION")
ref_solver = FLEigh().to(device)
conduit_solver = FLEighConduit().to(device)
n_tests = 0
n_pass = 0
max_eval_err = 0
max_evec_err = 0
for n in [3, 4, 5, 6, 8]:
for trial in range(20):
A = make_symmetric(16, n, device)
ref_evals, ref_evecs = ref_solver(A)
packet = conduit_solver(A)
cond_evals, cond_evecs = packet.eigenpairs()
eval_err = (ref_evals - cond_evals).abs().max().item()
# Eigenvectors: compare via absolute dot products (sign-agnostic)
dots = (ref_evecs * cond_evecs).sum(dim=-2).abs()
evec_err = (1.0 - dots).abs().max().item()
max_eval_err = max(max_eval_err, eval_err)
max_evec_err = max(max_evec_err, evec_err)
n_tests += 1
if eval_err < 1e-4 and evec_err < 1e-3:
n_pass += 1
print(f" Tests: {n_tests}")
print(f" Passed: {n_pass}/{n_tests}")
print(f" Max eval error: {max_eval_err:.2e}")
print(f" Max evec error: {max_evec_err:.2e}")
print(f" VERDICT: {'PASS' if n_pass == n_tests else 'FAIL'}")
# ═══════════════════════════════════════════════════════════════
# 2. CHARACTERISTIC COEFFICIENTS VALIDATION
# ═══════════════════════════════════════════════════════════════
section("2. CHARACTERISTIC COEFFICIENTS VALIDATION")
# For known eigenvalues, verify c[] matches elementary symmetric polynomials
test_evals = torch.tensor([1.0, 2.0, 3.0, 4.0], device=device)
A = make_with_eigenvalues(test_evals, B=8)
packet = conduit_solver(A)
# Elementary symmetric polynomials of eigenvalues:
# e1 = sum = 10, e2 = sum of pairs = 35, e3 = sum of triples = 50, e4 = product = 24
# Char poly: x^4 - e1*x^3 + e2*x^2 - e3*x + e4
# c[3] = -e1, c[2] = e2, c[1] = -e3, c[0] = e4
# But FLEigh works on scaled matrices, so we check relative structure
computed_evals = packet.eigenvalues # should be close to [1,2,3,4]
computed_coeffs = packet.char_coeffs
print(" Prescribed eigenvalues: [1, 2, 3, 4]")
print(f" Recovered eigenvalues: {computed_evals[0].tolist()}")
print(f" Char coeffs (sample): {computed_coeffs[0].tolist()}")
# Verify: coefficients reconstruct the polynomial that has these roots
# p(x) = x^4 + c3*x^3 + c2*x^2 + c1*x + c0
# p(lambda_i) should be ~0 for each eigenvalue
evals_d = computed_evals[0].double()
c = computed_coeffs[0].double()
# Note: FLEigh scales A, so char_coeffs are for the scaled matrix
# We verify that the STRUCTURE is correct by checking ratios
print(f"\n Coefficient ratios (should be consistent across batch):")
for i in range(min(4, len(c))):
vals = computed_coeffs[:, i]
print(f" c[{i}]: mean={vals.mean():.4f} std={vals.std():.6f} "
f"cv={vals.std()/vals.mean().abs():.4f}")
print(f"\n VERDICT: Coefficients {'consistent' if computed_coeffs.std(0).max() < 0.01 else 'inconsistent'} across batch")
# ═══════════════════════════════════════════════════════════════
# 3. FRICTION SIGNAL β€” SPECTRAL GAP CORRELATION
# ═══════════════════════════════════════════════════════════════
section("3. FRICTION vs SPECTRAL GAP CORRELATION")
# Generate matrices with varying spectral gaps
B = 512
A = make_symmetric(B, 4, device)
packet = conduit_solver(A)
# For each eigenvalue, compute minimum gap to neighbors
evals = packet.eigenvalues # (B, 4)
gaps = torch.zeros(B, 4, device=device)
for i in range(4):
diffs = (evals - evals[:, i:i+1]).abs()
diffs[:, i] = float('inf') # exclude self
gaps[:, i] = diffs.min(dim=-1).values
friction = packet.friction
# Correlation between gap and friction
# Theory: smaller gap β†’ higher friction (solver struggles more)
gap_flat = gaps.reshape(-1).cpu()
fric_flat = friction.reshape(-1).cpu()
# Remove inf/nan
valid = torch.isfinite(gap_flat) & torch.isfinite(fric_flat)
gap_v = gap_flat[valid]
fric_v = fric_flat[valid]
corr = torch.corrcoef(torch.stack([gap_v, fric_v]))[0, 1].item()
print(f" Samples: {B} matrices, {B*4} eigenvalues")
print(f" Gap range: [{gap_v.min():.4f}, {gap_v.max():.4f}]")
print(f" Friction range: [{fric_v.min():.2f}, {fric_v.max():.2f}]")
print(f" Correlation (gap vs friction): {corr:.4f}")
print(f" Expected: NEGATIVE (small gap β†’ high friction)")
print(f" VERDICT: {'CONFIRMED' if corr < -0.1 else 'WEAK' if corr < 0 else 'UNEXPECTED'}")
# Binned analysis
print(f"\n Binned friction by gap size:")
sorted_idx = gap_v.argsort()
n_bins = 5
bin_size = len(sorted_idx) // n_bins
for b in range(n_bins):
start = b * bin_size
end = (b + 1) * bin_size if b < n_bins - 1 else len(sorted_idx)
idx = sorted_idx[start:end]
print(f" Gap [{gap_v[idx].min():.3f}-{gap_v[idx].max():.3f}]: "
f"friction mean={fric_v[idx].mean():.2f} "
f"std={fric_v[idx].std():.2f}")
# ═══════════════════════════════════════════════════════════════
# 4. FRICTION β€” CONTROLLED GAP SWEEP
# ═══════════════════════════════════════════════════════════════
section("4. CONTROLLED GAP SWEEP")
# Sweep the gap between eigenvalues 2 and 3, keep others fixed
gaps_to_test = [0.001, 0.01, 0.05, 0.1, 0.5, 1.0, 2.0, 5.0]
print(f" Fixed eigenvalues: Ξ»β‚€=1.0, λ₁=3.0, λ₃=10.0")
print(f" Sweeping: Ξ»β‚‚ from 3.001 to 8.0 (gap to λ₁)\n")
print(f" {'Gap':>8s} {'Ξ»β‚‚':>6s} {'fric[0]':>8s} {'fric[1]':>8s} "
f"{'fric[2]':>8s} {'fric[3]':>8s} {'settle':>12s}")
print(f" {'-'*70}")
for gap in gaps_to_test:
evals = torch.tensor([1.0, 3.0, 3.0 + gap, 10.0], device=device)
A = make_with_eigenvalues(evals, B=32)
p = conduit_solver(A)
fric_mean = p.friction.mean(0)
settle_mean = p.settle.mean(0)
print(f" {gap:8.3f} {3.0+gap:6.3f} {fric_mean[0]:8.2f} {fric_mean[1]:8.2f} "
f"{fric_mean[2]:8.2f} {fric_mean[3]:8.2f} "
f"{settle_mean.tolist()}")
print(f"\n Expected: friction[1] and friction[2] spike as gap β†’ 0")
# ═══════════════════════════════════════════════════════════════
# 5. SETTLE TIME ANALYSIS
# ═══════════════════════════════════════════════════════════════
section("5. SETTLE TIME ANALYSIS")
B = 1024
A = make_symmetric(B, 4, device)
packet = conduit_solver(A)
settle = packet.settle
print(f" Samples: {B} matrices")
print(f"\n Settle distribution per root position:")
for i in range(4):
vals = settle[:, i]
print(f" Root {i}: mean={vals.mean():.2f} "
f"mode={vals.mode().values.item():.0f} "
f"min={vals.min():.0f} max={vals.max():.0f}")
# How often do all roots settle in 1 iteration?
all_settle_1 = (settle == 1.0).all(dim=-1).float().mean().item()
any_slow = (settle >= 3.0).any(dim=-1).float().mean().item()
print(f"\n All roots settle in 1 iter: {all_settle_1:.1%}")
print(f" Any root needs β‰₯3 iters: {any_slow:.1%}")
print(f" VERDICT: {'SPARSE' if all_settle_1 > 0.5 else 'DENSE'} settle signal at n=4")
# ═══════════════════════════════════════════════════════════════
# 6. EXTRACTION ORDER DETERMINISM
# ═══════════════════════════════════════════════════════════════
section("6. EXTRACTION ORDER DETERMINISM")
# Same matrix, same extraction order?
A_fixed = make_symmetric(1, 4, device).expand(32, -1, -1)
packet = conduit_solver(A_fixed)
orders = packet.extraction_order # (32, 4)
order_consistent = (orders == orders[0:1]).all().item()
print(f" Same matrix repeated 32 times")
print(f" Extraction order[0]: {orders[0].tolist()}")
print(f" All identical: {order_consistent}")
# Different matrices β€” does order vary?
A_varied = make_symmetric(64, 4, device)
packet2 = conduit_solver(A_varied)
unique_orders = packet2.extraction_order.unique(dim=0)
print(f"\n 64 different matrices")
print(f" Unique extraction orders: {len(unique_orders)}")
print(f" VERDICT: Order is {'deterministic' if order_consistent else 'NON-DETERMINISTIC'} "
f"for identical inputs, {'varies' if len(unique_orders) > 1 else 'fixed'} across inputs")
# ═══════════════════════════════════════════════════════════════
# 7. NEAR-DEGENERATE BEHAVIOR (Theorem 4 stress)
# ═══════════════════════════════════════════════════════════════
section("7. NEAR-DEGENERATE BEHAVIOR")
# Construct matrices where two eigenvalues approach each other
print(f" Two eigenvalues converging: Ξ»β‚‚ = 5.0, λ₃ = 5.0 + Ξ΅\n")
print(f" {'Ξ΅':>12s} {'fric[2]':>8s} {'fric[3]':>8s} {'settle[2]':>10s} "
f"{'settle[3]':>10s} {'refine_res':>10s} {'eval_err':>10s}")
print(f" {'-'*75}")
for eps in [1.0, 0.1, 0.01, 0.001, 0.0001, 0.00001, 1e-7, 1e-10]:
evals = torch.tensor([1.0, 3.0, 5.0, 5.0 + eps], device=device)
A = make_with_eigenvalues(evals, B=16)
p = conduit_solver(A)
# Check eigenvalue recovery accuracy
recovered = p.eigenvalues
eval_err = (recovered.sort(dim=-1).values - evals.unsqueeze(0)).abs().max().item()
print(f" {eps:12.1e} {p.friction[:, 2].mean():8.2f} {p.friction[:, 3].mean():8.2f} "
f"{p.settle[:, 2].mean():10.1f} {p.settle[:, 3].mean():10.1f} "
f"{p.refinement_residual.mean():10.2e} {eval_err:10.2e}")
print(f"\n Expected: friction spikes, settle increases, eigenvalue error grows as Ξ΅ β†’ 0")
# ═══════════════════════════════════════════════════════════════
# 8. SIGN CANONICALIZATION (Theorem 5)
# ═══════════════════════════════════════════════════════════════
section("8. SIGN CANONICALIZATION")
A = make_symmetric(16, 4, device)
packet = conduit_solver(A)
V = packet.eigenvectors # (16, 4, 4) β€” sign-canonicalized
# Verify: largest absolute value per column is positive
for b in range(min(4, V.shape[0])):
for col in range(4):
v_col = V[b, :, col]
max_idx = v_col.abs().argmax()
max_val = v_col[max_idx].item()
ok = max_val > 0
if not ok:
print(f" FAIL: batch={b} col={col} max_val={max_val:.4f}")
# Test that sign flip of input produces same canonicalized output
A2 = A.clone()
packet2 = conduit_solver(A2)
V2 = packet2.eigenvectors
# Compare: should be identical since same matrix
v_match = torch.allclose(V, V2, atol=1e-5)
# Now test with slightly perturbed matrix β€” should be CLOSE but not identical
A3 = A + torch.randn_like(A) * 1e-6
A3 = (A3 + A3.transpose(-1, -2)) / 2
packet3 = conduit_solver(A3)
V3 = packet3.eigenvectors
# Measure stability: how much do eigenvectors change under tiny perturbation?
v_drift = (V - V3).pow(2).sum((-2, -1)).sqrt().mean().item()
print(f" Canonicalization preserves positive max entry: {v_match}")
print(f" Eigenvector drift under 1e-6 perturbation: {v_drift:.2e}")
print(f" VERDICT: Canonicalization {'STABLE' if v_drift < 0.01 else 'UNSTABLE'}")
# Edge case: near-degenerate eigenvalues β€” gauge instability?
evals_degen = torch.tensor([1.0, 3.0, 5.0, 5.001], device=device)
A_degen = make_with_eigenvalues(evals_degen, B=32)
p_degen = conduit_solver(A_degen)
V_degen = p_degen.eigenvectors
# How consistent are eigenvectors across batch for same eigenvalues?
# (Different random orthogonal bases, so eigenvectors differ)
# But the CANONICAL sign should be consistent per-column
max_entries = V_degen.abs().argmax(dim=-2) # (32, 4) β€” which row has max
max_entry_consistent = (max_entries == max_entries[0:1]).all(dim=0)
print(f"\n Near-degenerate (gap=0.001):")
print(f" Max-entry row consistent: {max_entry_consistent.tolist()}")
print(f" CONCERN: Degenerate columns may have inconsistent gauge")
# ═══════════════════════════════════════════════════════════════
# 9. REFINEMENT RESIDUAL ANALYSIS
# ═══════════════════════════════════════════════════════════════
section("9. REFINEMENT RESIDUAL")
B = 1024
A = make_symmetric(B, 4, device)
packet = conduit_solver(A)
rr = packet.refinement_residual
print(f" Samples: {B}")
print(f" Mean: {rr.mean():.2e}")
print(f" Std: {rr.std():.2e}")
print(f" Min: {rr.min():.2e}")
print(f" Max: {rr.max():.2e}")
print(f" < 1e-6: {(rr < 1e-6).float().mean():.1%}")
print(f" < 1e-4: {(rr < 1e-4).float().mean():.1%}")
print(f" VERDICT: {'UNIFORMLY TINY' if rr.max() < 1e-4 else 'HAS VARIATION'} "
f"β€” {'no discriminative signal' if rr.max() < 1e-4 else 'may carry signal'}")
# ═══════════════════════════════════════════════════════════════
# 10. STATIC RECONSTRUCTION TEST (Theorem 2 β€” static side)
# ═══════════════════════════════════════════════════════════════
section("10. STATIC RECONSTRUCTION β€” char_coeffs from eigenvalues")
B = 128
A = make_symmetric(B, 4, device)
packet = FLEighConduit(research=True).to(device)(A)
evals = packet.eigenvalues # (B, 4)
coeffs = packet.char_coeffs # (B, 4)
# The char poly of the SCALED matrix has coefficients that are elementary
# symmetric polynomials. Since FLEigh scales by ||A||/sqrt(n), we can
# verify the STRUCTURE rather than exact values.
#
# For the original (unscaled) eigenvalues, the elementary symmetric polys are:
# e1 = Σλᡒ, e2 = Ξ£α΅’<β±Ό Ξ»α΅’Ξ»β±Ό, e3 = Ξ£α΅’<β±Ό<β‚– Ξ»α΅’Ξ»β±ΌΞ»β‚–, e4 = Ξ Ξ»α΅’
# char poly: t⁴ - e1·t³ + e2·t² - e3·t + e4
# Verify Mstore is reconstructible from (Ξ», V)
evals_d = evals.double()
V = packet.eigenvectors.double()
Mstore = packet.mstore # (5, B, 4, 4) β€” research mode
# Reconstruct Mstore[k] from V, Ξ›, c
# Mstore[k] = Ξ£β±Ό c[n-j] Β· A^j (up to index mapping)
# Since A = V diag(Ξ») V^T, A^j = V diag(Ξ»^j) V^T
# So Mstore[k] is reconstructible from (Ξ», V)
A_d = A.double()
recon_errors = []
for k in range(1, 5):
Mk_actual = Mstore[k].to(device)
# Reconstruct via A^j: Mstore[k] = A*Mstore[k-1] + c[n-k+1]*I
# We just check if V * f(Ξ›) * V^T matches, for SOME f
# The simplest check: project Mstore[k] into eigenbasis
Mk_eigenbasis = torch.bmm(torch.bmm(V.transpose(-1, -2), Mk_actual), V)
# Should be diagonal (since Mstore[k] = polynomial in A)
off_diag = Mk_eigenbasis - torch.diag_embed(Mk_eigenbasis.diagonal(dim1=-2, dim2=-1))
off_diag_norm = off_diag.pow(2).sum((-2, -1)).sqrt().mean().item()
diag_norm = Mk_eigenbasis.diagonal(dim1=-2, dim2=-1).pow(2).sum(-1).sqrt().mean().item()
recon_errors.append(off_diag_norm)
print(f" Mstore[{k}]: off-diag norm = {off_diag_norm:.2e}, "
f"diag norm = {diag_norm:.2e}, "
f"ratio = {off_diag_norm / (diag_norm + 1e-10):.2e}")
print(f"\n VERDICT: Mstore IS diagonal in eigenbasis β†’ "
f"{'CONFIRMED reconstructible from (Ξ»,V)' if max(recon_errors) < 1e-4 else 'UNEXPECTED'}")
# ═══════════════════════════════════════════════════════════════
# 11. DYNAMIC NON-RECONSTRUCTIBILITY (Theorem 2 β€” dynamic side)
# ═══════════════════════════════════════════════════════════════
section("11. DYNAMIC NON-RECONSTRUCTIBILITY")
# Key test: can we predict friction from eigenvalues alone?
B = 2048
A = make_symmetric(B, 4, device)
packet = conduit_solver(A)
evals = packet.eigenvalues
friction = packet.friction
# Compute the "static prediction" of friction:
# p'(λᡒ) = Π_{j≠i} (λᡒ - λⱼ)
# The static friction proxy would be 5 / (|p'(Ξ»α΅’)| + Ξ΄)
# (5 iterations, each contributing ~1/|p'| if converged)
static_proxy = torch.zeros_like(friction)
for i in range(4):
dp = torch.ones(B, device=device)
for j in range(4):
if j != i:
dp = dp * (evals[:, i] - evals[:, j])
static_proxy[:, i] = 5.0 / (dp.abs() + 1e-8)
# How well does the static proxy predict actual friction?
for i in range(4):
corr = torch.corrcoef(
torch.stack([friction[:, i].cpu(), static_proxy[:, i].cpu()])
)[0, 1].item()
residual = (friction[:, i] - static_proxy[:, i]).abs()
print(f" Root {i}: corr(actual, static_proxy) = {corr:.4f}, "
f"residual mean = {residual.mean():.2f}, "
f"residual std = {residual.std():.2f}")
# The residual between actual and static proxy is the DYNAMIC EXCESS
# If this residual is non-trivial, friction carries information beyond eigenvalues
total_var = friction.var().item()
proxy_var = static_proxy.var().item()
residual_var = (friction - static_proxy).var().item()
print(f"\n Total friction variance: {total_var:.4f}")
print(f" Static proxy variance: {proxy_var:.4f}")
print(f" Residual (dynamic) variance: {residual_var:.4f}")
print(f" Dynamic fraction: {residual_var / (total_var + 1e-10):.1%}")
print(f"\n VERDICT: Dynamic excess is "
f"{'SIGNIFICANT' if residual_var / (total_var + 1e-10) > 0.05 else 'NEGLIGIBLE'}")
# ═══════════════════════════════════════════════════════════════
# 12. DIMENSION AGNOSTIC SCALING
# ═══════════════════════════════════════════════════════════════
section("12. DIMENSION AGNOSTIC SCALING")
solver = FLEighConduit().to(device)
for n in [3, 4, 5, 6, 8]:
B = 64
A = make_symmetric(B, n, device)
t0 = time.time()
packet = solver(A)
elapsed = time.time() - t0
parity = verify_parity(A, atol=1e-4)
fric_range = (packet.friction.min().item(), packet.friction.max().item())
settle_range = (packet.settle.min().item(), packet.settle.max().item())
print(f" n={n}: packet OK, parity={parity}, "
f"friction=[{fric_range[0]:.1f}, {fric_range[1]:.1f}], "
f"settle=[{settle_range[0]:.0f}, {settle_range[1]:.0f}], "
f"time={elapsed*1000:.1f}ms")
print(f"\n VERDICT: Scales cleanly across dimensions")
# ═══════════════════════════════════════════════════════════════
# 13. RESEARCH MODE β€” Mstore & Trajectory
# ═══════════════════════════════════════════════════════════════
section("13. RESEARCH MODE")
solver_r = FLEighConduit(research=True).to(device)
A = make_symmetric(8, 4, device)
packet = solver_r(A)
print(f" Mstore shape: {packet.mstore.shape}")
print(f" z_trajectory shape: {packet.z_trajectory.shape}")
print(f" dp_trajectory shape: {packet.dp_trajectory.shape}")
# Inspect one patch's Laguerre trajectory
print(f"\n Laguerre trajectory for patch 0:")
for ri in range(4):
z_path = packet.z_trajectory[0, ri].tolist()
dp_path = packet.dp_trajectory[0, ri].tolist()
final_eval = packet.eigenvalues[0, ri].item()
print(f" Root {ri} (final Ξ»={final_eval:.4f}):")
for t in range(5):
print(f" iter {t}: z={z_path[t]:8.4f} |p'|={dp_path[t]:10.4f}")
# Mstore progression
print(f"\n Mstore diagonal progression for patch 0:")
for k in range(1, 5):
diag = packet.mstore[k, 0].diagonal().tolist()
print(f" Mstore[{k}] diag: [{', '.join(f'{v:.4f}' for v in diag)}]")
# ═══════════════════════════════════════════════════════════════
# 14. FRECKLES CIFAR-10 β€” CLASS DISCRIMINABILITY
# ═══════════════════════════════════════════════════════════════
section("14. FRECKLES CIFAR-10 β€” CLASS DISCRIMINABILITY")
print(" Loading Freckles v40 and CIFAR-10...")
try:
from geolip_svae import load_model
import torchvision
import torchvision.transforms as T
freckles, cfg = load_model(hf_version='v40_freckles_noise', device=device)
freckles.eval()
transform = T.Compose([T.Resize(64), T.ToTensor()])
cifar = torchvision.datasets.CIFAR10(
root='/content/data', train=False, download=True, transform=transform)
loader = torch.utils.data.DataLoader(cifar, batch_size=64, shuffle=False)
CLASSES = ['airplane', 'auto', 'bird', 'cat', 'deer',
'dog', 'frog', 'horse', 'ship', 'truck']
# Collect conduit telemetry per class
class_friction = {c: [] for c in range(10)}
class_settle = {c: [] for c in range(10)}
conduit = FLEighConduit().to(device)
n_batches = 10 # quick pass
for batch_idx, (images, labels) in enumerate(loader):
if batch_idx >= n_batches:
break
with torch.no_grad():
out = freckles(images.to(device))
# Get the Gram matrices from the SVD
# enc_out is (B, N, V, D) β†’ Gram is (B*N, D, D)
S = out['svd']['S'] # (B, N, D)
B_img, N, D = S.shape
# Build Gram matrices from the enc_out
# The SVAE computes G = M^T M where M is enc_out reshaped
# For conduit testing, we construct symmetric matrices from S
# Actually, we need the Gram matrix that FLEigh decomposed.
# S^2 are the eigenvalues of G. We can construct G = Vt^T diag(S^2) Vt
Vt = out['svd']['Vt'] # (B, N, D, D)
S2 = S.pow(2) # eigenvalues of Gram matrix
G = torch.einsum('bnij,bnj,bnjk->bnik',
Vt.transpose(-2, -1), S2, Vt)
# G: (B, N, D, D) β€” the Gram matrices
G_flat = G.reshape(B_img * N, D, D)
packet = conduit(G_flat)
# Reshape friction back to (B, N, D)
fric = packet.friction.reshape(B_img, N, D)
sett = packet.settle.reshape(B_img, N, D)
for c in range(10):
mask = labels == c
if mask.sum() > 0:
class_friction[c].append(fric[mask].cpu())
class_settle[c].append(sett[mask].cpu())
# Analyze
print(f"\n Per-class friction statistics (mean across patches):\n")
print(f" {'Class':<10s} {'fric_mean':>10s} {'fric_std':>10s} "
f"{'settle_mean':>12s}")
print(f" {'-'*44}")
class_fric_means = []
for c in range(10):
if class_friction[c]:
fric_cat = torch.cat(class_friction[c])
sett_cat = torch.cat(class_settle[c])
fm = fric_cat.mean().item()
fs = fric_cat.std().item()
sm = sett_cat.mean().item()
class_fric_means.append(fm)
print(f" {CLASSES[c]:<10s} {fm:10.2f} {fs:10.2f} {sm:12.2f}")
if class_fric_means:
spread = max(class_fric_means) - min(class_fric_means)
mean_fric = np.mean(class_fric_means)
print(f"\n Inter-class friction spread: {spread:.2f}")
print(f" Mean friction: {mean_fric:.2f}")
print(f" Spread/Mean ratio: {spread/mean_fric:.2%}")
print(f"\n VERDICT: {'CLASS-DISCRIMINATIVE' if spread/mean_fric > 0.05 else 'NOT DISCRIMINATIVE'} "
f"friction signal")
except ImportError as e:
print(f" SKIPPED β€” missing dependency: {e}")
except Exception as e:
print(f" SKIPPED β€” error: {e}")
# ═══════════════════════════════════════════════════════════════
# SUMMARY
# ═══════════════════════════════════════════════════════════════
section("SUMMARY β€” ALL TESTS COMPLETE")
print(" Review each section's VERDICT above.")
print(" Key questions answered:")
print(" 1. Does FLEighConduit match FLEigh?")
print(" 2. Does friction correlate with spectral gaps?")
print(" 3. Does friction spike at near-degeneracy?")
print(" 4. Is the dynamic signal non-trivial at n=4?")
print(" 5. Are static conduits reconstructible from eigenvalues?")
print(" 6. Is sign canonicalization stable?")
print(" 7. Does friction differ across CIFAR-10 classes?")
print(" 8. Is refinement residual uniformly tiny?")
print(" 9. Does settle time carry signal?")
print(" 10. Does the system scale to higher n?")