obliteratus / tests /test_breakthrough_modules.py
pliny-the-prompter's picture
Upload 127 files
45113e6 verified
"""Tests for the 5 novel breakthrough analysis modules:
1. Riemannian Refusal Manifold Discovery
2. Anti-Ouroboros Adversarial Self-Repair Probing
3. Conditional Abliteration with Category-Selective Projection Fields
4. Wasserstein Refusal Transfer Across Architectures
5. Spectral Abliteration Completeness Certification
"""
from __future__ import annotations
import math
import torch
from obliteratus.analysis.riemannian_manifold import (
RiemannianManifoldAnalyzer,
RiemannianRefusalManifold,
GeodesicProjectionResult,
)
from obliteratus.analysis.anti_ouroboros import (
AntiOuroborosProber,
ASRGResult,
)
from obliteratus.analysis.conditional_abliteration import (
ConditionalAbliterator,
ConditionalAbliterationResult,
CategoryProjector,
)
from obliteratus.analysis.wasserstein_transfer import (
WassersteinRefusalTransfer,
WassersteinTransferResult,
TransferredDirection,
)
from obliteratus.analysis.spectral_certification import (
SpectralCertifier,
SpectralCertificate,
CertificationLevel,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_activations(hidden_dim=32, n_per_class=30, separation=2.0, seed=42):
"""Create harmful/harmless activations with planted refusal signal."""
torch.manual_seed(seed)
direction = torch.randn(hidden_dim)
direction = direction / direction.norm()
harmful = torch.randn(n_per_class, hidden_dim) * 0.3 + separation * direction
harmless = torch.randn(n_per_class, hidden_dim) * 0.3
return harmful, harmless, direction
def _make_multilayer_activations(
n_layers=6, hidden_dim=32, n_per_class=30, separation=2.0, seed=42,
):
"""Create per-layer activations with planted refusal signals."""
torch.manual_seed(seed)
base_dir = torch.randn(hidden_dim)
base_dir = base_dir / base_dir.norm()
harmful_dict = {}
harmless_dict = {}
direction_dict = {}
for layer in range(n_layers):
# Rotate direction slightly per layer to simulate non-trivial geometry
rotation = torch.randn(hidden_dim) * 0.1
layer_dir = base_dir + rotation * (layer / n_layers)
layer_dir = layer_dir / layer_dir.norm()
harmful_dict[layer] = (
torch.randn(n_per_class, hidden_dim) * 0.3
+ separation * layer_dir
)
harmless_dict[layer] = torch.randn(n_per_class, hidden_dim) * 0.3
direction_dict[layer] = layer_dir
return harmful_dict, harmless_dict, direction_dict
def _make_category_activations(
categories=("weapons", "cyber", "fraud"),
hidden_dim=32,
n_per_category=15,
seed=42,
):
"""Create per-category harmful activations with distinct directions."""
torch.manual_seed(seed)
category_acts = {}
for i, cat in enumerate(categories):
# Each category gets a distinct direction
direction = torch.zeros(hidden_dim)
direction[i * 3: i * 3 + 3] = 1.0
direction = direction / direction.norm()
category_acts[cat] = (
torch.randn(n_per_category, hidden_dim) * 0.3
+ 2.0 * direction
)
harmless = torch.randn(n_per_category, hidden_dim) * 0.3
return category_acts, harmless
# ===========================================================================
# 1. Riemannian Refusal Manifold Discovery
# ===========================================================================
class TestRiemannianManifold:
def test_analyzer_creation(self):
analyzer = RiemannianManifoldAnalyzer()
assert analyzer.n_sample_points == 50
assert analyzer.curvature_flatness_threshold == 0.01
def test_analyze_basic(self):
harmful_dict, harmless_dict, _ = _make_multilayer_activations()
analyzer = RiemannianManifoldAnalyzer(n_sample_points=10)
result = analyzer.analyze(harmful_dict, harmless_dict)
assert isinstance(result, RiemannianRefusalManifold)
assert result.ambient_dimension == 32
assert result.intrinsic_dimension >= 1
assert result.dimension_ratio > 0
assert result.recommendation in ("linear_sufficient", "geodesic_recommended")
def test_curvature_estimation(self):
harmful_dict, harmless_dict, _ = _make_multilayer_activations()
analyzer = RiemannianManifoldAnalyzer(n_sample_points=10)
result = analyzer.analyze(harmful_dict, harmless_dict)
assert isinstance(result.mean_sectional_curvature, float)
assert isinstance(result.max_sectional_curvature, float)
assert result.curvature_std >= 0
def test_layer_curvatures(self):
harmful_dict, harmless_dict, _ = _make_multilayer_activations(n_layers=4)
analyzer = RiemannianManifoldAnalyzer(n_sample_points=5)
result = analyzer.analyze(harmful_dict, harmless_dict)
assert len(result.layer_curvatures) > 0
assert len(result.layer_intrinsic_dims) > 0
def test_geodesic_diameter(self):
harmful_dict, harmless_dict, dir_dict = _make_multilayer_activations()
analyzer = RiemannianManifoldAnalyzer()
result = analyzer.analyze(harmful_dict, harmless_dict, dir_dict)
assert result.geodesic_diameter >= 0
# Geodesic diameter on the sphere is at most pi
assert result.geodesic_diameter <= math.pi + 0.01
def test_geodesic_projection(self):
harmful, harmless, direction = _make_activations()
analyzer = RiemannianManifoldAnalyzer(n_sample_points=5)
result = analyzer.compute_geodesic_projection(
harmful[0], direction, harmful, layer_idx=0
)
assert isinstance(result, GeodesicProjectionResult)
assert result.original_refusal_component > 0
assert result.improvement_factor >= 1.0
def test_empty_input(self):
analyzer = RiemannianManifoldAnalyzer()
result = analyzer.analyze({}, {})
assert result.intrinsic_dimension == 0
assert result.recommendation == "linear_sufficient"
def test_with_precomputed_directions(self):
harmful_dict, harmless_dict, dir_dict = _make_multilayer_activations()
analyzer = RiemannianManifoldAnalyzer(n_sample_points=5)
result = analyzer.analyze(harmful_dict, harmless_dict, dir_dict)
assert result.ambient_dimension == 32
assert result.geodesic_vs_euclidean_ratio > 0
def test_flat_manifold_detection(self):
"""When activations are purely linear, curvature should be near zero."""
torch.manual_seed(99)
d = 32
# Create activations along a perfectly linear direction
direction = torch.randn(d)
direction = direction / direction.norm()
harmful = {0: direction.unsqueeze(0).repeat(20, 1) + torch.randn(20, d) * 0.01}
harmless = {0: torch.randn(20, d) * 0.01}
analyzer = RiemannianManifoldAnalyzer(
n_sample_points=5, curvature_flatness_threshold=1.0
)
result = analyzer.analyze(harmful, harmless)
# With very concentrated activations, curvature should be manageable
assert isinstance(result.is_approximately_flat, bool)
# ===========================================================================
# 2. Anti-Ouroboros Adversarial Self-Repair Probing
# ===========================================================================
class TestAntiOuroboros:
def test_prober_creation(self):
prober = AntiOuroborosProber()
assert prober.repair_threshold == 0.05
def test_build_asrg_from_strengths(self):
refusal_strengths = {0: 0.2, 1: 0.5, 2: 0.8, 3: 0.6, 4: 0.3, 5: 0.1}
prober = AntiOuroborosProber()
result = prober.build_asrg(refusal_strengths)
assert isinstance(result, ASRGResult)
assert result.n_nodes == 6
assert result.n_edges > 0
assert result.spectral_gap >= 0
assert result.self_repair_risk in ("low", "medium", "high", "extreme")
def test_repair_hubs_identified(self):
# Layer 3 has peak refusal — it should be a repair hub or
# be first in vulnerability ordering
refusal_strengths = {0: 0.1, 1: 0.2, 2: 0.5, 3: 0.9, 4: 0.3, 5: 0.1}
prober = AntiOuroborosProber(hub_percentile=0.8)
result = prober.build_asrg(refusal_strengths)
assert len(result.vulnerability_ordering) == 6
# Layer 3 should be near the top of vulnerability ordering
assert 3 in result.vulnerability_ordering[:3]
def test_with_self_repair_data(self):
refusal_strengths = {0: 0.3, 1: 0.6, 2: 0.4}
self_repair_results = [
{
"ablated_layer": 1,
"compensating_layers": [0, 2],
"repair_ratios": [0.2, 0.5],
},
]
prober = AntiOuroborosProber()
result = prober.build_asrg(refusal_strengths, self_repair_results)
assert result.n_edges >= 2
# Edge from layer 1 to layer 2 should have weight 0.5
edge_12 = [e for e in result.edges if e.source_layer == 1 and e.target_layer == 2]
assert len(edge_12) == 1
assert abs(edge_12[0].repair_weight - 0.5) < 1e-6
def test_spectral_gap(self):
refusal_strengths = {i: 0.5 for i in range(8)}
prober = AntiOuroborosProber()
result = prober.build_asrg(refusal_strengths)
assert result.spectral_gap >= 0
assert result.algebraic_connectivity >= 0
def test_min_ablations_bound(self):
refusal_strengths = {i: 0.3 + i * 0.1 for i in range(6)}
prober = AntiOuroborosProber()
result = prober.build_asrg(refusal_strengths)
assert result.min_simultaneous_ablations >= 1
assert result.min_simultaneous_ablations <= 6
assert len(result.recommended_ablation_set) == result.min_simultaneous_ablations
def test_empty_input(self):
prober = AntiOuroborosProber()
result = prober.build_asrg({0: 0.5})
assert result.n_nodes == 1
assert result.self_repair_risk == "low"
def test_estimated_passes(self):
# High self-repair should require more passes
refusal_strengths = {i: 0.8 for i in range(10)}
prober = AntiOuroborosProber()
result = prober.build_asrg(refusal_strengths)
assert result.estimated_passes_needed >= 1
def test_repair_locality(self):
refusal_strengths = {i: 0.5 for i in range(6)}
prober = AntiOuroborosProber()
result = prober.build_asrg(refusal_strengths)
assert 0 <= result.repair_locality <= 1
# ===========================================================================
# 3. Conditional Abliteration
# ===========================================================================
class TestConditionalAbliteration:
def test_abliterator_creation(self):
abliterator = ConditionalAbliterator()
assert abliterator.selectivity_threshold == 0.7
def test_analyze_basic(self):
category_acts, harmless = _make_category_activations()
abliterator = ConditionalAbliterator(min_samples_per_category=5)
result = abliterator.analyze(category_acts, harmless)
assert isinstance(result, ConditionalAbliterationResult)
assert result.n_categories > 0
assert len(result.projectors) > 0
def test_category_projectors(self):
category_acts, harmless = _make_category_activations()
abliterator = ConditionalAbliterator(min_samples_per_category=5)
result = abliterator.analyze(category_acts, harmless)
for proj in result.projectors:
assert isinstance(proj, CategoryProjector)
assert proj.condition_vector.shape == (32,)
assert proj.projection_direction.shape == (32,)
assert 0 <= proj.selectivity <= 1
def test_selectivity(self):
"""Categories with distinct directions should have high selectivity."""
category_acts, harmless = _make_category_activations(
categories=("weapons", "cyber", "fraud"),
hidden_dim=32,
n_per_category=20,
)
abliterator = ConditionalAbliterator(
selectivity_threshold=0.3,
min_samples_per_category=5,
)
result = abliterator.analyze(category_acts, harmless)
# With well-separated categories, selectivity should be reasonable
assert result.mean_selectivity > 0
def test_orthogonality(self):
category_acts, harmless = _make_category_activations()
abliterator = ConditionalAbliterator(min_samples_per_category=5)
result = abliterator.analyze(category_acts, harmless)
assert 0 <= result.orthogonality_score <= 1
def test_sheaf_consistency(self):
category_acts, harmless = _make_category_activations()
abliterator = ConditionalAbliterator(min_samples_per_category=5)
result = abliterator.analyze(category_acts, harmless)
assert 0 <= result.sheaf_consistency_score <= 1
assert isinstance(result.consistency_violations, list)
def test_leakage_matrix(self):
category_acts, harmless = _make_category_activations()
abliterator = ConditionalAbliterator(min_samples_per_category=5)
result = abliterator.analyze(category_acts, harmless)
# Leakage matrix should be square with n_categories
assert result.cross_category_leakage.shape[0] == result.n_categories
def test_empty_categories(self):
abliterator = ConditionalAbliterator()
result = abliterator.analyze({}, torch.randn(10, 32))
assert result.n_categories == 0
assert len(result.projectors) == 0
def test_too_few_samples(self):
"""Categories with too few samples should be skipped."""
category_acts = {"weapons": torch.randn(2, 32)} # only 2 samples
harmless = torch.randn(10, 32)
abliterator = ConditionalAbliterator(min_samples_per_category=5)
result = abliterator.analyze(category_acts, harmless)
assert result.n_categories == 0
def test_viable_vs_risky(self):
category_acts, harmless = _make_category_activations()
abliterator = ConditionalAbliterator(
selectivity_threshold=0.3,
min_samples_per_category=5,
)
result = abliterator.analyze(category_acts, harmless)
# All categories should be either viable or risky
total = len(result.viable_categories) + len(result.risky_categories)
assert total == result.n_categories
# ===========================================================================
# 4. Wasserstein Refusal Transfer
# ===========================================================================
class TestWassersteinTransfer:
def test_transfer_creation(self):
transfer = WassersteinRefusalTransfer()
assert transfer.fidelity_threshold == 0.5
def test_compute_transfer_same_model(self):
"""Transfer from a model to itself should have high fidelity."""
harmful_dict, harmless_dict, dir_dict = _make_multilayer_activations(
n_layers=4, hidden_dim=32
)
transfer = WassersteinRefusalTransfer()
result = transfer.compute_transfer(
source_activations=harmful_dict,
target_activations=harmful_dict, # same activations
source_refusal_directions=dir_dict,
source_model_name="model_a",
target_model_name="model_a",
)
assert isinstance(result, WassersteinTransferResult)
assert result.n_layers_transferred > 0
assert result.wasserstein_distance < float("inf")
def test_compute_transfer_different_models(self):
"""Transfer between different models."""
src_h, src_b, src_dirs = _make_multilayer_activations(
n_layers=4, hidden_dim=32, seed=42
)
tgt_h, tgt_b, _ = _make_multilayer_activations(
n_layers=4, hidden_dim=32, seed=99
)
transfer = WassersteinRefusalTransfer()
result = transfer.compute_transfer(
source_activations=src_h,
target_activations=tgt_h,
source_refusal_directions=src_dirs,
source_model_name="llama",
target_model_name="yi",
)
assert result.n_layers_transferred > 0
assert result.transfer_viability in ("excellent", "good", "marginal", "poor")
def test_layer_mapping(self):
"""Layer mapping with different layer counts."""
src_h, _, src_dirs = _make_multilayer_activations(
n_layers=6, hidden_dim=32
)
tgt_h, _, _ = _make_multilayer_activations(
n_layers=4, hidden_dim=32, seed=99
)
transfer = WassersteinRefusalTransfer()
result = transfer.compute_transfer(
source_activations=src_h,
target_activations=tgt_h,
source_refusal_directions=src_dirs,
)
assert len(result.layer_mapping) > 0
def test_explicit_layer_mapping(self):
src_h, _, src_dirs = _make_multilayer_activations(
n_layers=4, hidden_dim=32
)
tgt_h, _, _ = _make_multilayer_activations(
n_layers=4, hidden_dim=32, seed=99
)
transfer = WassersteinRefusalTransfer()
result = transfer.compute_transfer(
source_activations=src_h,
target_activations=tgt_h,
source_refusal_directions=src_dirs,
layer_mapping={0: 0, 1: 1, 2: 2, 3: 3},
)
assert result.n_layers_transferred == 4
def test_transferred_directions(self):
src_h, _, src_dirs = _make_multilayer_activations(
n_layers=3, hidden_dim=32
)
tgt_h, _, _ = _make_multilayer_activations(
n_layers=3, hidden_dim=32, seed=99
)
transfer = WassersteinRefusalTransfer()
result = transfer.compute_transfer(
source_activations=src_h,
target_activations=tgt_h,
source_refusal_directions=src_dirs,
)
for td in result.transferred_directions:
assert isinstance(td, TransferredDirection)
assert td.transferred_direction.shape == (32,)
# Direction should be approximately unit norm
assert abs(td.transferred_direction.norm().item() - 1.0) < 0.1 or \
td.transferred_direction.norm().item() < 0.1
def test_empty_input(self):
transfer = WassersteinRefusalTransfer()
result = transfer.compute_transfer({}, {}, {})
assert result.n_layers_transferred == 0
assert result.transfer_viability == "poor"
def test_recommendation_generated(self):
src_h, _, src_dirs = _make_multilayer_activations(n_layers=3)
tgt_h, _, _ = _make_multilayer_activations(n_layers=3, seed=99)
transfer = WassersteinRefusalTransfer()
result = transfer.compute_transfer(
source_activations=src_h,
target_activations=tgt_h,
source_refusal_directions=src_dirs,
)
assert isinstance(result.recommendation, str)
assert len(result.recommendation) > 10
# ===========================================================================
# 5. Spectral Abliteration Completeness Certification
# ===========================================================================
class TestSpectralCertification:
def test_certifier_creation(self):
certifier = SpectralCertifier()
assert certifier.confidence_level == 0.95
def test_certify_complete_abliteration(self):
"""After successful abliteration, should certify GREEN."""
torch.manual_seed(42)
d = 32
n = 50
# Post-abliteration: harmful and harmless should be indistinguishable
harmful = torch.randn(n, d) * 0.3
harmless = torch.randn(n, d) * 0.3
certifier = SpectralCertifier()
result = certifier.certify(harmful, harmless)
assert isinstance(result, SpectralCertificate)
# With no signal, should be GREEN
assert result.level == CertificationLevel.GREEN
def test_certify_incomplete_abliteration(self):
"""With clear residual refusal signal, should certify RED."""
torch.manual_seed(42)
d = 32
n = 50
direction = torch.randn(d)
direction = direction / direction.norm()
# Strong residual signal
harmful = torch.randn(n, d) * 0.3 + 5.0 * direction
harmless = torch.randn(n, d) * 0.3
certifier = SpectralCertifier()
result = certifier.certify(harmful, harmless)
assert result.level == CertificationLevel.RED
assert result.n_eigenvalues_above_threshold > 0
assert result.eigenvalue_margin > 0
def test_bbp_threshold(self):
torch.manual_seed(42)
harmful = torch.randn(30, 32) * 0.3
harmless = torch.randn(30, 32) * 0.3
certifier = SpectralCertifier()
result = certifier.certify(harmful, harmless)
assert result.bbp_threshold > 0
assert result.mp_upper_edge > 0
assert result.noise_variance > 0
def test_anisotropic_correction(self):
"""Non-isotropic BBP extension should increase the threshold."""
torch.manual_seed(42)
harmful = torch.randn(30, 32) * 0.3
harmless = torch.randn(30, 32) * 0.3
certifier = SpectralCertifier()
result = certifier.certify(harmful, harmless)
assert result.condition_number >= 1.0
assert result.anisotropy_correction >= 1.0
assert result.anisotropic_threshold >= result.isotropic_threshold
def test_sample_sufficiency(self):
torch.manual_seed(42)
harmful = torch.randn(10, 32) * 0.3
harmless = torch.randn(10, 32) * 0.3
certifier = SpectralCertifier(min_samples=50)
result = certifier.certify(harmful, harmless)
assert result.n_samples_used == 20
assert result.n_samples_required >= 50
def test_certify_all_layers(self):
harmful_dict, harmless_dict, _ = _make_multilayer_activations(n_layers=4)
certifier = SpectralCertifier()
results = certifier.certify_all_layers(harmful_dict, harmless_dict)
assert len(results) == 4
for layer_idx, cert in results.items():
assert isinstance(cert, SpectralCertificate)
def test_overall_certification(self):
harmful_dict, harmless_dict, _ = _make_multilayer_activations(n_layers=4)
certifier = SpectralCertifier()
layer_certs = certifier.certify_all_layers(harmful_dict, harmless_dict)
overall = certifier.overall_certification(layer_certs)
assert overall is not None
assert isinstance(overall.level, CertificationLevel)
def test_signal_analysis(self):
torch.manual_seed(42)
d = 32
n = 40
direction = torch.randn(d)
direction = direction / direction.norm()
harmful = torch.randn(n, d) * 0.3 + 3.0 * direction
harmless = torch.randn(n, d) * 0.3
certifier = SpectralCertifier()
result = certifier.certify(harmful, harmless)
assert result.signal_to_noise_ratio >= 0
assert result.signal_energy >= 0
assert result.noise_energy >= 0
def test_recommendation_text(self):
torch.manual_seed(42)
harmful = torch.randn(30, 32) * 0.3
harmless = torch.randn(30, 32) * 0.3
certifier = SpectralCertifier()
result = certifier.certify(harmful, harmless)
assert isinstance(result.recommendation, str)
assert len(result.recommendation) > 10
assert result.suggested_action in (
"none", "more_directions", "grp_obliteration", "more_samples"
)
# ===========================================================================
# Integration: All modules importable from analysis package
# ===========================================================================
class TestImports:
def test_import_riemannian(self):
from obliteratus.analysis import RiemannianManifoldAnalyzer
assert RiemannianManifoldAnalyzer is not None
def test_import_anti_ouroboros(self):
from obliteratus.analysis import AntiOuroborosProber
assert AntiOuroborosProber is not None
def test_import_conditional(self):
from obliteratus.analysis import ConditionalAbliterator
assert ConditionalAbliterator is not None
def test_import_wasserstein_transfer(self):
from obliteratus.analysis import WassersteinRefusalTransfer
assert WassersteinRefusalTransfer is not None
def test_import_spectral_certifier(self):
from obliteratus.analysis import SpectralCertifier, CertificationLevel
assert SpectralCertifier is not None
assert CertificationLevel.GREEN.value == "certified_complete"