""" Tests for utils/model_patterns.py Tests pure logic functions that don't require model loading: - merge_token_probabilities - safe_to_serializable - execute_forward_pass_with_multi_layer_head_ablation (import/signature tests) """ import pytest import torch import numpy as np from utils.model_patterns import merge_token_probabilities, safe_to_serializable from utils import execute_forward_pass_with_multi_layer_head_ablation class TestMergeTokenProbabilities: """Tests for merge_token_probabilities function.""" def test_merges_tokens_with_leading_space(self): """Tokens with and without leading space should be merged.""" token_probs = [ (" cat", 0.15), ("cat", 0.05), (" dog", 0.10), ] result = merge_token_probabilities(token_probs) # Convert to dict for easier checking result_dict = dict(result) assert "cat" in result_dict assert abs(result_dict["cat"] - 0.20) < 1e-6 # 0.15 + 0.05 assert "dog" in result_dict assert abs(result_dict["dog"] - 0.10) < 1e-6 def test_sorts_by_probability_descending(self): """Results should be sorted by probability (highest first).""" token_probs = [ ("low", 0.01), ("high", 0.50), ("medium", 0.20), ] result = merge_token_probabilities(token_probs) # Check order: high, medium, low assert result[0][0] == "high" assert result[1][0] == "medium" assert result[2][0] == "low" def test_handles_empty_input(self): """Empty input should return empty list.""" result = merge_token_probabilities([]) assert result == [] def test_handles_single_token(self): """Single token should be returned as-is (stripped).""" result = merge_token_probabilities([(" hello", 0.5)]) assert len(result) == 1 assert result[0][0] == "hello" assert result[0][1] == 0.5 def test_strips_multiple_spaces(self): """Multiple leading spaces should all be stripped.""" token_probs = [ (" word", 0.3), # Two spaces (" word", 0.2), # One space ("word", 0.1), # No space ] result = merge_token_probabilities(token_probs) result_dict = dict(result) assert "word" in result_dict assert abs(result_dict["word"] - 0.6) < 1e-6 # All merged class TestSafeToSerializable: """Tests for safe_to_serializable function.""" def test_converts_tensor_to_list(self): """PyTorch tensor should be converted to Python list.""" tensor = torch.tensor([1.0, 2.0, 3.0]) result = safe_to_serializable(tensor) assert isinstance(result, list) assert result == [1.0, 2.0, 3.0] def test_converts_nested_tensor(self): """2D tensor should become nested list.""" tensor = torch.tensor([[1, 2], [3, 4]]) result = safe_to_serializable(tensor) assert isinstance(result, list) assert result == [[1, 2], [3, 4]] def test_converts_list_of_tensors(self): """List containing tensors should have tensors converted.""" data = [torch.tensor([1, 2]), torch.tensor([3, 4])] result = safe_to_serializable(data) assert result == [[1, 2], [3, 4]] def test_converts_dict_with_tensor_values(self): """Dict with tensor values should have values converted.""" data = { "a": torch.tensor([1.0, 2.0]), "b": "string_value", "c": 42 } result = safe_to_serializable(data) assert result["a"] == [1.0, 2.0] assert result["b"] == "string_value" assert result["c"] == 42 def test_handles_tuple_input(self): """Tuple with tensors should be converted to list.""" data = (torch.tensor([1]), torch.tensor([2])) result = safe_to_serializable(data) assert isinstance(result, list) assert result == [[1], [2]] def test_passes_through_primitives(self): """Primitive types should pass through unchanged.""" assert safe_to_serializable(42) == 42 assert safe_to_serializable(3.14) == 3.14 assert safe_to_serializable("hello") == "hello" assert safe_to_serializable(None) is None assert safe_to_serializable(True) is True def test_handles_deeply_nested_structure(self): """Should handle deeply nested structures with tensors.""" data = { "level1": { "level2": { "tensor": torch.tensor([1, 2, 3]) } }, "list": [torch.tensor([4, 5])] } result = safe_to_serializable(data) assert result["level1"]["level2"]["tensor"] == [1, 2, 3] assert result["list"] == [[4, 5]] def test_handles_empty_containers(self): """Empty lists, dicts, tuples should remain empty.""" assert safe_to_serializable([]) == [] assert safe_to_serializable({}) == {} assert safe_to_serializable(()) == [] # Tuple becomes list class TestSafeToSerializableEdgeCases: """Edge case tests for safe_to_serializable.""" def test_handles_scalar_tensor(self): """Scalar tensor should become a Python scalar.""" scalar = torch.tensor(42.0) result = safe_to_serializable(scalar) # Scalar tensor.tolist() returns a Python number assert result == 42.0 def test_handles_integer_tensor(self): """Integer tensor should be converted correctly.""" tensor = torch.tensor([1, 2, 3], dtype=torch.int64) result = safe_to_serializable(tensor) assert result == [1, 2, 3] assert all(isinstance(x, int) for x in result) def test_handles_mixed_list(self): """List with mixed tensor and non-tensor items should work.""" data = [torch.tensor([1]), "string", 42, {"key": torch.tensor([2])}] result = safe_to_serializable(data) assert result[0] == [1] assert result[1] == "string" assert result[2] == 42 assert result[3] == {"key": [2]} class TestMultiLayerHeadAblation: """Tests for execute_forward_pass_with_multi_layer_head_ablation function. These tests verify the function exists, is importable, and has the expected signature. Full integration tests would require loading a model. """ def test_function_is_importable(self): """Function should be importable from utils.""" from utils import execute_forward_pass_with_multi_layer_head_ablation assert callable(execute_forward_pass_with_multi_layer_head_ablation) def test_function_has_expected_signature(self): """Function should accept model, tokenizer, prompt, config, heads_by_layer.""" import inspect sig = inspect.signature(execute_forward_pass_with_multi_layer_head_ablation) params = list(sig.parameters.keys()) assert 'model' in params assert 'tokenizer' in params assert 'prompt' in params assert 'config' in params assert 'heads_by_layer' in params def test_heads_by_layer_type_annotation(self): """heads_by_layer parameter should accept Dict[int, List[int]].""" import inspect from typing import Dict, List, get_type_hints # Get annotations (may not be available at runtime if not using from __future__) sig = inspect.signature(execute_forward_pass_with_multi_layer_head_ablation) heads_param = sig.parameters.get('heads_by_layer') # The parameter should exist assert heads_param is not None # Annotation may be a string or the actual type if heads_param.annotation != inspect.Parameter.empty: annotation_str = str(heads_param.annotation) assert 'Dict' in annotation_str or 'dict' in annotation_str.lower() def test_returns_error_for_no_modules(self): """Should return error dict when config has no modules. Note: This test uses a mock model that won't actually run forward pass. The function should return early with an error before trying to run. """ from unittest.mock import MagicMock mock_model = MagicMock() mock_tokenizer = MagicMock() empty_config = {} # No modules specified heads_by_layer = {0: [1]} # Non-empty to avoid early return result = execute_forward_pass_with_multi_layer_head_ablation( mock_model, mock_tokenizer, "test prompt", empty_config, heads_by_layer ) assert 'error' in result assert 'No modules specified' in result['error'] def test_returns_error_for_invalid_layer(self): """Should return error when layer number doesn't match any module.""" from unittest.mock import MagicMock mock_model = MagicMock() mock_tokenizer = MagicMock() # Config has layer 0 and 1, but we'll request layer 99 config = { 'attention_modules': ['model.layers.0.self_attn', 'model.layers.1.self_attn'], 'block_modules': ['model.layers.0', 'model.layers.1'] } heads_by_layer = {99: [0, 1]} # Layer 99 doesn't exist result = execute_forward_pass_with_multi_layer_head_ablation( mock_model, mock_tokenizer, "test prompt", config, heads_by_layer ) assert 'error' in result assert '99' in result['error'] # Should mention the invalid layer class TestComputePerPositionTop5: """Tests for compute_per_position_top5 function.""" def _make_mock_output(self, seq_len, vocab_size=10): """Create a mock model output with predictable logits. At each position i, logit[i] = 10.0 (highest), so the top-1 token is always token index == position index. Other logits are 1.0. """ logits = torch.ones(1, seq_len, vocab_size) for i in range(seq_len): # Make token (i % vocab_size) the top prediction at position i logits[0, i, i % vocab_size] = 10.0 class MockOutput: pass out = MockOutput() out.logits = logits return out def _make_mock_tokenizer(self, vocab_size=10): """Create a mock tokenizer that decodes token IDs to 'tok_N'.""" from unittest.mock import MagicMock tok = MagicMock() def decode_fn(ids, skip_special_tokens=False): if isinstance(ids, list) and len(ids) == 1: return f"tok_{ids[0]}" return "".join(f"tok_{i}" for i in ids) tok.decode = decode_fn return tok def test_returns_correct_number_of_positions(self): """With prompt_token_count=3 and seq_len=7, should return 4 positions (7-3).""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=7, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) # Full sequence has 7 tokens, prompt has 3, so 4 generated tokens result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=5) assert len(result) == 4 # positions 0, 1, 2, 3 def test_single_generated_token(self): """With 1 generated token, should return exactly 1 position.""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=4, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=5) assert len(result) == 1 assert result[0]['position'] == 0 def test_each_position_has_top_k_entries(self): """Each position should have exactly top_k entries in top5 list.""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=8, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=5) for pos_data in result: assert len(pos_data['top5']) == 5 def test_top_k_3(self): """Should respect custom top_k parameter.""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=6, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=3) for pos_data in result: assert len(pos_data['top5']) == 3 def test_probabilities_sorted_descending(self): """Top-5 probabilities should be in descending order.""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=6, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=5) for pos_data in result: probs = [entry['probability'] for entry in pos_data['top5']] assert probs == sorted(probs, reverse=True) def test_probabilities_are_valid(self): """All probabilities should be between 0 and 1.""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=6, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=5) for pos_data in result: for entry in pos_data['top5']: assert 0.0 <= entry['probability'] <= 1.0 assert 0.0 <= pos_data['actual_prob'] <= 1.0 def test_actual_token_field_present(self): """Each position should have actual_token and actual_prob fields.""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=6, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=5) for pos_data in result: assert 'actual_token' in pos_data assert 'actual_prob' in pos_data assert isinstance(pos_data['actual_token'], str) assert isinstance(pos_data['actual_prob'], float) def test_position_indices_sequential(self): """Position indices should be 0, 1, 2, ... N-1.""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=8, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=5) positions = [r['position'] for r in result] assert positions == list(range(5)) # 8 - 3 = 5 positions def test_does_not_include_position_beyond_sequence(self): """Should NOT produce a position that predicts beyond the last token.""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=5, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) # prompt=3, seq=5, so 2 generated tokens -> positions 0 and 1 result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=5) assert len(result) == 2 # Position 0: logits at index 2 (prompt_len-1), predicts token at index 3 # Position 1: logits at index 3, predicts token at index 4 # NO position for logits at index 4 (would predict beyond sequence) def test_prompt_equals_sequence_returns_empty(self): """When prompt_token_count == seq_len (no generated tokens), return empty.""" from utils.model_patterns import compute_per_position_top5 model_output = self._make_mock_output(seq_len=3, vocab_size=10) tokenizer = self._make_mock_tokenizer(vocab_size=10) result = compute_per_position_top5(model_output, tokenizer, prompt_token_count=3, top_k=5) assert result == [] class TestFullSequenceAttentionData: """ Tests verifying that activation data for full sequences (prompt + generated output) has correctly-sized attention matrices matching the full token count. """ def _make_activation_data(self, num_tokens, num_layers=2, num_heads=1): """Helper to build mock activation data for a given sequence length.""" input_ids = list(range(1, num_tokens + 1)) # Build uniform attention weights: [batch=1, heads, seq, seq] uniform_val = 1.0 / num_tokens attn_row = [uniform_val] * num_tokens head_matrix = [attn_row] * num_tokens head_block = [head_matrix] * num_heads attn_weights = [head_block] # batch dim attention_outputs = {} for layer in range(num_layers): module_name = f'model.layers.{layer}.self_attn' attention_outputs[module_name] = { 'output': [ [[0.1] * num_tokens], # hidden states placeholder attn_weights ] } return { 'model': 'mock-model', 'prompt': 'x ' * num_tokens, 'input_ids': [input_ids], 'attention_modules': list(attention_outputs.keys()), 'attention_outputs': attention_outputs, 'block_modules': [f'model.layers.{i}' for i in range(num_layers)], 'block_outputs': {f'model.layers.{i}': {'output': [[[0.1] * num_tokens]]} for i in range(num_layers)}, 'norm_parameters': [], 'norm_data': [], 'actual_output': {'token': 'tok', 'probability': 0.5}, 'global_top5_tokens': [] } def test_short_prompt_attention_dimensions(self): """Attention matrix for a 4-token prompt should be 4x4.""" data = self._make_activation_data(num_tokens=4) attn = data['attention_outputs']['model.layers.0.self_attn']['output'][1] seq_len = len(attn[0][0]) # batch -> head -> rows assert seq_len == 4 assert len(attn[0][0][0]) == 4 # columns def test_full_sequence_attention_dimensions(self): """Attention matrix for a 10-token full sequence (prompt+output) should be 10x10.""" data = self._make_activation_data(num_tokens=10) attn = data['attention_outputs']['model.layers.0.self_attn']['output'][1] seq_len = len(attn[0][0]) assert seq_len == 10 assert len(attn[0][0][0]) == 10 def test_full_sequence_has_more_tokens_than_prompt(self): """Full-sequence activation data should have larger dimensions than prompt-only.""" prompt_data = self._make_activation_data(num_tokens=4) full_data = self._make_activation_data(num_tokens=10) prompt_ids = prompt_data['input_ids'][0] full_ids = full_data['input_ids'][0] assert len(full_ids) > len(prompt_ids) prompt_attn = prompt_data['attention_outputs']['model.layers.0.self_attn']['output'][1] full_attn = full_data['attention_outputs']['model.layers.0.self_attn']['output'][1] assert len(full_attn[0][0]) > len(prompt_attn[0][0]) def test_input_ids_match_attention_seq_len(self): """input_ids length should match attention matrix dimensions.""" for n in [3, 7, 15]: data = self._make_activation_data(num_tokens=n) num_ids = len(data['input_ids'][0]) attn = data['attention_outputs']['model.layers.0.self_attn']['output'][1] assert num_ids == len(attn[0][0]) == n def test_all_layers_have_same_dimensions(self): """All layers should have the same attention matrix size for a given sequence.""" data = self._make_activation_data(num_tokens=8, num_layers=3) for layer in range(3): module = f'model.layers.{layer}.self_attn' attn = data['attention_outputs'][module]['output'][1] assert len(attn[0][0]) == 8 assert len(attn[0][0][0]) == 8