Spaces:
Sleeping
Sleeping
| """ | |
| tests/test_fault_math.py | |
| ------------------------ | |
| Tests for server/fault_math.py. | |
| Verifies that: | |
| - Each fault type degrades retrieval relative to a no-fault baseline. | |
| - apply_faults() with an empty fault set returns the original matrix (up to float precision). | |
| - make_noise() returns the expected keys. | |
| - All output values stay in [0, 1] after clipping. | |
| """ | |
| import numpy as np | |
| import pytest | |
| from server.fault_math import apply_faults, make_noise | |
| from models import FaultType | |
| # --------------------------------------------------------------------------- | |
| # Fixtures | |
| # --------------------------------------------------------------------------- | |
| N_QUERIES = 8 | |
| N_CHUNKS = 40 | |
| SEED = 42 | |
| def rng(): | |
| return np.random.default_rng(SEED) | |
| def S_true(rng): | |
| """A realistic S_true matrix: relevant chunks score ~0.7-0.9, others ~0.2-0.5.""" | |
| S = rng.uniform(0.2, 0.5, (N_QUERIES, N_CHUNKS)).astype(np.float32) | |
| # Spike the first 2 chunks of each query as "relevant" with high scores. | |
| for i in range(N_QUERIES): | |
| S[i, i % N_CHUNKS] = rng.uniform(0.75, 0.92) | |
| S[i, (i + 1) % N_CHUNKS] = rng.uniform(0.70, 0.88) | |
| return S | |
| def noise(rng): | |
| return make_noise(rng, (N_QUERIES, N_CHUNKS)) | |
| def dupe_ids(rng): | |
| return rng.choice(N_CHUNKS, size=max(1, N_CHUNKS // 7), replace=False) | |
| def _apply(S, fault_types, noise, dupe_ids, **kwargs): | |
| defaults = dict( | |
| config_chunk_size=512, | |
| config_context_limit=4096, | |
| config_use_reranking=False, | |
| config_chunk_overlap=50, | |
| ) | |
| defaults.update(kwargs) | |
| return apply_faults( | |
| S=S, | |
| fault_types=set(fault_types), | |
| noise=noise, | |
| dupe_ids=dupe_ids, | |
| **defaults, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Basic contracts | |
| # --------------------------------------------------------------------------- | |
| def test_no_faults_returns_original(S_true, noise, dupe_ids): | |
| """With no faults, the matrix should be unchanged (modulo float32 copy).""" | |
| result = _apply(S_true, [], noise, dupe_ids) | |
| np.testing.assert_array_almost_equal(result, S_true, decimal=5) | |
| def test_output_clipped_to_unit_interval(S_true, noise, dupe_ids): | |
| """All faults combined must still produce values in [0, 1].""" | |
| all_faults = [ | |
| FaultType.CHUNK_TOO_LARGE, | |
| FaultType.CHUNK_TOO_SMALL, | |
| FaultType.THRESHOLD_TOO_LOW, | |
| FaultType.THRESHOLD_TOO_HIGH, | |
| FaultType.TOP_K_TOO_SMALL, | |
| FaultType.DUPLICATE_FLOODING, | |
| FaultType.CONTEXT_OVERFLOW, | |
| FaultType.NO_RERANKING, | |
| ] | |
| result = _apply(S_true, all_faults, noise, dupe_ids) | |
| assert result.min() >= 0.0, f"Minimum value {result.min()} < 0" | |
| assert result.max() <= 1.0, f"Maximum value {result.max()} > 1" | |
| def test_make_noise_returns_expected_keys(rng): | |
| shape = (5, 20) | |
| n = make_noise(rng, shape) | |
| assert FaultType.CHUNK_TOO_SMALL in n | |
| assert FaultType.THRESHOLD_TOO_LOW in n | |
| assert FaultType.NO_RERANKING in n | |
| for v in n.values(): | |
| assert v.shape == shape | |
| assert v.dtype == np.float32 | |
| def test_apply_faults_does_not_mutate_input(S_true, noise, dupe_ids): | |
| original = S_true.copy() | |
| _apply(S_true, [FaultType.CHUNK_TOO_LARGE, FaultType.THRESHOLD_TOO_HIGH], noise, dupe_ids) | |
| np.testing.assert_array_equal(S_true, original) | |
| # --------------------------------------------------------------------------- | |
| # Each fault degrades in the expected direction | |
| # --------------------------------------------------------------------------- | |
| def _mean_top1_score(S: np.ndarray) -> float: | |
| """Mean of the max score per query — proxy for how retrieval-friendly the matrix is.""" | |
| return float(S.max(axis=1).mean()) | |
| def _score_std(S: np.ndarray) -> float: | |
| return float(S.std()) | |
| def test_chunk_too_large_smears_scores(S_true, noise, dupe_ids): | |
| """CHUNK_TOO_LARGE applies a box filter that blurs score peaks downward.""" | |
| result = _apply(S_true, [FaultType.CHUNK_TOO_LARGE], noise, dupe_ids, config_chunk_size=2048) | |
| # Box filter reduces the peak scores | |
| assert _mean_top1_score(result) < _mean_top1_score(S_true), ( | |
| "CHUNK_TOO_LARGE should smear peak scores downward" | |
| ) | |
| def test_threshold_too_high_deflates_scores(S_true, noise, dupe_ids): | |
| """THRESHOLD_TOO_HIGH multiplies all scores by 0.55, reducing absolute values.""" | |
| result = _apply(S_true, [FaultType.THRESHOLD_TOO_HIGH], noise, dupe_ids) | |
| assert result.mean() < S_true.mean() * 0.65, ( | |
| "THRESHOLD_TOO_HIGH should significantly deflate scores" | |
| ) | |
| def test_top_k_too_small_compresses_score_range(S_true, noise, dupe_ids): | |
| """TOP_K_TOO_SMALL compresses scores toward 0.5, reducing std.""" | |
| result = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids) | |
| assert _score_std(result) < _score_std(S_true), ( | |
| "TOP_K_TOO_SMALL should compress score variance" | |
| ) | |
| def test_top_k_too_small_less_severe_with_reranking(S_true, noise, dupe_ids): | |
| """Enabling reranking should reduce TOP_K_TOO_SMALL severity.""" | |
| without_rerank = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids, | |
| config_use_reranking=False) | |
| with_rerank = _apply(S_true, [FaultType.TOP_K_TOO_SMALL], noise, dupe_ids, | |
| config_use_reranking=True) | |
| # With reranking, score variance should be closer to original (less compressed) | |
| std_without = _score_std(without_rerank) | |
| std_with = _score_std(with_rerank) | |
| assert std_with > std_without, ( | |
| "Reranking should partially restore score spread under TOP_K_TOO_SMALL" | |
| ) | |
| def test_duplicate_flooding_boosts_dupe_columns(S_true, noise, dupe_ids): | |
| """DUPLICATE_FLOODING boosts duplicate chunk columns.""" | |
| result = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids) | |
| # Mean score of duplicate columns should be higher after flooding | |
| assert result[:, dupe_ids].mean() > S_true[:, dupe_ids].mean(), ( | |
| "DUPLICATE_FLOODING should boost duplicate chunk scores" | |
| ) | |
| def test_duplicate_flooding_reduced_with_reranking(S_true, noise, dupe_ids): | |
| """Reranking should reduce the duplicate flooding boost.""" | |
| without = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids, | |
| config_use_reranking=False) | |
| with_rr = _apply(S_true, [FaultType.DUPLICATE_FLOODING], noise, dupe_ids, | |
| config_use_reranking=True) | |
| assert with_rr[:, dupe_ids].mean() < without[:, dupe_ids].mean(), ( | |
| "Reranking should reduce duplicate flooding boost" | |
| ) | |
| def test_context_overflow_zeroes_tail_columns(S_true, noise, dupe_ids): | |
| """CONTEXT_OVERFLOW zeroes chunks beyond the context window cutoff.""" | |
| tight_limit = 512 # Very small → cuts most columns | |
| result = _apply(S_true, [FaultType.CONTEXT_OVERFLOW], noise, dupe_ids, | |
| config_context_limit=tight_limit) | |
| cutoff = max(1, int(N_CHUNKS * tight_limit / 16384)) | |
| if cutoff < N_CHUNKS: | |
| assert result[:, cutoff:].sum() == 0.0, ( | |
| "Columns beyond context cutoff should be zeroed" | |
| ) | |
| def test_no_reranking_adds_noise(S_true, noise, dupe_ids): | |
| """NO_RERANKING fault adds noise when reranking is off.""" | |
| result_off = _apply(S_true, [FaultType.NO_RERANKING], noise, dupe_ids, | |
| config_use_reranking=False) | |
| result_on = _apply(S_true, [FaultType.NO_RERANKING], noise, dupe_ids, | |
| config_use_reranking=True) | |
| # When reranking is on, NO_RERANKING fault is suppressed → output is closer to S_true | |
| diff_off = float(np.abs(result_off - S_true).mean()) | |
| diff_on = float(np.abs(result_on - S_true).mean()) | |
| assert diff_off > diff_on, ( | |
| "NO_RERANKING should only add noise when reranking is disabled" | |
| ) | |
| def test_chunk_too_small_noise_reduced_by_overlap(S_true, noise, dupe_ids): | |
| """Higher chunk_overlap reduces CHUNK_TOO_SMALL noise sigma.""" | |
| low_overlap = _apply(S_true, [FaultType.CHUNK_TOO_SMALL], noise, dupe_ids, | |
| config_chunk_size=128, config_chunk_overlap=0) | |
| high_overlap = _apply(S_true, [FaultType.CHUNK_TOO_SMALL], noise, dupe_ids, | |
| config_chunk_size=128, config_chunk_overlap=450) | |
| diff_low = float(np.abs(low_overlap - S_true).mean()) | |
| diff_high = float(np.abs(high_overlap - S_true).mean()) | |
| assert diff_low > diff_high, ( | |
| "Higher overlap should reduce CHUNK_TOO_SMALL noise impact" | |
| ) | |