Spaces:
Running on Zero
Running on Zero
| """Edge-case and robustness tests. | |
| Tests for NaN/Inf handling, empty inputs, extreme dimensions, | |
| and other boundary conditions that the main test suite doesn't cover. | |
| """ | |
| from __future__ import annotations | |
| import math | |
| import pytest | |
| import torch | |
| import torch.nn as nn | |
| from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor | |
| from obliteratus.analysis.cross_layer import CrossLayerAlignmentAnalyzer | |
| from obliteratus.analysis.concept_geometry import ConceptConeAnalyzer | |
| from obliteratus.analysis.alignment_imprint import AlignmentImprintDetector | |
| from obliteratus.analysis.multi_token_position import MultiTokenPositionAnalyzer | |
| from obliteratus.analysis.sparse_surgery import SparseDirectionSurgeon | |
| from obliteratus.analysis.causal_tracing import CausalRefusalTracer | |
| from obliteratus.analysis.residual_stream import ResidualStreamDecomposer | |
| from obliteratus.analysis.probing_classifiers import LinearRefusalProbe | |
| from obliteratus.analysis.cross_model_transfer import TransferAnalyzer | |
| from obliteratus.evaluation.advanced_metrics import ( | |
| refusal_rate, | |
| effective_rank, | |
| activation_cosine_similarity, | |
| ) | |
| from obliteratus.analysis.steering_vectors import ( | |
| SteeringVectorFactory, | |
| SteeringHookManager, | |
| SteeringConfig, | |
| SteeringResult, | |
| compute_steering_effectiveness, | |
| format_steering_report, | |
| ) | |
| # =========================================================================== | |
| # NaN / Inf handling | |
| # =========================================================================== | |
| class TestNaNInfHandling: | |
| """Test that modules handle degenerate inputs gracefully.""" | |
| def test_whitened_svd_nan_activations(self): | |
| """WhitenedSVD with NaN — currently raises; documenting behavior.""" | |
| harmful = [torch.tensor([float("nan"), 1.0, 2.0]) for _ in range(5)] | |
| harmless = [torch.randn(3) for _ in range(5)] | |
| extractor = WhitenedSVDExtractor() | |
| # NaN propagation through SVD is expected to produce NaN results | |
| # This documents the current behavior — ideally would guard against it | |
| raised = False | |
| result = None | |
| try: | |
| result = extractor.extract(harmful, harmless) | |
| except (RuntimeError, ValueError): | |
| raised = True | |
| # Either it raised an exception (acceptable) or returned a result with NaNs | |
| assert raised or result is not None, ( | |
| "Should either raise on NaN input or return a result" | |
| ) | |
| def test_whitened_svd_zero_activations(self): | |
| """WhitenedSVD with all-zero activations.""" | |
| harmful = [torch.zeros(8) for _ in range(5)] | |
| harmless = [torch.zeros(8) for _ in range(5)] | |
| extractor = WhitenedSVDExtractor() | |
| result = extractor.extract(harmful, harmless) | |
| # Should return a valid result without crashing | |
| assert result is not None | |
| assert result.directions is not None | |
| assert result.singular_values is not None | |
| def test_concept_cone_nan_direction(self): | |
| """ConceptConeAnalyzer with NaN in activations — documenting behavior.""" | |
| harmful = [torch.randn(16) for _ in range(10)] | |
| harmless = [torch.randn(16) for _ in range(10)] | |
| # Poison one activation | |
| harmful[3] = torch.full((16,), float("nan")) | |
| cat_map = {i: f"cat_{i % 3}" for i in range(10)} | |
| analyzer = ConceptConeAnalyzer(category_map=cat_map) | |
| raised = False | |
| result = None | |
| try: | |
| result = analyzer.analyze_layer(harmful, harmless) | |
| except (RuntimeError, ValueError): | |
| raised = True | |
| # Either it raised an exception (acceptable) or returned a result | |
| assert raised or result is not None, ( | |
| "Should either raise on NaN input or return a result" | |
| ) | |
| def test_sparse_surgery_zero_direction(self): | |
| """Sparse surgery with zero refusal direction.""" | |
| W = torch.randn(32, 16) | |
| zero_dir = torch.zeros(16) | |
| surgeon = SparseDirectionSurgeon() | |
| result = surgeon.analyze_weight_matrix(W, zero_dir) | |
| assert result.mean_projection == 0.0 | |
| def test_sparse_surgery_zero_weight(self): | |
| """Sparse surgery with zero weight matrix.""" | |
| W = torch.zeros(32, 16) | |
| ref_dir = torch.randn(16) | |
| surgeon = SparseDirectionSurgeon() | |
| result = surgeon.analyze_weight_matrix(W, ref_dir) | |
| assert result.max_projection < 1e-6 | |
| def test_effective_rank_nan_matrix(self): | |
| """effective_rank should handle matrix with NaN.""" | |
| W = torch.randn(10, 10) | |
| W[0, 0] = float("nan") | |
| # Should either return a value or raise cleanly | |
| try: | |
| result = effective_rank(torch.nan_to_num(W)) | |
| assert math.isfinite(result) | |
| except Exception: | |
| pass # Raising is acceptable for NaN input | |
| def test_cosine_similarity_zero_vectors(self): | |
| """Cosine similarity between zero vectors.""" | |
| a = torch.zeros(32) | |
| b = torch.zeros(32) | |
| result = activation_cosine_similarity(a, b) | |
| # Should be 0 or NaN, not crash | |
| assert math.isfinite(result) or math.isnan(result) | |
| def test_transfer_analyzer_nan_directions(self): | |
| """Transfer analyzer with NaN directions.""" | |
| dirs_a = {0: torch.randn(16), 1: torch.tensor([float("nan")] * 16)} | |
| dirs_b = {0: torch.randn(16), 1: torch.randn(16)} | |
| analyzer = TransferAnalyzer() | |
| # Should not crash | |
| result = analyzer.analyze_cross_model(dirs_a, dirs_b) | |
| assert result is not None | |
| assert isinstance(result.mean_transfer_score, float) | |
| assert result.per_layer_transfer is not None | |
| # =========================================================================== | |
| # Empty inputs | |
| # =========================================================================== | |
| class TestEmptyInputs: | |
| """Test graceful handling of empty or minimal inputs.""" | |
| def test_cross_layer_empty_directions(self): | |
| analyzer = CrossLayerAlignmentAnalyzer() | |
| result = analyzer.analyze({}) | |
| assert result.direction_persistence_score == 0.0 | |
| def test_alignment_imprint_single_layer(self): | |
| """Single layer should still return a result.""" | |
| detector = AlignmentImprintDetector() | |
| dirs = {0: torch.randn(32)} | |
| result = detector.detect_imprint(dirs) | |
| assert result.predicted_method in ("dpo", "rlhf", "cai", "sft", "unknown") | |
| def test_multi_token_single_position(self): | |
| """Single-position sequence.""" | |
| ref_dir = torch.randn(16) | |
| acts = torch.randn(1, 16) | |
| analyzer = MultiTokenPositionAnalyzer() | |
| result = analyzer.analyze_prompt(acts, ref_dir) | |
| assert result.n_tokens == 1 | |
| assert result.peak_position == 0 | |
| def test_probing_minimal_data(self): | |
| """Probing with very few samples.""" | |
| harmful = [torch.randn(8) for _ in range(3)] | |
| harmless = [torch.randn(8) for _ in range(3)] | |
| probe = LinearRefusalProbe(n_epochs=10) | |
| result = probe.probe_layer(harmful, harmless) | |
| assert 0 <= result.accuracy <= 1.0 | |
| def test_residual_stream_single_layer(self): | |
| acts = {0: torch.randn(32)} | |
| ref_dir = torch.randn(32) | |
| decomposer = ResidualStreamDecomposer() | |
| result = decomposer.decompose(acts, ref_dir) | |
| assert result.n_layers == 1 | |
| def test_causal_tracing_single_layer(self): | |
| acts = {0: torch.randn(32)} | |
| ref_dirs = {0: torch.randn(32)} | |
| tracer = CausalRefusalTracer() | |
| result = tracer.trace_from_activations(acts, ref_dirs) | |
| assert result.n_layers == 1 | |
| def test_transfer_no_common_layers(self): | |
| """Cross-model with no overlapping layer indices.""" | |
| dirs_a = {0: torch.randn(16), 1: torch.randn(16)} | |
| dirs_b = {2: torch.randn(16), 3: torch.randn(16)} | |
| analyzer = TransferAnalyzer() | |
| result = analyzer.analyze_cross_model(dirs_a, dirs_b) | |
| assert result.mean_transfer_score == 0.0 | |
| def test_refusal_rate_empty_list(self): | |
| result = refusal_rate([]) | |
| assert result == 0.0 | |
| def test_refusal_rate_single_response(self): | |
| result = refusal_rate(["I cannot help with that."]) | |
| assert result == 1.0 | |
| # =========================================================================== | |
| # Extreme dimensions | |
| # =========================================================================== | |
| class TestExtremeDimensions: | |
| """Test with unusually large or small dimensions.""" | |
| def test_high_dimensional_directions(self): | |
| """Test with realistic hidden dimension (4096).""" | |
| hidden_dim = 4096 | |
| torch.manual_seed(42) | |
| dirs = {i: torch.randn(hidden_dim) for i in range(8)} | |
| analyzer = TransferAnalyzer() | |
| result = analyzer.analyze_cross_layer(dirs) | |
| assert result.mean_adjacent_transfer >= 0 | |
| def test_high_dim_sparse_surgery(self): | |
| """Sparse surgery with large weight matrix.""" | |
| W = torch.randn(2048, 1024) | |
| ref_dir = torch.randn(1024) | |
| surgeon = SparseDirectionSurgeon(sparsity=0.05) | |
| result = surgeon.analyze_weight_matrix(W, ref_dir) | |
| assert result.n_rows_modified == int(0.05 * 2048) | |
| def test_single_dimension(self): | |
| """1D hidden dimension edge case.""" | |
| dirs = {i: torch.randn(1) for i in range(4)} | |
| analyzer = TransferAnalyzer() | |
| result = analyzer.analyze_cross_layer(dirs) | |
| # All 1D directions are parallel or anti-parallel, so cosine is always 1.0 | |
| assert result.mean_adjacent_transfer >= 0.99 | |
| def test_many_layers_imprint(self): | |
| """Alignment imprint with many layers (128).""" | |
| dirs = {i: torch.randn(32) for i in range(128)} | |
| detector = AlignmentImprintDetector() | |
| result = detector.detect_imprint(dirs) | |
| total = (result.dpo_probability + result.rlhf_probability + | |
| result.cai_probability + result.sft_probability) | |
| assert abs(total - 1.0) < 0.01 | |
| def test_concept_cone_varying_prompt_counts(self, n_prompts): | |
| """Concept cone with varying numbers of prompts.""" | |
| harmful = [torch.randn(16) for _ in range(n_prompts)] | |
| harmless = [torch.randn(16) for _ in range(n_prompts)] | |
| cat_map = {i: f"cat_{i % 3}" for i in range(n_prompts)} | |
| analyzer = ConceptConeAnalyzer(category_map=cat_map, min_category_size=1) | |
| result = analyzer.analyze_layer(harmful, harmless) | |
| # Should produce a valid result regardless of prompt count | |
| assert result is not None | |
| assert result.general_direction is not None | |
| assert result.category_count >= 1 | |
| def test_multi_token_varying_lengths(self, seq_len): | |
| """Multi-token analysis with varying sequence lengths.""" | |
| ref_dir = torch.randn(16) | |
| acts = torch.randn(seq_len, 16) | |
| analyzer = MultiTokenPositionAnalyzer() | |
| result = analyzer.analyze_prompt(acts, ref_dir) | |
| assert result.n_tokens == seq_len | |
| def test_probing_varying_dimensions(self, hidden_dim): | |
| """Probing classifier across different hidden dims (min 2 for SGD).""" | |
| harmful = [torch.randn(hidden_dim) * 2 + 1 for _ in range(10)] | |
| harmless = [torch.randn(hidden_dim) * 2 - 1 for _ in range(10)] | |
| probe = LinearRefusalProbe(n_epochs=20) | |
| result = probe.probe_layer(harmful, harmless) | |
| assert 0 <= result.accuracy <= 1.0 | |
| # =========================================================================== | |
| # Refusal rate edge cases | |
| # =========================================================================== | |
| class TestRefusalRateEdgeCases: | |
| """Edge cases for refusal rate detection.""" | |
| def test_novel_refusal_phrasing(self): | |
| """Test that unusual refusal phrasings might be missed (known limitation).""" | |
| novel_refusals = [ | |
| "That request falls outside my operational parameters.", | |
| "I'm designed to be helpful but this conflicts with my guidelines.", | |
| ] | |
| # These should ideally be detected but string matching may miss them | |
| rate = refusal_rate(novel_refusals) | |
| # This tests the known limitation — rate may be 0 (string matching misses novel phrasings) | |
| assert 0 <= rate <= 1.0 | |
| def test_false_positive_sorry(self): | |
| """'I'm sorry to hear that' followed by helpful content.""" | |
| responses = [ | |
| "I'm sorry to hear that. Here's how you can solve the problem: first...", | |
| ] | |
| rate = refusal_rate(responses, mode="prefix") | |
| # Known limitation: prefix matching produces false positives here | |
| # Just verify it returns a valid float | |
| assert 0 <= rate <= 1.0 | |
| def test_case_insensitivity(self): | |
| responses = ["I CANNOT ASSIST WITH THAT REQUEST."] | |
| rate = refusal_rate(responses, mode="substring") | |
| assert rate == 1.0 | |
| def test_whitespace_handling(self): | |
| responses = [" I'm sorry, but I can't help with that. "] | |
| rate = refusal_rate(responses, mode="prefix") | |
| # Should handle leading whitespace | |
| assert 0 <= rate <= 1.0 | |
| # =========================================================================== | |
| # Steering Vectors | |
| # =========================================================================== | |
| class TestSteeringVectors: | |
| """Tests for the steering vector inference-time intervention system.""" | |
| def test_from_refusal_direction(self): | |
| d = torch.randn(32) | |
| vec = SteeringVectorFactory.from_refusal_direction(d, source_layer=5) | |
| assert vec.label == "refusal" | |
| assert vec.source_layer == 5 | |
| assert vec.default_alpha == -1.0 | |
| assert abs(vec.direction.norm().item() - 1.0) < 0.01 | |
| def test_from_contrastive_pairs(self): | |
| pos = [torch.randn(16) + 2 for _ in range(10)] | |
| neg = [torch.randn(16) - 2 for _ in range(10)] | |
| vec = SteeringVectorFactory.from_contrastive_pairs(pos, neg, label="test") | |
| assert vec.label == "test" | |
| assert abs(vec.direction.norm().item() - 1.0) < 0.01 | |
| assert "n_positive" in vec.metadata | |
| def test_combine_vectors(self): | |
| v1 = SteeringVectorFactory.from_refusal_direction(torch.randn(32)) | |
| v2 = SteeringVectorFactory.from_refusal_direction(torch.randn(32)) | |
| combined = SteeringVectorFactory.combine([v1, v2], label="merged") | |
| assert combined.label == "merged" | |
| assert abs(combined.direction.norm().item() - 1.0) < 0.01 | |
| def test_combine_single(self): | |
| v = SteeringVectorFactory.from_refusal_direction(torch.randn(16)) | |
| combined = SteeringVectorFactory.combine([v]) | |
| assert abs(combined.direction.norm().item() - 1.0) < 0.01 | |
| def test_combine_empty_raises(self): | |
| with pytest.raises(ValueError): | |
| SteeringVectorFactory.combine([]) | |
| def test_hook_manager_lifecycle(self): | |
| """Test install/remove lifecycle without a real model.""" | |
| manager = SteeringHookManager() | |
| assert not manager.is_active | |
| manager.remove() # Should not crash even with no hooks | |
| assert not manager.is_active | |
| def test_hook_with_simple_model(self): | |
| """Test steering on a simple nn.Sequential model.""" | |
| model = nn.Sequential( | |
| nn.Linear(16, 16), | |
| nn.ReLU(), | |
| nn.Linear(16, 16), | |
| nn.ReLU(), | |
| nn.Linear(16, 8), | |
| ) | |
| vec = SteeringVectorFactory.from_refusal_direction(torch.randn(16)) | |
| config = SteeringConfig( | |
| vectors=[vec], | |
| target_layers=[0, 2], # steer at first and third linear layers | |
| alpha=1.0, | |
| ) | |
| manager = SteeringHookManager() | |
| # Install on specific modules | |
| layers = list(model.children()) | |
| result = manager.install(model, config, layer_modules=layers) | |
| assert result.hooks_installed == 2 | |
| assert manager.is_active | |
| # Run a forward pass (should not crash) | |
| x = torch.randn(1, 16) | |
| output = model(x) | |
| assert output.shape == (1, 8) | |
| # Remove hooks | |
| manager.remove() | |
| assert not manager.is_active | |
| def test_steering_effectiveness_remove(self): | |
| eff = compute_steering_effectiveness(2.0, 0.5, direction="remove") | |
| assert 0 < eff < 1.0 # Reduced but not eliminated | |
| def test_steering_effectiveness_perfect_remove(self): | |
| eff = compute_steering_effectiveness(2.0, 0.0, direction="remove") | |
| assert eff == 1.0 | |
| def test_steering_effectiveness_no_change(self): | |
| eff = compute_steering_effectiveness(2.0, 2.0, direction="remove") | |
| assert eff == 0.0 | |
| def test_steering_effectiveness_add(self): | |
| eff = compute_steering_effectiveness(1.0, 3.0, direction="add") | |
| assert eff == 1.0 # Capped at 1.0 | |
| def test_format_report(self): | |
| vec = SteeringVectorFactory.from_refusal_direction(torch.randn(32)) | |
| config = SteeringConfig(vectors=[vec], target_layers=[3, 5], alpha=0.5) | |
| result = SteeringResult(config=config, hooks_installed=2, total_steered_layers=2) | |
| report = format_steering_report(result) | |
| assert "Steering" in report | |
| assert "refusal" in report | |
| def test_steering_config_position_modes(self): | |
| """Test different position modes in config.""" | |
| for pos in ["all", "last", "first"]: | |
| config = SteeringConfig( | |
| vectors=[SteeringVectorFactory.from_refusal_direction(torch.randn(8))], | |
| target_layers=[0], | |
| position=pos, | |
| ) | |
| assert config.position == pos | |
| def test_imports(self): | |
| from obliteratus.analysis import SteeringVectorFactory, SteeringHookManager | |
| assert SteeringVectorFactory is not None | |
| assert SteeringHookManager is not None | |
| class TestParametrizedDimensions: | |
| """Parametrized tests across different hidden dimensions.""" | |
| def test_whitened_svd_various_dims(self, hidden_dim): | |
| n_samples = max(4, hidden_dim // 4) | |
| harmful = [torch.randn(hidden_dim) for _ in range(n_samples)] | |
| harmless = [torch.randn(hidden_dim) for _ in range(n_samples)] | |
| extractor = WhitenedSVDExtractor() | |
| result = extractor.extract(harmful, harmless, n_directions=1) | |
| assert result.directions.shape[1] == hidden_dim | |
| def test_cross_layer_various_dims(self, hidden_dim): | |
| directions = {i: torch.randn(hidden_dim) for i in range(4)} | |
| analyzer = CrossLayerAlignmentAnalyzer() | |
| result = analyzer.analyze(directions) | |
| assert 0.0 <= result.direction_persistence_score <= 1.0 | |
| def test_sparse_surgery_various_dims(self, hidden_dim): | |
| weight = torch.randn(hidden_dim, hidden_dim) | |
| direction = torch.randn(hidden_dim) | |
| direction = direction / direction.norm() | |
| surgeon = SparseDirectionSurgeon() | |
| result = surgeon.analyze_weight_matrix(weight, direction, layer_idx=0) | |
| assert 0.0 <= result.energy_removed <= 1.0 | |
| def test_imprint_various_layer_counts(self, n_layers): | |
| directions = {i: torch.randn(64) for i in range(n_layers)} | |
| detector = AlignmentImprintDetector() | |
| result = detector.detect_imprint(directions) | |
| assert result.predicted_method in ("dpo", "rlhf", "cai", "sft", "unknown") | |
| class TestExceptionPaths: | |
| """Tests for error handling and boundary conditions.""" | |
| def test_whitened_svd_mismatched_dims(self): | |
| """Harmful and harmless with different hidden dims should fail or handle gracefully.""" | |
| harmful = [torch.randn(64) for _ in range(10)] | |
| harmless = [torch.randn(32) for _ in range(10)] | |
| extractor = WhitenedSVDExtractor() | |
| with pytest.raises(Exception): | |
| extractor.extract(harmful, harmless, n_directions=1) | |
| def test_whitened_svd_single_sample(self): | |
| """Single sample should not crash (may return 0 directions due to insufficient data).""" | |
| harmful = [torch.randn(32)] | |
| harmless = [torch.randn(32)] | |
| extractor = WhitenedSVDExtractor() | |
| result = extractor.extract(harmful, harmless, n_directions=1) | |
| assert result.directions.shape[1] == 32 # hidden dim preserved | |
| def test_sparse_surgery_zero_direction(self): | |
| """Zero direction vector should not crash.""" | |
| weight = torch.randn(16, 16) | |
| direction = torch.zeros(16) | |
| surgeon = SparseDirectionSurgeon() | |
| # Should handle gracefully (possibly returning 0 energy) | |
| result = surgeon.analyze_weight_matrix(weight, direction, layer_idx=0) | |
| assert result is not None | |
| def test_cross_layer_single_layer(self): | |
| """Single layer directions should still produce a result.""" | |
| directions = {0: torch.randn(32)} | |
| analyzer = CrossLayerAlignmentAnalyzer() | |
| result = analyzer.analyze(directions) | |
| assert result is not None | |