| """Simplified GeoPoison-RAG attack simulation. |
| |
| Realistic threat model (matching GeoPoison-RAG Phase 1): |
| - Attacker has shadow queries approximating target query distribution. |
| - Attacker has access to document embeddings. |
| - Attacker builds bipartite query-document graph using COSINE SIMILARITY |
| (their model of how retrieval works). |
| - Attacker computes Fiedler vector and places adversarial doc at the |
| spectral-optimal position in document space. |
| |
| Defense argument: |
| - Baseline (cosine sim): attacker's model is correct → high ASR. |
| - Multi-manifold (R(q,d)): attacker's model is wrong because R ≠ cosine → lower ASR. |
| """ |
|
|
| import numpy as np |
| import torch |
| from scipy.sparse.linalg import eigsh |
| from sklearn.metrics.pairwise import cosine_similarity |
|
|
| from multi_manifold_retrieval.evaluation.spectral_analysis import compute_document_laplacian |
|
|
|
|
| def select_domain_documents( |
| passages: list[str], |
| keywords: list[str], |
| max_docs: int = 200, |
| ) -> tuple[list[int], list[str]]: |
| """Select documents belonging to a target domain by keyword matching.""" |
| indices = [] |
| texts = [] |
| for i, text in enumerate(passages): |
| text_lower = text.lower() |
| if any(kw in text_lower for kw in keywords): |
| indices.append(i) |
| texts.append(text) |
| if len(indices) >= max_docs: |
| break |
| return indices, texts |
|
|
|
|
| def build_bipartite_fiedler_placement( |
| query_embs: np.ndarray, |
| doc_embs: np.ndarray, |
| t_nn: int = 20, |
| ) -> tuple[np.ndarray, dict]: |
| """GeoPoison-RAG Phase 1: bipartite spectral placement (cosine-based). |
| |
| The attacker: |
| 1. Builds bipartite query-document graph using cosine similarity. |
| 2. Computes Fiedler vector of the normalized Laplacian. |
| 3. Extracts document component of Fiedler vector. |
| 4. Places adversarial doc at Fiedler-weighted centroid of documents. |
| |
| The placement is in DOCUMENT SPACE — the attacker optimizes where to |
| place a document, guided by the query-document spectral structure. |
| But the attacker assumes retrieval = cosine similarity. |
| """ |
| nq = query_embs.shape[0] |
| nd = doc_embs.shape[0] |
|
|
| |
| S = cosine_similarity(query_embs, doc_embs) |
|
|
| |
| t = min(t_nn, nd - 1) |
| S_sparse = np.zeros_like(S) |
| for i in range(nq): |
| top_idx = np.argpartition(S[i], -t)[-t:] |
| S_sparse[i, top_idx] = S[i, top_idx] |
|
|
| |
| n = nq + nd |
| A = np.zeros((n, n)) |
| A[:nq, nq:] = S_sparse |
| A[nq:, :nq] = S_sparse.T |
|
|
| |
| degrees = A.sum(axis=1) |
| degrees[degrees == 0] = 1.0 |
| D_inv_sqrt = np.diag(1.0 / np.sqrt(degrees)) |
| L = np.eye(n) - D_inv_sqrt @ A @ D_inv_sqrt |
|
|
| |
| k = min(3, n - 1) |
| eigenvalues, eigenvectors = eigsh(L, k=k, which="SM") |
| sorted_idx = np.argsort(eigenvalues) |
| fiedler_vec = eigenvectors[:, sorted_idx[1]] |
| fiedler_val = eigenvalues[sorted_idx[1]] |
|
|
| |
| doc_component = fiedler_vec[nq:] |
| weights = np.abs(doc_component) |
| weights = weights / (weights.sum() + 1e-12) |
|
|
| |
| adv_embedding = (weights[:, None] * doc_embs).sum(axis=0) |
|
|
| |
| norm = np.linalg.norm(adv_embedding) |
| if norm > 0: |
| adv_embedding = adv_embedding / norm |
|
|
| info = { |
| "method": "bipartite_fiedler", |
| "fiedler_eigenvalue": float(fiedler_val), |
| "weight_entropy": float(-np.sum(weights * np.log(weights + 1e-12))), |
| "max_weight": float(weights.max()), |
| "adv_mean_cos_to_queries": float( |
| cosine_similarity(adv_embedding.reshape(1, -1), query_embs).mean() |
| ), |
| "adv_mean_cos_to_docs": float( |
| cosine_similarity(adv_embedding.reshape(1, -1), doc_embs).mean() |
| ), |
| } |
|
|
| return adv_embedding, info |
|
|
|
|
| def compute_doconly_fiedler_placement(doc_embs: np.ndarray) -> tuple[np.ndarray, dict]: |
| """Document-only Fiedler placement (no query access). |
| |
| Weaker attacker that only has document embeddings. |
| Uses document-space Laplacian L_D directly. |
| """ |
| n = doc_embs.shape[0] |
| if n < 3: |
| centroid = doc_embs.mean(axis=0) |
| return centroid / np.linalg.norm(centroid), {"method": "centroid_fallback"} |
|
|
| L_D, _ = compute_document_laplacian(doc_embs) |
|
|
| k = min(3, n - 1) |
| eigenvalues, eigenvectors = eigsh(L_D, k=k, which="SM") |
| sorted_idx = np.argsort(eigenvalues) |
| fiedler_vec = eigenvectors[:, sorted_idx[1]] |
| fiedler_val = eigenvalues[sorted_idx[1]] |
|
|
| weights = np.abs(fiedler_vec) |
| weights = weights / (weights.sum() + 1e-12) |
|
|
| adv_embedding = (weights[:, None] * doc_embs).sum(axis=0) |
| norm = np.linalg.norm(adv_embedding) |
| if norm > 0: |
| adv_embedding = adv_embedding / norm |
|
|
| return adv_embedding, { |
| "method": "doconly_fiedler", |
| "fiedler_eigenvalue": float(fiedler_val), |
| } |
|
|
|
|
| def compute_asr_threshold( |
| query_embeddings: torch.Tensor, |
| corpus_embeddings: torch.Tensor, |
| adv_embedding: torch.Tensor, |
| operator, |
| top_k: int = 10, |
| device: str = "cpu", |
| batch_size: int = 50, |
| ) -> tuple[float, dict]: |
| """Compute ASR@k using per-query threshold (oracle-style). |
| |
| For each query, the k-th highest corpus score is the threshold. |
| Attack succeeds if the adversarial doc's score >= threshold. |
| Mirrors gp_rag/plan_single.py oracle check. |
| """ |
| num_queries = query_embeddings.shape[0] |
| corpus_emb = corpus_embeddings.to(device) |
| adv_emb = adv_embedding.to(device) |
|
|
| operator.eval() |
| successes = 0 |
| margins = [] |
|
|
| with torch.no_grad(): |
| for start in range(0, num_queries, batch_size): |
| end = min(start + batch_size, num_queries) |
| q_batch = query_embeddings[start:end].to(device) |
| bs = q_batch.shape[0] |
|
|
| |
| adv_expanded = adv_emb.unsqueeze(0).expand(bs, -1) |
| adv_scores = operator(q_batch, adv_expanded) |
|
|
| |
| corpus_scores = operator.compute_pairwise(q_batch, corpus_emb) |
|
|
| |
| topk_vals, _ = torch.topk(corpus_scores, top_k, dim=1) |
| thresholds = topk_vals[:, -1] |
|
|
| for j in range(bs): |
| margin = float(adv_scores[j].item() - thresholds[j].item()) |
| margins.append(margin) |
| if adv_scores[j] >= thresholds[j]: |
| successes += 1 |
|
|
| asr = successes / num_queries |
| margins_arr = np.array(margins) |
| info = { |
| "mean_margin": float(margins_arr.mean()), |
| "median_margin": float(np.median(margins_arr)), |
| "p25_margin": float(np.percentile(margins_arr, 25)), |
| "fraction_positive_margin": float((margins_arr >= 0).mean()), |
| } |
|
|
| return asr, info |
|
|
|
|
| def run_attack_simulation( |
| encoder, |
| operator, |
| baseline_operator, |
| passages: list[str], |
| passage_embeddings_torch: torch.Tensor, |
| target_query_texts: list[str], |
| medical_keywords: list[str], |
| top_k: int = 10, |
| max_domain_docs: int = 200, |
| device: str = "cpu", |
| ) -> dict: |
| """Run GeoPoison-RAG attack simulation. |
| |
| Tests two attacker models: |
| 1. Bipartite Fiedler (realistic): attacker has shadow queries + docs, |
| builds cosine-based bipartite graph, optimizes in document space. |
| 2. Doc-only Fiedler (weaker): attacker has only document embeddings. |
| |
| Both assume cosine similarity governs retrieval. The defense breaks |
| this assumption via the cross-manifold operator R. |
| """ |
| print("\n=== Attack Simulation ===", flush=True) |
|
|
| |
| domain_indices, domain_texts = select_domain_documents( |
| passages, medical_keywords, max_domain_docs |
| ) |
| print(f"Selected {len(domain_indices)} domain documents.", flush=True) |
|
|
| if len(domain_indices) < 5: |
| print("Warning: Too few domain documents found.") |
| return {"error": "insufficient domain documents"} |
|
|
| domain_embs_np = passage_embeddings_torch[domain_indices].cpu().numpy() |
| domain_corpus = passage_embeddings_torch[domain_indices] |
|
|
| |
| print(f"Encoding {len(target_query_texts)} target queries...", flush=True) |
| query_embeddings = encoder.encode_queries(target_query_texts, show_progress=False) |
| q_np = query_embeddings.cpu().numpy() |
|
|
| |
| print("\nComputing bipartite Fiedler placement (attacker has shadow queries)...", flush=True) |
| adv_bipartite_np, bp_info = build_bipartite_fiedler_placement( |
| q_np, domain_embs_np, t_nn=min(20, len(domain_indices) - 1) |
| ) |
| adv_bipartite = torch.tensor(adv_bipartite_np, dtype=torch.float32) |
| print(f" Fiedler eigenvalue: {bp_info['fiedler_eigenvalue']:.6f}", flush=True) |
| print(f" Adv mean cos to queries: {bp_info['adv_mean_cos_to_queries']:.4f}", flush=True) |
| print(f" Adv mean cos to docs: {bp_info['adv_mean_cos_to_docs']:.4f}", flush=True) |
|
|
| |
| print("\nComputing doc-only Fiedler placement (no query access)...", flush=True) |
| adv_doconly_np, do_info = compute_doconly_fiedler_placement(domain_embs_np) |
| adv_doconly = torch.tensor(adv_doconly_np, dtype=torch.float32) |
|
|
| |
| print(f"\n--- Bipartite Fiedler Attack (realistic GeoPoison-RAG) ---", flush=True) |
|
|
| asr_bp_base, bp_base_info = compute_asr_threshold( |
| query_embeddings, domain_corpus, adv_bipartite, |
| baseline_operator, top_k, device |
| ) |
| print(f" Baseline ASR@{top_k}: {asr_bp_base:.4f} (mean margin: {bp_base_info['mean_margin']:.4f})", flush=True) |
|
|
| asr_bp_mm, bp_mm_info = compute_asr_threshold( |
| query_embeddings, domain_corpus, adv_bipartite, |
| operator, top_k, device |
| ) |
| print(f" Multi-manifold ASR@{top_k}: {asr_bp_mm:.4f} (mean margin: {bp_mm_info['mean_margin']:.4f})", flush=True) |
|
|
| |
| print(f"\n--- Doc-only Fiedler Attack (weaker attacker) ---", flush=True) |
|
|
| asr_do_base, do_base_info = compute_asr_threshold( |
| query_embeddings, domain_corpus, adv_doconly, |
| baseline_operator, top_k, device |
| ) |
| print(f" Baseline ASR@{top_k}: {asr_do_base:.4f} (mean margin: {do_base_info['mean_margin']:.4f})", flush=True) |
|
|
| asr_do_mm, do_mm_info = compute_asr_threshold( |
| query_embeddings, domain_corpus, adv_doconly, |
| operator, top_k, device |
| ) |
| print(f" Multi-manifold ASR@{top_k}: {asr_do_mm:.4f} (mean margin: {do_mm_info['mean_margin']:.4f})", flush=True) |
|
|
| |
| results = { |
| "bipartite_attack": { |
| "baseline_asr": asr_bp_base, |
| "multi_manifold_asr": asr_bp_mm, |
| "baseline_margins": bp_base_info, |
| "multi_manifold_margins": bp_mm_info, |
| "placement_info": bp_info, |
| }, |
| "doconly_attack": { |
| "baseline_asr": asr_do_base, |
| "multi_manifold_asr": asr_do_mm, |
| "baseline_margins": do_base_info, |
| "multi_manifold_margins": do_mm_info, |
| "placement_info": do_info, |
| }, |
| "num_domain_docs": len(domain_indices), |
| "num_target_queries": len(target_query_texts), |
| "top_k": top_k, |
| |
| "baseline_asr": asr_bp_base, |
| "multi_manifold_asr": asr_bp_mm, |
| } |
|
|
| def _reduction(base, mm): |
| return (1 - mm / max(base, 1e-9)) * 100 |
|
|
| print(f"\n=== Attack Results Summary ===", flush=True) |
| print(f" Baseline Multi-Manifold Reduction", flush=True) |
| print(f" Bipartite (realistic): {asr_bp_base:.4f} {asr_bp_mm:.4f}" |
| f" {_reduction(asr_bp_base, asr_bp_mm):.1f}%", flush=True) |
| print(f" Doc-only (weaker): {asr_do_base:.4f} {asr_do_mm:.4f}" |
| f" {_reduction(asr_do_base, asr_do_mm):.1f}%", flush=True) |
|
|
| return results |
|
|