File size: 8,563 Bytes
ac224ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
"""
tests/test_fault_math.py
------------------------
Tests for server/fault_math.py.

Verifies that:
- Each fault type degrades retrieval relative to a no-fault baseline.
- apply_faults() with an empty fault set returns the original matrix (up to float precision).
- make_noise() returns the expected keys.
- All output values stay in [0, 1] after clipping.
"""

import numpy as np
import pytest

from server.fault_math import apply_faults, make_noise
from models import FaultType


# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------

N_QUERIES = 8
N_CHUNKS = 40
SEED = 42


@pytest.fixture
def rng():
    return np.random.default_rng(SEED)


@pytest.fixture
def S_true(rng):
    """A realistic S_true matrix: relevant chunks score ~0.7-0.9, others ~0.2-0.5."""
    S = rng.uniform(0.2, 0.5, (N_QUERIES, N_CHUNKS)).astype(np.float32)
    # Spike the first 2 chunks of each query as "relevant" with high scores.
    for i in range(N_QUERIES):
        S[i, i % N_CHUNKS] = rng.uniform(0.75, 0.92)
        S[i, (i + 1) % N_CHUNKS] = rng.uniform(0.70, 0.88)
    return S


@pytest.fixture
def noise(rng):
    return make_noise(rng, (N_QUERIES, N_CHUNKS))


@pytest.fixture
def dupe_ids(rng):
    return rng.choice(N_CHUNKS, size=max(1, N_CHUNKS // 7), replace=False)


def _apply(S, fault_types, noise, dupe_ids, **kwargs):
    defaults = dict(
        config_chunk_size=512,
        config_context_limit=4096,
        config_use_reranking=False,
        config_chunk_overlap=50,
    )
    defaults.update(kwargs)
    return apply_faults(
        S=S,
        fault_types=set(fault_types),
        noise=noise,
        dupe_ids=dupe_ids,
        **defaults,
    )


# ---------------------------------------------------------------------------
# Basic contracts
# ---------------------------------------------------------------------------

def test_no_faults_returns_original(S_true, noise, dupe_ids):
    """With no faults, the matrix should be unchanged (modulo float32 copy)."""
    result = _apply(S_true, [], noise, dupe_ids)
    np.testing.assert_array_almost_equal(result, S_true, decimal=5)


def test_output_clipped_to_unit_interval(S_true, noise, dupe_ids):
    """All faults combined must still produce values in [0, 1]."""
    all_faults = [
        FaultType.CHUNK_TOO_LARGE,
        FaultType.CHUNK_TOO_SMALL,
        FaultType.THRESHOLD_TOO_LOW,
        FaultType.THRESHOLD_TOO_HIGH,
        FaultType.TOP_K_TOO_SMALL,
        FaultType.DUPLICATE_FLOODING,
        FaultType.CONTEXT_OVERFLOW,
        FaultType.NO_RERANKING,
    ]
    result = _apply(S_true, all_faults, noise, dupe_ids)
    assert result.min() >= 0.0, f"Minimum value {result.min()} < 0"
    assert result.max() <= 1.0, f"Maximum value {result.max()} > 1"


def test_make_noise_returns_expected_keys(rng):
    shape = (5, 20)
    n = make_noise(rng, shape)
    assert FaultType.CHUNK_TOO_SMALL in n
    assert FaultType.THRESHOLD_TOO_LOW in n
    assert FaultType.NO_RERANKING in n
    for v in n.values():
        assert v.shape == shape
        assert v.dtype == np.float32


def test_apply_faults_does_not_mutate_input(S_true, noise, dupe_ids):
    original = S_true.copy()
    _apply(S_true, [FaultType.CHUNK_TOO_LARGE, FaultType.THRESHOLD_TOO_HIGH], noise, dupe_ids)
    np.testing.assert_array_equal(S_true, original)


# ---------------------------------------------------------------------------
# Each fault degrades in the expected direction
# ---------------------------------------------------------------------------

def _mean_top1_score(S: np.ndarray) -> float:
    """Mean of the max score per query — proxy for how retrieval-friendly the matrix is."""
    return float(S.max(axis=1).mean())


def _score_std(S: np.ndarray) -> float:
    return float(S.std())


def test_chunk_too_large_smears_scores(S_true, noise, dupe_ids):
    """CHUNK_TOO_LARGE applies a box filter that blurs score peaks downward."""
    result = _apply(S_true, [FaultType.CHUNK_TOO_LARGE], noise, dupe_ids, config_chunk_size=2048)
    # Box filter reduces the peak scores
    assert _mean_top1_score(result) < _mean_top1_score(S_true), (
        "CHUNK_TOO_LARGE should smear peak scores downward"
    )


def test_threshold_too_high_deflates_scores(S_true, noise, dupe_ids):
    """THRESHOLD_TOO_HIGH multiplies all scores by 0.55, reducing absolute values."""
    result = _apply(S_true, [FaultType.THRESHOLD_TOO_HIGH], noise, dupe_ids)
    assert result.mean() < S_true.mean() * 0.65, (
        "THRESHOLD_TOO_HIGH should significantly deflate scores"
    )


def test_top_k_too_small_compresses_score_range(S_true, noise, dupe_ids):
    """TOP_K_TOO_SMALL compresses scores toward 0.5, reducing std."""
    result = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids)
    assert _score_std(result) < _score_std(S_true), (
        "TOP_K_TOO_SMALL should compress score variance"
    )


def test_top_k_too_small_less_severe_with_reranking(S_true, noise, dupe_ids):
    """Enabling reranking should reduce TOP_K_TOO_SMALL severity."""
    without_rerank = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids,
                            config_use_reranking=False)
    with_rerank = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids,
                         config_use_reranking=True)
    # With reranking, score variance should be closer to original (less compressed)
    std_without = _score_std(without_rerank)
    std_with = _score_std(with_rerank)
    assert std_with > std_without, (
        "Reranking should partially restore score spread under TOP_K_TOO_SMALL"
    )


def test_duplicate_flooding_boosts_dupe_columns(S_true, noise, dupe_ids):
    """DUPLICATE_FLOODING boosts duplicate chunk columns."""
    result = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids)
    # Mean score of duplicate columns should be higher after flooding
    assert result[:, dupe_ids].mean() > S_true[:, dupe_ids].mean(), (
        "DUPLICATE_FLOODING should boost duplicate chunk scores"
    )


def test_duplicate_flooding_reduced_with_reranking(S_true, noise, dupe_ids):
    """Reranking should reduce the duplicate flooding boost."""
    without = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids,
                     config_use_reranking=False)
    with_rr = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids,
                     config_use_reranking=True)
    assert with_rr[:, dupe_ids].mean() < without[:, dupe_ids].mean(), (
        "Reranking should reduce duplicate flooding boost"
    )


def test_context_overflow_zeroes_tail_columns(S_true, noise, dupe_ids):
    """CONTEXT_OVERFLOW zeroes chunks beyond the context window cutoff."""
    tight_limit = 512  # Very small → cuts most columns
    result = _apply(S_true, [FaultType.CONTEXT_OVERFLOW], noise, dupe_ids,
                    config_context_limit=tight_limit)
    cutoff = max(1, int(N_CHUNKS * tight_limit / 16384))
    if cutoff < N_CHUNKS:
        assert result[:, cutoff:].sum() == 0.0, (
            "Columns beyond context cutoff should be zeroed"
        )


def test_no_reranking_adds_noise(S_true, noise, dupe_ids):
    """NO_RERANKING fault adds noise when reranking is off."""
    result_off = _apply(S_true, [FaultType.NO_RERANKING], noise, dupe_ids,
                        config_use_reranking=False)
    result_on = _apply(S_true, [FaultType.NO_RERANKING], noise, dupe_ids,
                       config_use_reranking=True)
    # When reranking is on, NO_RERANKING fault is suppressed → output is closer to S_true
    diff_off = float(np.abs(result_off - S_true).mean())
    diff_on = float(np.abs(result_on - S_true).mean())
    assert diff_off > diff_on, (
        "NO_RERANKING should only add noise when reranking is disabled"
    )


def test_chunk_too_small_noise_reduced_by_overlap(S_true, noise, dupe_ids):
    """Higher chunk_overlap reduces CHUNK_TOO_SMALL noise sigma."""
    low_overlap = _apply(S_true, [FaultType.CHUNK_TOO_SMALL], noise, dupe_ids,
                         config_chunk_size=128, config_chunk_overlap=0)
    high_overlap = _apply(S_true, [FaultType.CHUNK_TOO_SMALL], noise, dupe_ids,
                          config_chunk_size=128, config_chunk_overlap=450)
    diff_low = float(np.abs(low_overlap - S_true).mean())
    diff_high = float(np.abs(high_overlap - S_true).mean())
    assert diff_low > diff_high, (
        "Higher overlap should reduce CHUNK_TOO_SMALL noise impact"
    )