WrinkleBrane / performance_benchmark.py
WCNegentropy's picture
📚 Updated with scientifically rigorous documentation
dc2b9f3 verified
#!/usr/bin/env python3
"""
WrinkleBrane Performance Benchmark Suite
Comprehensive analysis of scaling laws and optimization opportunities.
"""
import sys
from pathlib import Path
sys.path.append(str(Path(__file__).resolve().parent / "src"))
import torch
import numpy as np
import time
import matplotlib.pyplot as plt
from wrinklebrane.membrane_bank import MembraneBank
from wrinklebrane.codes import hadamard_codes, dct_codes, gaussian_codes
from wrinklebrane.slicer import make_slicer
from wrinklebrane.write_ops import store_pairs
from wrinklebrane.metrics import psnr, spectral_entropy_2d, gzip_ratio
def benchmark_memory_scaling():
"""Benchmark memory usage and performance across different scales."""
print("📊 Memory Scaling Benchmark")
print("="*40)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Test different membrane dimensions
configs = [
{"L": 32, "H": 16, "W": 16, "K": 16, "B": 1},
{"L": 64, "H": 32, "W": 32, "K": 32, "B": 1},
{"L": 128, "H": 64, "W": 64, "K": 64, "B": 1},
{"L": 256, "H": 128, "W": 128, "K": 128, "B": 1},
]
results = []
for config in configs:
L, H, W, K, B = config["L"], config["H"], config["W"], config["K"], config["B"]
print(f"Testing L={L}, H={H}, W={W}, K={K}, B={B}")
# Calculate memory footprint
membrane_memory = B * L * H * W * 4 # 4 bytes per float32
code_memory = L * K * 4
total_memory = membrane_memory + code_memory
# Setup
bank = MembraneBank(L=L, H=H, W=W, device=device)
bank.allocate(B)
C = hadamard_codes(L, K).to(device)
slicer = make_slicer(C)
patterns = torch.rand(K, H, W, device=device)
keys = torch.arange(K, device=device)
alphas = torch.ones(K, device=device)
# Benchmark write speed
start_time = time.time()
iterations = max(1, 100 // (L // 32)) # Scale iterations based on size
for _ in range(iterations):
M = store_pairs(bank.read(), C, keys, patterns, alphas)
bank.write(M - bank.read())
write_time = (time.time() - start_time) / iterations
# Benchmark read speed
start_time = time.time()
read_iterations = iterations * 10
for _ in range(read_iterations):
readouts = slicer(bank.read())
read_time = (time.time() - start_time) / read_iterations
# Calculate fidelity
readouts = slicer(bank.read()).squeeze(0)
avg_psnr = 0
for i in range(K):
psnr_val = psnr(patterns[i].cpu().numpy(), readouts[i].cpu().numpy())
avg_psnr += psnr_val
avg_psnr /= K
result = {
"config": config,
"memory_mb": total_memory / 1e6,
"write_time_ms": write_time * 1000,
"read_time_ms": read_time * 1000,
"write_throughput": K / write_time,
"read_throughput": K * B / read_time,
"fidelity_psnr": avg_psnr
}
results.append(result)
print(f" Memory: {result['memory_mb']:.2f}MB")
print(f" Write: {result['write_time_ms']:.2f}ms ({result['write_throughput']:.0f} patterns/sec)")
print(f" Read: {result['read_time_ms']:.2f}ms ({result['read_throughput']:.0f} readouts/sec)")
print(f" PSNR: {result['fidelity_psnr']:.1f}dB")
print()
return results
def benchmark_capacity_limits():
"""Test WrinkleBrane capacity limits and interference scaling."""
print("🧮 Capacity Limits Benchmark")
print("="*40)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
L, H, W, B = 64, 32, 32, 1
# Test increasing number of stored patterns
pattern_counts = [4, 8, 16, 32, 64, 128, 256]
results = []
for K in pattern_counts:
print(f"Testing {K} patterns...")
bank = MembraneBank(L=L, H=H, W=W, device=device)
bank.allocate(B)
C = hadamard_codes(L, K).to(device)
slicer = make_slicer(C)
# Generate random patterns
patterns = torch.rand(K, H, W, device=device)
keys = torch.arange(K, device=device)
alphas = torch.ones(K, device=device)
# Store patterns
M = store_pairs(bank.read(), C, keys, patterns, alphas)
bank.write(M - bank.read())
# Measure interference
readouts = slicer(bank.read()).squeeze(0)
# Calculate metrics
psnr_values = []
entropy_values = []
compression_values = []
for i in range(K):
psnr_val = psnr(patterns[i].cpu().numpy(), readouts[i].cpu().numpy())
entropy_val = spectral_entropy_2d(readouts[i])
compression_val = gzip_ratio(readouts[i])
psnr_values.append(psnr_val)
entropy_values.append(entropy_val)
compression_values.append(compression_val)
# Theoretical capacity based on orthogonality
theoretical_capacity = L # For perfect orthogonal codes
capacity_utilization = K / theoretical_capacity
result = {
"K": K,
"avg_psnr": np.mean(psnr_values),
"min_psnr": np.min(psnr_values),
"std_psnr": np.std(psnr_values),
"avg_entropy": np.mean(entropy_values),
"avg_compression": np.mean(compression_values),
"capacity_utilization": capacity_utilization
}
results.append(result)
print(f" PSNR: {result['avg_psnr']:.1f}±{result['std_psnr']:.1f}dB (min: {result['min_psnr']:.1f}dB)")
print(f" Entropy: {result['avg_entropy']:.3f}")
print(f" Compression: {result['avg_compression']:.3f}")
print(f" Capacity utilization: {result['capacity_utilization']:.1%}")
print()
return results
def benchmark_code_types():
"""Compare performance of different orthogonal code types."""
print("🧬 Code Types Benchmark")
print("="*40)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
L, H, W, K, B = 64, 32, 32, 32, 1
code_generators = {
"Hadamard": lambda: hadamard_codes(L, K).to(device),
"DCT": lambda: dct_codes(L, K).to(device),
"Gaussian": lambda: gaussian_codes(L, K).to(device)
}
results = {}
patterns = torch.rand(K, H, W, device=device)
keys = torch.arange(K, device=device)
alphas = torch.ones(K, device=device)
for name, code_gen in code_generators.items():
print(f"Testing {name} codes...")
# Setup
bank = MembraneBank(L=L, H=H, W=W, device=device)
bank.allocate(B)
C = code_gen()
slicer = make_slicer(C)
# Measure orthogonality
G = C.T @ C
I = torch.eye(K, device=device, dtype=C.dtype)
orthogonality_error = torch.norm(G - I).item()
# Store and retrieve patterns
M = store_pairs(bank.read(), C, keys, patterns, alphas)
bank.write(M - bank.read())
readouts = slicer(bank.read()).squeeze(0)
# Calculate fidelity metrics
psnr_values = []
for i in range(K):
psnr_val = psnr(patterns[i].cpu().numpy(), readouts[i].cpu().numpy())
psnr_values.append(psnr_val)
# Benchmark speed
start_time = time.time()
for _ in range(100):
M = store_pairs(bank.read(), C, keys, patterns, alphas)
write_time = (time.time() - start_time) / 100
start_time = time.time()
for _ in range(1000):
readouts = slicer(bank.read())
read_time = (time.time() - start_time) / 1000
result = {
"orthogonality_error": orthogonality_error,
"avg_psnr": np.mean(psnr_values),
"std_psnr": np.std(psnr_values),
"write_time_ms": write_time * 1000,
"read_time_ms": read_time * 1000
}
results[name] = result
print(f" Orthogonality error: {result['orthogonality_error']:.6f}")
print(f" PSNR: {result['avg_psnr']:.1f}±{result['std_psnr']:.1f}dB")
print(f" Write time: {result['write_time_ms']:.3f}ms")
print(f" Read time: {result['read_time_ms']:.3f}ms")
print()
return results
def benchmark_gpu_acceleration():
"""Compare CPU vs GPU performance if available."""
print("⚡ GPU Acceleration Benchmark")
print("="*40)
if not torch.cuda.is_available():
print("CUDA not available, skipping GPU benchmark")
return None
L, H, W, K, B = 128, 64, 64, 64, 4
patterns = torch.rand(K, H, W)
keys = torch.arange(K)
alphas = torch.ones(K)
devices = [torch.device("cpu"), torch.device("cuda")]
results = {}
for device in devices:
print(f"Testing on {device}...")
# Setup
bank = MembraneBank(L=L, H=H, W=W, device=device)
bank.allocate(B)
C = hadamard_codes(L, K).to(device)
slicer = make_slicer(C)
patterns_dev = patterns.to(device)
keys_dev = keys.to(device)
alphas_dev = alphas.to(device)
# Warmup
for _ in range(10):
M = store_pairs(bank.read(), C, keys_dev, patterns_dev, alphas_dev)
bank.write(M - bank.read())
readouts = slicer(bank.read())
if device.type == "cuda":
torch.cuda.synchronize()
# Benchmark write
start_time = time.time()
for _ in range(100):
M = store_pairs(bank.read(), C, keys_dev, patterns_dev, alphas_dev)
bank.write(M - bank.read())
if device.type == "cuda":
torch.cuda.synchronize()
write_time = (time.time() - start_time) / 100
# Benchmark read
start_time = time.time()
for _ in range(1000):
readouts = slicer(bank.read())
if device.type == "cuda":
torch.cuda.synchronize()
read_time = (time.time() - start_time) / 1000
result = {
"write_time_ms": write_time * 1000,
"read_time_ms": read_time * 1000,
"write_throughput": K * B / write_time,
"read_throughput": K * B / read_time
}
results[str(device)] = result
print(f" Write: {result['write_time_ms']:.2f}ms ({result['write_throughput']:.0f} patterns/sec)")
print(f" Read: {result['read_time_ms']:.2f}ms ({result['read_throughput']:.0f} readouts/sec)")
print()
# Calculate speedup
if len(results) == 2:
cpu_result = results["cpu"]
gpu_result = results["cuda"]
write_speedup = cpu_result["write_time_ms"] / gpu_result["write_time_ms"]
read_speedup = cpu_result["read_time_ms"] / gpu_result["read_time_ms"]
print(f"GPU Speedup - Write: {write_speedup:.1f}x, Read: {read_speedup:.1f}x")
return results
def main():
"""Run comprehensive WrinkleBrane performance benchmark suite."""
print("⚡ WrinkleBrane Performance Benchmark Suite")
print("="*50)
# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)
try:
memory_results = benchmark_memory_scaling()
capacity_results = benchmark_capacity_limits()
code_results = benchmark_code_types()
gpu_results = benchmark_gpu_acceleration()
print("="*50)
print("📈 Performance Summary:")
print("="*50)
# Memory scaling summary
if memory_results:
largest = memory_results[-1]
print(f"Largest tested configuration:")
print(f" L={largest['config']['L']}, Memory: {largest['memory_mb']:.1f}MB")
print(f" Write throughput: {largest['write_throughput']:.0f} patterns/sec")
print(f" Read throughput: {largest['read_throughput']:.0f} readouts/sec")
print(f" Fidelity: {largest['fidelity_psnr']:.1f}dB")
# Capacity summary
if capacity_results:
max_capacity = capacity_results[-1]
print(f"\nMaximum tested capacity: {max_capacity['K']} patterns")
print(f" Average PSNR: {max_capacity['avg_psnr']:.1f}dB")
print(f" Capacity utilization: {max_capacity['capacity_utilization']:.1%}")
# Code comparison summary
if code_results:
best_code = min(code_results.items(), key=lambda x: x[1]['orthogonality_error'])
print(f"\nBest orthogonal codes: {best_code[0]}")
print(f" Orthogonality error: {best_code[1]['orthogonality_error']:.6f}")
print(f" Average PSNR: {best_code[1]['avg_psnr']:.1f}dB")
print("\n✅ WrinkleBrane Performance Analysis Complete!")
except Exception as e:
print(f"\n❌ Benchmark failed with error: {e}")
import traceback
traceback.print_exc()
return False
return True
if __name__ == "__main__":
main()