bragee's picture
Upload model checkpoints and code
b464490 verified
"""Simplified GeoPoison-RAG attack simulation.
Realistic threat model (matching GeoPoison-RAG Phase 1):
- Attacker has shadow queries approximating target query distribution.
- Attacker has access to document embeddings.
- Attacker builds bipartite query-document graph using COSINE SIMILARITY
(their model of how retrieval works).
- Attacker computes Fiedler vector and places adversarial doc at the
spectral-optimal position in document space.
Defense argument:
- Baseline (cosine sim): attacker's model is correct → high ASR.
- Multi-manifold (R(q,d)): attacker's model is wrong because R ≠ cosine → lower ASR.
"""
import numpy as np
import torch
from scipy.sparse.linalg import eigsh
from sklearn.metrics.pairwise import cosine_similarity
from multi_manifold_retrieval.evaluation.spectral_analysis import compute_document_laplacian
def select_domain_documents(
passages: list[str],
keywords: list[str],
max_docs: int = 200,
) -> tuple[list[int], list[str]]:
"""Select documents belonging to a target domain by keyword matching."""
indices = []
texts = []
for i, text in enumerate(passages):
text_lower = text.lower()
if any(kw in text_lower for kw in keywords):
indices.append(i)
texts.append(text)
if len(indices) >= max_docs:
break
return indices, texts
def build_bipartite_fiedler_placement(
query_embs: np.ndarray,
doc_embs: np.ndarray,
t_nn: int = 20,
) -> tuple[np.ndarray, dict]:
"""GeoPoison-RAG Phase 1: bipartite spectral placement (cosine-based).
The attacker:
1. Builds bipartite query-document graph using cosine similarity.
2. Computes Fiedler vector of the normalized Laplacian.
3. Extracts document component of Fiedler vector.
4. Places adversarial doc at Fiedler-weighted centroid of documents.
The placement is in DOCUMENT SPACE — the attacker optimizes where to
place a document, guided by the query-document spectral structure.
But the attacker assumes retrieval = cosine similarity.
"""
nq = query_embs.shape[0]
nd = doc_embs.shape[0]
# Cosine similarity between queries and documents (attacker's model)
S = cosine_similarity(query_embs, doc_embs) # (nq, nd)
# Sparsify: keep top-t per query
t = min(t_nn, nd - 1)
S_sparse = np.zeros_like(S)
for i in range(nq):
top_idx = np.argpartition(S[i], -t)[-t:]
S_sparse[i, top_idx] = S[i, top_idx]
# Build bipartite adjacency: A = [[0, S], [S^T, 0]]
n = nq + nd
A = np.zeros((n, n))
A[:nq, nq:] = S_sparse
A[nq:, :nq] = S_sparse.T
# Normalized Laplacian: L = I - D^{-1/2} A D^{-1/2}
degrees = A.sum(axis=1)
degrees[degrees == 0] = 1.0
D_inv_sqrt = np.diag(1.0 / np.sqrt(degrees))
L = np.eye(n) - D_inv_sqrt @ A @ D_inv_sqrt
# Fiedler vector (2nd smallest eigenvector)
k = min(3, n - 1)
eigenvalues, eigenvectors = eigsh(L, k=k, which="SM")
sorted_idx = np.argsort(eigenvalues)
fiedler_vec = eigenvectors[:, sorted_idx[1]]
fiedler_val = eigenvalues[sorted_idx[1]]
# Extract document component and use as weights
doc_component = fiedler_vec[nq:]
weights = np.abs(doc_component)
weights = weights / (weights.sum() + 1e-12)
# Fiedler-weighted centroid of documents
adv_embedding = (weights[:, None] * doc_embs).sum(axis=0)
# L2-normalize
norm = np.linalg.norm(adv_embedding)
if norm > 0:
adv_embedding = adv_embedding / norm
info = {
"method": "bipartite_fiedler",
"fiedler_eigenvalue": float(fiedler_val),
"weight_entropy": float(-np.sum(weights * np.log(weights + 1e-12))),
"max_weight": float(weights.max()),
"adv_mean_cos_to_queries": float(
cosine_similarity(adv_embedding.reshape(1, -1), query_embs).mean()
),
"adv_mean_cos_to_docs": float(
cosine_similarity(adv_embedding.reshape(1, -1), doc_embs).mean()
),
}
return adv_embedding, info
def compute_doconly_fiedler_placement(doc_embs: np.ndarray) -> tuple[np.ndarray, dict]:
"""Document-only Fiedler placement (no query access).
Weaker attacker that only has document embeddings.
Uses document-space Laplacian L_D directly.
"""
n = doc_embs.shape[0]
if n < 3:
centroid = doc_embs.mean(axis=0)
return centroid / np.linalg.norm(centroid), {"method": "centroid_fallback"}
L_D, _ = compute_document_laplacian(doc_embs)
k = min(3, n - 1)
eigenvalues, eigenvectors = eigsh(L_D, k=k, which="SM")
sorted_idx = np.argsort(eigenvalues)
fiedler_vec = eigenvectors[:, sorted_idx[1]]
fiedler_val = eigenvalues[sorted_idx[1]]
weights = np.abs(fiedler_vec)
weights = weights / (weights.sum() + 1e-12)
adv_embedding = (weights[:, None] * doc_embs).sum(axis=0)
norm = np.linalg.norm(adv_embedding)
if norm > 0:
adv_embedding = adv_embedding / norm
return adv_embedding, {
"method": "doconly_fiedler",
"fiedler_eigenvalue": float(fiedler_val),
}
def compute_asr_threshold(
query_embeddings: torch.Tensor,
corpus_embeddings: torch.Tensor,
adv_embedding: torch.Tensor,
operator,
top_k: int = 10,
device: str = "cpu",
batch_size: int = 50,
) -> tuple[float, dict]:
"""Compute ASR@k using per-query threshold (oracle-style).
For each query, the k-th highest corpus score is the threshold.
Attack succeeds if the adversarial doc's score >= threshold.
Mirrors gp_rag/plan_single.py oracle check.
"""
num_queries = query_embeddings.shape[0]
corpus_emb = corpus_embeddings.to(device)
adv_emb = adv_embedding.to(device)
operator.eval()
successes = 0
margins = []
with torch.no_grad():
for start in range(0, num_queries, batch_size):
end = min(start + batch_size, num_queries)
q_batch = query_embeddings[start:end].to(device)
bs = q_batch.shape[0]
# Score adversarial document
adv_expanded = adv_emb.unsqueeze(0).expand(bs, -1)
adv_scores = operator(q_batch, adv_expanded)
# Score corpus documents
corpus_scores = operator.compute_pairwise(q_batch, corpus_emb)
# k-th highest corpus score = threshold
topk_vals, _ = torch.topk(corpus_scores, top_k, dim=1)
thresholds = topk_vals[:, -1]
for j in range(bs):
margin = float(adv_scores[j].item() - thresholds[j].item())
margins.append(margin)
if adv_scores[j] >= thresholds[j]:
successes += 1
asr = successes / num_queries
margins_arr = np.array(margins)
info = {
"mean_margin": float(margins_arr.mean()),
"median_margin": float(np.median(margins_arr)),
"p25_margin": float(np.percentile(margins_arr, 25)),
"fraction_positive_margin": float((margins_arr >= 0).mean()),
}
return asr, info
def run_attack_simulation(
encoder,
operator,
baseline_operator,
passages: list[str],
passage_embeddings_torch: torch.Tensor,
target_query_texts: list[str],
medical_keywords: list[str],
top_k: int = 10,
max_domain_docs: int = 200,
device: str = "cpu",
) -> dict:
"""Run GeoPoison-RAG attack simulation.
Tests two attacker models:
1. Bipartite Fiedler (realistic): attacker has shadow queries + docs,
builds cosine-based bipartite graph, optimizes in document space.
2. Doc-only Fiedler (weaker): attacker has only document embeddings.
Both assume cosine similarity governs retrieval. The defense breaks
this assumption via the cross-manifold operator R.
"""
print("\n=== Attack Simulation ===", flush=True)
# Step 1: Select target domain documents
domain_indices, domain_texts = select_domain_documents(
passages, medical_keywords, max_domain_docs
)
print(f"Selected {len(domain_indices)} domain documents.", flush=True)
if len(domain_indices) < 5:
print("Warning: Too few domain documents found.")
return {"error": "insufficient domain documents"}
domain_embs_np = passage_embeddings_torch[domain_indices].cpu().numpy()
domain_corpus = passage_embeddings_torch[domain_indices]
# Step 2: Encode target queries (attacker's shadow queries)
print(f"Encoding {len(target_query_texts)} target queries...", flush=True)
query_embeddings = encoder.encode_queries(target_query_texts, show_progress=False)
q_np = query_embeddings.cpu().numpy()
# Step 3a: Bipartite Fiedler placement (realistic attacker)
print("\nComputing bipartite Fiedler placement (attacker has shadow queries)...", flush=True)
adv_bipartite_np, bp_info = build_bipartite_fiedler_placement(
q_np, domain_embs_np, t_nn=min(20, len(domain_indices) - 1)
)
adv_bipartite = torch.tensor(adv_bipartite_np, dtype=torch.float32)
print(f" Fiedler eigenvalue: {bp_info['fiedler_eigenvalue']:.6f}", flush=True)
print(f" Adv mean cos to queries: {bp_info['adv_mean_cos_to_queries']:.4f}", flush=True)
print(f" Adv mean cos to docs: {bp_info['adv_mean_cos_to_docs']:.4f}", flush=True)
# Step 3b: Doc-only Fiedler placement (weaker attacker)
print("\nComputing doc-only Fiedler placement (no query access)...", flush=True)
adv_doconly_np, do_info = compute_doconly_fiedler_placement(domain_embs_np)
adv_doconly = torch.tensor(adv_doconly_np, dtype=torch.float32)
# Step 4: Measure ASR for bipartite attack
print(f"\n--- Bipartite Fiedler Attack (realistic GeoPoison-RAG) ---", flush=True)
asr_bp_base, bp_base_info = compute_asr_threshold(
query_embeddings, domain_corpus, adv_bipartite,
baseline_operator, top_k, device
)
print(f" Baseline ASR@{top_k}: {asr_bp_base:.4f} (mean margin: {bp_base_info['mean_margin']:.4f})", flush=True)
asr_bp_mm, bp_mm_info = compute_asr_threshold(
query_embeddings, domain_corpus, adv_bipartite,
operator, top_k, device
)
print(f" Multi-manifold ASR@{top_k}: {asr_bp_mm:.4f} (mean margin: {bp_mm_info['mean_margin']:.4f})", flush=True)
# Step 5: Measure ASR for doc-only attack
print(f"\n--- Doc-only Fiedler Attack (weaker attacker) ---", flush=True)
asr_do_base, do_base_info = compute_asr_threshold(
query_embeddings, domain_corpus, adv_doconly,
baseline_operator, top_k, device
)
print(f" Baseline ASR@{top_k}: {asr_do_base:.4f} (mean margin: {do_base_info['mean_margin']:.4f})", flush=True)
asr_do_mm, do_mm_info = compute_asr_threshold(
query_embeddings, domain_corpus, adv_doconly,
operator, top_k, device
)
print(f" Multi-manifold ASR@{top_k}: {asr_do_mm:.4f} (mean margin: {do_mm_info['mean_margin']:.4f})", flush=True)
# Summary
results = {
"bipartite_attack": {
"baseline_asr": asr_bp_base,
"multi_manifold_asr": asr_bp_mm,
"baseline_margins": bp_base_info,
"multi_manifold_margins": bp_mm_info,
"placement_info": bp_info,
},
"doconly_attack": {
"baseline_asr": asr_do_base,
"multi_manifold_asr": asr_do_mm,
"baseline_margins": do_base_info,
"multi_manifold_margins": do_mm_info,
"placement_info": do_info,
},
"num_domain_docs": len(domain_indices),
"num_target_queries": len(target_query_texts),
"top_k": top_k,
# For backward compat with summary printing
"baseline_asr": asr_bp_base,
"multi_manifold_asr": asr_bp_mm,
}
def _reduction(base, mm):
return (1 - mm / max(base, 1e-9)) * 100
print(f"\n=== Attack Results Summary ===", flush=True)
print(f" Baseline Multi-Manifold Reduction", flush=True)
print(f" Bipartite (realistic): {asr_bp_base:.4f} {asr_bp_mm:.4f}"
f" {_reduction(asr_bp_base, asr_bp_mm):.1f}%", flush=True)
print(f" Doc-only (weaker): {asr_do_base:.4f} {asr_do_mm:.4f}"
f" {_reduction(asr_do_base, asr_do_mm):.1f}%", flush=True)
return results