"""Tests for the 5 novel breakthrough analysis modules: 1. Riemannian Refusal Manifold Discovery 2. Anti-Ouroboros Adversarial Self-Repair Probing 3. Conditional Abliteration with Category-Selective Projection Fields 4. Wasserstein Refusal Transfer Across Architectures 5. Spectral Abliteration Completeness Certification """ from __future__ import annotations import math import torch from obliteratus.analysis.riemannian_manifold import ( RiemannianManifoldAnalyzer, RiemannianRefusalManifold, GeodesicProjectionResult, ) from obliteratus.analysis.anti_ouroboros import ( AntiOuroborosProber, ASRGResult, ) from obliteratus.analysis.conditional_abliteration import ( ConditionalAbliterator, ConditionalAbliterationResult, CategoryProjector, ) from obliteratus.analysis.wasserstein_transfer import ( WassersteinRefusalTransfer, WassersteinTransferResult, TransferredDirection, ) from obliteratus.analysis.spectral_certification import ( SpectralCertifier, SpectralCertificate, CertificationLevel, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_activations(hidden_dim=32, n_per_class=30, separation=2.0, seed=42): """Create harmful/harmless activations with planted refusal signal.""" torch.manual_seed(seed) direction = torch.randn(hidden_dim) direction = direction / direction.norm() harmful = torch.randn(n_per_class, hidden_dim) * 0.3 + separation * direction harmless = torch.randn(n_per_class, hidden_dim) * 0.3 return harmful, harmless, direction def _make_multilayer_activations( n_layers=6, hidden_dim=32, n_per_class=30, separation=2.0, seed=42, ): """Create per-layer activations with planted refusal signals.""" torch.manual_seed(seed) base_dir = torch.randn(hidden_dim) base_dir = base_dir / base_dir.norm() harmful_dict = {} harmless_dict = {} direction_dict = {} for layer in range(n_layers): # Rotate direction slightly per layer to simulate non-trivial geometry rotation = torch.randn(hidden_dim) * 0.1 layer_dir = base_dir + rotation * (layer / n_layers) layer_dir = layer_dir / layer_dir.norm() harmful_dict[layer] = ( torch.randn(n_per_class, hidden_dim) * 0.3 + separation * layer_dir ) harmless_dict[layer] = torch.randn(n_per_class, hidden_dim) * 0.3 direction_dict[layer] = layer_dir return harmful_dict, harmless_dict, direction_dict def _make_category_activations( categories=("weapons", "cyber", "fraud"), hidden_dim=32, n_per_category=15, seed=42, ): """Create per-category harmful activations with distinct directions.""" torch.manual_seed(seed) category_acts = {} for i, cat in enumerate(categories): # Each category gets a distinct direction direction = torch.zeros(hidden_dim) direction[i * 3: i * 3 + 3] = 1.0 direction = direction / direction.norm() category_acts[cat] = ( torch.randn(n_per_category, hidden_dim) * 0.3 + 2.0 * direction ) harmless = torch.randn(n_per_category, hidden_dim) * 0.3 return category_acts, harmless # =========================================================================== # 1. Riemannian Refusal Manifold Discovery # =========================================================================== class TestRiemannianManifold: def test_analyzer_creation(self): analyzer = RiemannianManifoldAnalyzer() assert analyzer.n_sample_points == 50 assert analyzer.curvature_flatness_threshold == 0.01 def test_analyze_basic(self): harmful_dict, harmless_dict, _ = _make_multilayer_activations() analyzer = RiemannianManifoldAnalyzer(n_sample_points=10) result = analyzer.analyze(harmful_dict, harmless_dict) assert isinstance(result, RiemannianRefusalManifold) assert result.ambient_dimension == 32 assert result.intrinsic_dimension >= 1 assert result.dimension_ratio > 0 assert result.recommendation in ("linear_sufficient", "geodesic_recommended") def test_curvature_estimation(self): harmful_dict, harmless_dict, _ = _make_multilayer_activations() analyzer = RiemannianManifoldAnalyzer(n_sample_points=10) result = analyzer.analyze(harmful_dict, harmless_dict) assert isinstance(result.mean_sectional_curvature, float) assert isinstance(result.max_sectional_curvature, float) assert result.curvature_std >= 0 def test_layer_curvatures(self): harmful_dict, harmless_dict, _ = _make_multilayer_activations(n_layers=4) analyzer = RiemannianManifoldAnalyzer(n_sample_points=5) result = analyzer.analyze(harmful_dict, harmless_dict) assert len(result.layer_curvatures) > 0 assert len(result.layer_intrinsic_dims) > 0 def test_geodesic_diameter(self): harmful_dict, harmless_dict, dir_dict = _make_multilayer_activations() analyzer = RiemannianManifoldAnalyzer() result = analyzer.analyze(harmful_dict, harmless_dict, dir_dict) assert result.geodesic_diameter >= 0 # Geodesic diameter on the sphere is at most pi assert result.geodesic_diameter <= math.pi + 0.01 def test_geodesic_projection(self): harmful, harmless, direction = _make_activations() analyzer = RiemannianManifoldAnalyzer(n_sample_points=5) result = analyzer.compute_geodesic_projection( harmful[0], direction, harmful, layer_idx=0 ) assert isinstance(result, GeodesicProjectionResult) assert result.original_refusal_component > 0 assert result.improvement_factor >= 1.0 def test_empty_input(self): analyzer = RiemannianManifoldAnalyzer() result = analyzer.analyze({}, {}) assert result.intrinsic_dimension == 0 assert result.recommendation == "linear_sufficient" def test_with_precomputed_directions(self): harmful_dict, harmless_dict, dir_dict = _make_multilayer_activations() analyzer = RiemannianManifoldAnalyzer(n_sample_points=5) result = analyzer.analyze(harmful_dict, harmless_dict, dir_dict) assert result.ambient_dimension == 32 assert result.geodesic_vs_euclidean_ratio > 0 def test_flat_manifold_detection(self): """When activations are purely linear, curvature should be near zero.""" torch.manual_seed(99) d = 32 # Create activations along a perfectly linear direction direction = torch.randn(d) direction = direction / direction.norm() harmful = {0: direction.unsqueeze(0).repeat(20, 1) + torch.randn(20, d) * 0.01} harmless = {0: torch.randn(20, d) * 0.01} analyzer = RiemannianManifoldAnalyzer( n_sample_points=5, curvature_flatness_threshold=1.0 ) result = analyzer.analyze(harmful, harmless) # With very concentrated activations, curvature should be manageable assert isinstance(result.is_approximately_flat, bool) # =========================================================================== # 2. Anti-Ouroboros Adversarial Self-Repair Probing # =========================================================================== class TestAntiOuroboros: def test_prober_creation(self): prober = AntiOuroborosProber() assert prober.repair_threshold == 0.05 def test_build_asrg_from_strengths(self): refusal_strengths = {0: 0.2, 1: 0.5, 2: 0.8, 3: 0.6, 4: 0.3, 5: 0.1} prober = AntiOuroborosProber() result = prober.build_asrg(refusal_strengths) assert isinstance(result, ASRGResult) assert result.n_nodes == 6 assert result.n_edges > 0 assert result.spectral_gap >= 0 assert result.self_repair_risk in ("low", "medium", "high", "extreme") def test_repair_hubs_identified(self): # Layer 3 has peak refusal — it should be a repair hub or # be first in vulnerability ordering refusal_strengths = {0: 0.1, 1: 0.2, 2: 0.5, 3: 0.9, 4: 0.3, 5: 0.1} prober = AntiOuroborosProber(hub_percentile=0.8) result = prober.build_asrg(refusal_strengths) assert len(result.vulnerability_ordering) == 6 # Layer 3 should be near the top of vulnerability ordering assert 3 in result.vulnerability_ordering[:3] def test_with_self_repair_data(self): refusal_strengths = {0: 0.3, 1: 0.6, 2: 0.4} self_repair_results = [ { "ablated_layer": 1, "compensating_layers": [0, 2], "repair_ratios": [0.2, 0.5], }, ] prober = AntiOuroborosProber() result = prober.build_asrg(refusal_strengths, self_repair_results) assert result.n_edges >= 2 # Edge from layer 1 to layer 2 should have weight 0.5 edge_12 = [e for e in result.edges if e.source_layer == 1 and e.target_layer == 2] assert len(edge_12) == 1 assert abs(edge_12[0].repair_weight - 0.5) < 1e-6 def test_spectral_gap(self): refusal_strengths = {i: 0.5 for i in range(8)} prober = AntiOuroborosProber() result = prober.build_asrg(refusal_strengths) assert result.spectral_gap >= 0 assert result.algebraic_connectivity >= 0 def test_min_ablations_bound(self): refusal_strengths = {i: 0.3 + i * 0.1 for i in range(6)} prober = AntiOuroborosProber() result = prober.build_asrg(refusal_strengths) assert result.min_simultaneous_ablations >= 1 assert result.min_simultaneous_ablations <= 6 assert len(result.recommended_ablation_set) == result.min_simultaneous_ablations def test_empty_input(self): prober = AntiOuroborosProber() result = prober.build_asrg({0: 0.5}) assert result.n_nodes == 1 assert result.self_repair_risk == "low" def test_estimated_passes(self): # High self-repair should require more passes refusal_strengths = {i: 0.8 for i in range(10)} prober = AntiOuroborosProber() result = prober.build_asrg(refusal_strengths) assert result.estimated_passes_needed >= 1 def test_repair_locality(self): refusal_strengths = {i: 0.5 for i in range(6)} prober = AntiOuroborosProber() result = prober.build_asrg(refusal_strengths) assert 0 <= result.repair_locality <= 1 # =========================================================================== # 3. Conditional Abliteration # =========================================================================== class TestConditionalAbliteration: def test_abliterator_creation(self): abliterator = ConditionalAbliterator() assert abliterator.selectivity_threshold == 0.7 def test_analyze_basic(self): category_acts, harmless = _make_category_activations() abliterator = ConditionalAbliterator(min_samples_per_category=5) result = abliterator.analyze(category_acts, harmless) assert isinstance(result, ConditionalAbliterationResult) assert result.n_categories > 0 assert len(result.projectors) > 0 def test_category_projectors(self): category_acts, harmless = _make_category_activations() abliterator = ConditionalAbliterator(min_samples_per_category=5) result = abliterator.analyze(category_acts, harmless) for proj in result.projectors: assert isinstance(proj, CategoryProjector) assert proj.condition_vector.shape == (32,) assert proj.projection_direction.shape == (32,) assert 0 <= proj.selectivity <= 1 def test_selectivity(self): """Categories with distinct directions should have high selectivity.""" category_acts, harmless = _make_category_activations( categories=("weapons", "cyber", "fraud"), hidden_dim=32, n_per_category=20, ) abliterator = ConditionalAbliterator( selectivity_threshold=0.3, min_samples_per_category=5, ) result = abliterator.analyze(category_acts, harmless) # With well-separated categories, selectivity should be reasonable assert result.mean_selectivity > 0 def test_orthogonality(self): category_acts, harmless = _make_category_activations() abliterator = ConditionalAbliterator(min_samples_per_category=5) result = abliterator.analyze(category_acts, harmless) assert 0 <= result.orthogonality_score <= 1 def test_sheaf_consistency(self): category_acts, harmless = _make_category_activations() abliterator = ConditionalAbliterator(min_samples_per_category=5) result = abliterator.analyze(category_acts, harmless) assert 0 <= result.sheaf_consistency_score <= 1 assert isinstance(result.consistency_violations, list) def test_leakage_matrix(self): category_acts, harmless = _make_category_activations() abliterator = ConditionalAbliterator(min_samples_per_category=5) result = abliterator.analyze(category_acts, harmless) # Leakage matrix should be square with n_categories assert result.cross_category_leakage.shape[0] == result.n_categories def test_empty_categories(self): abliterator = ConditionalAbliterator() result = abliterator.analyze({}, torch.randn(10, 32)) assert result.n_categories == 0 assert len(result.projectors) == 0 def test_too_few_samples(self): """Categories with too few samples should be skipped.""" category_acts = {"weapons": torch.randn(2, 32)} # only 2 samples harmless = torch.randn(10, 32) abliterator = ConditionalAbliterator(min_samples_per_category=5) result = abliterator.analyze(category_acts, harmless) assert result.n_categories == 0 def test_viable_vs_risky(self): category_acts, harmless = _make_category_activations() abliterator = ConditionalAbliterator( selectivity_threshold=0.3, min_samples_per_category=5, ) result = abliterator.analyze(category_acts, harmless) # All categories should be either viable or risky total = len(result.viable_categories) + len(result.risky_categories) assert total == result.n_categories # =========================================================================== # 4. Wasserstein Refusal Transfer # =========================================================================== class TestWassersteinTransfer: def test_transfer_creation(self): transfer = WassersteinRefusalTransfer() assert transfer.fidelity_threshold == 0.5 def test_compute_transfer_same_model(self): """Transfer from a model to itself should have high fidelity.""" harmful_dict, harmless_dict, dir_dict = _make_multilayer_activations( n_layers=4, hidden_dim=32 ) transfer = WassersteinRefusalTransfer() result = transfer.compute_transfer( source_activations=harmful_dict, target_activations=harmful_dict, # same activations source_refusal_directions=dir_dict, source_model_name="model_a", target_model_name="model_a", ) assert isinstance(result, WassersteinTransferResult) assert result.n_layers_transferred > 0 assert result.wasserstein_distance < float("inf") def test_compute_transfer_different_models(self): """Transfer between different models.""" src_h, src_b, src_dirs = _make_multilayer_activations( n_layers=4, hidden_dim=32, seed=42 ) tgt_h, tgt_b, _ = _make_multilayer_activations( n_layers=4, hidden_dim=32, seed=99 ) transfer = WassersteinRefusalTransfer() result = transfer.compute_transfer( source_activations=src_h, target_activations=tgt_h, source_refusal_directions=src_dirs, source_model_name="llama", target_model_name="yi", ) assert result.n_layers_transferred > 0 assert result.transfer_viability in ("excellent", "good", "marginal", "poor") def test_layer_mapping(self): """Layer mapping with different layer counts.""" src_h, _, src_dirs = _make_multilayer_activations( n_layers=6, hidden_dim=32 ) tgt_h, _, _ = _make_multilayer_activations( n_layers=4, hidden_dim=32, seed=99 ) transfer = WassersteinRefusalTransfer() result = transfer.compute_transfer( source_activations=src_h, target_activations=tgt_h, source_refusal_directions=src_dirs, ) assert len(result.layer_mapping) > 0 def test_explicit_layer_mapping(self): src_h, _, src_dirs = _make_multilayer_activations( n_layers=4, hidden_dim=32 ) tgt_h, _, _ = _make_multilayer_activations( n_layers=4, hidden_dim=32, seed=99 ) transfer = WassersteinRefusalTransfer() result = transfer.compute_transfer( source_activations=src_h, target_activations=tgt_h, source_refusal_directions=src_dirs, layer_mapping={0: 0, 1: 1, 2: 2, 3: 3}, ) assert result.n_layers_transferred == 4 def test_transferred_directions(self): src_h, _, src_dirs = _make_multilayer_activations( n_layers=3, hidden_dim=32 ) tgt_h, _, _ = _make_multilayer_activations( n_layers=3, hidden_dim=32, seed=99 ) transfer = WassersteinRefusalTransfer() result = transfer.compute_transfer( source_activations=src_h, target_activations=tgt_h, source_refusal_directions=src_dirs, ) for td in result.transferred_directions: assert isinstance(td, TransferredDirection) assert td.transferred_direction.shape == (32,) # Direction should be approximately unit norm assert abs(td.transferred_direction.norm().item() - 1.0) < 0.1 or \ td.transferred_direction.norm().item() < 0.1 def test_empty_input(self): transfer = WassersteinRefusalTransfer() result = transfer.compute_transfer({}, {}, {}) assert result.n_layers_transferred == 0 assert result.transfer_viability == "poor" def test_recommendation_generated(self): src_h, _, src_dirs = _make_multilayer_activations(n_layers=3) tgt_h, _, _ = _make_multilayer_activations(n_layers=3, seed=99) transfer = WassersteinRefusalTransfer() result = transfer.compute_transfer( source_activations=src_h, target_activations=tgt_h, source_refusal_directions=src_dirs, ) assert isinstance(result.recommendation, str) assert len(result.recommendation) > 10 # =========================================================================== # 5. Spectral Abliteration Completeness Certification # =========================================================================== class TestSpectralCertification: def test_certifier_creation(self): certifier = SpectralCertifier() assert certifier.confidence_level == 0.95 def test_certify_complete_abliteration(self): """After successful abliteration, should certify GREEN.""" torch.manual_seed(42) d = 32 n = 50 # Post-abliteration: harmful and harmless should be indistinguishable harmful = torch.randn(n, d) * 0.3 harmless = torch.randn(n, d) * 0.3 certifier = SpectralCertifier() result = certifier.certify(harmful, harmless) assert isinstance(result, SpectralCertificate) # With no signal, should be GREEN assert result.level == CertificationLevel.GREEN def test_certify_incomplete_abliteration(self): """With clear residual refusal signal, should certify RED.""" torch.manual_seed(42) d = 32 n = 50 direction = torch.randn(d) direction = direction / direction.norm() # Strong residual signal harmful = torch.randn(n, d) * 0.3 + 5.0 * direction harmless = torch.randn(n, d) * 0.3 certifier = SpectralCertifier() result = certifier.certify(harmful, harmless) assert result.level == CertificationLevel.RED assert result.n_eigenvalues_above_threshold > 0 assert result.eigenvalue_margin > 0 def test_bbp_threshold(self): torch.manual_seed(42) harmful = torch.randn(30, 32) * 0.3 harmless = torch.randn(30, 32) * 0.3 certifier = SpectralCertifier() result = certifier.certify(harmful, harmless) assert result.bbp_threshold > 0 assert result.mp_upper_edge > 0 assert result.noise_variance > 0 def test_anisotropic_correction(self): """Non-isotropic BBP extension should increase the threshold.""" torch.manual_seed(42) harmful = torch.randn(30, 32) * 0.3 harmless = torch.randn(30, 32) * 0.3 certifier = SpectralCertifier() result = certifier.certify(harmful, harmless) assert result.condition_number >= 1.0 assert result.anisotropy_correction >= 1.0 assert result.anisotropic_threshold >= result.isotropic_threshold def test_sample_sufficiency(self): torch.manual_seed(42) harmful = torch.randn(10, 32) * 0.3 harmless = torch.randn(10, 32) * 0.3 certifier = SpectralCertifier(min_samples=50) result = certifier.certify(harmful, harmless) assert result.n_samples_used == 20 assert result.n_samples_required >= 50 def test_certify_all_layers(self): harmful_dict, harmless_dict, _ = _make_multilayer_activations(n_layers=4) certifier = SpectralCertifier() results = certifier.certify_all_layers(harmful_dict, harmless_dict) assert len(results) == 4 for layer_idx, cert in results.items(): assert isinstance(cert, SpectralCertificate) def test_overall_certification(self): harmful_dict, harmless_dict, _ = _make_multilayer_activations(n_layers=4) certifier = SpectralCertifier() layer_certs = certifier.certify_all_layers(harmful_dict, harmless_dict) overall = certifier.overall_certification(layer_certs) assert overall is not None assert isinstance(overall.level, CertificationLevel) def test_signal_analysis(self): torch.manual_seed(42) d = 32 n = 40 direction = torch.randn(d) direction = direction / direction.norm() harmful = torch.randn(n, d) * 0.3 + 3.0 * direction harmless = torch.randn(n, d) * 0.3 certifier = SpectralCertifier() result = certifier.certify(harmful, harmless) assert result.signal_to_noise_ratio >= 0 assert result.signal_energy >= 0 assert result.noise_energy >= 0 def test_recommendation_text(self): torch.manual_seed(42) harmful = torch.randn(30, 32) * 0.3 harmless = torch.randn(30, 32) * 0.3 certifier = SpectralCertifier() result = certifier.certify(harmful, harmless) assert isinstance(result.recommendation, str) assert len(result.recommendation) > 10 assert result.suggested_action in ( "none", "more_directions", "grp_obliteration", "more_samples" ) # =========================================================================== # Integration: All modules importable from analysis package # =========================================================================== class TestImports: def test_import_riemannian(self): from obliteratus.analysis import RiemannianManifoldAnalyzer assert RiemannianManifoldAnalyzer is not None def test_import_anti_ouroboros(self): from obliteratus.analysis import AntiOuroborosProber assert AntiOuroborosProber is not None def test_import_conditional(self): from obliteratus.analysis import ConditionalAbliterator assert ConditionalAbliterator is not None def test_import_wasserstein_transfer(self): from obliteratus.analysis import WassersteinRefusalTransfer assert WassersteinRefusalTransfer is not None def test_import_spectral_certifier(self): from obliteratus.analysis import SpectralCertifier, CertificationLevel assert SpectralCertifier is not None assert CertificationLevel.GREEN.value == "certified_complete"