rag_debug_env / tests /test_fault_math.py
vankap-grover's picture
Upload folder using huggingface_hub
ac224ce verified
"""
tests/test_fault_math.py
------------------------
Tests for server/fault_math.py.
Verifies that:
- Each fault type degrades retrieval relative to a no-fault baseline.
- apply_faults() with an empty fault set returns the original matrix (up to float precision).
- make_noise() returns the expected keys.
- All output values stay in [0, 1] after clipping.
"""
import numpy as np
import pytest
from server.fault_math import apply_faults, make_noise
from models import FaultType
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
N_QUERIES = 8
N_CHUNKS = 40
SEED = 42
@pytest.fixture
def rng():
return np.random.default_rng(SEED)
@pytest.fixture
def S_true(rng):
"""A realistic S_true matrix: relevant chunks score ~0.7-0.9, others ~0.2-0.5."""
S = rng.uniform(0.2, 0.5, (N_QUERIES, N_CHUNKS)).astype(np.float32)
# Spike the first 2 chunks of each query as "relevant" with high scores.
for i in range(N_QUERIES):
S[i, i % N_CHUNKS] = rng.uniform(0.75, 0.92)
S[i, (i + 1) % N_CHUNKS] = rng.uniform(0.70, 0.88)
return S
@pytest.fixture
def noise(rng):
return make_noise(rng, (N_QUERIES, N_CHUNKS))
@pytest.fixture
def dupe_ids(rng):
return rng.choice(N_CHUNKS, size=max(1, N_CHUNKS // 7), replace=False)
def _apply(S, fault_types, noise, dupe_ids, **kwargs):
defaults = dict(
config_chunk_size=512,
config_context_limit=4096,
config_use_reranking=False,
config_chunk_overlap=50,
)
defaults.update(kwargs)
return apply_faults(
S=S,
fault_types=set(fault_types),
noise=noise,
dupe_ids=dupe_ids,
**defaults,
)
# ---------------------------------------------------------------------------
# Basic contracts
# ---------------------------------------------------------------------------
def test_no_faults_returns_original(S_true, noise, dupe_ids):
"""With no faults, the matrix should be unchanged (modulo float32 copy)."""
result = _apply(S_true, [], noise, dupe_ids)
np.testing.assert_array_almost_equal(result, S_true, decimal=5)
def test_output_clipped_to_unit_interval(S_true, noise, dupe_ids):
"""All faults combined must still produce values in [0, 1]."""
all_faults = [
FaultType.CHUNK_TOO_LARGE,
FaultType.CHUNK_TOO_SMALL,
FaultType.THRESHOLD_TOO_LOW,
FaultType.THRESHOLD_TOO_HIGH,
FaultType.TOP_K_TOO_SMALL,
FaultType.DUPLICATE_FLOODING,
FaultType.CONTEXT_OVERFLOW,
FaultType.NO_RERANKING,
]
result = _apply(S_true, all_faults, noise, dupe_ids)
assert result.min() >= 0.0, f"Minimum value {result.min()} < 0"
assert result.max() <= 1.0, f"Maximum value {result.max()} > 1"
def test_make_noise_returns_expected_keys(rng):
shape = (5, 20)
n = make_noise(rng, shape)
assert FaultType.CHUNK_TOO_SMALL in n
assert FaultType.THRESHOLD_TOO_LOW in n
assert FaultType.NO_RERANKING in n
for v in n.values():
assert v.shape == shape
assert v.dtype == np.float32
def test_apply_faults_does_not_mutate_input(S_true, noise, dupe_ids):
original = S_true.copy()
_apply(S_true, [FaultType.CHUNK_TOO_LARGE, FaultType.THRESHOLD_TOO_HIGH], noise, dupe_ids)
np.testing.assert_array_equal(S_true, original)
# ---------------------------------------------------------------------------
# Each fault degrades in the expected direction
# ---------------------------------------------------------------------------
def _mean_top1_score(S: np.ndarray) -> float:
"""Mean of the max score per query — proxy for how retrieval-friendly the matrix is."""
return float(S.max(axis=1).mean())
def _score_std(S: np.ndarray) -> float:
return float(S.std())
def test_chunk_too_large_smears_scores(S_true, noise, dupe_ids):
"""CHUNK_TOO_LARGE applies a box filter that blurs score peaks downward."""
result = _apply(S_true, [FaultType.CHUNK_TOO_LARGE], noise, dupe_ids, config_chunk_size=2048)
# Box filter reduces the peak scores
assert _mean_top1_score(result) < _mean_top1_score(S_true), (
"CHUNK_TOO_LARGE should smear peak scores downward"
)
def test_threshold_too_high_deflates_scores(S_true, noise, dupe_ids):
"""THRESHOLD_TOO_HIGH multiplies all scores by 0.55, reducing absolute values."""
result = _apply(S_true, [FaultType.THRESHOLD_TOO_HIGH], noise, dupe_ids)
assert result.mean() < S_true.mean() * 0.65, (
"THRESHOLD_TOO_HIGH should significantly deflate scores"
)
def test_top_k_too_small_compresses_score_range(S_true, noise, dupe_ids):
"""TOP_K_TOO_SMALL compresses scores toward 0.5, reducing std."""
result = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids)
assert _score_std(result) < _score_std(S_true), (
"TOP_K_TOO_SMALL should compress score variance"
)
def test_top_k_too_small_less_severe_with_reranking(S_true, noise, dupe_ids):
"""Enabling reranking should reduce TOP_K_TOO_SMALL severity."""
without_rerank = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids,
config_use_reranking=False)
with_rerank = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids,
config_use_reranking=True)
# With reranking, score variance should be closer to original (less compressed)
std_without = _score_std(without_rerank)
std_with = _score_std(with_rerank)
assert std_with > std_without, (
"Reranking should partially restore score spread under TOP_K_TOO_SMALL"
)
def test_duplicate_flooding_boosts_dupe_columns(S_true, noise, dupe_ids):
"""DUPLICATE_FLOODING boosts duplicate chunk columns."""
result = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids)
# Mean score of duplicate columns should be higher after flooding
assert result[:, dupe_ids].mean() > S_true[:, dupe_ids].mean(), (
"DUPLICATE_FLOODING should boost duplicate chunk scores"
)
def test_duplicate_flooding_reduced_with_reranking(S_true, noise, dupe_ids):
"""Reranking should reduce the duplicate flooding boost."""
without = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids,
config_use_reranking=False)
with_rr = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids,
config_use_reranking=True)
assert with_rr[:, dupe_ids].mean() < without[:, dupe_ids].mean(), (
"Reranking should reduce duplicate flooding boost"
)
def test_context_overflow_zeroes_tail_columns(S_true, noise, dupe_ids):
"""CONTEXT_OVERFLOW zeroes chunks beyond the context window cutoff."""
tight_limit = 512 # Very small → cuts most columns
result = _apply(S_true, [FaultType.CONTEXT_OVERFLOW], noise, dupe_ids,
config_context_limit=tight_limit)
cutoff = max(1, int(N_CHUNKS * tight_limit / 16384))
if cutoff < N_CHUNKS:
assert result[:, cutoff:].sum() == 0.0, (
"Columns beyond context cutoff should be zeroed"
)
def test_no_reranking_adds_noise(S_true, noise, dupe_ids):
"""NO_RERANKING fault adds noise when reranking is off."""
result_off = _apply(S_true, [FaultType.NO_RERANKING], noise, dupe_ids,
config_use_reranking=False)
result_on = _apply(S_true, [FaultType.NO_RERANKING], noise, dupe_ids,
config_use_reranking=True)
# When reranking is on, NO_RERANKING fault is suppressed → output is closer to S_true
diff_off = float(np.abs(result_off - S_true).mean())
diff_on = float(np.abs(result_on - S_true).mean())
assert diff_off > diff_on, (
"NO_RERANKING should only add noise when reranking is disabled"
)
def test_chunk_too_small_noise_reduced_by_overlap(S_true, noise, dupe_ids):
"""Higher chunk_overlap reduces CHUNK_TOO_SMALL noise sigma."""
low_overlap = _apply(S_true, [FaultType.CHUNK_TOO_SMALL], noise, dupe_ids,
config_chunk_size=128, config_chunk_overlap=0)
high_overlap = _apply(S_true, [FaultType.CHUNK_TOO_SMALL], noise, dupe_ids,
config_chunk_size=128, config_chunk_overlap=450)
diff_low = float(np.abs(low_overlap - S_true).mean())
diff_high = float(np.abs(high_overlap - S_true).mean())
assert diff_low > diff_high, (
"Higher overlap should reduce CHUNK_TOO_SMALL noise impact"
)