rag_debug_env / server /fault_math.py
vankap-grover's picture
Upload folder using huggingface_hub
ac224ce verified
"""
server/fault_math.py
--------------------
Pure fault-injection math: apply_faults(S_true, ...) -> S_faulted.
Extracted from RAGDebugEnvironment._recompute_S_faulted so that both
the server environment and corpora/stages/verify.py can use identical
transformations without instantiating the full environment class.
All transformations are deterministic given the same noise/dupe_ids arrays.
Noise is pre-generated by the caller (at episode reset, or fresh per-test in
verify.py) and scaled here.
"""
from __future__ import annotations
from typing import Dict, Optional, Set, Tuple
import numpy as np
from scipy.ndimage import uniform_filter1d
from models import FaultType
def apply_faults(
S: np.ndarray,
fault_types: Set[FaultType],
config_chunk_size: int,
config_context_limit: int,
config_use_reranking: bool,
noise: Dict[FaultType, np.ndarray],
dupe_ids: np.ndarray,
rewrite_boosts: Optional[np.ndarray] = None,
config_chunk_overlap: int = 0,
) -> np.ndarray:
"""
Apply fault math transformations to a similarity matrix.
Parameters
----------
S : np.ndarray, shape (n_queries, n_chunks), float32
The S_true matrix for the active embedding model. Copied internally.
fault_types : set of FaultType
Which faults to apply. WRONG_EMBEDDING_MODEL is not handled here —
it is implicit in which S matrix the caller passes in.
config_chunk_size : int
Current PipelineConfig.chunk_size. Modulates CHUNK_TOO_LARGE and
CHUNK_TOO_SMALL severity.
config_context_limit : int
Current PipelineConfig.context_window_limit. Modulates CONTEXT_OVERFLOW.
config_use_reranking : bool
Current PipelineConfig.use_reranking. Modulates DUPLICATE_FLOODING,
suppresses NO_RERANKING when True, and partially restores score signal
under TOP_K_TOO_SMALL (cross-encoder reranking recovers rank order).
noise : dict mapping FaultType -> np.ndarray (n_queries, n_chunks), float32
Pre-generated unit-normal arrays. Caller generates these once per
episode (for determinism) or fresh per test (for verify.py).
Required keys: CHUNK_TOO_SMALL, THRESHOLD_TOO_LOW, NO_RERANKING.
dupe_ids : np.ndarray of int
Column indices of chunks designated as "duplicates" for this episode.
rewrite_boosts : np.ndarray (n_queries, n_chunks) float32, optional
Persistent boost overlay from REWRITE_QUERY actions. Pass None or
zeros if not applicable.
config_chunk_overlap : int
Current PipelineConfig.chunk_overlap. Higher overlap stabilises
boundary embeddings, reducing score noise under CHUNK_TOO_SMALL.
Returns
-------
np.ndarray, same shape as S, dtype float32, values clipped to [0, 1].
"""
S = S.copy().astype(np.float32)
S_clean = S.copy() # preserve pre-fault scores for cross-encoder reranking blend
n_q, n_c = S.shape
# --- CHUNK_TOO_LARGE ---
# Box filter along chunk axis smears scores across neighboring chunks.
# filter_size scales with chunk_size: larger chunks -> more smearing.
if FaultType.CHUNK_TOO_LARGE in fault_types:
filter_size = max(1, round(4 * config_chunk_size / 512))
if filter_size > 1:
S = uniform_filter1d(S, size=filter_size, axis=1, mode="nearest").astype(np.float32)
# --- CHUNK_TOO_SMALL ---
# Adds Gaussian noise: smaller chunks -> noisier embeddings.
# Higher overlap stabilises content at chunk boundaries, partially
# mitigating the noise (overlap up to 500 can halve the sigma).
if FaultType.CHUNK_TOO_SMALL in fault_types:
overlap_reduction = min(0.5, config_chunk_overlap / 1000.0)
sigma = 0.15 * min(1.0, 512.0 / max(config_chunk_size, 64)) * (1.0 - overlap_reduction)
S = S + sigma * noise[FaultType.CHUNK_TOO_SMALL]
# --- THRESHOLD_TOO_LOW ---
# Adds Gaussian noise: irrelevant chunks score high by chance, cluttering retrieval.
if FaultType.THRESHOLD_TOO_LOW in fault_types:
S = S + 0.10 * noise[FaultType.THRESHOLD_TOO_LOW]
# --- THRESHOLD_TOO_HIGH ---
# Deflates all scores: relevant chunks fall below any reasonable threshold.
if FaultType.THRESHOLD_TOO_HIGH in fault_types:
S = S * 0.55
# --- TOP_K_TOO_SMALL ---
# Compresses score range toward 0.5: rankings become unreliable, tanking
# precision when top_k is large relative to the number of relevant chunks.
# Enabling reranking (cross-encoder) partially restores score signal by
# re-scoring the top_k candidates with a stronger model, so compression
# is less severe when use_reranking=True.
if FaultType.TOP_K_TOO_SMALL in fault_types:
compress = 0.24 if not config_use_reranking else 0.65
S = 0.5 + (S - 0.5) * compress
# --- DUPLICATE_FLOODING ---
# Boosts a random ~14% of chunks. Reranking sharply cuts the boost.
if FaultType.DUPLICATE_FLOODING in fault_types:
boost = 0.08 if config_use_reranking else 0.20
S[:, dupe_ids] = np.minimum(S[:, dupe_ids] + boost, 1.0)
# --- CONTEXT_OVERFLOW ---
# Zeros out chunks beyond the context window cutoff.
if FaultType.CONTEXT_OVERFLOW in fault_types:
cutoff = max(1, int(n_c * config_context_limit / 16384))
if cutoff < n_c:
S[:, cutoff:] = 0.0
# --- NO_RERANKING ---
# Adds mild Gaussian noise. Skipped entirely when reranking is enabled.
if FaultType.NO_RERANKING in fault_types and not config_use_reranking:
S = S + 0.10 * noise[FaultType.NO_RERANKING]
# --- CROSS-ENCODER RERANKING ---
# A cross-encoder re-scores candidates using a stronger model that sees
# the full query-document pair, partially recovering the true relevance
# signal corrupted by faults. We simulate this by blending faulted
# scores back toward the original (pre-fault) scores.
# This is non-monotonic for noise-based faults (undoes random
# perturbations) and restores score spread for compression faults
# (enabling effective threshold-based filtering).
if config_use_reranking:
rerank_alpha = 0.35
S = (1.0 - rerank_alpha) * S + rerank_alpha * S_clean
# Clip to valid range and apply persistent query-rewrite boosts
S = np.clip(S, 0.0, 1.0)
if rewrite_boosts is not None:
S = np.clip(S + rewrite_boosts, 0.0, 1.0)
return S
def make_noise(rng: np.random.Generator, shape: Tuple[int, int]) -> Dict[FaultType, np.ndarray]:
"""
Generate the standard noise dict expected by apply_faults.
Parameters
----------
rng : np.random.Generator
shape : (n_queries, n_chunks)
Returns
-------
dict mapping the three noise-dependent FaultTypes to unit-normal arrays.
"""
return {
FaultType.CHUNK_TOO_SMALL: rng.standard_normal(shape).astype(np.float32),
FaultType.THRESHOLD_TOO_LOW: rng.standard_normal(shape).astype(np.float32),
FaultType.NO_RERANKING: rng.standard_normal(shape).astype(np.float32),
}