obliteratus / tests /test_edge_cases.py
pliny-the-prompter's picture
Upload 127 files
45113e6 verified
"""Edge-case and robustness tests.
Tests for NaN/Inf handling, empty inputs, extreme dimensions,
and other boundary conditions that the main test suite doesn't cover.
"""
from __future__ import annotations
import math
import pytest
import torch
import torch.nn as nn
from obliteratus.analysis.whitened_svd import WhitenedSVDExtractor
from obliteratus.analysis.cross_layer import CrossLayerAlignmentAnalyzer
from obliteratus.analysis.concept_geometry import ConceptConeAnalyzer
from obliteratus.analysis.alignment_imprint import AlignmentImprintDetector
from obliteratus.analysis.multi_token_position import MultiTokenPositionAnalyzer
from obliteratus.analysis.sparse_surgery import SparseDirectionSurgeon
from obliteratus.analysis.causal_tracing import CausalRefusalTracer
from obliteratus.analysis.residual_stream import ResidualStreamDecomposer
from obliteratus.analysis.probing_classifiers import LinearRefusalProbe
from obliteratus.analysis.cross_model_transfer import TransferAnalyzer
from obliteratus.evaluation.advanced_metrics import (
refusal_rate,
effective_rank,
activation_cosine_similarity,
)
from obliteratus.analysis.steering_vectors import (
SteeringVectorFactory,
SteeringHookManager,
SteeringConfig,
SteeringResult,
compute_steering_effectiveness,
format_steering_report,
)
# ===========================================================================
# NaN / Inf handling
# ===========================================================================
class TestNaNInfHandling:
"""Test that modules handle degenerate inputs gracefully."""
def test_whitened_svd_nan_activations(self):
"""WhitenedSVD with NaN — currently raises; documenting behavior."""
harmful = [torch.tensor([float("nan"), 1.0, 2.0]) for _ in range(5)]
harmless = [torch.randn(3) for _ in range(5)]
extractor = WhitenedSVDExtractor()
# NaN propagation through SVD is expected to produce NaN results
# This documents the current behavior — ideally would guard against it
raised = False
result = None
try:
result = extractor.extract(harmful, harmless)
except (RuntimeError, ValueError):
raised = True
# Either it raised an exception (acceptable) or returned a result with NaNs
assert raised or result is not None, (
"Should either raise on NaN input or return a result"
)
def test_whitened_svd_zero_activations(self):
"""WhitenedSVD with all-zero activations."""
harmful = [torch.zeros(8) for _ in range(5)]
harmless = [torch.zeros(8) for _ in range(5)]
extractor = WhitenedSVDExtractor()
result = extractor.extract(harmful, harmless)
# Should return a valid result without crashing
assert result is not None
assert result.directions is not None
assert result.singular_values is not None
def test_concept_cone_nan_direction(self):
"""ConceptConeAnalyzer with NaN in activations — documenting behavior."""
harmful = [torch.randn(16) for _ in range(10)]
harmless = [torch.randn(16) for _ in range(10)]
# Poison one activation
harmful[3] = torch.full((16,), float("nan"))
cat_map = {i: f"cat_{i % 3}" for i in range(10)}
analyzer = ConceptConeAnalyzer(category_map=cat_map)
raised = False
result = None
try:
result = analyzer.analyze_layer(harmful, harmless)
except (RuntimeError, ValueError):
raised = True
# Either it raised an exception (acceptable) or returned a result
assert raised or result is not None, (
"Should either raise on NaN input or return a result"
)
def test_sparse_surgery_zero_direction(self):
"""Sparse surgery with zero refusal direction."""
W = torch.randn(32, 16)
zero_dir = torch.zeros(16)
surgeon = SparseDirectionSurgeon()
result = surgeon.analyze_weight_matrix(W, zero_dir)
assert result.mean_projection == 0.0
def test_sparse_surgery_zero_weight(self):
"""Sparse surgery with zero weight matrix."""
W = torch.zeros(32, 16)
ref_dir = torch.randn(16)
surgeon = SparseDirectionSurgeon()
result = surgeon.analyze_weight_matrix(W, ref_dir)
assert result.max_projection < 1e-6
def test_effective_rank_nan_matrix(self):
"""effective_rank should handle matrix with NaN."""
W = torch.randn(10, 10)
W[0, 0] = float("nan")
# Should either return a value or raise cleanly
try:
result = effective_rank(torch.nan_to_num(W))
assert math.isfinite(result)
except Exception:
pass # Raising is acceptable for NaN input
def test_cosine_similarity_zero_vectors(self):
"""Cosine similarity between zero vectors."""
a = torch.zeros(32)
b = torch.zeros(32)
result = activation_cosine_similarity(a, b)
# Should be 0 or NaN, not crash
assert math.isfinite(result) or math.isnan(result)
def test_transfer_analyzer_nan_directions(self):
"""Transfer analyzer with NaN directions."""
dirs_a = {0: torch.randn(16), 1: torch.tensor([float("nan")] * 16)}
dirs_b = {0: torch.randn(16), 1: torch.randn(16)}
analyzer = TransferAnalyzer()
# Should not crash
result = analyzer.analyze_cross_model(dirs_a, dirs_b)
assert result is not None
assert isinstance(result.mean_transfer_score, float)
assert result.per_layer_transfer is not None
# ===========================================================================
# Empty inputs
# ===========================================================================
class TestEmptyInputs:
"""Test graceful handling of empty or minimal inputs."""
def test_cross_layer_empty_directions(self):
analyzer = CrossLayerAlignmentAnalyzer()
result = analyzer.analyze({})
assert result.direction_persistence_score == 0.0
def test_alignment_imprint_single_layer(self):
"""Single layer should still return a result."""
detector = AlignmentImprintDetector()
dirs = {0: torch.randn(32)}
result = detector.detect_imprint(dirs)
assert result.predicted_method in ("dpo", "rlhf", "cai", "sft", "unknown")
def test_multi_token_single_position(self):
"""Single-position sequence."""
ref_dir = torch.randn(16)
acts = torch.randn(1, 16)
analyzer = MultiTokenPositionAnalyzer()
result = analyzer.analyze_prompt(acts, ref_dir)
assert result.n_tokens == 1
assert result.peak_position == 0
def test_probing_minimal_data(self):
"""Probing with very few samples."""
harmful = [torch.randn(8) for _ in range(3)]
harmless = [torch.randn(8) for _ in range(3)]
probe = LinearRefusalProbe(n_epochs=10)
result = probe.probe_layer(harmful, harmless)
assert 0 <= result.accuracy <= 1.0
def test_residual_stream_single_layer(self):
acts = {0: torch.randn(32)}
ref_dir = torch.randn(32)
decomposer = ResidualStreamDecomposer()
result = decomposer.decompose(acts, ref_dir)
assert result.n_layers == 1
def test_causal_tracing_single_layer(self):
acts = {0: torch.randn(32)}
ref_dirs = {0: torch.randn(32)}
tracer = CausalRefusalTracer()
result = tracer.trace_from_activations(acts, ref_dirs)
assert result.n_layers == 1
def test_transfer_no_common_layers(self):
"""Cross-model with no overlapping layer indices."""
dirs_a = {0: torch.randn(16), 1: torch.randn(16)}
dirs_b = {2: torch.randn(16), 3: torch.randn(16)}
analyzer = TransferAnalyzer()
result = analyzer.analyze_cross_model(dirs_a, dirs_b)
assert result.mean_transfer_score == 0.0
def test_refusal_rate_empty_list(self):
result = refusal_rate([])
assert result == 0.0
def test_refusal_rate_single_response(self):
result = refusal_rate(["I cannot help with that."])
assert result == 1.0
# ===========================================================================
# Extreme dimensions
# ===========================================================================
class TestExtremeDimensions:
"""Test with unusually large or small dimensions."""
def test_high_dimensional_directions(self):
"""Test with realistic hidden dimension (4096)."""
hidden_dim = 4096
torch.manual_seed(42)
dirs = {i: torch.randn(hidden_dim) for i in range(8)}
analyzer = TransferAnalyzer()
result = analyzer.analyze_cross_layer(dirs)
assert result.mean_adjacent_transfer >= 0
def test_high_dim_sparse_surgery(self):
"""Sparse surgery with large weight matrix."""
W = torch.randn(2048, 1024)
ref_dir = torch.randn(1024)
surgeon = SparseDirectionSurgeon(sparsity=0.05)
result = surgeon.analyze_weight_matrix(W, ref_dir)
assert result.n_rows_modified == int(0.05 * 2048)
def test_single_dimension(self):
"""1D hidden dimension edge case."""
dirs = {i: torch.randn(1) for i in range(4)}
analyzer = TransferAnalyzer()
result = analyzer.analyze_cross_layer(dirs)
# All 1D directions are parallel or anti-parallel, so cosine is always 1.0
assert result.mean_adjacent_transfer >= 0.99
def test_many_layers_imprint(self):
"""Alignment imprint with many layers (128)."""
dirs = {i: torch.randn(32) for i in range(128)}
detector = AlignmentImprintDetector()
result = detector.detect_imprint(dirs)
total = (result.dpo_probability + result.rlhf_probability +
result.cai_probability + result.sft_probability)
assert abs(total - 1.0) < 0.01
@pytest.mark.parametrize("n_prompts", [1, 2, 5, 50, 100])
def test_concept_cone_varying_prompt_counts(self, n_prompts):
"""Concept cone with varying numbers of prompts."""
harmful = [torch.randn(16) for _ in range(n_prompts)]
harmless = [torch.randn(16) for _ in range(n_prompts)]
cat_map = {i: f"cat_{i % 3}" for i in range(n_prompts)}
analyzer = ConceptConeAnalyzer(category_map=cat_map, min_category_size=1)
result = analyzer.analyze_layer(harmful, harmless)
# Should produce a valid result regardless of prompt count
assert result is not None
assert result.general_direction is not None
assert result.category_count >= 1
@pytest.mark.parametrize("seq_len", [1, 2, 10, 100, 500])
def test_multi_token_varying_lengths(self, seq_len):
"""Multi-token analysis with varying sequence lengths."""
ref_dir = torch.randn(16)
acts = torch.randn(seq_len, 16)
analyzer = MultiTokenPositionAnalyzer()
result = analyzer.analyze_prompt(acts, ref_dir)
assert result.n_tokens == seq_len
@pytest.mark.parametrize("hidden_dim", [2, 4, 16, 64, 256, 1024])
def test_probing_varying_dimensions(self, hidden_dim):
"""Probing classifier across different hidden dims (min 2 for SGD)."""
harmful = [torch.randn(hidden_dim) * 2 + 1 for _ in range(10)]
harmless = [torch.randn(hidden_dim) * 2 - 1 for _ in range(10)]
probe = LinearRefusalProbe(n_epochs=20)
result = probe.probe_layer(harmful, harmless)
assert 0 <= result.accuracy <= 1.0
# ===========================================================================
# Refusal rate edge cases
# ===========================================================================
class TestRefusalRateEdgeCases:
"""Edge cases for refusal rate detection."""
def test_novel_refusal_phrasing(self):
"""Test that unusual refusal phrasings might be missed (known limitation)."""
novel_refusals = [
"That request falls outside my operational parameters.",
"I'm designed to be helpful but this conflicts with my guidelines.",
]
# These should ideally be detected but string matching may miss them
rate = refusal_rate(novel_refusals)
# This tests the known limitation — rate may be 0 (string matching misses novel phrasings)
assert 0 <= rate <= 1.0
def test_false_positive_sorry(self):
"""'I'm sorry to hear that' followed by helpful content."""
responses = [
"I'm sorry to hear that. Here's how you can solve the problem: first...",
]
rate = refusal_rate(responses, mode="prefix")
# Known limitation: prefix matching produces false positives here
# Just verify it returns a valid float
assert 0 <= rate <= 1.0
def test_case_insensitivity(self):
responses = ["I CANNOT ASSIST WITH THAT REQUEST."]
rate = refusal_rate(responses, mode="substring")
assert rate == 1.0
def test_whitespace_handling(self):
responses = [" I'm sorry, but I can't help with that. "]
rate = refusal_rate(responses, mode="prefix")
# Should handle leading whitespace
assert 0 <= rate <= 1.0
# ===========================================================================
# Steering Vectors
# ===========================================================================
class TestSteeringVectors:
"""Tests for the steering vector inference-time intervention system."""
def test_from_refusal_direction(self):
d = torch.randn(32)
vec = SteeringVectorFactory.from_refusal_direction(d, source_layer=5)
assert vec.label == "refusal"
assert vec.source_layer == 5
assert vec.default_alpha == -1.0
assert abs(vec.direction.norm().item() - 1.0) < 0.01
def test_from_contrastive_pairs(self):
pos = [torch.randn(16) + 2 for _ in range(10)]
neg = [torch.randn(16) - 2 for _ in range(10)]
vec = SteeringVectorFactory.from_contrastive_pairs(pos, neg, label="test")
assert vec.label == "test"
assert abs(vec.direction.norm().item() - 1.0) < 0.01
assert "n_positive" in vec.metadata
def test_combine_vectors(self):
v1 = SteeringVectorFactory.from_refusal_direction(torch.randn(32))
v2 = SteeringVectorFactory.from_refusal_direction(torch.randn(32))
combined = SteeringVectorFactory.combine([v1, v2], label="merged")
assert combined.label == "merged"
assert abs(combined.direction.norm().item() - 1.0) < 0.01
def test_combine_single(self):
v = SteeringVectorFactory.from_refusal_direction(torch.randn(16))
combined = SteeringVectorFactory.combine([v])
assert abs(combined.direction.norm().item() - 1.0) < 0.01
def test_combine_empty_raises(self):
with pytest.raises(ValueError):
SteeringVectorFactory.combine([])
def test_hook_manager_lifecycle(self):
"""Test install/remove lifecycle without a real model."""
manager = SteeringHookManager()
assert not manager.is_active
manager.remove() # Should not crash even with no hooks
assert not manager.is_active
def test_hook_with_simple_model(self):
"""Test steering on a simple nn.Sequential model."""
model = nn.Sequential(
nn.Linear(16, 16),
nn.ReLU(),
nn.Linear(16, 16),
nn.ReLU(),
nn.Linear(16, 8),
)
vec = SteeringVectorFactory.from_refusal_direction(torch.randn(16))
config = SteeringConfig(
vectors=[vec],
target_layers=[0, 2], # steer at first and third linear layers
alpha=1.0,
)
manager = SteeringHookManager()
# Install on specific modules
layers = list(model.children())
result = manager.install(model, config, layer_modules=layers)
assert result.hooks_installed == 2
assert manager.is_active
# Run a forward pass (should not crash)
x = torch.randn(1, 16)
output = model(x)
assert output.shape == (1, 8)
# Remove hooks
manager.remove()
assert not manager.is_active
def test_steering_effectiveness_remove(self):
eff = compute_steering_effectiveness(2.0, 0.5, direction="remove")
assert 0 < eff < 1.0 # Reduced but not eliminated
def test_steering_effectiveness_perfect_remove(self):
eff = compute_steering_effectiveness(2.0, 0.0, direction="remove")
assert eff == 1.0
def test_steering_effectiveness_no_change(self):
eff = compute_steering_effectiveness(2.0, 2.0, direction="remove")
assert eff == 0.0
def test_steering_effectiveness_add(self):
eff = compute_steering_effectiveness(1.0, 3.0, direction="add")
assert eff == 1.0 # Capped at 1.0
def test_format_report(self):
vec = SteeringVectorFactory.from_refusal_direction(torch.randn(32))
config = SteeringConfig(vectors=[vec], target_layers=[3, 5], alpha=0.5)
result = SteeringResult(config=config, hooks_installed=2, total_steered_layers=2)
report = format_steering_report(result)
assert "Steering" in report
assert "refusal" in report
def test_steering_config_position_modes(self):
"""Test different position modes in config."""
for pos in ["all", "last", "first"]:
config = SteeringConfig(
vectors=[SteeringVectorFactory.from_refusal_direction(torch.randn(8))],
target_layers=[0],
position=pos,
)
assert config.position == pos
def test_imports(self):
from obliteratus.analysis import SteeringVectorFactory, SteeringHookManager
assert SteeringVectorFactory is not None
assert SteeringHookManager is not None
class TestParametrizedDimensions:
"""Parametrized tests across different hidden dimensions."""
@pytest.mark.parametrize("hidden_dim", [2, 8, 64, 256, 768])
def test_whitened_svd_various_dims(self, hidden_dim):
n_samples = max(4, hidden_dim // 4)
harmful = [torch.randn(hidden_dim) for _ in range(n_samples)]
harmless = [torch.randn(hidden_dim) for _ in range(n_samples)]
extractor = WhitenedSVDExtractor()
result = extractor.extract(harmful, harmless, n_directions=1)
assert result.directions.shape[1] == hidden_dim
@pytest.mark.parametrize("hidden_dim", [2, 8, 64, 256])
def test_cross_layer_various_dims(self, hidden_dim):
directions = {i: torch.randn(hidden_dim) for i in range(4)}
analyzer = CrossLayerAlignmentAnalyzer()
result = analyzer.analyze(directions)
assert 0.0 <= result.direction_persistence_score <= 1.0
@pytest.mark.parametrize("hidden_dim", [4, 32, 128])
def test_sparse_surgery_various_dims(self, hidden_dim):
weight = torch.randn(hidden_dim, hidden_dim)
direction = torch.randn(hidden_dim)
direction = direction / direction.norm()
surgeon = SparseDirectionSurgeon()
result = surgeon.analyze_weight_matrix(weight, direction, layer_idx=0)
assert 0.0 <= result.energy_removed <= 1.0
@pytest.mark.parametrize("n_layers", [1, 4, 12, 32])
def test_imprint_various_layer_counts(self, n_layers):
directions = {i: torch.randn(64) for i in range(n_layers)}
detector = AlignmentImprintDetector()
result = detector.detect_imprint(directions)
assert result.predicted_method in ("dpo", "rlhf", "cai", "sft", "unknown")
class TestExceptionPaths:
"""Tests for error handling and boundary conditions."""
def test_whitened_svd_mismatched_dims(self):
"""Harmful and harmless with different hidden dims should fail or handle gracefully."""
harmful = [torch.randn(64) for _ in range(10)]
harmless = [torch.randn(32) for _ in range(10)]
extractor = WhitenedSVDExtractor()
with pytest.raises(Exception):
extractor.extract(harmful, harmless, n_directions=1)
def test_whitened_svd_single_sample(self):
"""Single sample should not crash (may return 0 directions due to insufficient data)."""
harmful = [torch.randn(32)]
harmless = [torch.randn(32)]
extractor = WhitenedSVDExtractor()
result = extractor.extract(harmful, harmless, n_directions=1)
assert result.directions.shape[1] == 32 # hidden dim preserved
def test_sparse_surgery_zero_direction(self):
"""Zero direction vector should not crash."""
weight = torch.randn(16, 16)
direction = torch.zeros(16)
surgeon = SparseDirectionSurgeon()
# Should handle gracefully (possibly returning 0 energy)
result = surgeon.analyze_weight_matrix(weight, direction, layer_idx=0)
assert result is not None
def test_cross_layer_single_layer(self):
"""Single layer directions should still produce a result."""
directions = {0: torch.randn(32)}
analyzer = CrossLayerAlignmentAnalyzer()
result = analyzer.analyze(directions)
assert result is not None