Spaces:
Sleeping
Sleeping
File size: 8,563 Bytes
ac224ce | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 | """
tests/test_fault_math.py
------------------------
Tests for server/fault_math.py.
Verifies that:
- Each fault type degrades retrieval relative to a no-fault baseline.
- apply_faults() with an empty fault set returns the original matrix (up to float precision).
- make_noise() returns the expected keys.
- All output values stay in [0, 1] after clipping.
"""
import numpy as np
import pytest
from server.fault_math import apply_faults, make_noise
from models import FaultType
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
N_QUERIES = 8
N_CHUNKS = 40
SEED = 42
@pytest.fixture
def rng():
return np.random.default_rng(SEED)
@pytest.fixture
def S_true(rng):
"""A realistic S_true matrix: relevant chunks score ~0.7-0.9, others ~0.2-0.5."""
S = rng.uniform(0.2, 0.5, (N_QUERIES, N_CHUNKS)).astype(np.float32)
# Spike the first 2 chunks of each query as "relevant" with high scores.
for i in range(N_QUERIES):
S[i, i % N_CHUNKS] = rng.uniform(0.75, 0.92)
S[i, (i + 1) % N_CHUNKS] = rng.uniform(0.70, 0.88)
return S
@pytest.fixture
def noise(rng):
return make_noise(rng, (N_QUERIES, N_CHUNKS))
@pytest.fixture
def dupe_ids(rng):
return rng.choice(N_CHUNKS, size=max(1, N_CHUNKS // 7), replace=False)
def _apply(S, fault_types, noise, dupe_ids, **kwargs):
defaults = dict(
config_chunk_size=512,
config_context_limit=4096,
config_use_reranking=False,
config_chunk_overlap=50,
)
defaults.update(kwargs)
return apply_faults(
S=S,
fault_types=set(fault_types),
noise=noise,
dupe_ids=dupe_ids,
**defaults,
)
# ---------------------------------------------------------------------------
# Basic contracts
# ---------------------------------------------------------------------------
def test_no_faults_returns_original(S_true, noise, dupe_ids):
"""With no faults, the matrix should be unchanged (modulo float32 copy)."""
result = _apply(S_true, [], noise, dupe_ids)
np.testing.assert_array_almost_equal(result, S_true, decimal=5)
def test_output_clipped_to_unit_interval(S_true, noise, dupe_ids):
"""All faults combined must still produce values in [0, 1]."""
all_faults = [
FaultType.CHUNK_TOO_LARGE,
FaultType.CHUNK_TOO_SMALL,
FaultType.THRESHOLD_TOO_LOW,
FaultType.THRESHOLD_TOO_HIGH,
FaultType.TOP_K_TOO_SMALL,
FaultType.DUPLICATE_FLOODING,
FaultType.CONTEXT_OVERFLOW,
FaultType.NO_RERANKING,
]
result = _apply(S_true, all_faults, noise, dupe_ids)
assert result.min() >= 0.0, f"Minimum value {result.min()} < 0"
assert result.max() <= 1.0, f"Maximum value {result.max()} > 1"
def test_make_noise_returns_expected_keys(rng):
shape = (5, 20)
n = make_noise(rng, shape)
assert FaultType.CHUNK_TOO_SMALL in n
assert FaultType.THRESHOLD_TOO_LOW in n
assert FaultType.NO_RERANKING in n
for v in n.values():
assert v.shape == shape
assert v.dtype == np.float32
def test_apply_faults_does_not_mutate_input(S_true, noise, dupe_ids):
original = S_true.copy()
_apply(S_true, [FaultType.CHUNK_TOO_LARGE, FaultType.THRESHOLD_TOO_HIGH], noise, dupe_ids)
np.testing.assert_array_equal(S_true, original)
# ---------------------------------------------------------------------------
# Each fault degrades in the expected direction
# ---------------------------------------------------------------------------
def _mean_top1_score(S: np.ndarray) -> float:
"""Mean of the max score per query — proxy for how retrieval-friendly the matrix is."""
return float(S.max(axis=1).mean())
def _score_std(S: np.ndarray) -> float:
return float(S.std())
def test_chunk_too_large_smears_scores(S_true, noise, dupe_ids):
"""CHUNK_TOO_LARGE applies a box filter that blurs score peaks downward."""
result = _apply(S_true, [FaultType.CHUNK_TOO_LARGE], noise, dupe_ids, config_chunk_size=2048)
# Box filter reduces the peak scores
assert _mean_top1_score(result) < _mean_top1_score(S_true), (
"CHUNK_TOO_LARGE should smear peak scores downward"
)
def test_threshold_too_high_deflates_scores(S_true, noise, dupe_ids):
"""THRESHOLD_TOO_HIGH multiplies all scores by 0.55, reducing absolute values."""
result = _apply(S_true, [FaultType.THRESHOLD_TOO_HIGH], noise, dupe_ids)
assert result.mean() < S_true.mean() * 0.65, (
"THRESHOLD_TOO_HIGH should significantly deflate scores"
)
def test_top_k_too_small_compresses_score_range(S_true, noise, dupe_ids):
"""TOP_K_TOO_SMALL compresses scores toward 0.5, reducing std."""
result = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids)
assert _score_std(result) < _score_std(S_true), (
"TOP_K_TOO_SMALL should compress score variance"
)
def test_top_k_too_small_less_severe_with_reranking(S_true, noise, dupe_ids):
"""Enabling reranking should reduce TOP_K_TOO_SMALL severity."""
without_rerank = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids,
config_use_reranking=False)
with_rerank = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids,
config_use_reranking=True)
# With reranking, score variance should be closer to original (less compressed)
std_without = _score_std(without_rerank)
std_with = _score_std(with_rerank)
assert std_with > std_without, (
"Reranking should partially restore score spread under TOP_K_TOO_SMALL"
)
def test_duplicate_flooding_boosts_dupe_columns(S_true, noise, dupe_ids):
"""DUPLICATE_FLOODING boosts duplicate chunk columns."""
result = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids)
# Mean score of duplicate columns should be higher after flooding
assert result[:, dupe_ids].mean() > S_true[:, dupe_ids].mean(), (
"DUPLICATE_FLOODING should boost duplicate chunk scores"
)
def test_duplicate_flooding_reduced_with_reranking(S_true, noise, dupe_ids):
"""Reranking should reduce the duplicate flooding boost."""
without = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids,
config_use_reranking=False)
with_rr = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids,
config_use_reranking=True)
assert with_rr[:, dupe_ids].mean() < without[:, dupe_ids].mean(), (
"Reranking should reduce duplicate flooding boost"
)
def test_context_overflow_zeroes_tail_columns(S_true, noise, dupe_ids):
"""CONTEXT_OVERFLOW zeroes chunks beyond the context window cutoff."""
tight_limit = 512 # Very small → cuts most columns
result = _apply(S_true, [FaultType.CONTEXT_OVERFLOW], noise, dupe_ids,
config_context_limit=tight_limit)
cutoff = max(1, int(N_CHUNKS * tight_limit / 16384))
if cutoff < N_CHUNKS:
assert result[:, cutoff:].sum() == 0.0, (
"Columns beyond context cutoff should be zeroed"
)
def test_no_reranking_adds_noise(S_true, noise, dupe_ids):
"""NO_RERANKING fault adds noise when reranking is off."""
result_off = _apply(S_true, [FaultType.NO_RERANKING], noise, dupe_ids,
config_use_reranking=False)
result_on = _apply(S_true, [FaultType.NO_RERANKING], noise, dupe_ids,
config_use_reranking=True)
# When reranking is on, NO_RERANKING fault is suppressed → output is closer to S_true
diff_off = float(np.abs(result_off - S_true).mean())
diff_on = float(np.abs(result_on - S_true).mean())
assert diff_off > diff_on, (
"NO_RERANKING should only add noise when reranking is disabled"
)
def test_chunk_too_small_noise_reduced_by_overlap(S_true, noise, dupe_ids):
"""Higher chunk_overlap reduces CHUNK_TOO_SMALL noise sigma."""
low_overlap = _apply(S_true, [FaultType.CHUNK_TOO_SMALL], noise, dupe_ids,
config_chunk_size=128, config_chunk_overlap=0)
high_overlap = _apply(S_true, [FaultType.CHUNK_TOO_SMALL], noise, dupe_ids,
config_chunk_size=128, config_chunk_overlap=450)
diff_low = float(np.abs(low_overlap - S_true).mean())
diff_high = float(np.abs(high_overlap - S_true).mean())
assert diff_low > diff_high, (
"Higher overlap should reduce CHUNK_TOO_SMALL noise impact"
)
|