"""Edge-case and robustness tests. Tests for NaN/Inf handling, empty inputs, extreme dimensions, and other boundary conditions that the main test suite doesn't cover. """ from __future__ import annotations import math import pytest import torch import torch.nn as nn from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor from obliteratus.analysis.cross_layer import CrossLayerAlignmentAnalyzer from obliteratus.analysis.concept_geometry import ConceptConeAnalyzer from obliteratus.analysis.alignment_imprint import AlignmentImprintDetector from obliteratus.analysis.multi_token_position import MultiTokenPositionAnalyzer from obliteratus.analysis.sparse_surgery import SparseDirectionSurgeon from obliteratus.analysis.causal_tracing import CausalRefusalTracer from obliteratus.analysis.residual_stream import ResidualStreamDecomposer from obliteratus.analysis.probing_classifiers import LinearRefusalProbe from obliteratus.analysis.cross_model_transfer import TransferAnalyzer from obliteratus.evaluation.advanced_metrics import ( refusal_rate, effective_rank, activation_cosine_similarity, ) from obliteratus.analysis.steering_vectors import ( SteeringVectorFactory, SteeringHookManager, SteeringConfig, SteeringResult, compute_steering_effectiveness, format_steering_report, ) # =========================================================================== # NaN / Inf handling # =========================================================================== class TestNaNInfHandling: """Test that modules handle degenerate inputs gracefully.""" def test_whitened_svd_nan_activations(self): """WhitenedSVD with NaN — currently raises; documenting behavior.""" harmful = [torch.tensor([float("nan"), 1.0, 2.0]) for _ in range(5)] harmless = [torch.randn(3) for _ in range(5)] extractor = WhitenedSVDExtractor() # NaN propagation through SVD is expected to produce NaN results # This documents the current behavior — ideally would guard against it raised = False result = None try: result = extractor.extract(harmful, harmless) except (RuntimeError, ValueError): raised = True # Either it raised an exception (acceptable) or returned a result with NaNs assert raised or result is not None, ( "Should either raise on NaN input or return a result" ) def test_whitened_svd_zero_activations(self): """WhitenedSVD with all-zero activations.""" harmful = [torch.zeros(8) for _ in range(5)] harmless = [torch.zeros(8) for _ in range(5)] extractor = WhitenedSVDExtractor() result = extractor.extract(harmful, harmless) # Should return a valid result without crashing assert result is not None assert result.directions is not None assert result.singular_values is not None def test_concept_cone_nan_direction(self): """ConceptConeAnalyzer with NaN in activations — documenting behavior.""" harmful = [torch.randn(16) for _ in range(10)] harmless = [torch.randn(16) for _ in range(10)] # Poison one activation harmful[3] = torch.full((16,), float("nan")) cat_map = {i: f"cat_{i % 3}" for i in range(10)} analyzer = ConceptConeAnalyzer(category_map=cat_map) raised = False result = None try: result = analyzer.analyze_layer(harmful, harmless) except (RuntimeError, ValueError): raised = True # Either it raised an exception (acceptable) or returned a result assert raised or result is not None, ( "Should either raise on NaN input or return a result" ) def test_sparse_surgery_zero_direction(self): """Sparse surgery with zero refusal direction.""" W = torch.randn(32, 16) zero_dir = torch.zeros(16) surgeon = SparseDirectionSurgeon() result = surgeon.analyze_weight_matrix(W, zero_dir) assert result.mean_projection == 0.0 def test_sparse_surgery_zero_weight(self): """Sparse surgery with zero weight matrix.""" W = torch.zeros(32, 16) ref_dir = torch.randn(16) surgeon = SparseDirectionSurgeon() result = surgeon.analyze_weight_matrix(W, ref_dir) assert result.max_projection < 1e-6 def test_effective_rank_nan_matrix(self): """effective_rank should handle matrix with NaN.""" W = torch.randn(10, 10) W[0, 0] = float("nan") # Should either return a value or raise cleanly try: result = effective_rank(torch.nan_to_num(W)) assert math.isfinite(result) except Exception: pass # Raising is acceptable for NaN input def test_cosine_similarity_zero_vectors(self): """Cosine similarity between zero vectors.""" a = torch.zeros(32) b = torch.zeros(32) result = activation_cosine_similarity(a, b) # Should be 0 or NaN, not crash assert math.isfinite(result) or math.isnan(result) def test_transfer_analyzer_nan_directions(self): """Transfer analyzer with NaN directions.""" dirs_a = {0: torch.randn(16), 1: torch.tensor([float("nan")] * 16)} dirs_b = {0: torch.randn(16), 1: torch.randn(16)} analyzer = TransferAnalyzer() # Should not crash result = analyzer.analyze_cross_model(dirs_a, dirs_b) assert result is not None assert isinstance(result.mean_transfer_score, float) assert result.per_layer_transfer is not None # =========================================================================== # Empty inputs # =========================================================================== class TestEmptyInputs: """Test graceful handling of empty or minimal inputs.""" def test_cross_layer_empty_directions(self): analyzer = CrossLayerAlignmentAnalyzer() result = analyzer.analyze({}) assert result.direction_persistence_score == 0.0 def test_alignment_imprint_single_layer(self): """Single layer should still return a result.""" detector = AlignmentImprintDetector() dirs = {0: torch.randn(32)} result = detector.detect_imprint(dirs) assert result.predicted_method in ("dpo", "rlhf", "cai", "sft", "unknown") def test_multi_token_single_position(self): """Single-position sequence.""" ref_dir = torch.randn(16) acts = torch.randn(1, 16) analyzer = MultiTokenPositionAnalyzer() result = analyzer.analyze_prompt(acts, ref_dir) assert result.n_tokens == 1 assert result.peak_position == 0 def test_probing_minimal_data(self): """Probing with very few samples.""" harmful = [torch.randn(8) for _ in range(3)] harmless = [torch.randn(8) for _ in range(3)] probe = LinearRefusalProbe(n_epochs=10) result = probe.probe_layer(harmful, harmless) assert 0 <= result.accuracy <= 1.0 def test_residual_stream_single_layer(self): acts = {0: torch.randn(32)} ref_dir = torch.randn(32) decomposer = ResidualStreamDecomposer() result = decomposer.decompose(acts, ref_dir) assert result.n_layers == 1 def test_causal_tracing_single_layer(self): acts = {0: torch.randn(32)} ref_dirs = {0: torch.randn(32)} tracer = CausalRefusalTracer() result = tracer.trace_from_activations(acts, ref_dirs) assert result.n_layers == 1 def test_transfer_no_common_layers(self): """Cross-model with no overlapping layer indices.""" dirs_a = {0: torch.randn(16), 1: torch.randn(16)} dirs_b = {2: torch.randn(16), 3: torch.randn(16)} analyzer = TransferAnalyzer() result = analyzer.analyze_cross_model(dirs_a, dirs_b) assert result.mean_transfer_score == 0.0 def test_refusal_rate_empty_list(self): result = refusal_rate([]) assert result == 0.0 def test_refusal_rate_single_response(self): result = refusal_rate(["I cannot help with that."]) assert result == 1.0 # =========================================================================== # Extreme dimensions # =========================================================================== class TestExtremeDimensions: """Test with unusually large or small dimensions.""" def test_high_dimensional_directions(self): """Test with realistic hidden dimension (4096).""" hidden_dim = 4096 torch.manual_seed(42) dirs = {i: torch.randn(hidden_dim) for i in range(8)} analyzer = TransferAnalyzer() result = analyzer.analyze_cross_layer(dirs) assert result.mean_adjacent_transfer >= 0 def test_high_dim_sparse_surgery(self): """Sparse surgery with large weight matrix.""" W = torch.randn(2048, 1024) ref_dir = torch.randn(1024) surgeon = SparseDirectionSurgeon(sparsity=0.05) result = surgeon.analyze_weight_matrix(W, ref_dir) assert result.n_rows_modified == int(0.05 * 2048) def test_single_dimension(self): """1D hidden dimension edge case.""" dirs = {i: torch.randn(1) for i in range(4)} analyzer = TransferAnalyzer() result = analyzer.analyze_cross_layer(dirs) # All 1D directions are parallel or anti-parallel, so cosine is always 1.0 assert result.mean_adjacent_transfer >= 0.99 def test_many_layers_imprint(self): """Alignment imprint with many layers (128).""" dirs = {i: torch.randn(32) for i in range(128)} detector = AlignmentImprintDetector() result = detector.detect_imprint(dirs) total = (result.dpo_probability + result.rlhf_probability + result.cai_probability + result.sft_probability) assert abs(total - 1.0) < 0.01 @pytest.mark.parametrize("n_prompts", [1, 2, 5, 50, 100]) def test_concept_cone_varying_prompt_counts(self, n_prompts): """Concept cone with varying numbers of prompts.""" harmful = [torch.randn(16) for _ in range(n_prompts)] harmless = [torch.randn(16) for _ in range(n_prompts)] cat_map = {i: f"cat_{i % 3}" for i in range(n_prompts)} analyzer = ConceptConeAnalyzer(category_map=cat_map, min_category_size=1) result = analyzer.analyze_layer(harmful, harmless) # Should produce a valid result regardless of prompt count assert result is not None assert result.general_direction is not None assert result.category_count >= 1 @pytest.mark.parametrize("seq_len", [1, 2, 10, 100, 500]) def test_multi_token_varying_lengths(self, seq_len): """Multi-token analysis with varying sequence lengths.""" ref_dir = torch.randn(16) acts = torch.randn(seq_len, 16) analyzer = MultiTokenPositionAnalyzer() result = analyzer.analyze_prompt(acts, ref_dir) assert result.n_tokens == seq_len @pytest.mark.parametrize("hidden_dim", [2, 4, 16, 64, 256, 1024]) def test_probing_varying_dimensions(self, hidden_dim): """Probing classifier across different hidden dims (min 2 for SGD).""" harmful = [torch.randn(hidden_dim) * 2 + 1 for _ in range(10)] harmless = [torch.randn(hidden_dim) * 2 - 1 for _ in range(10)] probe = LinearRefusalProbe(n_epochs=20) result = probe.probe_layer(harmful, harmless) assert 0 <= result.accuracy <= 1.0 # =========================================================================== # Refusal rate edge cases # =========================================================================== class TestRefusalRateEdgeCases: """Edge cases for refusal rate detection.""" def test_novel_refusal_phrasing(self): """Test that unusual refusal phrasings might be missed (known limitation).""" novel_refusals = [ "That request falls outside my operational parameters.", "I'm designed to be helpful but this conflicts with my guidelines.", ] # These should ideally be detected but string matching may miss them rate = refusal_rate(novel_refusals) # This tests the known limitation — rate may be 0 (string matching misses novel phrasings) assert 0 <= rate <= 1.0 def test_false_positive_sorry(self): """'I'm sorry to hear that' followed by helpful content.""" responses = [ "I'm sorry to hear that. Here's how you can solve the problem: first...", ] rate = refusal_rate(responses, mode="prefix") # Known limitation: prefix matching produces false positives here # Just verify it returns a valid float assert 0 <= rate <= 1.0 def test_case_insensitivity(self): responses = ["I CANNOT ASSIST WITH THAT REQUEST."] rate = refusal_rate(responses, mode="substring") assert rate == 1.0 def test_whitespace_handling(self): responses = [" I'm sorry, but I can't help with that. "] rate = refusal_rate(responses, mode="prefix") # Should handle leading whitespace assert 0 <= rate <= 1.0 # =========================================================================== # Steering Vectors # =========================================================================== class TestSteeringVectors: """Tests for the steering vector inference-time intervention system.""" def test_from_refusal_direction(self): d = torch.randn(32) vec = SteeringVectorFactory.from_refusal_direction(d, source_layer=5) assert vec.label == "refusal" assert vec.source_layer == 5 assert vec.default_alpha == -1.0 assert abs(vec.direction.norm().item() - 1.0) < 0.01 def test_from_contrastive_pairs(self): pos = [torch.randn(16) + 2 for _ in range(10)] neg = [torch.randn(16) - 2 for _ in range(10)] vec = SteeringVectorFactory.from_contrastive_pairs(pos, neg, label="test") assert vec.label == "test" assert abs(vec.direction.norm().item() - 1.0) < 0.01 assert "n_positive" in vec.metadata def test_combine_vectors(self): v1 = SteeringVectorFactory.from_refusal_direction(torch.randn(32)) v2 = SteeringVectorFactory.from_refusal_direction(torch.randn(32)) combined = SteeringVectorFactory.combine([v1, v2], label="merged") assert combined.label == "merged" assert abs(combined.direction.norm().item() - 1.0) < 0.01 def test_combine_single(self): v = SteeringVectorFactory.from_refusal_direction(torch.randn(16)) combined = SteeringVectorFactory.combine([v]) assert abs(combined.direction.norm().item() - 1.0) < 0.01 def test_combine_empty_raises(self): with pytest.raises(ValueError): SteeringVectorFactory.combine([]) def test_hook_manager_lifecycle(self): """Test install/remove lifecycle without a real model.""" manager = SteeringHookManager() assert not manager.is_active manager.remove() # Should not crash even with no hooks assert not manager.is_active def test_hook_with_simple_model(self): """Test steering on a simple nn.Sequential model.""" model = nn.Sequential( nn.Linear(16, 16), nn.ReLU(), nn.Linear(16, 16), nn.ReLU(), nn.Linear(16, 8), ) vec = SteeringVectorFactory.from_refusal_direction(torch.randn(16)) config = SteeringConfig( vectors=[vec], target_layers=[0, 2], # steer at first and third linear layers alpha=1.0, ) manager = SteeringHookManager() # Install on specific modules layers = list(model.children()) result = manager.install(model, config, layer_modules=layers) assert result.hooks_installed == 2 assert manager.is_active # Run a forward pass (should not crash) x = torch.randn(1, 16) output = model(x) assert output.shape == (1, 8) # Remove hooks manager.remove() assert not manager.is_active def test_steering_effectiveness_remove(self): eff = compute_steering_effectiveness(2.0, 0.5, direction="remove") assert 0 < eff < 1.0 # Reduced but not eliminated def test_steering_effectiveness_perfect_remove(self): eff = compute_steering_effectiveness(2.0, 0.0, direction="remove") assert eff == 1.0 def test_steering_effectiveness_no_change(self): eff = compute_steering_effectiveness(2.0, 2.0, direction="remove") assert eff == 0.0 def test_steering_effectiveness_add(self): eff = compute_steering_effectiveness(1.0, 3.0, direction="add") assert eff == 1.0 # Capped at 1.0 def test_format_report(self): vec = SteeringVectorFactory.from_refusal_direction(torch.randn(32)) config = SteeringConfig(vectors=[vec], target_layers=[3, 5], alpha=0.5) result = SteeringResult(config=config, hooks_installed=2, total_steered_layers=2) report = format_steering_report(result) assert "Steering" in report assert "refusal" in report def test_steering_config_position_modes(self): """Test different position modes in config.""" for pos in ["all", "last", "first"]: config = SteeringConfig( vectors=[SteeringVectorFactory.from_refusal_direction(torch.randn(8))], target_layers=[0], position=pos, ) assert config.position == pos def test_imports(self): from obliteratus.analysis import SteeringVectorFactory, SteeringHookManager assert SteeringVectorFactory is not None assert SteeringHookManager is not None class TestParametrizedDimensions: """Parametrized tests across different hidden dimensions.""" @pytest.mark.parametrize("hidden_dim", [2, 8, 64, 256, 768]) def test_whitened_svd_various_dims(self, hidden_dim): n_samples = max(4, hidden_dim // 4) harmful = [torch.randn(hidden_dim) for _ in range(n_samples)] harmless = [torch.randn(hidden_dim) for _ in range(n_samples)] extractor = WhitenedSVDExtractor() result = extractor.extract(harmful, harmless, n_directions=1) assert result.directions.shape[1] == hidden_dim @pytest.mark.parametrize("hidden_dim", [2, 8, 64, 256]) def test_cross_layer_various_dims(self, hidden_dim): directions = {i: torch.randn(hidden_dim) for i in range(4)} analyzer = CrossLayerAlignmentAnalyzer() result = analyzer.analyze(directions) assert 0.0 <= result.direction_persistence_score <= 1.0 @pytest.mark.parametrize("hidden_dim", [4, 32, 128]) def test_sparse_surgery_various_dims(self, hidden_dim): weight = torch.randn(hidden_dim, hidden_dim) direction = torch.randn(hidden_dim) direction = direction / direction.norm() surgeon = SparseDirectionSurgeon() result = surgeon.analyze_weight_matrix(weight, direction, layer_idx=0) assert 0.0 <= result.energy_removed <= 1.0 @pytest.mark.parametrize("n_layers", [1, 4, 12, 32]) def test_imprint_various_layer_counts(self, n_layers): directions = {i: torch.randn(64) for i in range(n_layers)} detector = AlignmentImprintDetector() result = detector.detect_imprint(directions) assert result.predicted_method in ("dpo", "rlhf", "cai", "sft", "unknown") class TestExceptionPaths: """Tests for error handling and boundary conditions.""" def test_whitened_svd_mismatched_dims(self): """Harmful and harmless with different hidden dims should fail or handle gracefully.""" harmful = [torch.randn(64) for _ in range(10)] harmless = [torch.randn(32) for _ in range(10)] extractor = WhitenedSVDExtractor() with pytest.raises(Exception): extractor.extract(harmful, harmless, n_directions=1) def test_whitened_svd_single_sample(self): """Single sample should not crash (may return 0 directions due to insufficient data).""" harmful = [torch.randn(32)] harmless = [torch.randn(32)] extractor = WhitenedSVDExtractor() result = extractor.extract(harmful, harmless, n_directions=1) assert result.directions.shape[1] == 32 # hidden dim preserved def test_sparse_surgery_zero_direction(self): """Zero direction vector should not crash.""" weight = torch.randn(16, 16) direction = torch.zeros(16) surgeon = SparseDirectionSurgeon() # Should handle gracefully (possibly returning 0 energy) result = surgeon.analyze_weight_matrix(weight, direction, layer_idx=0) assert result is not None def test_cross_layer_single_layer(self): """Single layer directions should still produce a result.""" directions = {0: torch.randn(32)} analyzer = CrossLayerAlignmentAnalyzer() result = analyzer.analyze(directions) assert result is not None