"""Convert spectral analysis JSON results to CSV tables for analysis.""" import json import csv import sys import os import re from pathlib import Path def classify_layer(name, model_type): """Classify a weight matrix by layer index, component type, and phase.""" if model_type == "prisma": # mirror_blocks.N.component m = re.match(r'mirror_blocks\.(\d+)\.', name) if m: layer_idx = int(m.group(1)) phase = "mirror" if 'attn' in name: comp = 'Q' if 'q_proj' in name else 'K' if 'k_proj' in name else 'V' if 'v_proj' in name else 'O' if 'o_proj' in name else 'attn' elif 'ffn.w3' in name or 'gate_expand' in name: comp = 'W3' elif 'ffn.w4' in name or 'gate_compress' in name: comp = 'W4' elif 'ffn.w1' in name: comp = 'W1' elif 'w2' in name: comp = 'W2' else: comp = 'other' return layer_idx, comp, phase m = re.match(r'middle_blocks\.(\d+)\.', name) if m: layer_idx = int(m.group(1)) phase = "middle" if 'attn' in name: comp = 'Q' if 'q_proj' in name else 'K' if 'k_proj' in name else 'V' if 'v_proj' in name else 'O' if 'o_proj' in name else 'attn' elif 'gate' in name: comp = 'W3' elif 'ffn.w1' in name: comp = 'W1' elif 'ffn.w2' in name: comp = 'W2' else: comp = 'other' return layer_idx, comp, phase m = re.match(r'(first|last)_block\.', name) if m: phase = m.group(1) if 'attn' in name: comp = 'Q' if 'q_proj' in name else 'K' if 'k_proj' in name else 'V' if 'v_proj' in name else 'O' if 'o_proj' in name else 'attn' elif 'ffn.w3' in name or 'gate' in name: comp = 'W3' elif 'ffn.w4' in name: comp = 'W4' elif 'ffn.w1' in name: comp = 'W1' elif 'ffn.w2' in name: comp = 'W2' else: comp = 'other' return 0, comp, phase if 'embed' in name: return -1, 'embed', 'embed' if 'head' in name or 'lm_head' in name: return 99, 'head', 'head' return -1, 'other', 'other' else: # GPT-2 style m = re.match(r'transformer\.h\.(\d+)\.', name) if m: layer_idx = int(m.group(1)) if 'c_attn' in name: comp = 'QKV' elif 'c_proj' in name and 'mlp' not in name: comp = 'O' elif 'c_fc' in name: comp = 'W1' elif 'mlp.c_proj' in name: comp = 'W2' else: comp = 'other' return layer_idx, comp, "layer" if 'wte' in name: return -1, 'embed', 'embed' if 'wpe' in name: return -1, 'pos_embed', 'embed' return -1, 'other', 'other' def json_to_csvs(json_path, output_dir, model_type="prisma"): with open(json_path) as f: data = json.load(f) os.makedirs(output_dir, exist_ok=True) # 1. Full weight matrix summary rows = [] for name, info in data.items(): if 'activation' in name or name.startswith('_'): continue layer_idx, comp, phase = classify_layer(name, model_type) rows.append({ 'name': name, 'layer_idx': layer_idx, 'component': comp, 'phase': phase, 'shape': 'x'.join(str(s) for s in info['shape']), 'effective_rank': round(info['effective_rank'], 2), 'stable_rank': round(info['stable_rank'], 3), 'spectral_norm': round(info['spectral_norm'], 4), 'frobenius_norm': round(info['frobenius_norm'], 4), 'alpha': round(info['alpha'], 4), 'alpha_r2': round(info['alpha_r2'], 4), 'signal_ratio': round(info['signal_ratio'], 4), 'condition_number': round(info['condition_number'], 2), 'mp_bound': round(info['mp_bound'], 4), 'n_above_mp': info['n_above_mp'], 'n_total': info['n_total'], 'sv_1': round(info['top_10_sv'][0], 4) if info['top_10_sv'] else 0, 'sv_2': round(info['top_10_sv'][1], 4) if len(info['top_10_sv']) > 1 else 0, 'sv_10': round(info['top_10_sv'][9], 4) if len(info['top_10_sv']) > 9 else 0, 'sv1_sv2_ratio': round(info['top_10_sv'][0] / info['top_10_sv'][1], 4) if len(info['top_10_sv']) > 1 and info['top_10_sv'][1] > 0 else 0, }) with open(os.path.join(output_dir, 'weights_full.csv'), 'w', newline='') as f: w = csv.DictWriter(f, fieldnames=rows[0].keys()) w.writeheader() w.writerows(sorted(rows, key=lambda r: (r['phase'], r['layer_idx'], r['component']))) # 2. Layer-level FFN summary (W1 progression = the lens) ffn_rows = [r for r in rows if r['component'] == 'W1'] with open(os.path.join(output_dir, 'ffn_w1_progression.csv'), 'w', newline='') as f: w = csv.DictWriter(f, fieldnames=['layer_idx', 'phase', 'effective_rank', 'stable_rank', 'alpha', 'alpha_r2', 'signal_ratio', 'condition_number', 'sv1_sv2_ratio']) w.writeheader() for r in sorted(ffn_rows, key=lambda r: (r['phase'], r['layer_idx'])): w.writerow({k: r[k] for k in w.fieldnames}) # 3. Gate comparison (W3 vs W4) gate_rows = [r for r in rows if r['component'] in ('W3', 'W4') and r['phase'] == 'mirror'] with open(os.path.join(output_dir, 'gate_comparison.csv'), 'w', newline='') as f: w = csv.DictWriter(f, fieldnames=['layer_idx', 'component', 'effective_rank', 'stable_rank', 'alpha', 'alpha_r2', 'signal_ratio', 'sv1_sv2_ratio']) w.writeheader() for r in sorted(gate_rows, key=lambda r: (r['layer_idx'], r['component'])): w.writerow({k: r[k] for k in w.fieldnames}) # 4. Attention head comparison (Q, K, V, O per layer) attn_rows = [r for r in rows if r['component'] in ('Q', 'K', 'V', 'O', 'QKV')] with open(os.path.join(output_dir, 'attention_progression.csv'), 'w', newline='') as f: w = csv.DictWriter(f, fieldnames=['layer_idx', 'phase', 'component', 'effective_rank', 'stable_rank', 'alpha', 'signal_ratio', 'condition_number']) w.writeheader() for r in sorted(attn_rows, key=lambda r: (r['phase'], r['layer_idx'], r['component'])): w.writerow({k: r[k] for k in w.fieldnames}) # 5. Summary statistics alphas = [r['alpha'] for r in rows if r['alpha'] > 0] eff_ranks = [r['effective_rank'] for r in rows if r['layer_idx'] >= 0] signal_ratios = [r['signal_ratio'] for r in rows if r['layer_idx'] >= 0] summary = { 'n_matrices': len(rows), 'mean_alpha': round(sum(alphas) / len(alphas), 4) if alphas else 0, 'min_alpha': round(min(alphas), 4) if alphas else 0, 'max_alpha': round(max(alphas), 4) if alphas else 0, 'mean_effective_rank': round(sum(eff_ranks) / len(eff_ranks), 2) if eff_ranks else 0, 'mean_signal_ratio': round(sum(signal_ratios) / len(signal_ratios), 4) if signal_ratios else 0, 'n_well_trained (alpha<2)': sum(1 for a in alphas if a < 2.0), 'n_total_alpha': len(alphas), } with open(os.path.join(output_dir, 'summary.csv'), 'w', newline='') as f: w = csv.DictWriter(f, fieldnames=summary.keys()) w.writeheader() w.writerow(summary) print(f"Wrote CSVs to {output_dir}/") print(f" weights_full.csv ({len(rows)} matrices)") print(f" ffn_w1_progression.csv ({len(ffn_rows)} layers)") print(f" gate_comparison.csv ({len(gate_rows)} entries)") print(f" attention_progression.csv ({len(attn_rows)} entries)") print(f" summary.csv") if __name__ == '__main__': base = "circuits/scripts/spectral_output/mirrored_300M_mk4_cont" # Prisma json_to_csvs( f"{base}/results.json", f"{base}/csv_prisma", model_type="prisma" ) # GPT-2 medium if os.path.exists(f"{base}/results_b.json"): json_to_csvs( f"{base}/results_b.json", f"{base}/csv_gpt2", model_type="gpt2" )