Spaces:
Sleeping
Sleeping
File size: 7,066 Bytes
f23deb1 ac224ce f23deb1 ac224ce f23deb1 ac224ce f23deb1 ac224ce f23deb1 ac224ce f23deb1 ac224ce f23deb1 ac224ce f23deb1 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 | """
server/fault_math.py
--------------------
Pure fault-injection math: apply_faults(S_true, ...) -> S_faulted.
Extracted from RAGDebugEnvironment._recompute_S_faulted so that both
the server environment and corpora/stages/verify.py can use identical
transformations without instantiating the full environment class.
All transformations are deterministic given the same noise/dupe_ids arrays.
Noise is pre-generated by the caller (at episode reset, or fresh per-test in
verify.py) and scaled here.
"""
from __future__ import annotations
from typing import Dict, Optional, Set, Tuple
import numpy as np
from scipy.ndimage import uniform_filter1d
from models import FaultType
def apply_faults(
S: np.ndarray,
fault_types: Set[FaultType],
config_chunk_size: int,
config_context_limit: int,
config_use_reranking: bool,
noise: Dict[FaultType, np.ndarray],
dupe_ids: np.ndarray,
rewrite_boosts: Optional[np.ndarray] = None,
config_chunk_overlap: int = 0,
) -> np.ndarray:
"""
Apply fault math transformations to a similarity matrix.
Parameters
----------
S : np.ndarray, shape (n_queries, n_chunks), float32
The S_true matrix for the active embedding model. Copied internally.
fault_types : set of FaultType
Which faults to apply. WRONG_EMBEDDING_MODEL is not handled here —
it is implicit in which S matrix the caller passes in.
config_chunk_size : int
Current PipelineConfig.chunk_size. Modulates CHUNK_TOO_LARGE and
CHUNK_TOO_SMALL severity.
config_context_limit : int
Current PipelineConfig.context_window_limit. Modulates CONTEXT_OVERFLOW.
config_use_reranking : bool
Current PipelineConfig.use_reranking. Modulates DUPLICATE_FLOODING,
suppresses NO_RERANKING when True, and partially restores score signal
under TOP_K_TOO_SMALL (cross-encoder reranking recovers rank order).
noise : dict mapping FaultType -> np.ndarray (n_queries, n_chunks), float32
Pre-generated unit-normal arrays. Caller generates these once per
episode (for determinism) or fresh per test (for verify.py).
Required keys: CHUNK_TOO_SMALL, THRESHOLD_TOO_LOW, NO_RERANKING.
dupe_ids : np.ndarray of int
Column indices of chunks designated as "duplicates" for this episode.
rewrite_boosts : np.ndarray (n_queries, n_chunks) float32, optional
Persistent boost overlay from REWRITE_QUERY actions. Pass None or
zeros if not applicable.
config_chunk_overlap : int
Current PipelineConfig.chunk_overlap. Higher overlap stabilises
boundary embeddings, reducing score noise under CHUNK_TOO_SMALL.
Returns
-------
np.ndarray, same shape as S, dtype float32, values clipped to [0, 1].
"""
S = S.copy().astype(np.float32)
S_clean = S.copy() # preserve pre-fault scores for cross-encoder reranking blend
n_q, n_c = S.shape
# --- CHUNK_TOO_LARGE ---
# Box filter along chunk axis smears scores across neighboring chunks.
# filter_size scales with chunk_size: larger chunks -> more smearing.
if FaultType.CHUNK_TOO_LARGE in fault_types:
filter_size = max(1, round(4 * config_chunk_size / 512))
if filter_size > 1:
S = uniform_filter1d(S, size=filter_size, axis=1, mode="nearest").astype(np.float32)
# --- CHUNK_TOO_SMALL ---
# Adds Gaussian noise: smaller chunks -> noisier embeddings.
# Higher overlap stabilises content at chunk boundaries, partially
# mitigating the noise (overlap up to 500 can halve the sigma).
if FaultType.CHUNK_TOO_SMALL in fault_types:
overlap_reduction = min(0.5, config_chunk_overlap / 1000.0)
sigma = 0.15 * min(1.0, 512.0 / max(config_chunk_size, 64)) * (1.0 - overlap_reduction)
S = S + sigma * noise[FaultType.CHUNK_TOO_SMALL]
# --- THRESHOLD_TOO_LOW ---
# Adds Gaussian noise: irrelevant chunks score high by chance, cluttering retrieval.
if FaultType.THRESHOLD_TOO_LOW in fault_types:
S = S + 0.10 * noise[FaultType.THRESHOLD_TOO_LOW]
# --- THRESHOLD_TOO_HIGH ---
# Deflates all scores: relevant chunks fall below any reasonable threshold.
if FaultType.THRESHOLD_TOO_HIGH in fault_types:
S = S * 0.55
# --- TOP_K_TOO_SMALL ---
# Compresses score range toward 0.5: rankings become unreliable, tanking
# precision when top_k is large relative to the number of relevant chunks.
# Enabling reranking (cross-encoder) partially restores score signal by
# re-scoring the top_k candidates with a stronger model, so compression
# is less severe when use_reranking=True.
if FaultType.TOP_K_TOO_SMALL in fault_types:
compress = 0.24 if not config_use_reranking else 0.65
S = 0.5 + (S - 0.5) * compress
# --- DUPLICATE_FLOODING ---
# Boosts a random ~14% of chunks. Reranking sharply cuts the boost.
if FaultType.DUPLICATE_FLOODING in fault_types:
boost = 0.08 if config_use_reranking else 0.20
S[:, dupe_ids] = np.minimum(S[:, dupe_ids] + boost, 1.0)
# --- CONTEXT_OVERFLOW ---
# Zeros out chunks beyond the context window cutoff.
if FaultType.CONTEXT_OVERFLOW in fault_types:
cutoff = max(1, int(n_c * config_context_limit / 16384))
if cutoff < n_c:
S[:, cutoff:] = 0.0
# --- NO_RERANKING ---
# Adds mild Gaussian noise. Skipped entirely when reranking is enabled.
if FaultType.NO_RERANKING in fault_types and not config_use_reranking:
S = S + 0.10 * noise[FaultType.NO_RERANKING]
# --- CROSS-ENCODER RERANKING ---
# A cross-encoder re-scores candidates using a stronger model that sees
# the full query-document pair, partially recovering the true relevance
# signal corrupted by faults. We simulate this by blending faulted
# scores back toward the original (pre-fault) scores.
# This is non-monotonic for noise-based faults (undoes random
# perturbations) and restores score spread for compression faults
# (enabling effective threshold-based filtering).
if config_use_reranking:
rerank_alpha = 0.35
S = (1.0 - rerank_alpha) * S + rerank_alpha * S_clean
# Clip to valid range and apply persistent query-rewrite boosts
S = np.clip(S, 0.0, 1.0)
if rewrite_boosts is not None:
S = np.clip(S + rewrite_boosts, 0.0, 1.0)
return S
def make_noise(rng: np.random.Generator, shape: Tuple[int, int]) -> Dict[FaultType, np.ndarray]:
"""
Generate the standard noise dict expected by apply_faults.
Parameters
----------
rng : np.random.Generator
shape : (n_queries, n_chunks)
Returns
-------
dict mapping the three noise-dependent FaultTypes to unit-normal arrays.
"""
return {
FaultType.CHUNK_TOO_SMALL: rng.standard_normal(shape).astype(np.float32),
FaultType.THRESHOLD_TOO_LOW: rng.standard_normal(shape).astype(np.float32),
FaultType.NO_RERANKING: rng.standard_normal(shape).astype(np.float32),
}
|